Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Resume Resumable Uploads #2333

Merged
merged 28 commits into from
Nov 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
14e4b61
chore: WIP
d-goog Apr 7, 2023
8d1e00f
Merge branch 'main' of github.com:googleapis/nodejs-storage into resu…
d-goog Apr 8, 2023
4d96da3
chore: typo
d-goog Apr 21, 2023
66daed5
Merge branches 'resume-resumable-upload' and 'main' of github.com:goo…
d-goog Sep 28, 2023
b3fed2b
Merge branch 'main' of github.com:googleapis/nodejs-storage into resu…
d-goog Oct 3, 2023
d795869
Merge branch 'main' of github.com:googleapis/nodejs-storage into resu…
d-goog Oct 13, 2023
a12aede
feat: WIP
d-goog Oct 17, 2023
bf88a05
docs: `returns` clean up
d-goog Oct 18, 2023
8c20671
fix: missing `gzip`
d-goog Oct 18, 2023
444d721
test: validate `createURI`
d-goog Oct 19, 2023
f54b25f
feat: check for invalid offset when `resumeCRC32C` has not been provided
d-goog Oct 19, 2023
64a0bfe
fix: type
d-goog Oct 19, 2023
cf6b21d
chore: lint
d-goog Oct 19, 2023
90dea2d
test: lint + cleanup
d-goog Oct 19, 2023
621b966
chore: lint
d-goog Oct 19, 2023
4c33f38
feat: emit `uri` when created
d-goog Oct 19, 2023
ae2402d
docs: clarify
d-goog Oct 19, 2023
7342f96
test: Make the interrupt test actually test something
d-goog Oct 19, 2023
842f2f3
Merge branch 'main' of github.com:googleapis/nodejs-storage into resu…
d-goog Oct 19, 2023
60abe35
feat: expose the `crc32c` if available
d-goog Oct 20, 2023
64cfcdb
refactor: clean-up
d-goog Oct 20, 2023
8d7bc4c
test: Add _resumable_ resumable upload tests
d-goog Oct 20, 2023
8deab9b
docs: Clean up and clarify
d-goog Oct 20, 2023
391a3c0
Merge branch 'main' of github.com:googleapis/nodejs-storage into resu…
d-goog Oct 20, 2023
e4126ba
docs: docs
d-goog Oct 23, 2023
7dbac4f
chore: clean-up
d-goog Oct 24, 2023
d1d76b6
Merge branch 'main' of github.com:googleapis/nodejs-storage into resu…
d-goog Oct 25, 2023
125d0d7
fix: merge conflict fixes
d-goog Oct 25, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 92 additions & 21 deletions src/file.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,7 @@ import * as crypto from 'crypto';
import * as fs from 'fs';
import mime from 'mime';
import * as resumableUpload from './resumable-upload.js';
import {
Writable,
Readable,
pipeline,
Transform,
PassThrough,
PipelineSource,
} from 'stream';
import {Writable, Readable, pipeline, Transform, PipelineSource} from 'stream';
import * as zlib from 'zlib';
import * as http from 'http';

Expand Down Expand Up @@ -70,7 +63,7 @@ import {
formatAsUTCISO,
PassThroughShim,
} from './util.js';
import {CRC32CValidatorGenerator} from './crc32c.js';
import {CRC32C, CRC32CValidatorGenerator} from './crc32c.js';
import {HashStreamValidator} from './hash-stream-validator.js';
import {URL} from 'url';

Expand Down Expand Up @@ -208,6 +201,7 @@ export type PredefinedAcl =
type PublicResumableUploadOptions =
| 'chunkSize'
| 'highWaterMark'
| 'isPartialUpload'
| 'metadata'
| 'origin'
| 'offset'
Expand All @@ -219,6 +213,15 @@ type PublicResumableUploadOptions =

