From 14e4b6184f00735109647a52ddd71e9119e192c0 Mon Sep 17 00:00:00 2001 From: Daniel Bankhead Date: Fri, 7 Apr 2023 14:18:46 -0700 Subject: [PATCH 01/21] chore: WIP --- src/file.ts | 49 +++++++++++++++++++++++++++++++---------- src/resumable-upload.ts | 40 ++++++++++++++++++++++++++------- 2 files changed, 69 insertions(+), 20 deletions(-) diff --git a/src/file.ts b/src/file.ts index 3813e32f1..136a15a19 100644 --- a/src/file.ts +++ b/src/file.ts @@ -1882,8 +1882,13 @@ class File extends ServiceObject { md5 = options.validation === 'md5'; } else if (options.validation === false) { crc32c = false; + } else if (options.offset) { + // TODO: check for provided offset + // if available, } + // offset + /** * A callback for determining when the underlying pipeline is complete. * It's possible the pipeline callback could error before the write stream @@ -1910,13 +1915,6 @@ class File extends ServiceObject { }); const emitStream = new PassThroughShim(); - const hashCalculatingStream = new HashStreamValidator({ - crc32c, - md5, - crc32cGenerator: this.crc32cGenerator, - updateHashesOnly: true, - }); - const fileWriteStream = duplexify(); let fileWriteStreamMetadataReceived = false; @@ -1929,17 +1927,38 @@ class File extends ServiceObject { fileWriteStreamMetadataReceived = true; }); - writeStream.on('writing', () => { + writeStream.on('writing', async () => { if (options.resumable === false) { this.startSimpleUpload_(fileWriteStream, options); } else { this.startResumableUpload_(fileWriteStream, options); } + const transformStreams: Transform[] = []; + if (gzip) { + transformStreams.push(new PassThrough()); + } + + let hashCalculatingStream: HashStreamValidator | null = null; + if (crc32c || md5) { + // if offset + // - if md5 -> error + // - if crc32cResume -> set for init HSV value + // - if !crc32cResume -> query for current crc32c for init HSV value + + hashCalculatingStream = new HashStreamValidator({ + crc32c, + md5, + crc32cGenerator: this.crc32cGenerator, + updateHashesOnly: true, + }); + + transformStreams.push(hashCalculatingStream); + } + pipeline( emitStream, - gzip ? zlib.createGzip() : new PassThrough(), - hashCalculatingStream, + ...(transformStreams as [Transform]), fileWriteStream, async e => { if (e) { @@ -1961,7 +1980,13 @@ class File extends ServiceObject { } try { - await this.#validateIntegrity(hashCalculatingStream, {crc32c, md5}); + if (hashCalculatingStream) { + await this.#validateIntegrity(hashCalculatingStream, { + crc32c, + md5, + }); + } + pipelineCallback(); } catch (e) { pipelineCallback(e as Error); @@ -2040,7 +2065,7 @@ class File extends ServiceObject { * const file = myBucket.file('my-file'); * * //- - * // Download a file into memory. The contents will be available as the + * // Download a file into memory. The contents will be available fhe * second * // argument in the demonstration below, `contents`. * //- diff --git a/src/resumable-upload.ts b/src/resumable-upload.ts index 647d37d8a..583bb82a1 100644 --- a/src/resumable-upload.ts +++ b/src/resumable-upload.ts @@ -143,6 +143,10 @@ export interface UploadConfig { /** * The starting byte of the upload stream, for resuming an interrupted upload. + * + * If the provided stream is resuming from where the initial stream left off, consider + * setting `resumeFromOffset: true`. + * * See * https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload#resume-upload. */ @@ -176,6 +180,16 @@ export interface UploadConfig { */ public?: boolean; + /** + * Determines if the provided stream to upload is a continuation of a previous + * upload (e.g. partial/remaining stream) or a restart of a previous upload + * (a stream containing the entire object). + * + * If this is `true` and if the `offset` is not provided, it will be calculated + * by checking the `uri`. + */ + resumeFromOffset?: boolean; + /** * If you already have a resumable URI from a previously-created resumable * upload, just pass it in here and we'll use that. @@ -331,7 +345,11 @@ export class Upload extends Writable { const autoRetry = cfg.retryOptions.autoRetry; this.uriProvidedManually = !!cfg.uri; this.uri = cfg.uri; - this.numBytesWritten = 0; + + if (cfg.resumeFromOffset && this.offset) { + this.numBytesWritten = this.offset; + } + this.numRetries = 0; // counter for number of retries currently executed if (!autoRetry) { cfg.retryOptions.maxRetries = 0; @@ -346,7 +364,7 @@ export class Upload extends Writable { this.once('writing', () => { if (this.uri) { - this.continueUploading(); + this.continueUploading(cfg.resumeFromOffset); } else { this.createURI(err => { if (err) { @@ -623,13 +641,19 @@ export class Upload extends Writable { return uri; } - private async continueUploading() { - if (typeof this.offset === 'number') { - this.startUploading(); - return; + /** + * + * @param resumeFromOffset if `true`, set `numBytesWritten` to offset + * @returns + */ + private async continueUploading(resumeFromOffset = false) { + this.offset ?? (await this.getAndSetOffset()); + + if (resumeFromOffset) { + this.numBytesWritten = this.offset ?? 0; } - await this.getAndSetOffset(); - this.startUploading(); + + return this.startUploading(); } async startUploading() { From 4d96da34c4bff85e0236d5bb51ce6e52fac47e59 Mon Sep 17 00:00:00 2001 From: Daniel Bankhead Date: Fri, 21 Apr 2023 15:21:42 -0700 Subject: [PATCH 02/21] chore: typo --- src/file.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/file.ts b/src/file.ts index 136a15a19..fd768b3af 100644 --- a/src/file.ts +++ b/src/file.ts @@ -2065,7 +2065,7 @@ class File extends ServiceObject { * const file = myBucket.file('my-file'); * * //- - * // Download a file into memory. The contents will be available fhe + * // Download a file into memory. The contents will be available as the * second * // argument in the demonstration below, `contents`. * //- From a12aede96a2b857bd85fc7420e27f0e05483e80f Mon Sep 17 00:00:00 2001 From: Daniel Bankhead Date: Tue, 17 Oct 2023 16:12:19 -0700 Subject: [PATCH 03/21] feat: WIP --- src/file.ts | 41 ++++++++++++----- src/hash-stream-validator.ts | 16 ++++--- src/resumable-upload.ts | 86 +++++++++++++++++++++--------------- test/resumable-upload.ts | 27 +++++++++++ 4 files changed, 119 insertions(+), 51 deletions(-) diff --git a/src/file.ts b/src/file.ts index 2c2f6431f..efc8b3682 100644 --- a/src/file.ts +++ b/src/file.ts @@ -72,7 +72,7 @@ import { formatAsUTCISO, PassThroughShim, } from './util'; -import {CRC32CValidatorGenerator} from './crc32c'; +import {CRC32C, CRC32CValidatorGenerator} from './crc32c'; import {HashStreamValidator} from './hash-stream-validator'; import {URL} from 'url'; @@ -210,6 +210,7 @@ export type PredefinedAcl = type PublicResumableUploadOptions = | 'chunkSize' | 'highWaterMark' + | 'isPartialUpload' | 'metadata' | 'origin' | 'offset' @@ -221,6 +222,11 @@ type PublicResumableUploadOptions = export interface CreateResumableUploadOptions extends Pick { + /** + * A CRC32C to resume from when resuming a previous upload. + * @see {@link CRC32C.from} for possible values. + */ + resumeCrc32c?: Parameters<(typeof CRC32C)['from']>[0]; preconditionOpts?: PreconditionOptions; [GCCL_GCS_CMD_KEY]?: resumableUpload.UploadConfig[typeof GCCL_GCS_CMD_KEY]; } @@ -469,6 +475,7 @@ export enum FileExceptionMessages { UPLOAD_MISMATCH = `The uploaded data did not match the data from the server. As a precaution, the file has been deleted. To be sure the content is the same, you should try uploading the file again.`, + MD5_RESUMED_UPLOAD = 'MD5 cannot be used with a resumed resumable upload as MD5 cannot be extended from an existing value', } /** @@ -1936,12 +1943,12 @@ class File extends ServiceObject { md5 = options.validation === 'md5'; } else if (options.validation === false) { crc32c = false; - } else if (options.offset) { - // TODO: check for provided offset - // if available, + md5 = false; } - // offset + if (options.offset && md5) { + throw new RangeError(FileExceptionMessages.MD5_RESUMED_UPLOAD); + } /** * A callback for determining when the underlying pipeline is complete. @@ -1981,7 +1988,7 @@ class File extends ServiceObject { fileWriteStreamMetadataReceived = true; }); - writeStream.on('writing', async () => { + const writingCallback = async () => { if (options.resumable === false) { this.startSimpleUpload_(fileWriteStream, options); } else { @@ -1995,13 +2002,20 @@ class File extends ServiceObject { let hashCalculatingStream: HashStreamValidator | null = null; if (crc32c || md5) { - // if offset - // - if md5 -> error - // - if crc32cResume -> set for init HSV value - // - if !crc32cResume -> query for current crc32c for init HSV value + let crc32cInstance: CRC32C | undefined = undefined; + + if (options.offset) { + // set the initial CRC32C value for the resumed upload. + if (options.resumeCrc32c) { + crc32cInstance = CRC32C.from(options.resumeCrc32c); + } else { + // TODO: query for current crc32c for init HSV value + } + } hashCalculatingStream = new HashStreamValidator({ crc32c, + crc32cInstance, md5, crc32cGenerator: this.crc32cGenerator, updateHashesOnly: true, @@ -2047,7 +2061,11 @@ class File extends ServiceObject { } } ); - }); + }; + + writeStream.once('writing', () => + writingCallback().catch(pipelineCallback) + ); return writeStream; } @@ -3927,6 +3945,7 @@ class File extends ServiceObject { ), file: this.name, generation: this.generation, + isPartialUpload: options.isPartialUpload, key: this.encryptionKey, kmsKeyName: this.kmsKeyName, metadata: options.metadata, diff --git a/src/hash-stream-validator.ts b/src/hash-stream-validator.ts index 300ada7d9..5e70f0f21 100644 --- a/src/hash-stream-validator.ts +++ b/src/hash-stream-validator.ts @@ -27,7 +27,9 @@ interface HashStreamValidatorOptions { crc32c: boolean; /** Enables MD5 calculation. To validate a provided value use `md5Expected`. */ md5: boolean; - /** Set a custom CRC32C generator */ + /** A CRC32C instance for validation. To validate a provided value use `crc32cExpected`. */ + crc32cInstance: CRC32CValidator; + /** Set a custom CRC32C generator. Used if `crc32cInstance` has not been provided. */ crc32cGenerator: CRC32CValidatorGenerator; /** Sets the expected CRC32C value to verify once all data has been consumed. Also sets the `crc32c` option to `true` */ crc32cExpected?: string; @@ -57,10 +59,14 @@ class HashStreamValidator extends Transform { this.md5Expected = options.md5Expected; if (this.crc32cEnabled) { - const crc32cGenerator = - options.crc32cGenerator || CRC32C_DEFAULT_VALIDATOR_GENERATOR; - - this.#crc32cHash = crc32cGenerator(); + if (options.crc32cInstance) { + this.#crc32cHash = options.crc32cInstance; + } else { + const crc32cGenerator = + options.crc32cGenerator || CRC32C_DEFAULT_VALIDATOR_GENERATOR; + + this.#crc32cHash = crc32cGenerator(); + } } if (this.md5Enabled) { diff --git a/src/resumable-upload.ts b/src/resumable-upload.ts index c64095d15..161b3700d 100644 --- a/src/resumable-upload.ts +++ b/src/resumable-upload.ts @@ -124,6 +124,14 @@ export interface UploadConfig extends Pick { */ generation?: number; + /** + * Set to `true` if the upload is only a subset of the overall object to upload. + * This can be used when planning to continue upload an object in another session. + * + * Must be used with {@link UploadConfig.chunkSize} != `0`. + */ + isPartialUpload?: boolean; + /** * A customer-supplied encryption key. See * https://cloud.google.com/storage/docs/encryption#customer-supplied. @@ -146,11 +154,9 @@ export interface UploadConfig extends Pick { /** * The starting byte of the upload stream, for resuming an interrupted upload. * - * If the provided stream is resuming from where the initial stream left off, consider - * setting `resumeFromOffset: true`. + * If the provided stream should start at the * - * See - * https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload#resume-upload. + * @see {@link https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload#resume-upload.} */ offset?: number; @@ -182,16 +188,6 @@ export interface UploadConfig extends Pick { */ public?: boolean; - /** - * Determines if the provided stream to upload is a continuation of a previous - * upload (e.g. partial/remaining stream) or a restart of a previous upload - * (a stream containing the entire object). - * - * If this is `true` and if the `offset` is not provided, it will be calculated - * by checking the `uri`. - */ - resumeFromOffset?: boolean; - /** * If you already have a resumable URI from a previously-created resumable * upload, just pass it in here and we'll use that. @@ -217,7 +213,8 @@ export interface ConfigMetadata { [key: string]: any; /** - * Set the length of the file being uploaded. + * Set the length of the object being uploaded. If uploading a partial + * object, this is the overall size of the finalized object. */ contentLength?: number; @@ -273,6 +270,8 @@ export class Upload extends Writable { contentLength: number | '*'; retryOptions: RetryOptions; timeOfFirstRequest: number; + isPartialUpload: boolean; + private currentInvocationId = { chunk: uuid.v4(), uri: uuid.v4(), @@ -301,6 +300,12 @@ export class Upload extends Writable { throw new Error('A bucket and file name are required'); } + if (cfg.offset && !cfg.uri) { + throw new RangeError( + 'Cannot provide an `offset` without providing a `uri`' + ); + } + cfg.authConfig = cfg.authConfig || {}; cfg.authConfig.scopes = [ 'https://www.googleapis.com/auth/devstorage.full_control', @@ -336,6 +341,7 @@ export class Upload extends Writable { this.userProject = cfg.userProject; this.chunkSize = cfg.chunkSize; this.retryOptions = cfg.retryOptions; + this.isPartialUpload = cfg.isPartialUpload ?? false; if (cfg.key) { const base64Key = Buffer.from(cfg.key).toString('base64'); @@ -353,7 +359,8 @@ export class Upload extends Writable { this.uriProvidedManually = !!cfg.uri; this.uri = cfg.uri; - if (cfg.resumeFromOffset && this.offset) { + if (this.offset) { + // we're resuming an incomplete upload this.numBytesWritten = this.offset; } @@ -373,7 +380,7 @@ export class Upload extends Writable { this.once('writing', () => { if (this.uri) { - this.continueUploading(cfg.resumeFromOffset); + this.continueUploading(); } else { this.createURI(err => { if (err) { @@ -491,7 +498,6 @@ export class Upload extends Writable { * Retrieves data from upstream's buffer. * * @param limit The maximum amount to return from the buffer. - * @returns The data requested. */ private *pullFromChunkBuffer(limit: number) { while (limit) { @@ -698,21 +704,13 @@ export class Upload extends Writable { this.uri = uri; this.offset = 0; + return uri; } - /** - * - * @param resumeFromOffset if `true`, set `numBytesWritten` to offset - * @returns - */ - private async continueUploading(resumeFromOffset = false) { + private async continueUploading() { this.offset ?? (await this.getAndSetOffset()); - if (resumeFromOffset) { - this.numBytesWritten = this.offset ?? 0; - } - return this.startUploading(); } @@ -820,10 +818,11 @@ export class Upload extends Writable { // https://cloud.google.com/storage/docs/performing-resumable-uploads#chunked-upload for await (const chunk of this.upstreamIterator(expectedUploadSize)) { // This will conveniently track and keep the size of the buffers + // We will hit either the expected upload size or the remainder this.#addLocalBufferCache(chunk); } - // We hit either the expected upload size or the remainder + // This is the sum from the `#addLocalBufferCache` calls const bytesToUpload = this.localWriteCacheByteLength; // Important: we want to know if the upstream has ended and the queue is empty before @@ -835,7 +834,11 @@ export class Upload extends Writable { let totalObjectSize = this.contentLength; - if (typeof this.contentLength !== 'number' && isLastChunkOfUpload) { + if ( + typeof this.contentLength !== 'number' && + isLastChunkOfUpload && + !this.isPartialUpload + ) { // Let's let the server know this is the last chunk since // we didn't know the content-length beforehand. totalObjectSize = bytesToUpload + this.numBytesWritten; @@ -865,7 +868,7 @@ export class Upload extends Writable { const resp = await this.makeRequestStream(reqOpts); if (resp) { responseReceived = true; - this.responseHandler(resp); + await this.responseHandler(resp); } } catch (e) { const err = e as ApiError; @@ -884,7 +887,7 @@ export class Upload extends Writable { // Process the API response to look for errors that came in // the response body. - private responseHandler(resp: GaxiosResponse) { + private async responseHandler(resp: GaxiosResponse) { if (resp.data.error) { this.destroy(resp.data.error); return; @@ -893,10 +896,20 @@ export class Upload extends Writable { // At this point we can safely create a new id for the chunk this.currentInvocationId.chunk = uuid.v4(); + const moreDataToUpload = await this.waitForNextChunk(); + const shouldContinueWithNextMultiChunkRequest = this.chunkSize && resp.status === RESUMABLE_INCOMPLETE_STATUS_CODE && - resp.headers.range; + resp.headers.range && + moreDataToUpload; + + /** + * This is true when we're expecting to upload more data, yet + * + */ + const shouldContinueUploadInAnotherRequest = + resp.status === RESUMABLE_INCOMPLETE_STATUS_CODE && !moreDataToUpload; if (shouldContinueWithNextMultiChunkRequest) { // Use the upper value in this header to determine where to start the next chunk. @@ -921,7 +934,10 @@ export class Upload extends Writable { // continue uploading next chunk this.continueUploading(); - } else if (!this.isSuccessfulResponse(resp.status)) { + } else if ( + !this.isSuccessfulResponse(resp.status) && + !shouldContinueUploadInAnotherRequest + ) { const err: ApiError = new Error('Upload failed'); err.code = resp.status; err.name = 'Upload failed'; @@ -1147,7 +1163,7 @@ export class Upload extends Writable { * Check if a given status code is 2xx * * @param status The status code to check - * @returns if the status is 2xx + * @returns {boolean} if the status is 2xx */ public isSuccessfulResponse(status: number): boolean { return status >= 200 && status < 300; diff --git a/test/resumable-upload.ts b/test/resumable-upload.ts index 326a63fc3..0d5b89197 100644 --- a/test/resumable-upload.ts +++ b/test/resumable-upload.ts @@ -240,6 +240,7 @@ describe('resumable-upload', () => { bucket: BUCKET, file: FILE, offset, + uri: 'https://example.com', retryOptions: RETRY_OPTIONS, }); @@ -1288,6 +1289,7 @@ describe('resumable-upload', () => { assert.strictEqual(body, BODY); done(); }); + up.upstreamEnded = true; up.responseHandler(RESP); }); @@ -1302,6 +1304,7 @@ describe('resumable-upload', () => { assert.strictEqual(typeof data.size, 'number'); done(); }); + up.upstreamEnded = true; up.responseHandler(RESP); }); @@ -1314,6 +1317,7 @@ describe('resumable-upload', () => { assert.strictEqual(err, RESP.data.error); done(); }; + up.upstreamEnded = true; up.responseHandler(RESP); }); @@ -1325,6 +1329,7 @@ describe('resumable-upload', () => { assert.strictEqual(err.message, 'Upload failed'); done(); }; + up.upstreamEnded = true; up.responseHandler(RESP); }); @@ -1340,6 +1345,7 @@ describe('resumable-upload', () => { }; up.chunkSize = 1; + up.writeBuffers = [Buffer.alloc(0)]; up.continueUploading = () => { assert.equal(up.offset, lastByteReceived + 1); @@ -1350,6 +1356,25 @@ describe('resumable-upload', () => { up.responseHandler(RESP); }); + it('should not continue with multi-chunk upload when incomplete if the upstream has finished', done => { + const lastByteReceived = 9; + + const RESP = { + data: '', + status: RESUMABLE_INCOMPLETE_STATUS_CODE, + headers: { + range: `bytes=0-${lastByteReceived}`, + }, + }; + + up.chunkSize = 1; + up.upstreamEnded = true; + + up.on('uploadFinished', done); + + up.responseHandler(RESP); + }); + it('should unshift missing data if server did not receive the entire chunk', done => { const NUM_BYTES_WRITTEN = 20; const LAST_CHUNK_LENGTH = 256; @@ -1401,6 +1426,8 @@ describe('resumable-upload', () => { it('currentInvocationId.chunk should be different after success', done => { const beforeCallInvocationId = up.currentInvocationId.chunk; const RESP = {data: '', status: 200}; + up.upstreamEnded = true; + up.on('uploadFinished', () => { assert.notEqual(beforeCallInvocationId, up.currentInvocationId.chunk); done(); From bf88a05fbaed5359417e7dfac1a8e24de2c9283b Mon Sep 17 00:00:00 2001 From: Daniel Bankhead Date: Wed, 18 Oct 2023 12:25:39 -0700 Subject: [PATCH 04/21] docs: `returns` clean up --- src/resumable-upload.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/resumable-upload.ts b/src/resumable-upload.ts index 161b3700d..cdbd38a1a 100644 --- a/src/resumable-upload.ts +++ b/src/resumable-upload.ts @@ -1133,7 +1133,7 @@ export class Upload extends Writable { } /** - * @returns {number} the amount of time to wait before retrying the request + * @returns the amount of time to wait before retrying the request */ private getRetryDelay(): number { const randomMs = Math.round(Math.random() * 1000); @@ -1163,7 +1163,7 @@ export class Upload extends Writable { * Check if a given status code is 2xx * * @param status The status code to check - * @returns {boolean} if the status is 2xx + * @returns if the status is 2xx */ public isSuccessfulResponse(status: number): boolean { return status >= 200 && status < 300; From 8c20671112f8b6895ed1a46ca6b35d27a34205d5 Mon Sep 17 00:00:00 2001 From: Daniel Bankhead Date: Wed, 18 Oct 2023 13:34:16 -0700 Subject: [PATCH 05/21] fix: missing `gzip` --- src/file.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/file.ts b/src/file.ts index efc8b3682..e5be65432 100644 --- a/src/file.ts +++ b/src/file.ts @@ -1997,7 +1997,7 @@ class File extends ServiceObject { const transformStreams: Transform[] = []; if (gzip) { - transformStreams.push(new PassThrough()); + transformStreams.push(zlib.createGzip()); } let hashCalculatingStream: HashStreamValidator | null = null; From 444d721948979dd586c35cfe808d35f6f52210ea Mon Sep 17 00:00:00 2001 From: Daniel Bankhead Date: Thu, 19 Oct 2023 11:33:02 -0700 Subject: [PATCH 06/21] test: validate `createURI` --- system-test/kitchen.ts | 35 ++++++++++++++++++++++++----------- 1 file changed, 24 insertions(+), 11 deletions(-) diff --git a/system-test/kitchen.ts b/system-test/kitchen.ts index cbeb1c8ad..67a83c645 100644 --- a/system-test/kitchen.ts +++ b/system-test/kitchen.ts @@ -20,7 +20,12 @@ import * as tmp from 'tmp'; import * as crypto from 'crypto'; import * as os from 'os'; import {Readable} from 'stream'; -import {createURI, ErrorWithCode, upload} from '../src/resumable-upload'; +import { + checkUploadStatus, + createURI, + ErrorWithCode, + upload, +} from '../src/resumable-upload'; import { RETRY_DELAY_MULTIPLIER_DEFAULT, TOTAL_TIMEOUT_DEFAULT, @@ -146,16 +151,24 @@ describe('resumable-upload', () => { }); }); - it('should just make an upload URI', done => { - createURI( - { - bucket: bucketName, - file: filePath, - metadata: {contentType: 'image/jpg'}, - retryOptions: retryOptions, - }, - done - ); + it('should create an upload URI', async () => { + const uri = await createURI({ + bucket: bucketName, + file: filePath, + metadata: {contentType: 'image/jpg'}, + retryOptions: retryOptions, + }); + + const resp = await checkUploadStatus({ + bucket: bucketName, + file: filePath, + metadata: {contentType: 'image/jpg'}, + retryOptions: retryOptions, + uri, + }); + + assert(!resp.data); + assert.equal(resp.headers['content-length'], 0); }); it('should return a non-resumable failed upload', done => { From f54b25f36438c69cf7017f6d46867206cc2d0387 Mon Sep 17 00:00:00 2001 From: Daniel Bankhead Date: Thu, 19 Oct 2023 11:40:58 -0700 Subject: [PATCH 07/21] feat: check for invalid offset when `resumeCRC32C` has not been provided --- src/file.ts | 68 ++++++++++++++++++++++++++--------- src/resumable-upload.ts | 79 ++++++++++++++++++++++++++++++++++------- 2 files changed, 117 insertions(+), 30 deletions(-) diff --git a/src/file.ts b/src/file.ts index e5be65432..8254ed903 100644 --- a/src/file.ts +++ b/src/file.ts @@ -226,7 +226,7 @@ export interface CreateResumableUploadOptions * A CRC32C to resume from when resuming a previous upload. * @see {@link CRC32C.from} for possible values. */ - resumeCrc32c?: Parameters<(typeof CRC32C)['from']>[0]; + resumeCRC32C?: Parameters<(typeof CRC32C)['from']>[0]; preconditionOpts?: PreconditionOptions; [GCCL_GCS_CMD_KEY]?: resumableUpload.UploadConfig[typeof GCCL_GCS_CMD_KEY]; } @@ -478,6 +478,14 @@ export enum FileExceptionMessages { MD5_RESUMED_UPLOAD = 'MD5 cannot be used with a resumed resumable upload as MD5 cannot be extended from an existing value', } +const RESUME_RESUMABLE_UPLOAD_OFFSET_ERROR = ( + actualOffset: number, + providedOffset: number +) => + `The server has ${actualOffset} bytes and while the provided offset is ${providedOffset} - thus ${ + providedOffset - actualOffset + } bytes are missing. Stopping as this could result in data loss. Initiate a new upload to continue.`; + /** * A File object is created from your {@link Bucket} object using * {@link Bucket#file}. @@ -2001,15 +2009,37 @@ class File extends ServiceObject { } let hashCalculatingStream: HashStreamValidator | null = null; + if (crc32c || md5) { let crc32cInstance: CRC32C | undefined = undefined; if (options.offset) { // set the initial CRC32C value for the resumed upload. - if (options.resumeCrc32c) { - crc32cInstance = CRC32C.from(options.resumeCrc32c); + if (options.resumeCRC32C) { + crc32cInstance = CRC32C.from(options.resumeCRC32C); } else { - // TODO: query for current crc32c for init HSV value + const resp = + await this.#createResumableUpload(options).checkUploadStatus(); + + if (resp.data?.crc32c) { + crc32cInstance = CRC32C.from(resp.data.crc32c); + } + + // check if the offset provided is higher than `options.offset` to + // avoid any data loss, otherwise we would have a strange, difficult- + // to-debug validation error later when the upload has completed. + if (typeof resp.headers.range === 'string') { + const actualOffset = Number(resp.headers.range.split('-')[1]) + 1; + + if (actualOffset < options.offset) { + throw new RangeError( + RESUME_RESUMABLE_UPLOAD_OFFSET_ERROR( + actualOffset, + options.offset + ) + ); + } + } } } @@ -3912,18 +3942,7 @@ class File extends ServiceObject { this.bucket.setUserProject.call(this, userProject); } - /** - * This creates a resumable-upload upload stream. - * - * @param {Duplexify} stream - Duplexify stream of data to pipe to the file. - * @param {object=} options - Configuration object. - * - * @private - */ - startResumableUpload_( - dup: Duplexify, - options: CreateResumableUploadOptions = {} - ): void { + #createResumableUpload(options: CreateResumableUploadOptions = {}) { options.metadata ??= {}; const retryOptions = this.storage.retryOptions; @@ -3935,7 +3954,7 @@ class File extends ServiceObject { retryOptions.autoRetry = false; } - const uploadStream = resumableUpload.upload({ + return resumableUpload.upload({ authClient: this.storage.authClient, apiEndpoint: this.storage.apiEndpoint, bucket: this.bucket.name, @@ -3961,6 +3980,21 @@ class File extends ServiceObject { highWaterMark: options?.highWaterMark, [GCCL_GCS_CMD_KEY]: options[GCCL_GCS_CMD_KEY], }); + } + + /** + * This creates a resumable-upload upload stream. + * + * @param {Duplexify} stream - Duplexify stream of data to pipe to the file. + * @param {object=} options - Configuration object. + * + * @private + */ + startResumableUpload_( + dup: Duplexify, + options: CreateResumableUploadOptions = {} + ): void { + const uploadStream = this.#createResumableUpload(options); uploadStream .on('response', resp => { diff --git a/src/resumable-upload.ts b/src/resumable-upload.ts index cdbd38a1a..6e22f2cb8 100644 --- a/src/resumable-upload.ts +++ b/src/resumable-upload.ts @@ -28,6 +28,7 @@ import {RetryOptions, PreconditionOptions} from './storage'; import * as uuid from 'uuid'; import {getRuntimeTrackingString} from './util'; import {GCCL_GCS_CMD_KEY} from './nodejs-common/util'; +import {FileMetadata} from './file'; const NOT_FOUND_STATUS_CODE = 404; const RESUMABLE_INCOMPLETE_STATUS_CODE = 308; @@ -42,7 +43,6 @@ export interface ErrorWithCode extends Error { } export type CreateUriCallback = (err: Error | null, uri?: string) => void; - export interface Encryption { key: {}; hash: {}; @@ -233,6 +233,15 @@ export interface ApiError extends Error { errors?: GoogleInnerError[]; } +export interface CheckUploadStatusConfig { + /** + * Set to `false` to disable retries within this method. + * + * @defaultValue `true` + */ + retry?: boolean; +} + export class Upload extends Writable { bucket: string; file: string; @@ -273,9 +282,9 @@ export class Upload extends Writable { isPartialUpload: boolean; private currentInvocationId = { + checkUploadStatus: uuid.v4(), chunk: uuid.v4(), uri: uuid.v4(), - offset: uuid.v4(), }; /** * A cache of buffers written to this instance, ready for consuming @@ -905,11 +914,13 @@ export class Upload extends Writable { moreDataToUpload; /** - * This is true when we're expecting to upload more data, yet - * + * This is true when we're expecting to upload more data, yet the upstream + * has been exhausted. */ const shouldContinueUploadInAnotherRequest = - resp.status === RESUMABLE_INCOMPLETE_STATUS_CODE && !moreDataToUpload; + this.isPartialUpload && + resp.status === RESUMABLE_INCOMPLETE_STATUS_CODE && + !moreDataToUpload; if (shouldContinueWithNextMultiChunkRequest) { // Use the upper value in this header to determine where to start the next chunk. @@ -961,10 +972,12 @@ export class Upload extends Writable { } } - private async getAndSetOffset() { + async checkUploadStatus( + config: CheckUploadStatusConfig = {} + ): Promise> { let googAPIClient = `${getRuntimeTrackingString()} gccl/${ packageJson.version - } gccl-invocation-id/${this.currentInvocationId.offset}`; + } gccl-invocation-id/${this.currentInvocationId.checkUploadStatus}`; if (this.#gcclGcsCmd) { googAPIClient += ` gccl-gcs-cmd/${this.#gcclGcsCmd}`; @@ -972,21 +985,50 @@ export class Upload extends Writable { const opts: GaxiosOptions = { method: 'PUT', - url: this.uri!, + url: this.uri, headers: { 'Content-Length': 0, 'Content-Range': 'bytes */*', 'x-goog-api-client': googAPIClient, }, }; + try { const resp = await this.makeRequest(opts); + // Successfully got the offset we can now create a new offset invocation id - this.currentInvocationId.offset = uuid.v4(); + this.currentInvocationId.checkUploadStatus = uuid.v4(); + + return resp; + } catch (e) { + if ( + config.retry === false || + !(e instanceof Error) || + !this.retryOptions.retryableErrorFn!(e) + ) { + throw e; + } + + const retryDelay = this.getRetryDelay(); + + if (retryDelay <= 0) { + throw e; + } + + await new Promise(res => setTimeout(res, retryDelay)); + + return this.checkUploadStatus(config); + } + } + + private async getAndSetOffset() { + try { + // we want to handle retries in this method. + const resp = await this.checkUploadStatus({retry: false}); + if (resp.status === RESUMABLE_INCOMPLETE_STATUS_CODE) { - if (resp.headers.range) { - const range = resp.headers.range as string; - this.offset = Number(range.split('-')[1]) + 1; + if (typeof resp.headers.range === 'string') { + this.offset = Number(resp.headers.range.split('-')[1]) + 1; return; } } @@ -1133,7 +1175,10 @@ export class Upload extends Writable { } /** - * @returns the amount of time to wait before retrying the request + * The amount of time to wait before retrying the request, in milliseconds. + * If negative, do not retry. + * + * @returns the amount of time to wait, in milliseconds. */ private getRetryDelay(): number { const randomMs = Math.round(Math.random() * 1000); @@ -1186,3 +1231,11 @@ export function createURI( } up.createURI().then(r => callback(null, r), callback); } + +export function checkUploadStatus( + cfg: UploadConfig & Required> +) { + const up = new Upload(cfg); + + return up.checkUploadStatus(); +} From 64a0bfe4fcea5a8d54b188cde0353d75396ce8a3 Mon Sep 17 00:00:00 2001 From: Daniel Bankhead Date: Thu, 19 Oct 2023 11:41:43 -0700 Subject: [PATCH 08/21] fix: type --- system-test/kitchen.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/system-test/kitchen.ts b/system-test/kitchen.ts index 67a83c645..e20bd42e6 100644 --- a/system-test/kitchen.ts +++ b/system-test/kitchen.ts @@ -168,7 +168,7 @@ describe('resumable-upload', () => { }); assert(!resp.data); - assert.equal(resp.headers['content-length'], 0); + assert.equal(resp.headers['content-length'], '0'); }); it('should return a non-resumable failed upload', done => { From cf6b21df78a84975181ca45e948b4222716df322 Mon Sep 17 00:00:00 2001 From: Daniel Bankhead Date: Thu, 19 Oct 2023 12:20:35 -0700 Subject: [PATCH 09/21] chore: lint --- system-test/kitchen.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/system-test/kitchen.ts b/system-test/kitchen.ts index e20bd42e6..a3ffed2df 100644 --- a/system-test/kitchen.ts +++ b/system-test/kitchen.ts @@ -148,7 +148,7 @@ describe('resumable-upload', () => { ); }); }); - }); + }).catch(done); }); it('should create an upload URI', async () => { From 90dea2d7799eec4ff6e443eee5c8ec42af609c11 Mon Sep 17 00:00:00 2001 From: Daniel Bankhead Date: Thu, 19 Oct 2023 12:24:05 -0700 Subject: [PATCH 10/21] test: lint + cleanup --- test/resumable-upload.ts | 47 +++++++++++++++++++++++++++++++++------- 1 file changed, 39 insertions(+), 8 deletions(-) diff --git a/test/resumable-upload.ts b/test/resumable-upload.ts index 0d5b89197..61e962e8c 100644 --- a/test/resumable-upload.ts +++ b/test/resumable-upload.ts @@ -1356,7 +1356,7 @@ describe('resumable-upload', () => { up.responseHandler(RESP); }); - it('should not continue with multi-chunk upload when incomplete if the upstream has finished', done => { + it('should not continue with multi-chunk upload when incomplete if a partial upload has finished', done => { const lastByteReceived = 9; const RESP = { @@ -1369,12 +1369,36 @@ describe('resumable-upload', () => { up.chunkSize = 1; up.upstreamEnded = true; + up.isPartialUpload = true; up.on('uploadFinished', done); up.responseHandler(RESP); }); + it('should error when upload is incomplete and the upstream is not a partial upload', done => { + const lastByteReceived = 9; + + const RESP = { + data: '', + status: RESUMABLE_INCOMPLETE_STATUS_CODE, + headers: { + range: `bytes=0-${lastByteReceived}`, + }, + }; + + up.chunkSize = 1; + up.upstreamEnded = true; + + up.on('error', (e: Error) => { + assert.match(e.message, /Upload failed/); + + done(); + }); + + up.responseHandler(RESP); + }); + it('should unshift missing data if server did not receive the entire chunk', done => { const NUM_BYTES_WRITTEN = 20; const LAST_CHUNK_LENGTH = 256; @@ -1446,25 +1470,32 @@ describe('resumable-upload', () => { }); }); - it('currentInvocationId.offset should be different after success', async () => { - const beforeCallInvocationId = up.currentInvocationId.offset; + it('currentInvocationId.checkUploadStatus should be different after success', async () => { + const beforeCallInvocationId = up.currentInvocationId.checkUploadStatus; up.makeRequest = () => { return {}; }; await up.getAndSetOffset(); - assert.notEqual(beforeCallInvocationId, up.currentInvocationId.offset); + assert.notEqual( + beforeCallInvocationId, + up.currentInvocationId.checkUploadStatus + ); }); - it('currentInvocationId.offset should be the same on error', async done => { - const beforeCallInvocationId = up.currentInvocationId.offset; + it('currentInvocationId.checkUploadStatus should be the same on error', done => { + const beforeCallInvocationId = up.currentInvocationId.checkUploadStatus; up.destroy = () => { - assert.equal(beforeCallInvocationId, up.currentInvocationId.offset); + assert.equal( + beforeCallInvocationId, + up.currentInvocationId.checkUploadStatus + ); done(); }; up.makeRequest = () => { throw new Error() as GaxiosError; }; - await up.getAndSetOffset(); + + up.getAndSetOffset().catch(done); }); describe('#getAndSetOffset', () => { From 621b966b0d7871b13cad89521bc141fa7e58ccf2 Mon Sep 17 00:00:00 2001 From: Daniel Bankhead Date: Thu, 19 Oct 2023 12:31:29 -0700 Subject: [PATCH 11/21] chore: lint --- src/file.ts | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/src/file.ts b/src/file.ts index 8254ed903..e3d0dc3fe 100644 --- a/src/file.ts +++ b/src/file.ts @@ -29,14 +29,7 @@ import * as crypto from 'crypto'; import * as fs from 'fs'; import * as mime from 'mime'; import * as resumableUpload from './resumable-upload'; -import { - Writable, - Readable, - pipeline, - Transform, - PassThrough, - PipelineSource, -} from 'stream'; +import {Writable, Readable, pipeline, Transform, PipelineSource} from 'stream'; import * as zlib from 'zlib'; import * as http from 'http'; From 4c33f3890a8c37dc8dc209f885db26d8a223cfa3 Mon Sep 17 00:00:00 2001 From: Daniel Bankhead Date: Thu, 19 Oct 2023 13:30:38 -0700 Subject: [PATCH 12/21] feat: emit `uri` when created --- src/file.ts | 24 +++++++++++++++--- src/resumable-upload.ts | 55 ++++++++++++++++++++++++++++++++++------- 2 files changed, 67 insertions(+), 12 deletions(-) diff --git a/src/file.ts b/src/file.ts index e3d0dc3fe..098a702e9 100644 --- a/src/file.ts +++ b/src/file.ts @@ -216,7 +216,9 @@ type PublicResumableUploadOptions = export interface CreateResumableUploadOptions extends Pick { /** - * A CRC32C to resume from when resuming a previous upload. + * An optional CRC32C to resume from when resuming a previous upload. + * When not provided the current CRC32C will be fetched from GCS. + * * @see {@link CRC32C.from} for possible values. */ resumeCRC32C?: Parameters<(typeof CRC32C)['from']>[0]; @@ -1840,8 +1842,8 @@ class File extends ServiceObject { * NOTE: Writable streams will emit the `finish` event when the file is fully * uploaded. * - * See {@link https://cloud.google.com/storage/docs/json_api/v1/how-tos/upload| Upload Options (Simple or Resumable)} - * See {@link https://cloud.google.com/storage/docs/json_api/v1/objects/insert| Objects: insert API Documentation} + * See {@link https://cloud.google.com/storage/docs/json_api/v1/how-tos/upload Upload Options (Simple or Resumable)} + * See {@link https://cloud.google.com/storage/docs/json_api/v1/objects/insert Objects: insert API Documentation} * * @param {CreateWriteStreamOptions} [options] Configuration options. * @returns {WritableStream} @@ -1906,6 +1908,18 @@ class File extends ServiceObject { * // The file upload is complete. * }); * ``` + * + * //- + * //

Resuming a Resumable Upload

+ * // + * // One can capture a `uri` from a resumable upload to reuse later. + * //- + * let uri: string | undefined = undefined; + * + * fs.createWriteStream().on('uri', (link) => {uri = link}); + * + * // later... + * fs.createWriteStream({uri}); */ // eslint-disable-next-line @typescript-eslint/no-explicit-any createWriteStream(options: CreateWriteStreamOptions = {}): Writable { @@ -1983,6 +1997,7 @@ class File extends ServiceObject { // Handing off emitted events to users emitStream.on('reading', () => writeStream.emit('reading')); emitStream.on('writing', () => writeStream.emit('writing')); + fileWriteStream.on('uri', evt => writeStream.emit('uri', evt)); fileWriteStream.on('progress', evt => writeStream.emit('progress', evt)); fileWriteStream.on('response', resp => writeStream.emit('response', resp)); fileWriteStream.once('metadata', () => { @@ -3993,6 +4008,9 @@ class File extends ServiceObject { .on('response', resp => { dup.emit('response', resp); }) + .on('uri', uri => { + dup.emit('uri', uri); + }) .on('metadata', metadata => { this.metadata = metadata; dup.emit('metadata'); diff --git a/src/resumable-upload.ts b/src/resumable-upload.ts index 6e22f2cb8..e15f25ebf 100644 --- a/src/resumable-upload.ts +++ b/src/resumable-upload.ts @@ -128,7 +128,12 @@ export interface UploadConfig extends Pick { * Set to `true` if the upload is only a subset of the overall object to upload. * This can be used when planning to continue upload an object in another session. * - * Must be used with {@link UploadConfig.chunkSize} != `0`. + * **Must be used with {@link UploadConfig.chunkSize} != `0`**. + * + * If this is a continuation of a previous upload, {@link UploadConfig.offset} + * should be set. + * + * @see {@link checkUploadStatus} for checking the status of an existing upload. */ isPartialUpload?: boolean; @@ -152,10 +157,17 @@ export interface UploadConfig extends Pick { metadata?: ConfigMetadata; /** - * The starting byte of the upload stream, for resuming an interrupted upload. + * The starting byte in relation to the final uploaded object. + * **Must be used with {@link UploadConfig.uri}**. * - * If the provided stream should start at the + * If resuming an interrupted stream, do not supply this argument unless you + * know the exact number of bytes the service has AND the provided stream's + * beginning is a continuation from that provided offset. If resuming an + * interrupted stream and this option has not been provided, we will treat + * the provided upload stream as the object to upload; skipping any bytes + * that are already on the server. * + * @see {@link checkUploadStatus} for checking the status of an existing upload. * @see {@link https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload#resume-upload.} */ offset?: number; @@ -191,6 +203,12 @@ export interface UploadConfig extends Pick { /** * If you already have a resumable URI from a previously-created resumable * upload, just pass it in here and we'll use that. + * + * If resuming an interrupted stream and the {@link UploadConfig.offset} + * option has not been provided, we will treat the provided upload stream as + * the object to upload; skipping any bytes that are already on the server. + * + * @see {@link checkUploadStatus} for checking the status of an existing upload. */ uri?: string; @@ -315,6 +333,12 @@ export class Upload extends Writable { ); } + if (cfg.isPartialUpload && !cfg.chunkSize) { + throw new RangeError( + 'Cannot set `isPartialUpload` without providing a `chunkSize`' + ); + } + cfg.authConfig = cfg.authConfig || {}; cfg.authConfig.scopes = [ 'https://www.googleapis.com/auth/devstorage.full_control', @@ -714,6 +738,8 @@ export class Upload extends Writable { this.uri = uri; this.offset = 0; + this.emit('uri', uri); + return uri; } @@ -826,8 +852,8 @@ export class Upload extends Writable { // We need to know how much data is available upstream to set the `Content-Range` header. // https://cloud.google.com/storage/docs/performing-resumable-uploads#chunked-upload for await (const chunk of this.upstreamIterator(expectedUploadSize)) { - // This will conveniently track and keep the size of the buffers - // We will hit either the expected upload size or the remainder + // This will conveniently track and keep the size of the buffers. + // We will reach either the expected upload size or the remainder of the stream. this.#addLocalBufferCache(chunk); } @@ -848,8 +874,7 @@ export class Upload extends Writable { isLastChunkOfUpload && !this.isPartialUpload ) { - // Let's let the server know this is the last chunk since - // we didn't know the content-length beforehand. + // Let's let the server know this is the last chunk of the object since we didn't set it before. totalObjectSize = bytesToUpload + this.numBytesWritten; } @@ -914,8 +939,8 @@ export class Upload extends Writable { moreDataToUpload; /** - * This is true when we're expecting to upload more data, yet the upstream - * has been exhausted. + * This is true when we're expecting to upload more data in a future request, + * yet the upstream for the upload session has been exhausted. */ const shouldContinueUploadInAnotherRequest = this.isPartialUpload && @@ -972,6 +997,12 @@ export class Upload extends Writable { } } + /** + * Check the status of an existing resumable upload. + * + * @param cfg A configuration to use. `uri` is required. + * @returns the current upload status + */ async checkUploadStatus( config: CheckUploadStatusConfig = {} ): Promise> { @@ -1232,6 +1263,12 @@ export function createURI( up.createURI().then(r => callback(null, r), callback); } +/** + * Check the status of an existing resumable upload. + * + * @param cfg A configuration to use. `uri` is required. + * @returns the current upload status + */ export function checkUploadStatus( cfg: UploadConfig & Required> ) { From ae2402de1fc802c85c9f59a0c97a0181e6ed2b3b Mon Sep 17 00:00:00 2001 From: Daniel Bankhead Date: Thu, 19 Oct 2023 13:55:14 -0700 Subject: [PATCH 13/21] docs: clarify --- src/resumable-upload.ts | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/src/resumable-upload.ts b/src/resumable-upload.ts index e15f25ebf..fd1c73e2e 100644 --- a/src/resumable-upload.ts +++ b/src/resumable-upload.ts @@ -126,7 +126,8 @@ export interface UploadConfig extends Pick { /** * Set to `true` if the upload is only a subset of the overall object to upload. - * This can be used when planning to continue upload an object in another session. + * This can be used when planning to continue the upload an object in another + * session. * * **Must be used with {@link UploadConfig.chunkSize} != `0`**. * @@ -162,10 +163,11 @@ export interface UploadConfig extends Pick { * * If resuming an interrupted stream, do not supply this argument unless you * know the exact number of bytes the service has AND the provided stream's - * beginning is a continuation from that provided offset. If resuming an + * first byte is a continuation from that provided offset. If resuming an * interrupted stream and this option has not been provided, we will treat - * the provided upload stream as the object to upload; skipping any bytes - * that are already on the server. + * the provided upload stream as the object to upload - where the first byte + * of the upload stream is the first byte of the object to upload; skipping + * any bytes that are already present on the server. * * @see {@link checkUploadStatus} for checking the status of an existing upload. * @see {@link https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload#resume-upload.} @@ -206,7 +208,9 @@ export interface UploadConfig extends Pick { * * If resuming an interrupted stream and the {@link UploadConfig.offset} * option has not been provided, we will treat the provided upload stream as - * the object to upload; skipping any bytes that are already on the server. + * the object to upload - where the first byte of the upload stream is the + * first byte of the object to upload; skipping any bytes that are already + * present on the server. * * @see {@link checkUploadStatus} for checking the status of an existing upload. */ From 7342f9635baafb23c46ef76a9980a1541625ebb7 Mon Sep 17 00:00:00 2001 From: Daniel Bankhead Date: Thu, 19 Oct 2023 14:05:28 -0700 Subject: [PATCH 14/21] test: Make the interrupt test actually test something --- system-test/kitchen.ts | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/system-test/kitchen.ts b/system-test/kitchen.ts index a3ffed2df..e70314acf 100644 --- a/system-test/kitchen.ts +++ b/system-test/kitchen.ts @@ -100,6 +100,7 @@ describe('resumable-upload', () => { assert.ifError(err); const size = fd.size; + let uri: string | undefined = undefined; // eslint-disable-next-line @typescript-eslint/no-explicit-any type DoUploadCallback = (...args: any[]) => void; @@ -111,12 +112,17 @@ describe('resumable-upload', () => { let destroyed = false; const ws = upload({ + uri, bucket: bucketName, file: filePath, metadata: {contentType: 'image/jpg'}, retryOptions: retryOptions, }); + ws.on('uri', (link: string) => { + uri = link; + }); + fs.createReadStream(filePath) .on('error', callback) .on('data', function (this: Readable, chunk) { @@ -141,6 +147,7 @@ describe('resumable-upload', () => { {interrupt: false}, (err: Error, metadata: {size: number}) => { assert.ifError(err); + assert(uri); assert.strictEqual(metadata.size, size); assert.strictEqual(typeof metadata.size, 'number'); done(); From 60abe35e20faf01711e329793e68cce61dc75061 Mon Sep 17 00:00:00 2001 From: Daniel Bankhead Date: Thu, 19 Oct 2023 18:05:53 -0700 Subject: [PATCH 15/21] feat: expose the `crc32c` if available --- src/hash-stream-validator.ts | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/hash-stream-validator.ts b/src/hash-stream-validator.ts index 5e70f0f21..281970626 100644 --- a/src/hash-stream-validator.ts +++ b/src/hash-stream-validator.ts @@ -74,6 +74,10 @@ class HashStreamValidator extends Transform { } } + get crc32c() { + return this.#crc32cHash?.toString(); + } + _flush(callback: (error?: Error | null | undefined) => void) { if (this.#md5Hash) { this.#md5Digest = this.#md5Hash.digest('base64'); From 64cfcdb953018470adb3d71f2ff905943c5834f1 Mon Sep 17 00:00:00 2001 From: Daniel Bankhead Date: Thu, 19 Oct 2023 18:17:39 -0700 Subject: [PATCH 16/21] refactor: clean-up --- src/file.ts | 125 ++++++++++++++++++++++------------------------------ 1 file changed, 53 insertions(+), 72 deletions(-) diff --git a/src/file.ts b/src/file.ts index 86b83b7ff..81fd8ae30 100644 --- a/src/file.ts +++ b/src/file.ts @@ -216,8 +216,8 @@ type PublicResumableUploadOptions = export interface CreateResumableUploadOptions extends Pick { /** - * An optional CRC32C to resume from when resuming a previous upload. - * When not provided the current CRC32C will be fetched from GCS. + * A CRC32C to resume from when resuming a previous upload. + * This is **required** when validating a final portion of a resumed upload. * * @see {@link CRC32C.from} for possible values. */ @@ -471,16 +471,9 @@ export enum FileExceptionMessages { As a precaution, the file has been deleted. To be sure the content is the same, you should try uploading the file again.`, MD5_RESUMED_UPLOAD = 'MD5 cannot be used with a resumed resumable upload as MD5 cannot be extended from an existing value', + MISSING_RESUME_CRC32C_FINAL_UPLOAD = 'The CRC32C is missing for the final portion of a resumed upload, which is required for validation. Please provide `resumeCRC32C` if validation is required, or disable `validation`.', } -const RESUME_RESUMABLE_UPLOAD_OFFSET_ERROR = ( - actualOffset: number, - providedOffset: number -) => - `The server has ${actualOffset} bytes and while the provided offset is ${providedOffset} - thus ${ - providedOffset - actualOffset - } bytes are missing. Stopping as this could result in data loss. Initiate a new upload to continue.`; - /** * A File object is created from your {@link Bucket} object using * {@link Bucket#file}. @@ -1920,13 +1913,17 @@ class File extends ServiceObject { * //

Resuming a Resumable Upload

* // * // One can capture a `uri` from a resumable upload to reuse later. + * // Additionally, for validation, one can also capture and pass `crc32c`. * //- * let uri: string | undefined = undefined; + * let crc32: string | undefined = undefined; * - * fs.createWriteStream().on('uri', (link) => {uri = link}); + * fs.createWriteStream() + * .on('uri', link => {uri = link}) + * .on('crc32', crc32c => {resumeCRC32C = crc32c}); * * // later... - * fs.createWriteStream({uri}); + * fs.createWriteStream({resumeCRC32C, uri}); */ // eslint-disable-next-line @typescript-eslint/no-explicit-any createWriteStream(options: CreateWriteStreamOptions = {}): Writable { @@ -1968,8 +1965,16 @@ class File extends ServiceObject { md5 = false; } - if (options.offset && md5) { - throw new RangeError(FileExceptionMessages.MD5_RESUMED_UPLOAD); + if (options.offset) { + if (md5) { + throw new RangeError(FileExceptionMessages.MD5_RESUMED_UPLOAD); + } + + if (crc32c && !options.isPartialUpload && !options.resumeCRC32C) { + throw new RangeError( + FileExceptionMessages.MISSING_RESUME_CRC32C_FINAL_UPLOAD + ); + } } /** @@ -1997,7 +2002,31 @@ class File extends ServiceObject { }, }); + const transformStreams: Transform[] = []; + + if (gzip) { + transformStreams.push(zlib.createGzip()); + } + const emitStream = new PassThroughShim(); + const crc32cInstance = options.resumeCRC32C + ? CRC32C.from(options.resumeCRC32C) + : undefined; + + let hashCalculatingStream: HashStreamValidator | null = null; + + if (crc32c || md5) { + hashCalculatingStream = new HashStreamValidator({ + crc32c, + crc32cInstance, + md5, + crc32cGenerator: this.crc32cGenerator, + updateHashesOnly: true, + }); + + transformStreams.push(hashCalculatingStream); + } + const fileWriteStream = duplexify(); let fileWriteStreamMetadataReceived = false; @@ -2011,64 +2040,13 @@ class File extends ServiceObject { fileWriteStreamMetadataReceived = true; }); - const writingCallback = async () => { + writeStream.once('writing', () => { if (options.resumable === false) { this.startSimpleUpload_(fileWriteStream, options); } else { this.startResumableUpload_(fileWriteStream, options); } - const transformStreams: Transform[] = []; - if (gzip) { - transformStreams.push(zlib.createGzip()); - } - - let hashCalculatingStream: HashStreamValidator | null = null; - - if (crc32c || md5) { - let crc32cInstance: CRC32C | undefined = undefined; - - if (options.offset) { - // set the initial CRC32C value for the resumed upload. - if (options.resumeCRC32C) { - crc32cInstance = CRC32C.from(options.resumeCRC32C); - } else { - const resp = - await this.#createResumableUpload(options).checkUploadStatus(); - - if (resp.data?.crc32c) { - crc32cInstance = CRC32C.from(resp.data.crc32c); - } - - // check if the offset provided is higher than `options.offset` to - // avoid any data loss, otherwise we would have a strange, difficult- - // to-debug validation error later when the upload has completed. - if (typeof resp.headers.range === 'string') { - const actualOffset = Number(resp.headers.range.split('-')[1]) + 1; - - if (actualOffset < options.offset) { - throw new RangeError( - RESUME_RESUMABLE_UPLOAD_OFFSET_ERROR( - actualOffset, - options.offset - ) - ); - } - } - } - } - - hashCalculatingStream = new HashStreamValidator({ - crc32c, - crc32cInstance, - md5, - crc32cGenerator: this.crc32cGenerator, - updateHashesOnly: true, - }); - - transformStreams.push(hashCalculatingStream); - } - pipeline( emitStream, ...(transformStreams as [Transform]), @@ -2092,8 +2070,15 @@ class File extends ServiceObject { } } + // Emit the local CRC32C value for future validation, if validation is enabled. + if (hashCalculatingStream?.crc32c) { + writeStream.emit('crc32c', hashCalculatingStream.crc32c); + } + try { - if (hashCalculatingStream) { + const metadataNotReady = options.isPartialUpload && !this.metadata; + + if (hashCalculatingStream && !metadataNotReady) { await this.#validateIntegrity(hashCalculatingStream, { crc32c, md5, @@ -2106,11 +2091,7 @@ class File extends ServiceObject { } } ); - }; - - writeStream.once('writing', () => - writingCallback().catch(pipelineCallback) - ); + }); return writeStream; } From 8d7bc4c30b154604ec13c155a434d6318a9a5f35 Mon Sep 17 00:00:00 2001 From: Daniel Bankhead Date: Thu, 19 Oct 2023 18:18:00 -0700 Subject: [PATCH 17/21] test: Add _resumable_ resumable upload tests --- system-test/kitchen.ts | 153 ++++++++++++++++++++++++++++++++++------- 1 file changed, 130 insertions(+), 23 deletions(-) diff --git a/system-test/kitchen.ts b/system-test/kitchen.ts index e70314acf..565cc94c8 100644 --- a/system-test/kitchen.ts +++ b/system-test/kitchen.ts @@ -19,7 +19,7 @@ import * as path from 'path'; import * as tmp from 'tmp'; import * as crypto from 'crypto'; import * as os from 'os'; -import {Readable} from 'stream'; +import {pipeline, Readable} from 'stream'; import { checkUploadStatus, createURI, @@ -33,33 +33,37 @@ import { AUTO_RETRY_DEFAULT, MAX_RETRY_DEFAULT, RETRYABLE_ERR_FN_DEFAULT, + Storage, } from '../src/storage'; const bucketName = process.env.BUCKET_NAME || 'gcs-resumable-upload-test'; -tmp.setGracefulCleanup(); -const tmpFileContents = crypto.randomBytes(1024 * 1024 * 20); -const filePath = path.join(os.tmpdir(), '20MB.zip'); -const writeStream = fs.createWriteStream(filePath); -const retryOptions = { - retryDelayMultiplier: RETRY_DELAY_MULTIPLIER_DEFAULT, - totalTimeout: TOTAL_TIMEOUT_DEFAULT, - maxRetryDelay: MAX_RETRY_DELAY_DEFAULT, - autoRetry: AUTO_RETRY_DEFAULT, - maxRetries: MAX_RETRY_DEFAULT, - retryableErrorFn: RETRYABLE_ERR_FN_DEFAULT, -}; -writeStream.write(tmpFileContents); -writeStream.close(); - -async function delay(title: string, retries: number, done: Function) { - if (retries === 0) return done(); // no retry on the first failure. - // see: https://cloud.google.com/storage/docs/exponential-backoff: - const ms = Math.pow(2, retries) * 1000 + Math.random() * 2000; - console.info(`retrying "${title}" in ${ms}ms`); - setTimeout(done(), ms); -} + +/** + * The known multiple chunk upload size, in bytes + */ +const KNOWN_MULTI_CHUNK_MULTIPLE_BYTES = 256 * 1024; +const FILE_SIZE = 1024 * 1024 * 20; describe('resumable-upload', () => { + const retryOptions = { + retryDelayMultiplier: RETRY_DELAY_MULTIPLIER_DEFAULT, + totalTimeout: TOTAL_TIMEOUT_DEFAULT, + maxRetryDelay: MAX_RETRY_DELAY_DEFAULT, + autoRetry: AUTO_RETRY_DEFAULT, + maxRetries: MAX_RETRY_DEFAULT, + retryableErrorFn: RETRYABLE_ERR_FN_DEFAULT, + }; + + const bucket = new Storage({retryOptions}).bucket(bucketName); + let filePath: string; + + before(async () => { + tmp.setGracefulCleanup(); + filePath = path.join(os.tmpdir(), '20MB.zip'); + + await fs.promises.writeFile(filePath, crypto.randomBytes(FILE_SIZE)); + }); + beforeEach(() => { upload({ bucket: bucketName, @@ -68,6 +72,33 @@ describe('resumable-upload', () => { }); }); + afterEach(async () => { + await bucket.file(filePath).delete({ignoreNotFound: true}); + }); + + after(async () => { + await fs.promises.rm(filePath, {force: true}); + }); + + function createReadable(amount: number) { + async function* readableGenerator() { + while (amount > 0) { + yield crypto.randomBytes(KNOWN_MULTI_CHUNK_MULTIPLE_BYTES); + amount -= KNOWN_MULTI_CHUNK_MULTIPLE_BYTES; + } + } + + return Readable.from(readableGenerator()); + } + + async function delay(title: string, retries: number, done: Function) { + if (retries === 0) return done(); // no retry on the first failure. + // see: https://cloud.google.com/storage/docs/exponential-backoff: + const ms = Math.pow(2, retries) * 1000 + Math.random() * 2000; + console.info(`retrying "${title}" in ${ms}ms`); + setTimeout(done(), ms); + } + it('should work', done => { let uploadSucceeded = false; fs.createReadStream(filePath) @@ -198,4 +229,80 @@ describe('resumable-upload', () => { done(); }); }); + + it('should upload an object in multiple chunks', async () => { + const file = bucket.file(filePath); + const chunkSize = Math.floor(FILE_SIZE / 4); + + // ensure we're testing a valid size + assert(chunkSize > KNOWN_MULTI_CHUNK_MULTIPLE_BYTES); + + await new Promise((resolve, reject) => + pipeline( + fs.createReadStream(filePath), + file.createWriteStream({ + chunkSize, + }), + e => (e ? reject(e) : resolve()) + ) + ); + + const [results] = await file.getMetadata(); + + assert.equal(results.size, FILE_SIZE); + }); + + it('should complete multiple, partial upload session', async () => { + const count = 4; + const file = bucket.file(filePath); + const chunkSize = Math.floor(FILE_SIZE / count); + + // ensure we're testing a valid size and multiple + assert.equal(chunkSize % KNOWN_MULTI_CHUNK_MULTIPLE_BYTES, 0); + + let uri: string | undefined = undefined; + let uriGenerated = 0; + + let resumeCRC32C = ''; + let crc32cGenerated = 0; + + for (let i = 0; i < count; i++) { + const isPartialUpload = !(i + 1 === count); + const offset = i * chunkSize; + + const readable = createReadable(chunkSize); + const writable = file.createWriteStream({ + uri, + chunkSize, + isPartialUpload, + offset, + resumeCRC32C, + }); + + writable.on('uri', link => { + uri = link; + uriGenerated++; + }); + + writable.on('crc32c', crc32c => { + resumeCRC32C = crc32c; + crc32cGenerated++; + }); + + await new Promise((resolve, reject) => + pipeline(readable, writable, e => (e ? reject(e) : resolve())) + ); + } + + const [results] = await file.getMetadata(); + + assert(uri); + assert.equal(uriGenerated, 1, 'uri should be generated once'); + assert.equal( + crc32cGenerated, + count, + 'crc32c should be generated on each upload' + ); + assert.equal(results.size, FILE_SIZE); + }); }); From 8deab9bf83859c0b5cf2a44dfc950bff03bf159a Mon Sep 17 00:00:00 2001 From: Daniel Bankhead Date: Thu, 19 Oct 2023 18:32:20 -0700 Subject: [PATCH 18/21] docs: Clean up and clarify --- src/file.ts | 13 ++++++++----- src/hash-stream-validator.ts | 3 +++ src/resumable-upload.ts | 1 + 3 files changed, 12 insertions(+), 5 deletions(-) diff --git a/src/file.ts b/src/file.ts index 81fd8ae30..df686a55a 100644 --- a/src/file.ts +++ b/src/file.ts @@ -1916,14 +1916,14 @@ class File extends ServiceObject { * // Additionally, for validation, one can also capture and pass `crc32c`. * //- * let uri: string | undefined = undefined; - * let crc32: string | undefined = undefined; + * let resumeCRC32C: string | undefined = undefined; * * fs.createWriteStream() * .on('uri', link => {uri = link}) * .on('crc32', crc32c => {resumeCRC32C = crc32c}); * * // later... - * fs.createWriteStream({resumeCRC32C, uri}); + * fs.createWriteStream({uri, resumeCRC32C}); */ // eslint-disable-next-line @typescript-eslint/no-explicit-any createWriteStream(options: CreateWriteStreamOptions = {}): Writable { @@ -2009,13 +2009,14 @@ class File extends ServiceObject { } const emitStream = new PassThroughShim(); - const crc32cInstance = options.resumeCRC32C - ? CRC32C.from(options.resumeCRC32C) - : undefined; let hashCalculatingStream: HashStreamValidator | null = null; if (crc32c || md5) { + const crc32cInstance = options.resumeCRC32C + ? CRC32C.from(options.resumeCRC32C) + : undefined; + hashCalculatingStream = new HashStreamValidator({ crc32c, crc32cInstance, @@ -2076,6 +2077,8 @@ class File extends ServiceObject { } try { + // Metadata may not be ready if the upload is a partial upload, + // nothing to validate yet. const metadataNotReady = options.isPartialUpload && !this.metadata; if (hashCalculatingStream && !metadataNotReady) { diff --git a/src/hash-stream-validator.ts b/src/hash-stream-validator.ts index 281970626..0cc9023ab 100644 --- a/src/hash-stream-validator.ts +++ b/src/hash-stream-validator.ts @@ -74,6 +74,9 @@ class HashStreamValidator extends Transform { } } + /** + * Return the current CRC32C value, if available. + */ get crc32c() { return this.#crc32cHash?.toString(); } diff --git a/src/resumable-upload.ts b/src/resumable-upload.ts index 0cda24cb5..1ffb46bd8 100644 --- a/src/resumable-upload.ts +++ b/src/resumable-upload.ts @@ -743,6 +743,7 @@ export class Upload extends Writable { this.uri = uri; this.offset = 0; + // emit the newly generated URI for future reuse, if necessary. this.emit('uri', uri); return uri; From e4126ba79f629930fb4dbc3ef2c9a61836b4b98d Mon Sep 17 00:00:00 2001 From: Daniel Bankhead Date: Mon, 23 Oct 2023 13:27:07 -0700 Subject: [PATCH 19/21] docs: docs --- src/file.ts | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/file.ts b/src/file.ts index a3d1e07e8..a0fded8ae 100644 --- a/src/file.ts +++ b/src/file.ts @@ -216,8 +216,10 @@ type PublicResumableUploadOptions = export interface CreateResumableUploadOptions extends Pick { /** - * A CRC32C to resume from when resuming a previous upload. - * This is **required** when validating a final portion of a resumed upload. + * A CRC32C to resume from when continuing a previous upload. It is recommended + * to capture the `crc32c` event from previous upload sessions to provide in + * subsequent requests in order to accurately track the upload. This is **required** + * when validating a final portion of the uploaded object. * * @see {@link CRC32C.from} for possible values. */ @@ -470,7 +472,7 @@ export enum FileExceptionMessages { UPLOAD_MISMATCH = `The uploaded data did not match the data from the server. As a precaution, the file has been deleted. To be sure the content is the same, you should try uploading the file again.`, - MD5_RESUMED_UPLOAD = 'MD5 cannot be used with a resumed resumable upload as MD5 cannot be extended from an existing value', + MD5_RESUMED_UPLOAD = 'MD5 cannot be used with a continued resumable upload as MD5 cannot be extended from an existing value', MISSING_RESUME_CRC32C_FINAL_UPLOAD = 'The CRC32C is missing for the final portion of a resumed upload, which is required for validation. Please provide `resumeCRC32C` if validation is required, or disable `validation`.', } From 7dbac4ff304e3fa93b7b218c2442b27a5a03f973 Mon Sep 17 00:00:00 2001 From: Daniel Bankhead Date: Tue, 24 Oct 2023 12:49:49 -0700 Subject: [PATCH 20/21] chore: clean-up --- src/file.ts | 32 ++++++++++++++------------------ 1 file changed, 14 insertions(+), 18 deletions(-) diff --git a/src/file.ts b/src/file.ts index a0fded8ae..89128b04e 100644 --- a/src/file.ts +++ b/src/file.ts @@ -1912,7 +1912,7 @@ class File extends ServiceObject { * ``` * * //- - * //

Resuming a Resumable Upload

+ * //

Continuing a Resumable Upload

* // * // One can capture a `uri` from a resumable upload to reuse later. * // Additionally, for validation, one can also capture and pass `crc32c`. @@ -3949,7 +3949,18 @@ class File extends ServiceObject { this.bucket.setUserProject.call(this, userProject); } - #createResumableUpload(options: CreateResumableUploadOptions = {}) { + /** + * This creates a resumable-upload upload stream. + * + * @param {Duplexify} stream - Duplexify stream of data to pipe to the file. + * @param {object=} options - Configuration object. + * + * @private + */ + startResumableUpload_( + dup: Duplexify, + options: CreateResumableUploadOptions = {} + ): void { options.metadata ??= {}; const retryOptions = this.storage.retryOptions; @@ -3961,7 +3972,7 @@ class File extends ServiceObject { retryOptions.autoRetry = false; } - return resumableUpload.upload({ + const uploadStream = resumableUpload.upload({ authClient: this.storage.authClient, apiEndpoint: this.storage.apiEndpoint, bucket: this.bucket.name, @@ -3987,21 +3998,6 @@ class File extends ServiceObject { highWaterMark: options?.highWaterMark, [GCCL_GCS_CMD_KEY]: options[GCCL_GCS_CMD_KEY], }); - } - - /** - * This creates a resumable-upload upload stream. - * - * @param {Duplexify} stream - Duplexify stream of data to pipe to the file. - * @param {object=} options - Configuration object. - * - * @private - */ - startResumableUpload_( - dup: Duplexify, - options: CreateResumableUploadOptions = {} - ): void { - const uploadStream = this.#createResumableUpload(options); uploadStream .on('response', resp => { From 125d0d78e6da19deef77a2978760c364967e8085 Mon Sep 17 00:00:00 2001 From: Daniel Bankhead Date: Wed, 25 Oct 2023 11:00:00 -0700 Subject: [PATCH 21/21] fix: merge conflict fixes --- src/resumable-upload.ts | 2 ++ system-test/kitchen.ts | 8 ++++---- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/src/resumable-upload.ts b/src/resumable-upload.ts index 797090fd2..4184c264b 100644 --- a/src/resumable-upload.ts +++ b/src/resumable-upload.ts @@ -33,6 +33,8 @@ import { } from './util.js'; import {GCCL_GCS_CMD_KEY} from './nodejs-common/util.js'; import {FileMetadata} from './file.js'; +// eslint-disable-next-line @typescript-eslint/ban-ts-comment +// @ts-ignore import {getPackageJSON} from './package-json-helper.cjs'; const NOT_FOUND_STATUS_CODE = 404; diff --git a/system-test/kitchen.ts b/system-test/kitchen.ts index a195b1bd4..fbdd139d1 100644 --- a/system-test/kitchen.ts +++ b/system-test/kitchen.ts @@ -178,7 +178,7 @@ describe('resumable-upload', () => { {interrupt: false}, (err: Error, metadata: {size: number}) => { assert.ifError(err); - assert(uri); + assert.ok(uri); assert.strictEqual(metadata.size, size); assert.strictEqual(typeof metadata.size, 'number'); done(); @@ -205,7 +205,7 @@ describe('resumable-upload', () => { uri, }); - assert(!resp.data); + assert.ok(!resp.data); assert.equal(resp.headers['content-length'], '0'); }); @@ -235,7 +235,7 @@ describe('resumable-upload', () => { const chunkSize = Math.floor(FILE_SIZE / 4); // ensure we're testing a valid size - assert(chunkSize > KNOWN_MULTI_CHUNK_MULTIPLE_BYTES); + assert.ok(chunkSize > KNOWN_MULTI_CHUNK_MULTIPLE_BYTES); await new Promise((resolve, reject) => pipeline( @@ -296,7 +296,7 @@ describe('resumable-upload', () => { const [results] = await file.getMetadata(); - assert(uri); + assert.ok(uri); assert.equal(uriGenerated, 1, 'uri should be generated once'); assert.equal( crc32cGenerated,