diff --git a/sdk/core/core-http/src/fetchHttpClient.ts b/sdk/core/core-http/src/fetchHttpClient.ts index 30ce58aadb7f..eb84c0d4479b 100644 --- a/sdk/core/core-http/src/fetchHttpClient.ts +++ b/sdk/core/core-http/src/fetchHttpClient.ts @@ -10,6 +10,7 @@ import { HttpOperationResponse } from "./httpOperationResponse"; import { HttpHeaders, HttpHeadersLike } from "./httpHeaders"; import { RestError } from "./restError"; import { Readable, Transform } from "stream"; +import { logger } from "./log"; interface FetchError extends Error { code?: string; @@ -149,11 +150,12 @@ export abstract class FetchHttpClient implements HttpClient { ...platformSpecificRequestInit }; + let operationResponse: HttpOperationResponse | undefined; try { const response: CommonResponse = await this.fetch(httpRequest.url, requestInit); const headers = parseHeaders(response.headers); - const operationResponse: HttpOperationResponse = { + operationResponse = { headers: headers, request: httpRequest, status: response.status, @@ -200,7 +202,23 @@ export abstract class FetchHttpClient implements HttpClient { } finally { // clean up event listener if (httpRequest.abortSignal && abortListener) { - httpRequest.abortSignal.removeEventListener("abort", abortListener); + let uploadStreamDone = Promise.resolve(); + if (isReadableStream(body)) { + uploadStreamDone = isStreamComplete(body); + } + let downloadStreamDone = Promise.resolve(); + if (isReadableStream(operationResponse?.readableStreamBody)) { + downloadStreamDone = isStreamComplete(operationResponse!.readableStreamBody); + } + + Promise.all([uploadStreamDone, downloadStreamDone]) + .then(() => { + httpRequest.abortSignal?.removeEventListener("abort", abortListener!); + return; + }) + .catch((e) => { + logger.warning("Error when cleaning up abortListener on httpRequest", e); + }); } } } @@ -214,6 +232,14 @@ function isReadableStream(body: any): body is Readable { return body && typeof body.pipe === "function"; } +function isStreamComplete(stream: Readable): Promise { + return new Promise((resolve) => { + stream.on("close", resolve); + stream.on("end", resolve); + stream.on("error", resolve); + }); +} + export function parseHeaders(headers: Headers): HttpHeadersLike { const httpHeaders = new HttpHeaders(); diff --git a/sdk/core/core-https/src/nodeHttpsClient.ts b/sdk/core/core-https/src/nodeHttpsClient.ts index 52faa3ac74f2..431d14531e6e 100644 --- a/sdk/core/core-https/src/nodeHttpsClient.ts +++ b/sdk/core/core-https/src/nodeHttpsClient.ts @@ -18,11 +18,20 @@ import { createHttpHeaders } from "./httpHeaders"; import { RestError } from "./restError"; import { URL } from "./util/url"; import { IncomingMessage } from "http"; +import { logger } from "./log"; function isReadableStream(body: any): body is NodeJS.ReadableStream { return body && typeof body.pipe === "function"; } +function isStreamComplete(stream: NodeJS.ReadableStream): Promise { + return new Promise((resolve) => { + stream.on("close", resolve); + stream.on("end", resolve); + stream.on("error", resolve); + }); +} + function isArrayBuffer(body: any): body is ArrayBuffer | ArrayBufferView { return body && typeof body.byteLength === "number"; } @@ -92,6 +101,7 @@ export class NodeHttpsClient implements HttpsClient { } } + let responseStream: NodeJS.ReadableStream | undefined; try { const result = await new Promise((resolve, reject) => { if (body && request.onUploadProgress) { @@ -157,7 +167,23 @@ export class NodeHttpsClient implements HttpsClient { } finally { // clean up event listener if (request.abortSignal && abortListener) { - request.abortSignal.removeEventListener("abort", abortListener); + let uploadStreamDone = Promise.resolve(); + if (isReadableStream(body)) { + uploadStreamDone = isStreamComplete(body as NodeJS.ReadableStream); + } + let downloadStreamDone = Promise.resolve(); + if (isReadableStream(responseStream)) { + downloadStreamDone = isStreamComplete(responseStream); + } + + Promise.all([uploadStreamDone, downloadStreamDone]) + .then(() => { + request.abortSignal?.removeEventListener("abort", abortListener!); + return; + }) + .catch((e) => { + logger.warning("Error when cleaning up abortListener on httpRequest", e); + }); } } } diff --git a/sdk/storage/storage-file-share/test/fileclient.spec.ts b/sdk/storage/storage-file-share/test/fileclient.spec.ts index 27573cb80244..3b895e59285e 100644 --- a/sdk/storage/storage-file-share/test/fileclient.spec.ts +++ b/sdk/storage/storage-file-share/test/fileclient.spec.ts @@ -734,7 +734,6 @@ describe("FileClient", () => { // tslint:disable-next-line:no-empty } catch (err) { assert.equal(err.name, "AbortError"); - assert.equal(err.message, "The operation was aborted.", "Unexpected error caught: " + err); } assert.ok(eventTriggered); });