From 0072f426732e3e361839ea0b34545ff1f6e800da Mon Sep 17 00:00:00 2001 From: George Fu Date: Wed, 24 Jul 2024 21:55:41 -0400 Subject: [PATCH] fix(eventstream-handler-node): start streaming without waiting for response (#6311) * fix(eventstream-handler-node): start streaming without waiting for response * test(middleware-eventstream): use dummy asynciterator in integ tests --- .../src/EventStreamPayloadHandler.spec.ts | 55 ++++++++++++++++--- .../src/EventStreamPayloadHandler.ts | 25 ++++----- .../src/middleware-eventstream.integ.spec.ts | 30 +--------- .../src/middleware-websocket.integ.spec.ts | 6 +- 4 files changed, 62 insertions(+), 54 deletions(-) diff --git a/packages/eventstream-handler-node/src/EventStreamPayloadHandler.spec.ts b/packages/eventstream-handler-node/src/EventStreamPayloadHandler.spec.ts index d2a774126c6a..a2b8f25d0edd 100644 --- a/packages/eventstream-handler-node/src/EventStreamPayloadHandler.spec.ts +++ b/packages/eventstream-handler-node/src/EventStreamPayloadHandler.spec.ts @@ -9,6 +9,15 @@ jest.mock("./EventSigningStream"); jest.mock("@smithy/eventstream-codec"); describe(EventStreamPayloadHandler.name, () => { + const collectData = (stream: Readable) => { + const chunks: any = []; + return new Promise((resolve, reject) => { + stream.on("data", (chunk) => chunks.push(chunk)); + stream.on("error", reject); + stream.on("end", () => resolve(Buffer.concat(chunks).toString("utf8"))); + }); + }; + const mockMessageSigner: MessageSigner = { sign: jest.fn(), signMessage: jest.fn(), @@ -49,7 +58,7 @@ describe(EventStreamPayloadHandler.name, () => { utf8Decoder: mockUtf8Decoder, utf8Encoder: mockUtf8encoder, }); - const mockRequest = { body: new Readable() } as HttpRequest; + const mockRequest = { body: new PassThrough() } as HttpRequest; try { await handler.handle(mockNextHandler, { @@ -126,6 +135,42 @@ describe(EventStreamPayloadHandler.name, () => { }); }); + it("should start piping regardless of whether the downstream resolves", async () => { + const authorization = + "AWS4-HMAC-SHA256 Credential=AKID/20200510/us-west-2/foo/aws4_request, SignedHeaders=host, Signature=1234567890"; + const originalPayload = new PassThrough(); + const mockRequest = { + body: originalPayload, + headers: { authorization }, + } as any; + const handler = new EventStreamPayloadHandler({ + messageSigner: () => Promise.resolve(mockMessageSigner), + utf8Decoder: mockUtf8Decoder, + utf8Encoder: mockUtf8encoder, + }); + + (mockNextHandler as any).mockImplementationOnce(async (args: FinalizeHandlerArguments) => { + const handledRequest = args.request as HttpRequest; + + originalPayload.end("Some Data"); + const collected = await collectData(handledRequest.body); + + // this means the stream is flowing without this downstream middleware + // having resolved yet. + expect(collected).toEqual("Some Data"); + + return Promise.resolve({ output: { handledRequest } }); + }); + + const { + output: { handledRequest }, + } = await handler.handle(mockNextHandler, { + request: mockRequest, + input: {}, + }); + expect(handledRequest.body).not.toBe(originalPayload); + }); + it("should start piping to request payload through event signer if downstream middleware returns", async () => { const authorization = "AWS4-HMAC-SHA256 Credential=AKID/20200510/us-west-2/foo/aws4_request, SignedHeaders=host, Signature=1234567890"; @@ -155,14 +200,6 @@ describe(EventStreamPayloadHandler.name, () => { expect(handledRequest.body).not.toBe(originalPayload); // Expect the data from the output payload from eventstream payload handler the same as from the // stream supplied to the handler. - const collectData = (stream: Readable) => { - const chunks: any = []; - return new Promise((resolve, reject) => { - stream.on("data", (chunk) => chunks.push(chunk)); - stream.on("error", reject); - stream.on("end", () => resolve(Buffer.concat(chunks).toString("utf8"))); - }); - }; originalPayload.end("Some Data"); const collected = await collectData(handledRequest.body); expect(collected).toEqual("Some Data"); diff --git a/packages/eventstream-handler-node/src/EventStreamPayloadHandler.ts b/packages/eventstream-handler-node/src/EventStreamPayloadHandler.ts index c750f9158fa6..e17d6c6aa538 100644 --- a/packages/eventstream-handler-node/src/EventStreamPayloadHandler.ts +++ b/packages/eventstream-handler-node/src/EventStreamPayloadHandler.ts @@ -64,20 +64,9 @@ export class EventStreamPayloadHandler implements IEventStreamPayloadHandler { objectMode: true, }); - let result: FinalizeHandlerOutput; - try { - result = await next(args); - } catch (e) { - // Close the payload stream otherwise the retry would hang - // because of the previous connection. - request.body.end(); - throw e; - } - - // If response is successful, start piping the payload stream - const match = (request.headers["authorization"] || "").match(/Signature=([\w]+)$/); + const match = request.headers?.authorization?.match(/Signature=([\w]+)$/); // Sign the eventstream based on the signature from initial request. - const priorSignature = (match || [])[1] || (query && (query["X-Amz-Signature"] as string)) || ""; + const priorSignature = match?.[1] ?? (query?.["X-Amz-Signature"] as string) ?? ""; const signingStream = new EventSigningStream({ priorSignature, eventStreamCodec: this.eventStreamCodec, @@ -91,6 +80,16 @@ export class EventStreamPayloadHandler implements IEventStreamPayloadHandler { } }); + let result: FinalizeHandlerOutput; + try { + result = await next(args); + } catch (e) { + // Close the payload stream otherwise the retry would hang + // because of the previous connection. + request.body.end(); + throw e; + } + return result; } } diff --git a/packages/middleware-eventstream/src/middleware-eventstream.integ.spec.ts b/packages/middleware-eventstream/src/middleware-eventstream.integ.spec.ts index 2216bdda63bc..b412e9e98c68 100644 --- a/packages/middleware-eventstream/src/middleware-eventstream.integ.spec.ts +++ b/packages/middleware-eventstream/src/middleware-eventstream.integ.spec.ts @@ -30,15 +30,7 @@ describe("middleware-eventstream", () => { botAliasId: "undefined", localeId: "undefined", sessionId: "undefined", - requestEventStream: { - [Symbol.asyncIterator]() { - return { - next() { - return this as any; - }, - }; - }, - }, + requestEventStream: (async function* () {})(), }); expect.assertions(2); @@ -61,15 +53,7 @@ describe("middleware-eventstream", () => { VideoWidth: "undefined", VideoHeight: "undefined", ChallengeVersions: "undefined", - LivenessRequestStream: { - [Symbol.asyncIterator]() { - return { - next() { - return this as any; - }, - }; - }, - }, + LivenessRequestStream: (async function* () {})(), }); expect.assertions(2); @@ -91,15 +75,7 @@ describe("middleware-eventstream", () => { await client.startStreamTranscription({ MediaSampleRateHertz: 144, MediaEncoding: "ogg-opus", - AudioStream: { - [Symbol.asyncIterator]() { - return { - next() { - return this as any; - }, - }; - }, - }, + AudioStream: (async function* () {})(), }); expect.assertions(2); diff --git a/packages/middleware-websocket/src/middleware-websocket.integ.spec.ts b/packages/middleware-websocket/src/middleware-websocket.integ.spec.ts index 7155b92c4979..26ce910f83f5 100644 --- a/packages/middleware-websocket/src/middleware-websocket.integ.spec.ts +++ b/packages/middleware-websocket/src/middleware-websocket.integ.spec.ts @@ -47,11 +47,7 @@ describe("middleware-websocket", () => { VideoWidth: "1024", VideoHeight: "1024", ChallengeVersions: "a,b,c", - LivenessRequestStream: { - [Symbol.asyncIterator]() { - return this as any; - }, - }, + LivenessRequestStream: (async function* () {})(), }); }); });