Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(util-stream): create checksum stream adapters #1409

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/red-cameras-repair.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@smithy/util-stream": minor
---

create checksum stream adapter
3 changes: 3 additions & 0 deletions packages/util-stream/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,19 @@
"dist-*/**"
],
"browser": {
"./dist-es/checksum/createChecksumStream": "./dist-es/checksum/createChecksumStream.browser",
"./dist-es/getAwsChunkedEncodingStream": "./dist-es/getAwsChunkedEncodingStream.browser",
"./dist-es/headStream": "./dist-es/headStream.browser",
"./dist-es/sdk-stream-mixin": "./dist-es/sdk-stream-mixin.browser",
"./dist-es/splitStream": "./dist-es/splitStream.browser"
},
"react-native": {
"./dist-es/checksum/createChecksumStream": "./dist-es/checksum/createChecksumStream.browser",
"./dist-es/getAwsChunkedEncodingStream": "./dist-es/getAwsChunkedEncodingStream.browser",
"./dist-es/sdk-stream-mixin": "./dist-es/sdk-stream-mixin.browser",
"./dist-es/headStream": "./dist-es/headStream.browser",
"./dist-es/splitStream": "./dist-es/splitStream.browser",
"./dist-cjs/checksum/createChecksumStream": "./dist-cjs/checksum/createChecksumStream.browser",
"./dist-cjs/getAwsChunkedEncodingStream": "./dist-cjs/getAwsChunkedEncodingStream.browser",
"./dist-cjs/sdk-stream-mixin": "./dist-cjs/sdk-stream-mixin.browser",
"./dist-cjs/headStream": "./dist-cjs/headStream.browser",
Expand Down
119 changes: 119 additions & 0 deletions packages/util-stream/src/checksum/createChecksumStream.browser.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import { Checksum, Encoder } from "@smithy/types";
import { toBase64 } from "@smithy/util-base64";

import { isReadableStream } from "../stream-type-check";

/**
* @internal
*/
export interface ChecksumStreamInit {
/**
* Base64 value of the expected checksum.
*/
expectedChecksum: string;
/**
* For error messaging, the location from which the checksum value was read.
*/
checksumSourceLocation: string;
/**
* The checksum calculator.
*/
checksum: Checksum;
/**
* The stream to be checked.
*/
source: ReadableStream;

/**
* Optional base 64 encoder if calling from a request context.
*/
base64Encoder?: Encoder;
}

/**
* Alias prevents compiler from turning
* ReadableStream into ReadableStream<any>, which is incompatible
* with the NodeJS.ReadableStream global type.
*/
export type ReadableStreamType = ReadableStream;

/**
* This is a local copy of
* https://developer.mozilla.org/en-US/docs/Web/API/TransformStreamDefaultController
* in case users do not have this type.
*/
interface TransformStreamDefaultController {
enqueue(chunk: any): void;
error(error: unknown): void;
terminate(): void;
}

/**
* @internal
*
* Creates a stream adapter for throwing checksum errors for streams without
* buffering the stream.
*/
export const createChecksumStream = ({
expectedChecksum,
checksum,
source,
checksumSourceLocation,
base64Encoder,
}: ChecksumStreamInit): ReadableStreamType => {
if (!isReadableStream(source)) {
throw new Error(
`@smithy/util-stream: unsupported source type ${(source as any)?.constructor?.name ?? source} in ChecksumStream.`
);
}

const encoder = base64Encoder ?? toBase64;

if (typeof TransformStream !== "function") {
throw new Error(
"@smithy/util-stream: unable to instantiate ChecksumStream because API unavailable: ReadableStream/TransformStream."
);
}

const transform = new TransformStream({
start() {},
async transform(chunk: any, controller: TransformStreamDefaultController) {
/**
* When the upstream source flows data to this stream,
* calculate a step update of the checksum.
*/
checksum.update(chunk);
controller.enqueue(chunk);
},
async flush(controller: TransformStreamDefaultController) {
const digest: Uint8Array = await checksum.digest();
const received = encoder(digest);

if (expectedChecksum !== received) {
const error = new Error(
`Checksum mismatch: expected "${expectedChecksum}" but received "${received}"` +
` in response header "${checksumSourceLocation}".`
);
controller.error(error);
} else {
controller.terminate();
}
},
});

source.pipeThrough(transform);
const readable = transform.readable;
Object.setPrototypeOf(readable, ChecksumStream.prototype);
return readable;
};

const ReadableStreamRef = typeof ReadableStream === "function" ? ReadableStream : function (): void {};

/**
* This stub exists so that the readable returned by createChecksumStream
* identifies as "ChecksumStream" in alignment with the Node.js
* implementation.
*
* @extends ReadableStream
*/
export class ChecksumStream extends (ReadableStreamRef as any) {}
135 changes: 135 additions & 0 deletions packages/util-stream/src/checksum/createChecksumStream.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
import { Checksum } from "@smithy/types";
import { toBase64 } from "@smithy/util-base64";
import { toUtf8 } from "@smithy/util-utf8";
import { Readable } from "stream";

import { headStream } from "../headStream";
import { ChecksumStream, createChecksumStream } from "./createChecksumStream";
import { ChecksumStream as ChecksumStreamWeb } from "./createChecksumStream.browser";

describe("Checksum streams", () => {
/**
* Hash "algorithm" that appends all data together.
*/
class Appender implements Checksum {
public hash = "";
async digest(): Promise<Uint8Array> {
return Buffer.from(this.hash);
}
reset(): void {
throw new Error("Function not implemented.");
}
update(chunk: Uint8Array): void {
this.hash += toUtf8(chunk);
}
}

const canonicalData = new Uint8Array("abcdefghijklmnopqrstuvwxyz".split("").map((_) => _.charCodeAt(0)));

const canonicalUtf8 = toUtf8(canonicalData);
const canonicalBase64 = toBase64(canonicalUtf8);

describe(createChecksumStream.name, () => {
const makeStream = () => {
return Readable.from(Buffer.from(canonicalData.buffer, 0, 26));
};

it("should extend a Readable stream", async () => {
const stream = makeStream();
const checksumStream = createChecksumStream({
expectedChecksum: canonicalBase64,
checksum: new Appender(),
checksumSourceLocation: "my-header",
source: stream,
});

expect(checksumStream).toBeInstanceOf(Readable);
expect(checksumStream).toBeInstanceOf(ChecksumStream);

const collected = toUtf8(await headStream(checksumStream, Infinity));
expect(collected).toEqual(canonicalUtf8);
expect(stream.readableEnded).toEqual(true);
expect(checksumStream.readableEnded).toEqual(true);
});

it("should throw during stream read if the checksum does not match", async () => {
const stream = makeStream();
const checksumStream = createChecksumStream({
expectedChecksum: "different-expected-checksum",
checksum: new Appender(),
checksumSourceLocation: "my-header",
source: stream,
});

try {
toUtf8(await headStream(checksumStream, Infinity));
throw new Error("stream was read successfully");
} catch (e: unknown) {
expect(String(e)).toEqual(
`Error: Checksum mismatch: expected "different-expected-checksum" but` +
` received "${canonicalBase64}"` +
` in response header "my-header".`
);
}
});
});

describe(createChecksumStream.name + " webstreams API", () => {
if (typeof ReadableStream !== "function") {
// test not applicable to Node.js 16.
return;
}

const makeStream = () => {
return new ReadableStream({
start(controller) {
canonicalData.forEach((byte) => {
controller.enqueue(new Uint8Array([byte]));
});
controller.close();
},
});
};

it("should extend a ReadableStream", async () => {
const stream = makeStream();
const checksumStream = createChecksumStream({
expectedChecksum: canonicalBase64,
checksum: new Appender(),
checksumSourceLocation: "my-header",
source: stream,
});

expect(checksumStream).toBeInstanceOf(ReadableStream);
expect(checksumStream).toBeInstanceOf(ChecksumStreamWeb);

const collected = toUtf8(await headStream(checksumStream, Infinity));
expect(collected).toEqual(canonicalUtf8);
expect(stream.locked).toEqual(true);

// expectation is that it is resolved.
expect(await checksumStream.getReader().closed);
});

it("should throw during stream read if the checksum does not match", async () => {
const stream = makeStream();
const checksumStream = createChecksumStream({
expectedChecksum: "different-expected-checksum",
checksum: new Appender(),
checksumSourceLocation: "my-header",
source: stream,
});

try {
toUtf8(await headStream(checksumStream, Infinity));
throw new Error("stream was read successfully");
} catch (e: unknown) {
expect(String(e)).toEqual(
`Error: Checksum mismatch: expected "different-expected-checksum" but` +
` received "${canonicalBase64}"` +
` in response header "my-header".`
);
}
});
});
});
Loading
Loading