Skip to content

Commit

Permalink
feat: support iterables in file#save (#2202)
Browse files Browse the repository at this point in the history
  • Loading branch information
matthieusieben committed Jul 11, 2023
1 parent 8c6f9c6 commit 36f1547
Show file tree
Hide file tree
Showing 2 changed files with 254 additions and 25 deletions.
88 changes: 63 additions & 25 deletions src/file.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,14 @@ import * as extend from 'extend';
import * as fs from 'fs';
import * as mime from 'mime';
import * as resumableUpload from './resumable-upload';
import {Writable, Readable, pipeline, Transform, PassThrough} from 'stream';
import {
Writable,
Readable,
pipeline,
Transform,
PassThrough,
PipelineSource,
} from 'stream';
import * as zlib from 'zlib';
import * as http from 'http';

Expand Down Expand Up @@ -93,6 +100,8 @@ export interface PolicyDocument {
signature: string;
}

export type SaveData = string | Buffer | PipelineSource<string | Buffer>;

export type GenerateSignedPostPolicyV2Response = [PolicyDocument];

export interface GenerateSignedPostPolicyV2Callback {
Expand Down Expand Up @@ -419,6 +428,7 @@ export enum FileExceptionMessages {
UPLOAD_MISMATCH = `The uploaded data did not match the data from the server.
As a precaution, the file has been deleted.
To be sure the content is the same, you should try uploading the file again.`,
STREAM_NOT_READABLE = 'Stream must be readable.',
}

/**
Expand Down Expand Up @@ -3527,13 +3537,9 @@ class File extends ServiceObject<File> {
this.copy(newFile, copyOptions, callback!);
}

save(data: string | Buffer, options?: SaveOptions): Promise<void>;
save(data: string | Buffer, callback: SaveCallback): void;
save(
data: string | Buffer,
options: SaveOptions,
callback: SaveCallback
): void;
save(data: SaveData, options?: SaveOptions): Promise<void>;
save(data: SaveData, callback: SaveCallback): void;
save(data: SaveData, options: SaveOptions, callback: SaveCallback): void;
/**
* @typedef {object} SaveOptions
* @extends CreateWriteStreamOptions
Expand All @@ -3560,7 +3566,7 @@ class File extends ServiceObject<File> {
* resumable feature is disabled.
* </p>
*
* @param {string | Buffer} data The data to write to a file.
* @param {SaveData} data The data to write to a file.
* @param {SaveOptions} [options] See {@link File#createWriteStream}'s `options`
* parameter.
* @param {SaveCallback} [callback] Callback function.
Expand Down Expand Up @@ -3588,7 +3594,7 @@ class File extends ServiceObject<File> {
* ```
*/
save(
data: string | Buffer,
data: SaveData,
optionsOrCallback?: SaveOptions | SaveCallback,
callback?: SaveCallback
): Promise<void> | void {
Expand All @@ -3608,28 +3614,60 @@ class File extends ServiceObject<File> {
}
const returnValue = retry(
async (bail: (err: Error) => void) => {
await new Promise<void>((resolve, reject) => {
if (data instanceof Readable && !data.readable) {
// Readable streams can only be consumed once and would cause a dead
// lock when used with pipeline().
return bail(new Error(FileExceptionMessages.STREAM_NOT_READABLE));
}

return new Promise<void>((resolve, reject) => {
if (maxRetries === 0) {
this.storage.retryOptions.autoRetry = false;
}
const writable = this.createWriteStream(options)
.on('error', err => {
if (
this.storage.retryOptions.autoRetry &&
this.storage.retryOptions.retryableErrorFn!(err)
) {
return reject(err);
const writable = this.createWriteStream(options);

if (options.onUploadProgress) {
writable.on('progress', options.onUploadProgress);
}

const handleError = (err: Error) => {
if (
!this.storage.retryOptions.autoRetry ||
!this.storage.retryOptions.retryableErrorFn!(err)
) {
bail(err);
}

reject(err);
};

if (typeof data === 'string' || Buffer.isBuffer(data)) {
writable
.on('error', handleError)
.on('finish', () => resolve())
.end(data);
} else {
pipeline(data, writable, err => {
if (err) {
// If data is not a valid PipelineSource, then pipeline will
// fail without destroying the writable stream. If data is a
// PipelineSource that yields invalid chunks (e.g. a stream in
// object mode or an iterable that does not yield Buffers or
// strings), then pipeline will destroy the writable stream.
if (!writable.destroyed) writable.destroy();

if (typeof data !== 'function') {
// Only PipelineSourceFunction can be retried. Async-iterables
// and Readable streams can only be consumed once.
bail(err);
}

handleError(err);
} else {
return bail(err);
resolve();
}
})
.on('finish', () => {
return resolve();
});
if (options.onUploadProgress) {
writable.on('progress', options.onUploadProgress);
}
writable.end(data);
});
},
{
Expand Down
191 changes: 191 additions & 0 deletions test/file.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4314,6 +4314,197 @@ describe('File', () => {
await file.save(DATA, options, assert.ifError);
});

it('should save a Readable with no errors', done => {
const options = {resumable: false};
file.createWriteStream = () => {
const writeStream = new PassThrough();
writeStream.on('data', data => {
assert.strictEqual(data.toString(), DATA);
});
writeStream.once('finish', done);
return writeStream;
};

const readable = new Readable({
read() {
this.push(DATA);
this.push(null);
},
});

void file.save(readable, options);
});

it('should propagate Readable errors', done => {
const options = {resumable: false};
file.createWriteStream = () => {
const writeStream = new PassThrough();
let errorCalled = false;
writeStream.on('data', data => {
assert.strictEqual(data.toString(), DATA);
});
writeStream.on('error', err => {
errorCalled = true;
assert.strictEqual(err.message, 'Error!');
});
writeStream.on('finish', () => {
assert.ok(errorCalled);
});
return writeStream;
};

const readable = new Readable({
read() {
setTimeout(() => {
this.push(DATA);
this.destroy(new Error('Error!'));
}, 50);
},
});

file.save(readable, options, (err: Error) => {
assert.strictEqual(err.message, 'Error!');
done();
});
});

it('Readable upload should not retry', async () => {
const options = {resumable: false};

let retryCount = 0;

file.createWriteStream = () => {
retryCount++;
return new Transform({
transform(
chunk: string | Buffer,
_encoding: string,
done: Function
) {
this.push(chunk);
setTimeout(() => {
done(new HTTPError('retryable error', 408));
}, 5);
},
});
};
try {
const readable = new Readable({
read() {
this.push(DATA);
this.push(null);
},
});

await file.save(readable, options);
throw Error('unreachable');
} catch (e) {
assert.strictEqual((e as Error).message, 'retryable error');
assert.ok(retryCount === 1);
}
});

it('Destroyed Readable upload should throw', async () => {
const options = {resumable: false};

file.createWriteStream = () => {
throw new Error('unreachable');
};
try {
const readable = new Readable({
read() {
this.push(DATA);
this.push(null);
},
});

readable.destroy();

await file.save(readable, options);
} catch (e) {
assert.strictEqual(
(e as Error).message,
FileExceptionMessages.STREAM_NOT_READABLE
);
}
});

it('should save a generator with no error', done => {
const options = {resumable: false};
file.createWriteStream = () => {
const writeStream = new PassThrough();
writeStream.on('data', data => {
assert.strictEqual(data.toString(), DATA);
done();
});
return writeStream;
};

const generator = async function* (arg?: {signal?: AbortSignal}) {
await new Promise(resolve => setTimeout(resolve, 5));
if (arg?.signal?.aborted) return;
yield DATA;
};

void file.save(generator, options);
});

it('should propagate async iterable errors', done => {
const options = {resumable: false};
file.createWriteStream = () => {
const writeStream = new PassThrough();
let errorCalled = false;
writeStream.on('data', data => {
assert.strictEqual(data.toString(), DATA);
});
writeStream.on('error', err => {
errorCalled = true;
assert.strictEqual(err.message, 'Error!');
});
writeStream.on('finish', () => {
assert.ok(errorCalled);
});
return writeStream;
};

const generator = async function* () {
yield DATA;
throw new Error('Error!');
};

file.save(generator(), options, (err: Error) => {
assert.strictEqual(err.message, 'Error!');
done();
});
});

it('should error on invalid async iterator data', done => {
const options = {resumable: false};
file.createWriteStream = () => {
const writeStream = new PassThrough();
let errorCalled = false;
writeStream.on('error', () => {
errorCalled = true;
});
writeStream.on('finish', () => {
assert.ok(errorCalled);
});
return writeStream;
};

const generator = async function* () {
yield {thisIsNot: 'a buffer or a string'};
};

file.save(generator(), options, (err: Error) => {
assert.strictEqual(
err.message,
'The "chunk" argument must be of type string or an instance of Buffer or Uint8Array. Received an instance of Object'
);
done();
});
});

it('buffer upload should retry on first failure', async () => {
const options = {
resumable: false,
Expand Down

0 comments on commit 36f1547

Please sign in to comment.