export interface CreateResumableUploadOptions
extends Pick<resumableUpload.UploadConfig, PublicResumableUploadOptions> {
/**
* A CRC32C to resume from when continuing a previous upload. It is recommended
* to capture the `crc32c` event from previous upload sessions to provide in
* subsequent requests in order to accurately track the upload. This is **required**
* when validating a final portion of the uploaded object.
*
* @see {@link CRC32C.from} for possible values.
*/
resumeCRC32C?: Parameters<(typeof CRC32C)['from']>[0];
preconditionOpts?: PreconditionOptions;
[GCCL_GCS_CMD_KEY]?: resumableUpload.UploadConfig[typeof GCCL_GCS_CMD_KEY];
}
Expand Down Expand Up @@ -467,6 +470,8 @@ export enum FileExceptionMessages {
UPLOAD_MISMATCH = `The uploaded data did not match the data from the server.
As a precaution, the file has been deleted.
To be sure the content is the same, you should try uploading the file again.`,
MD5_RESUMED_UPLOAD = 'MD5 cannot be used with a continued resumable upload as MD5 cannot be extended from an existing value',
MISSING_RESUME_CRC32C_FINAL_UPLOAD = 'The CRC32C is missing for the final portion of a resumed upload, which is required for validation. Please provide `resumeCRC32C` if validation is required, or disable `validation`.',
}

