Skip to content

Commit

Permalink
Only remove abort listener on the last active stream
Browse files Browse the repository at this point in the history
Technically there's a possibility that uploading streaming and download
streaming are happening concurrently.  We only want to remove the
abort controller at the end of last active streaming
  • Loading branch information
jeremymeng committed Jan 6, 2021
1 parent dd2c826 commit 9b63e8d
Showing 1 changed file with 29 additions and 45 deletions.
74 changes: 29 additions & 45 deletions sdk/core/core-http/src/fetchHttpClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ export type CommonResponse = Omit<Response, "body" | "trailer" | "formData"> & {

export class ReportTransform extends Transform {
private loadedBytes: number = 0;
_transform(chunk: string | Buffer, _encoding: string, callback: Function): void {
_transform(chunk: string | Buffer, _encoding: string, callback: (arg: any) => void): void {
this.push(chunk);
this.loadedBytes += chunk.length;
this.progressCallback!({ loadedBytes: this.loadedBytes });
Expand All @@ -55,9 +55,6 @@ export abstract class FetchHttpClient implements HttpClient {

const abortController = new AbortController();
let abortListener: ((event: any) => void) | undefined;
let registerCleanUpListenerOnStream:
| ((stream: NodeJS.ReadableStream, isDownload: boolean) => void)
| undefined;
if (httpRequest.abortSignal) {
if (httpRequest.abortSignal.aborted) {
throw new AbortError("The operation was aborted.");
Expand All @@ -69,28 +66,6 @@ export abstract class FetchHttpClient implements HttpClient {
}
};
httpRequest.abortSignal.addEventListener("abort", abortListener);

registerCleanUpListenerOnStream = (stream: NodeJS.ReadableStream, isDownload: boolean) => {
const cleanupCallback = () => {
if (isDownload) {
streamingDownload = false;
} else {
streamingUpload = false;
}
stream.removeListener("close", cleanupCallback);
stream.removeListener("end", cleanupCallback);
stream.removeListener("error", cleanupCallback);

if (abortListener) {
// unregister only when the other stream is not active
if ((isDownload && !streamingUpload) || (!isDownload && !streamingDownload))
httpRequest.abortSignal?.removeEventListener("abort", abortListener);
}
};
stream.once("close", cleanupCallback);
stream.once("end", cleanupCallback);
stream.once("error", cleanupCallback);
};
}

if (httpRequest.timeout) {
Expand All @@ -107,8 +82,11 @@ export abstract class FetchHttpClient implements HttpClient {
if (typeof value === "function") {
value = value();
}
// eslint-disable-next-line no-prototype-builtins
if (value && value.hasOwnProperty("value") && value.hasOwnProperty("options")) {
if (
value &&
Object.prototype.hasOwnProperty.call(value, "value") &&
Object.prototype.hasOwnProperty.call(value, "options")
) {
requestForm.append(key, value.value, value.options);
} else {
requestForm.append(key, value);
Expand Down Expand Up @@ -146,7 +124,6 @@ export abstract class FetchHttpClient implements HttpClient {
? httpRequest.body()
: httpRequest.body
: undefined;
let streamingUpload = false;
if (httpRequest.onUploadProgress && httpRequest.body) {
const onUploadProgress = httpRequest.onUploadProgress;
const uploadReportStream = new ReportTransform(onUploadProgress);
Expand All @@ -158,10 +135,6 @@ export abstract class FetchHttpClient implements HttpClient {

body = uploadReportStream;
}
if (registerCleanUpListenerOnStream && isReadableStream(body)) {
streamingUpload = true;
registerCleanUpListenerOnStream(body, false);
}

const platformSpecificRequestInit: Partial<RequestInit> = await this.prepareRequest(
httpRequest
Expand All @@ -177,7 +150,6 @@ export abstract class FetchHttpClient implements HttpClient {
};

let operationResponse: HttpOperationResponse | undefined;
let streamingDownload = false;
try {
const response: CommonResponse = await this.fetch(httpRequest.url, requestInit);

Expand Down Expand Up @@ -208,13 +180,6 @@ export abstract class FetchHttpClient implements HttpClient {
}
}
}
if (
registerCleanUpListenerOnStream &&
isReadableStream(operationResponse.readableStreamBody)
) {
streamingDownload = true;
registerCleanUpListenerOnStream(operationResponse.readableStreamBody, true);
}

await this.processRequest(operationResponse);

Expand All @@ -234,21 +199,40 @@ export abstract class FetchHttpClient implements HttpClient {

throw fetchError;
} finally {
if (!(streamingUpload || streamingDownload) && abortListener) {
// clean up event listener
if (httpRequest.abortSignal && abortListener) {
let uploadStreamDone = Promise.resolve();
if (httpRequest.abortSignal && isReadableStream(body)) {
uploadStreamDone = isStreamComplete(body);
}
let downloadStreamDone = Promise.resolve();
if (httpRequest.abortSignal && isReadableStream(operationResponse?.readableStreamBody)) {
downloadStreamDone = isStreamComplete(operationResponse!.readableStreamBody);
}

await Promise.all([uploadStreamDone, downloadStreamDone]);
httpRequest.abortSignal?.removeEventListener("abort", abortListener);
}
}
}

abstract async prepareRequest(httpRequest: WebResourceLike): Promise<Partial<RequestInit>>;
abstract async processRequest(operationResponse: HttpOperationResponse): Promise<void>;
abstract async fetch(input: CommonRequestInfo, init?: CommonRequestInit): Promise<CommonResponse>;
abstract prepareRequest(httpRequest: WebResourceLike): Promise<Partial<RequestInit>>;
abstract processRequest(operationResponse: HttpOperationResponse): Promise<void>;
abstract fetch(input: CommonRequestInfo, init?: CommonRequestInit): Promise<CommonResponse>;
}

function isReadableStream(body: any): body is Readable {
return body && typeof body.pipe === "function";
}

function isStreamComplete(stream: Readable): Promise<void> {
return new Promise((resolve) => {
stream.on("close", resolve);
stream.on("end", resolve);
stream.on("error", resolve);
});
}

export function parseHeaders(headers: Headers): HttpHeadersLike {
const httpHeaders = new HttpHeaders();

Expand Down

0 comments on commit 9b63e8d

Please sign in to comment.