Skip to content

Commit

Permalink
feat(file-storage): emit 'done' event once file is fully written
Browse files Browse the repository at this point in the history
  • Loading branch information
getlarge committed Oct 11, 2023
1 parent 6eed261 commit f8b1a02
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 9 deletions.
10 changes: 7 additions & 3 deletions packages/file-storage/src/file-storage-fs.class.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import {
} from 'fs';
import { readdir, readFile, rm, writeFile } from 'node:fs/promises';
import { resolve as resolvePath } from 'node:path';
import { Readable, Writable } from 'node:stream';
import { Readable } from 'node:stream';

import { MethodTypes } from './constants';
import {
Expand All @@ -23,6 +23,7 @@ import {
FileStorageConfigFactory,
FileStorageDirBaseArgs,
} from './file-storage.class';
import { FileStorageWritable } from './types';

export type StreamOptions = {
flags?: string;
Expand Down Expand Up @@ -115,10 +116,13 @@ export class FileStorageLocal implements FileStorage {
return writeFile(fileName, content, options);
}

async uploadStream(args: FileStorageLocalUploadStream): Promise<Writable> {
async uploadStream(args: FileStorageLocalUploadStream): Promise<FileStorageWritable> {
const { filePath, options, request } = args;
const fileName = await this.transformFilePath(filePath, MethodTypes.WRITE, request, options);
return createWriteStream(fileName, options);
const writeStream = createWriteStream(fileName, options);
writeStream.prependOnceListener('finish', () => writeStream.emit('done'));
writeStream.prependOnceListener('error', (err) => writeStream.emit('done', err));
return writeStream;
}

downloadFile(args: {
Expand Down
10 changes: 7 additions & 3 deletions packages/file-storage/src/file-storage-s3.class.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import {
} from '@aws-sdk/client-s3';
import { Upload } from '@aws-sdk/lib-storage';
import type { Request } from 'express';
import { PassThrough, Readable, Writable } from 'node:stream';
import { PassThrough, Readable } from 'node:stream';

import { MethodTypes } from './constants';
import {
Expand All @@ -19,6 +19,7 @@ import {
FileStorageConfigFactory,
FileStorageDirBaseArgs,
} from './file-storage.class';
import { FileStorageWritable } from './types';

/**
* Either region or endpoint must be provided
Expand Down Expand Up @@ -140,7 +141,7 @@ export class FileStorageS3 implements FileStorage {
}).done();
}

async uploadStream(args: FileStorageS3UploadStream): Promise<Writable> {
async uploadStream(args: FileStorageS3UploadStream): Promise<FileStorageWritable> {
const { filePath, options = {}, request } = args;
const Key = await this.transformFilePath(filePath, MethodTypes.WRITE, request, options);
const { s3, bucket: Bucket } = this.config;
Expand All @@ -155,8 +156,11 @@ export class FileStorageS3 implements FileStorage {
},
})
.done()
.then(() => {
writeStream.emit('done');
})
.catch((err) => {
writeStream.destroy(err);
writeStream.emit('done', err);
});
return writeStream;
}
Expand Down
7 changes: 4 additions & 3 deletions packages/file-storage/test/file-storage.e2e-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { Test, TestingModule } from '@nestjs/testing';
import * as dotenv from 'dotenv';
import { mkdir, rm } from 'node:fs/promises';
import { resolve } from 'node:path';
import { Readable } from 'node:stream';
import { once, Readable } from 'node:stream';
import { pipeline } from 'node:stream/promises';

import { FileStorage, FileStorageModule, FileStorageModuleOptions, StorageType } from '../src';
Expand Down Expand Up @@ -111,8 +111,9 @@ testMap.forEach((testSuite) => {
const upload = await fileStorage.uploadStream({ filePath: testFileName });
const entry = Readable.from(Buffer.from(testFileContent));
await pipeline(entry, upload);
// add delay, otherwise test is flaky
await new Promise<void>((resolve) => setTimeout(resolve, 100));
const ac = new AbortController();
const t = setTimeout(() => ac.abort(), 200);
await once(upload, 'done', { signal: ac.signal }).finally(() => clearTimeout(t));
const result = await fileStorage.readDir({ dirPath });
expect(result.length).toBe(1);
});
Expand Down

0 comments on commit f8b1a02

Please sign in to comment.