/**
Expand Down Expand Up @@ -1837,8 +1842,8 @@ class File extends ServiceObject<File, FileMetadata> {
* NOTE: Writable streams will emit the `finish` event when the file is fully
* uploaded.
*
* See {@link https://cloud.google.com/storage/docs/json_api/v1/how-tos/upload| Upload Options (Simple or Resumable)}
* See {@link https://cloud.google.com/storage/docs/json_api/v1/objects/insert| Objects: insert API Documentation}
* See {@link https://cloud.google.com/storage/docs/json_api/v1/how-tos/upload Upload Options (Simple or Resumable)}
* See {@link https://cloud.google.com/storage/docs/json_api/v1/objects/insert Objects: insert API Documentation}
*
* @param {CreateWriteStreamOptions} [options] Configuration options.
* @returns {WritableStream}
Expand Down Expand Up @@ -1903,6 +1908,22 @@ class File extends ServiceObject<File, FileMetadata> {
* // The file upload is complete.
* });
* ```
*
* //-
* // <h4>Continuing a Resumable Upload</h4>
* //
* // One can capture a `uri` from a resumable upload to reuse later.
* // Additionally, for validation, one can also capture and pass `crc32c`.
* //-
* let uri: string | undefined = undefined;
* let resumeCRC32C: string | undefined = undefined;
*
* fs.createWriteStream()
* .on('uri', link => {uri = link})
* .on('crc32', crc32c => {resumeCRC32C = crc32c});
*
* // later...
* fs.createWriteStream({uri, resumeCRC32C});
*/
// eslint-disable-next-line @typescript-eslint/no-explicit-any
createWriteStream(options: CreateWriteStreamOptions = {}): Writable {
Expand Down Expand Up @@ -1941,6 +1962,19 @@ class File extends ServiceObject<File, FileMetadata> {
md5 = options.validation === 'md5';
} else if (options.validation === false) {
crc32c = false;
md5 = false;
}

if (options.offset) {
if (md5) {
throw new RangeError(FileExceptionMessages.MD5_RESUMED_UPLOAD);
}

if (crc32c && !options.isPartialUpload && !options.resumeCRC32C) {
throw new RangeError(
FileExceptionMessages.MISSING_RESUME_CRC32C_FINAL_UPLOAD
);
}
}

/**
Expand Down Expand Up @@ -1968,27 +2002,46 @@ class File extends ServiceObject<File, FileMetadata> {
},
});

const transformStreams: Transform[] = [];

if (gzip) {
transformStreams.push(zlib.createGzip());
}

const emitStream = new PassThroughShim();
const hashCalculatingStream = new HashStreamValidator({
crc32c,
md5,
crc32cGenerator: this.crc32cGenerator,
updateHashesOnly: true,
});

let hashCalculatingStream: HashStreamValidator | null = null;

if (crc32c || md5) {
const crc32cInstance = options.resumeCRC32C
? CRC32C.from(options.resumeCRC32C)
: undefined;

hashCalculatingStream = new HashStreamValidator({
crc32c,
crc32cInstance,
md5,
crc32cGenerator: this.crc32cGenerator,
updateHashesOnly: true,
});

transformStreams.push(hashCalculatingStream);
}

const fileWriteStream = duplexify();
let fileWriteStreamMetadataReceived = false;

// Handing off emitted events to users
emitStream.on('reading', () => writeStream.emit('reading'));
emitStream.on('writing', () => writeStream.emit('writing'));
fileWriteStream.on('uri', evt => writeStream.emit('uri', evt));
fileWriteStream.on('progress', evt => writeStream.emit('progress', evt));
fileWriteStream.on('response', resp => writeStream.emit('response', resp));
fileWriteStream.once('metadata', () => {
fileWriteStreamMetadataReceived = true;
});

writeStream.on('writing', () => {
writeStream.once('writing', () => {
if (options.resumable === false) {
this.startSimpleUpload_(fileWriteStream, options);
} else {
Expand All @@ -1997,8 +2050,7 @@ class File extends ServiceObject<File, FileMetadata> {

pipeline(
emitStream,
gzip ? zlib.createGzip() : new PassThrough(),
hashCalculatingStream,
...(transformStreams as [Transform]),
fileWriteStream,
async e => {
if (e) {
Expand All @@ -2019,8 +2071,23 @@ class File extends ServiceObject<File, FileMetadata> {
}
}

// Emit the local CRC32C value for future validation, if validation is enabled.
if (hashCalculatingStream?.crc32c) {
writeStream.emit('crc32c', hashCalculatingStream.crc32c);
}

try {
await this.#validateIntegrity(hashCalculatingStream, {crc32c, md5});
// Metadata may not be ready if the upload is a partial upload,
// nothing to validate yet.
const metadataNotReady = options.isPartialUpload && !this.metadata;

if (hashCalculatingStream && !metadataNotReady) {
await this.#validateIntegrity(hashCalculatingStream, {
crc32c,
md5,
});
}

pipelineCallback();
} catch (e) {
pipelineCallback(e as Error);
Expand Down Expand Up @@ -3913,6 +3980,7 @@ class File extends ServiceObject<File, FileMetadata> {
),
file: this.name,
generation: this.generation,
isPartialUpload: options.isPartialUpload,
key: this.encryptionKey,
kmsKeyName: this.kmsKeyName,
metadata: options.metadata,
Expand All @@ -3933,6 +4001,9 @@ class File extends ServiceObject<File, FileMetadata> {
.on('response', resp => {
dup.emit('response', resp);
})
.on('uri', uri => {
dup.emit('uri', uri);
})
.on('metadata', metadata => {
this.metadata = metadata;
dup.emit('metadata');
Expand Down
23 changes: 18 additions & 5 deletions src/hash-stream-validator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ interface HashStreamValidatorOptions {
crc32c: boolean;
/** Enables MD5 calculation. To validate a provided value use `md5Expected`. */
md5: boolean;
/** Set a custom CRC32C generator */
/** A CRC32C instance for validation. To validate a provided value use `crc32cExpected`. */
crc32cInstance: CRC32CValidator;
/** Set a custom CRC32C generator. Used if `crc32cInstance` has not been provided. */
crc32cGenerator: CRC32CValidatorGenerator;
/** Sets the expected CRC32C value to verify once all data has been consumed. Also sets the `crc32c` option to `true` */
crc32cExpected?: string;
Expand Down Expand Up @@ -57,17 +59,28 @@ class HashStreamValidator extends Transform {
this.md5Expected = options.md5Expected;

if (this.crc32cEnabled) {
const crc32cGenerator =
options.crc32cGenerator || CRC32C_DEFAULT_VALIDATOR_GENERATOR;

this.#crc32cHash = crc32cGenerator();
if (options.crc32cInstance) {
this.#crc32cHash = options.crc32cInstance;
} else {
const crc32cGenerator =
options.crc32cGenerator || CRC32C_DEFAULT_VALIDATOR_GENERATOR;

this.#crc32cHash = crc32cGenerator();
}
}

if (this.md5Enabled) {
this.#md5Hash = createHash('md5');
}
}

/**
* Return the current CRC32C value, if available.
*/
get crc32c() {
return this.#crc32cHash?.toString();
}

_flush(callback: (error?: Error | null | undefined) => void) {
if (this.#md5Hash) {
this.#md5Digest = this.#md5Hash.digest('base64');
Expand Down
Loading
Loading