Skip to content

Commit

Permalink
Only remove abort listener on the last active stream
Browse files Browse the repository at this point in the history
Technically there's a possibility that uploading streaming and download
streaming are happening concurrently.  We only want to remove the
abort controller at the end of last active streaming
  • Loading branch information
jeremymeng committed Jan 6, 2021
1 parent b391eb1 commit fc93036
Showing 1 changed file with 20 additions and 39 deletions.
59 changes: 20 additions & 39 deletions sdk/core/core-http/src/fetchHttpClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,6 @@ export abstract class FetchHttpClient implements HttpClient {

const abortController = new AbortController();
let abortListener: ((event: any) => void) | undefined;
let registerCleanUpListenerOnStream:
| ((stream: NodeJS.ReadableStream, isDownload: boolean) => void)
| undefined;
if (httpRequest.abortSignal) {
if (httpRequest.abortSignal.aborted) {
throw new AbortError("The operation was aborted.");
Expand All @@ -69,28 +66,6 @@ export abstract class FetchHttpClient implements HttpClient {
}
};
httpRequest.abortSignal.addEventListener("abort", abortListener);

registerCleanUpListenerOnStream = (stream: NodeJS.ReadableStream, isDownload: boolean) => {
const cleanupCallback = () => {
if (isDownload) {
streamingDownload = false;
} else {
streamingUpload = false;
}
stream.removeListener("close", cleanupCallback);
stream.removeListener("end", cleanupCallback);
stream.removeListener("error", cleanupCallback);

if (abortListener) {
// unregister only when the other stream is not active
if ((isDownload && !streamingUpload) || (!isDownload && !streamingDownload))
httpRequest.abortSignal?.removeEventListener("abort", abortListener);
}
};
stream.once("close", cleanupCallback);
stream.once("end", cleanupCallback);
stream.once("error", cleanupCallback);
};
}

if (httpRequest.timeout) {
Expand Down Expand Up @@ -149,7 +124,6 @@ export abstract class FetchHttpClient implements HttpClient {
? httpRequest.body()
: httpRequest.body
: undefined;
let streamingUpload = false;
if (httpRequest.onUploadProgress && httpRequest.body) {
const onUploadProgress = httpRequest.onUploadProgress;
const uploadReportStream = new ReportTransform(onUploadProgress);
Expand All @@ -161,10 +135,6 @@ export abstract class FetchHttpClient implements HttpClient {

body = uploadReportStream;
}
if (registerCleanUpListenerOnStream && isReadableStream(body)) {
streamingUpload = true;
registerCleanUpListenerOnStream(body, false);
}

const platformSpecificRequestInit: Partial<RequestInit> = await this.prepareRequest(
httpRequest
Expand All @@ -180,7 +150,6 @@ export abstract class FetchHttpClient implements HttpClient {
};

let operationResponse: HttpOperationResponse | undefined;
let streamingDownload = false;
try {
const response: CommonResponse = await this.fetch(httpRequest.url, requestInit);

Expand Down Expand Up @@ -211,13 +180,6 @@ export abstract class FetchHttpClient implements HttpClient {
}
}
}
if (
registerCleanUpListenerOnStream &&
isReadableStream(operationResponse.readableStreamBody)
) {
streamingDownload = true;
registerCleanUpListenerOnStream(operationResponse.readableStreamBody, true);
}

await this.processRequest(operationResponse);

Expand All @@ -237,7 +199,18 @@ export abstract class FetchHttpClient implements HttpClient {

throw fetchError;
} finally {
if (!(streamingUpload || streamingDownload) && abortListener) {
// clean up event listener
if (httpRequest.abortSignal && abortListener) {
let uploadStreamDone = Promise.resolve();
if (httpRequest.abortSignal && isReadableStream(body)) {
uploadStreamDone = isStreamComplete(body);
}
let downloadStreamDone = Promise.resolve();
if (httpRequest.abortSignal && isReadableStream(operationResponse?.readableStreamBody)) {
downloadStreamDone = isStreamComplete(operationResponse!.readableStreamBody);
}

await Promise.all([uploadStreamDone, downloadStreamDone]);
httpRequest.abortSignal?.removeEventListener("abort", abortListener);
}
}
Expand All @@ -252,6 +225,14 @@ function isReadableStream(body: any): body is Readable {
return body && typeof body.pipe === "function";
}

function isStreamComplete(stream: Readable): Promise<void> {
return new Promise((resolve) => {
stream.on("close", resolve);
stream.on("end", resolve);
stream.on("error", resolve);
});
}

export function parseHeaders(headers: Headers): HttpHeadersLike {
const httpHeaders = new HttpHeaders();

Expand Down

0 comments on commit fc93036

Please sign in to comment.