Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support iterables in file#save (#2202) #2203

Merged
merged 1 commit into from
Aug 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 71 additions & 25 deletions src/file.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,14 @@ import * as extend from 'extend';
import * as fs from 'fs';
import * as mime from 'mime';
import * as resumableUpload from './resumable-upload';
import {Writable, Readable, pipeline, Transform, PassThrough} from 'stream';
import {
Writable,
Readable,
pipeline,
Transform,
PassThrough,
PipelineSource,
} from 'stream';
import * as zlib from 'zlib';
import * as http from 'http';

Expand Down Expand Up @@ -93,6 +100,8 @@ export interface PolicyDocument {
signature: string;
}

export type SaveData = string | Buffer | PipelineSource<string | Buffer>;

export type GenerateSignedPostPolicyV2Response = [PolicyDocument];

export interface GenerateSignedPostPolicyV2Callback {
Expand Down Expand Up @@ -419,6 +428,7 @@ export enum FileExceptionMessages {
UPLOAD_MISMATCH = `The uploaded data did not match the data from the server.
As a precaution, the file has been deleted.
To be sure the content is the same, you should try uploading the file again.`,
STREAM_NOT_READABLE = 'Stream must be readable.',
}

/**
Expand Down Expand Up @@ -3527,13 +3537,9 @@ class File extends ServiceObject<File> {
this.copy(newFile, copyOptions, callback!);
}

save(data: string | Buffer, options?: SaveOptions): Promise<void>;
save(data: string | Buffer, callback: SaveCallback): void;
save(
data: string | Buffer,
options: SaveOptions,
callback: SaveCallback
): void;
save(data: SaveData, options?: SaveOptions): Promise<void>;
save(data: SaveData, callback: SaveCallback): void;
save(data: SaveData, options: SaveOptions, callback: SaveCallback): void;
/**
* @typedef {object} SaveOptions
* @extends CreateWriteStreamOptions
Expand All @@ -3560,7 +3566,7 @@ class File extends ServiceObject<File> {
* resumable feature is disabled.
* </p>
*
* @param {string | Buffer} data The data to write to a file.
* @param {SaveData} data The data to write to a file.
* @param {SaveOptions} [options] See {@link File#createWriteStream}'s `options`
* parameter.
* @param {SaveCallback} [callback] Callback function.
Expand Down Expand Up @@ -3588,7 +3594,7 @@ class File extends ServiceObject<File> {
* ```
*/
save(
data: string | Buffer,
data: SaveData,
optionsOrCallback?: SaveOptions | SaveCallback,
callback?: SaveCallback
): Promise<void> | void {
Expand All @@ -3608,28 +3614,68 @@ class File extends ServiceObject<File> {
}
const returnValue = retry(
async (bail: (err: Error) => void) => {
await new Promise<void>((resolve, reject) => {
if (data instanceof Readable) {
// Make sure any pending async readable operations are finished before
// attempting to check if the stream is readable.
await new Promise(resolve => setImmediate(resolve));

if (!data.readable || data.destroyed) {
// Calling pipeline() with a non-readable stream will result in the
// callback being called without an error, and no piping taking
// place. In that case, file.save() would appear to succeed, but
// nothing would be uploaded.
return bail(new Error(FileExceptionMessages.STREAM_NOT_READABLE));
}
}

return new Promise<void>((resolve, reject) => {
if (maxRetries === 0) {
this.storage.retryOptions.autoRetry = false;
}
const writable = this.createWriteStream(options)
.on('error', err => {
if (
this.storage.retryOptions.autoRetry &&
this.storage.retryOptions.retryableErrorFn!(err)
) {
return reject(err);
const writable = this.createWriteStream(options);

if (options.onUploadProgress) {
writable.on('progress', options.onUploadProgress);
}

const handleError = (err: Error) => {
if (
!this.storage.retryOptions.autoRetry ||
!this.storage.retryOptions.retryableErrorFn!(err)
) {
bail(err);
}

reject(err);
};

if (typeof data === 'string' || Buffer.isBuffer(data)) {
writable
.on('error', handleError)
.on('finish', () => resolve())
.end(data);
} else {
pipeline(data, writable, err => {
if (err) {
// If data is not a valid PipelineSource, then pipeline will
// fail without destroying the writable stream. If data is a
// PipelineSource that yields invalid chunks (e.g. a stream in
// object mode or an iterable that does not yield Buffers or
// strings), then pipeline will destroy the writable stream.
if (!writable.destroyed) writable.destroy();

if (typeof data !== 'function') {
// Only PipelineSourceFunction can be retried. Async-iterables
// and Readable streams can only be consumed once.
bail(err);
}

handleError(err);
} else {
return bail(err);
resolve();
}
})
.on('finish', () => {
return resolve();
});
if (options.onUploadProgress) {
writable.on('progress', options.onUploadProgress);
}
writable.end(data);
});
},
{
Expand Down
191 changes: 191 additions & 0 deletions test/file.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4314,6 +4314,197 @@ describe('File', () => {
await file.save(DATA, options, assert.ifError);
});

it('should save a Readable with no errors', done => {
const options = {resumable: false};
file.createWriteStream = () => {
const writeStream = new PassThrough();
writeStream.on('data', data => {
assert.strictEqual(data.toString(), DATA);
});
writeStream.once('finish', done);
return writeStream;
};

const readable = new Readable({
read() {
this.push(DATA);
this.push(null);
},
});

void file.save(readable, options);
});

it('should propagate Readable errors', done => {
const options = {resumable: false};
file.createWriteStream = () => {
const writeStream = new PassThrough();
let errorCalled = false;
writeStream.on('data', data => {
assert.strictEqual(data.toString(), DATA);
});
writeStream.on('error', err => {
errorCalled = true;
assert.strictEqual(err.message, 'Error!');
});
writeStream.on('finish', () => {
assert.ok(errorCalled);
});
return writeStream;
};

const readable = new Readable({
read() {
setTimeout(() => {
this.push(DATA);
this.destroy(new Error('Error!'));
}, 50);
},
});

file.save(readable, options, (err: Error) => {
assert.strictEqual(err.message, 'Error!');
done();
});
});

it('Readable upload should not retry', async () => {
const options = {resumable: false};

let retryCount = 0;

file.createWriteStream = () => {
retryCount++;
return new Transform({
transform(
chunk: string | Buffer,
_encoding: string,
done: Function
) {
this.push(chunk);
setTimeout(() => {
done(new HTTPError('retryable error', 408));
}, 5);
},
});
};
try {
const readable = new Readable({
read() {
this.push(DATA);
this.push(null);
},
});

await file.save(readable, options);
throw Error('unreachable');
} catch (e) {
assert.strictEqual((e as Error).message, 'retryable error');
assert.ok(retryCount === 1);
}
});

it('Destroyed Readable upload should throw', async () => {
const options = {resumable: false};

file.createWriteStream = () => {
throw new Error('unreachable');
};
try {
const readable = new Readable({
read() {
this.push(DATA);
this.push(null);
},
});

readable.destroy();

await file.save(readable, options);
} catch (e) {
assert.strictEqual(
(e as Error).message,
FileExceptionMessages.STREAM_NOT_READABLE
);
}
});

it('should save a generator with no error', done => {
const options = {resumable: false};
file.createWriteStream = () => {
const writeStream = new PassThrough();
writeStream.on('data', data => {
assert.strictEqual(data.toString(), DATA);
done();
});
return writeStream;
};

const generator = async function* (arg?: {signal?: AbortSignal}) {
await new Promise(resolve => setTimeout(resolve, 5));
if (arg?.signal?.aborted) return;
yield DATA;
};

void file.save(generator, options);
});

it('should propagate async iterable errors', done => {
const options = {resumable: false};
file.createWriteStream = () => {
const writeStream = new PassThrough();
let errorCalled = false;
writeStream.on('data', data => {
assert.strictEqual(data.toString(), DATA);
});
writeStream.on('error', err => {
errorCalled = true;
assert.strictEqual(err.message, 'Error!');
});
writeStream.on('finish', () => {
assert.ok(errorCalled);
});
return writeStream;
};

const generator = async function* () {
yield DATA;
throw new Error('Error!');
};

file.save(generator(), options, (err: Error) => {
assert.strictEqual(err.message, 'Error!');
done();
});
});

it('should error on invalid async iterator data', done => {
const options = {resumable: false};
file.createWriteStream = () => {
const writeStream = new PassThrough();
let errorCalled = false;
writeStream.on('error', () => {
errorCalled = true;
});
writeStream.on('finish', () => {
assert.ok(errorCalled);
});
return writeStream;
};

const generator = async function* () {
yield {thisIsNot: 'a buffer or a string'};
};

file.save(generator(), options, (err: Error) => {
assert.strictEqual(
err.message,
'The "chunk" argument must be of type string or an instance of Buffer or Uint8Array. Received an instance of Object'
);
done();
});
});

it('buffer upload should retry on first failure', async () => {
const options = {
resumable: false,
Expand Down