Skip to content

Commit

Permalink
[core-http] Clean up event listener when streaming is done (#12038)
Browse files Browse the repository at this point in the history
Currently our cleanup code removes the abort event listener when the response is
returned. In streaming case even the response is returned, the work is not done
yet, however, with the abort listener removed, we lost the ability to cancel the
streaming. This change fixes the issue by unregistering the abort listener for
streaming when the stream ends.

Now that aborting streaming is working, we are getting AbortError from
node-fetch which use a different message "The user aborted a request."
than the browser fetch API does. so stop verifying error.message.

Port fix to core-https
  • Loading branch information
jeremymeng authored Jan 20, 2021
1 parent 8c7ba16 commit f8e023f
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 4 deletions.
30 changes: 28 additions & 2 deletions sdk/core/core-http/src/fetchHttpClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { HttpOperationResponse } from "./httpOperationResponse";
import { HttpHeaders, HttpHeadersLike } from "./httpHeaders";
import { RestError } from "./restError";
import { Readable, Transform } from "stream";
import { logger } from "./log";

interface FetchError extends Error {
code?: string;
Expand Down Expand Up @@ -149,11 +150,12 @@ export abstract class FetchHttpClient implements HttpClient {
...platformSpecificRequestInit
};

let operationResponse: HttpOperationResponse | undefined;
try {
const response: CommonResponse = await this.fetch(httpRequest.url, requestInit);

const headers = parseHeaders(response.headers);
const operationResponse: HttpOperationResponse = {
operationResponse = {
headers: headers,
request: httpRequest,
status: response.status,
Expand Down Expand Up @@ -200,7 +202,23 @@ export abstract class FetchHttpClient implements HttpClient {
} finally {
// clean up event listener
if (httpRequest.abortSignal && abortListener) {
httpRequest.abortSignal.removeEventListener("abort", abortListener);
let uploadStreamDone = Promise.resolve();
if (isReadableStream(body)) {
uploadStreamDone = isStreamComplete(body);
}
let downloadStreamDone = Promise.resolve();
if (isReadableStream(operationResponse?.readableStreamBody)) {
downloadStreamDone = isStreamComplete(operationResponse!.readableStreamBody);
}

Promise.all([uploadStreamDone, downloadStreamDone])
.then(() => {
httpRequest.abortSignal?.removeEventListener("abort", abortListener!);
return;
})
.catch((e) => {
logger.warning("Error when cleaning up abortListener on httpRequest", e);
});
}
}
}
Expand All @@ -214,6 +232,14 @@ function isReadableStream(body: any): body is Readable {
return body && typeof body.pipe === "function";
}

function isStreamComplete(stream: Readable): Promise<void> {
return new Promise((resolve) => {
stream.on("close", resolve);
stream.on("end", resolve);
stream.on("error", resolve);
});
}

export function parseHeaders(headers: Headers): HttpHeadersLike {
const httpHeaders = new HttpHeaders();

Expand Down
28 changes: 27 additions & 1 deletion sdk/core/core-https/src/nodeHttpsClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,20 @@ import { createHttpHeaders } from "./httpHeaders";
import { RestError } from "./restError";
import { URL } from "./util/url";
import { IncomingMessage } from "http";
import { logger } from "./log";

function isReadableStream(body: any): body is NodeJS.ReadableStream {
return body && typeof body.pipe === "function";
}

function isStreamComplete(stream: NodeJS.ReadableStream): Promise<void> {
return new Promise((resolve) => {
stream.on("close", resolve);
stream.on("end", resolve);
stream.on("error", resolve);
});
}

function isArrayBuffer(body: any): body is ArrayBuffer | ArrayBufferView {
return body && typeof body.byteLength === "number";
}
Expand Down Expand Up @@ -92,6 +101,7 @@ export class NodeHttpsClient implements HttpsClient {
}
}

let responseStream: NodeJS.ReadableStream | undefined;
try {
const result = await new Promise<PipelineResponse>((resolve, reject) => {
if (body && request.onUploadProgress) {
Expand Down Expand Up @@ -157,7 +167,23 @@ export class NodeHttpsClient implements HttpsClient {
} finally {
// clean up event listener
if (request.abortSignal && abortListener) {
request.abortSignal.removeEventListener("abort", abortListener);
let uploadStreamDone = Promise.resolve();
if (isReadableStream(body)) {
uploadStreamDone = isStreamComplete(body as NodeJS.ReadableStream);
}
let downloadStreamDone = Promise.resolve();
if (isReadableStream(responseStream)) {
downloadStreamDone = isStreamComplete(responseStream);
}

Promise.all([uploadStreamDone, downloadStreamDone])
.then(() => {
request.abortSignal?.removeEventListener("abort", abortListener!);
return;
})
.catch((e) => {
logger.warning("Error when cleaning up abortListener on httpRequest", e);
});
}
}
}
Expand Down
1 change: 0 additions & 1 deletion sdk/storage/storage-file-share/test/fileclient.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -734,7 +734,6 @@ describe("FileClient", () => {
// tslint:disable-next-line:no-empty
} catch (err) {
assert.equal(err.name, "AbortError");
assert.equal(err.message, "The operation was aborted.", "Unexpected error caught: " + err);
}
assert.ok(eventTriggered);
});
Expand Down

0 comments on commit f8e023f

Please sign in to comment.