Skip to content

Commit

Permalink
files - write queues and atomic read support (#139059)
Browse files Browse the repository at this point in the history
* files - write queues and atomic read support

* rename `barrier` => `lock`

* cleanup
  • Loading branch information
bpasero authored Dec 15, 2021
1 parent 725225f commit 9df559f
Show file tree
Hide file tree
Showing 10 changed files with 349 additions and 66 deletions.
2 changes: 0 additions & 2 deletions src/main.js
Original file line number Diff line number Diff line change
Expand Up @@ -522,8 +522,6 @@ function getCodeCachePath() {
* @returns {Promise<string>}
*/
function mkdirp(dir) {
const fs = require('fs');

return new Promise((resolve, reject) => {
fs.mkdir(dir, { recursive: true }, err => (err && err.code !== 'EEXIST') ? reject(err) : resolve(dir));
});
Expand Down
2 changes: 1 addition & 1 deletion src/vs/base/node/pfs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ export namespace SymlinkSupport {

//#region Write File

// According to node.js docs (https://nodejs.org/docs/v6.5.0/api/fs.html#fs_fs_writefile_file_data_options_callback)
// According to node.js docs (https://nodejs.org/docs/v14.16.0/api/fs.html#fs_fs_writefile_file_data_options_callback)
// it is not safe to call writeFile() on the same path multiple times without waiting for the callback to return.
// Therefor we use a Queue on the path that is given to us to sequentialize calls to the same path properly.
const writeQueues = new ResourceQueue();
Expand Down
12 changes: 7 additions & 5 deletions src/vs/platform/files/common/diskFileSystemProviderClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import { newWriteableStream, ReadableStreamEventPayload, ReadableStreamEvents }
import { URI, UriComponents } from 'vs/base/common/uri';
import { generateUuid } from 'vs/base/common/uuid';
import { IChannel } from 'vs/base/parts/ipc/common/ipc';
import { createFileSystemProviderError, FileChangeType, FileDeleteOptions, FileOpenOptions, FileOverwriteOptions, FileReadStreamOptions, FileSystemProviderCapabilities, FileSystemProviderErrorCode, FileType, FileWriteOptions, IFileChange, IFileSystemProviderWithFileFolderCopyCapability, IFileSystemProviderWithFileReadStreamCapability, IFileSystemProviderWithFileReadWriteCapability, IFileSystemProviderWithOpenReadWriteCloseCapability, IStat, IWatchOptions } from 'vs/platform/files/common/files';
import { createFileSystemProviderError, FileAtomicReadOptions, FileChangeType, FileDeleteOptions, FileOpenOptions, FileOverwriteOptions, FileReadStreamOptions, FileSystemProviderCapabilities, FileSystemProviderErrorCode, FileType, FileWriteOptions, IFileChange, IFileSystemProviderWithFileAtomicReadCapability, IFileSystemProviderWithFileFolderCopyCapability, IFileSystemProviderWithFileReadStreamCapability, IFileSystemProviderWithFileReadWriteCapability, IFileSystemProviderWithOpenReadWriteCloseCapability, IStat, IWatchOptions } from 'vs/platform/files/common/files';

/**
* An implementation of a local disk file system provider
Expand All @@ -24,7 +24,8 @@ export class DiskFileSystemProviderClient extends Disposable implements
IFileSystemProviderWithFileReadWriteCapability,
IFileSystemProviderWithOpenReadWriteCloseCapability,
IFileSystemProviderWithFileReadStreamCapability,
IFileSystemProviderWithFileFolderCopyCapability {
IFileSystemProviderWithFileFolderCopyCapability,
IFileSystemProviderWithFileAtomicReadCapability {

constructor(
private readonly channel: IChannel,
Expand All @@ -47,7 +48,8 @@ export class DiskFileSystemProviderClient extends Disposable implements
FileSystemProviderCapabilities.FileOpenReadWriteClose |
FileSystemProviderCapabilities.FileReadStream |
FileSystemProviderCapabilities.FileFolderCopy |
FileSystemProviderCapabilities.FileWriteUnlock;
FileSystemProviderCapabilities.FileWriteUnlock |
FileSystemProviderCapabilities.FileAtomicRead;

if (this.extraCapabilities.pathCaseSensitive) {
this._capabilities |= FileSystemProviderCapabilities.PathCaseSensitive;
Expand Down Expand Up @@ -77,8 +79,8 @@ export class DiskFileSystemProviderClient extends Disposable implements

//#region File Reading/Writing

async readFile(resource: URI): Promise<Uint8Array> {
const { buffer } = await this.channel.call('readFile', [resource]) as VSBuffer;
async readFile(resource: URI, opts?: FileAtomicReadOptions): Promise<Uint8Array> {
const { buffer } = await this.channel.call('readFile', [resource, opts]) as VSBuffer;

return buffer;
}
Expand Down
21 changes: 15 additions & 6 deletions src/vs/platform/files/common/fileService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import { extUri, extUriIgnorePathCase, IExtUri, isAbsolutePath } from 'vs/base/c
import { consumeStream, isReadableBufferedStream, isReadableStream, listenStream, newWriteableStream, peekReadable, peekStream, transform } from 'vs/base/common/stream';
import { URI } from 'vs/base/common/uri';
import { localize } from 'vs/nls';
import { ensureFileSystemProviderError, etag, ETAG_DISABLED, FileChangesEvent, FileDeleteOptions, FileOperation, FileOperationError, FileOperationEvent, FileOperationResult, FilePermission, FileSystemProviderCapabilities, FileSystemProviderErrorCode, FileType, hasFileFolderCopyCapability, hasFileReadStreamCapability, hasOpenReadWriteCloseCapability, hasReadWriteCapability, ICreateFileOptions, IFileChange, IFileContent, IFileService, IFileStat, IFileStatWithMetadata, IFileStreamContent, IFileSystemProvider, IFileSystemProviderActivationEvent, IFileSystemProviderCapabilitiesChangeEvent, IFileSystemProviderRegistrationEvent, IFileSystemProviderWithFileReadStreamCapability, IFileSystemProviderWithFileReadWriteCapability, IFileSystemProviderWithOpenReadWriteCloseCapability, IRawFileChangesEvent, IReadFileOptions, IReadFileStreamOptions, IResolveFileOptions, IResolveFileResult, IResolveFileResultWithMetadata, IResolveMetadataFileOptions, IStat, IWatchOptions, IWriteFileOptions, NotModifiedSinceFileOperationError, toFileOperationResult, toFileSystemProviderErrorCode } from 'vs/platform/files/common/files';
import { ensureFileSystemProviderError, etag, ETAG_DISABLED, FileChangesEvent, FileDeleteOptions, FileOperation, FileOperationError, FileOperationEvent, FileOperationResult, FilePermission, FileSystemProviderCapabilities, FileSystemProviderErrorCode, FileType, hasFileAtomicReadCapability, hasFileFolderCopyCapability, hasFileReadStreamCapability, hasOpenReadWriteCloseCapability, hasReadWriteCapability, ICreateFileOptions, IFileChange, IFileContent, IFileService, IFileStat, IFileStatWithMetadata, IFileStreamContent, IFileSystemProvider, IFileSystemProviderActivationEvent, IFileSystemProviderCapabilitiesChangeEvent, IFileSystemProviderRegistrationEvent, IFileSystemProviderWithFileAtomicReadCapability, IFileSystemProviderWithFileReadStreamCapability, IFileSystemProviderWithFileReadWriteCapability, IFileSystemProviderWithOpenReadWriteCloseCapability, IRawFileChangesEvent, IReadFileOptions, IReadFileStreamOptions, IResolveFileOptions, IResolveFileResult, IResolveFileResultWithMetadata, IResolveMetadataFileOptions, IStat, IWatchOptions, IWriteFileOptions, NotModifiedSinceFileOperationError, toFileOperationResult, toFileSystemProviderErrorCode } from 'vs/platform/files/common/files';
import { readFileIntoStream } from 'vs/platform/files/common/io';
import { ILogService } from 'vs/platform/log/common/log';

Expand Down Expand Up @@ -486,7 +486,7 @@ export class FileService extends Disposable implements IFileService {
return this.doReadFileStream(provider, resource, options, token);
}

private async doReadFileStream(provider: IFileSystemProviderWithFileReadWriteCapability | IFileSystemProviderWithOpenReadWriteCloseCapability | IFileSystemProviderWithFileReadStreamCapability, resource: URI, options?: IReadFileStreamOptions & { preferUnbuffered?: boolean; }, token?: CancellationToken): Promise<IFileStreamContent> {
private async doReadFileStream(provider: IFileSystemProviderWithFileReadWriteCapability | IFileSystemProviderWithOpenReadWriteCloseCapability | IFileSystemProviderWithFileReadStreamCapability, resource: URI, options?: IReadFileOptions & IReadFileStreamOptions & { preferUnbuffered?: boolean; }, token?: CancellationToken): Promise<IFileStreamContent> {

// install a cancellation token that gets cancelled
// when any error occurs. this allows us to resolve
Expand Down Expand Up @@ -516,8 +516,12 @@ export class FileService extends Disposable implements IFileService {
await statPromise;
}

// read unbuffered (only if either preferred, or the provider has no buffered read capability)
if (!(hasOpenReadWriteCloseCapability(provider) || hasFileReadStreamCapability(provider)) || (hasReadWriteCapability(provider) && options?.preferUnbuffered)) {
// read unbuffered
if (
(options?.atomic && hasFileAtomicReadCapability(provider)) || // atomic reads are always unbuffered
!(hasOpenReadWriteCloseCapability(provider) || hasFileReadStreamCapability(provider)) || // provider has no buffered capability
(hasReadWriteCapability(provider) && options?.preferUnbuffered) // unbuffered read is preferred
) {
fileStream = this.readFileUnbuffered(provider, resource, options);
}

Expand Down Expand Up @@ -578,14 +582,19 @@ export class FileService extends Disposable implements IFileService {
return stream;
}

private readFileUnbuffered(provider: IFileSystemProviderWithFileReadWriteCapability, resource: URI, options?: IReadFileStreamOptions): VSBufferReadableStream {
private readFileUnbuffered(provider: IFileSystemProviderWithFileReadWriteCapability | IFileSystemProviderWithFileAtomicReadCapability, resource: URI, options?: IReadFileOptions & IReadFileStreamOptions): VSBufferReadableStream {
const stream = newWriteableStream<VSBuffer>(data => VSBuffer.concat(data));

// Read the file into the stream async but do not wait for
// this to complete because streams work via events
(async () => {
try {
let buffer = await provider.readFile(resource);
let buffer: Uint8Array;
if (options?.atomic && hasFileAtomicReadCapability(provider)) {
buffer = await provider.readFile(resource, { atomic: true });
} else {
buffer = await provider.readFile(resource);
}

// respect position option
if (typeof options?.position === 'number') {
Expand Down
40 changes: 39 additions & 1 deletion src/vs/platform/files/common/files.ts
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,26 @@ export interface FileUnlockOptions {
readonly unlock: boolean;
}

export interface FileAtomicReadOptions {

/**
* The optional `atomic` flag can be used to make sure
* the `readFile` method is not running in parallel with
* any `write` operations in the same process.
*
* Typically you should not need to use this flag but if
* for example you are quickly reading a file right after
* a file event occurred and the file changes a lot, there
* is a chance that a read returns an empty or partial file
* because a pending write has not finished yet.
*
* Note: this does not prevent the file from being written
* to from a different process. If you need such atomic
* operations, you better use a real database as storage.
*/
readonly atomic: true;
}

export interface FileReadStreamOptions {

/**
Expand Down Expand Up @@ -427,7 +447,13 @@ export const enum FileSystemProviderCapabilities {
/**
* Provider support to unlock files for writing.
*/
FileWriteUnlock = 1 << 13
FileWriteUnlock = 1 << 13,

/**
* Provider support to read files atomically. This implies the
* provider provides the `FileReadWrite` capability too.
*/
FileAtomicRead = 1 << 14
}

export interface IFileSystemProvider {
Expand Down Expand Up @@ -494,6 +520,18 @@ export function hasFileReadStreamCapability(provider: IFileSystemProvider): prov
return !!(provider.capabilities & FileSystemProviderCapabilities.FileReadStream);
}

export interface IFileSystemProviderWithFileAtomicReadCapability extends IFileSystemProvider {
readFile(resource: URI, opts?: FileAtomicReadOptions): Promise<Uint8Array>;
}

export function hasFileAtomicReadCapability(provider: IFileSystemProvider): provider is IFileSystemProviderWithFileAtomicReadCapability {
if (!hasReadWriteCapability(provider)) {
return false; // we require the `FileReadWrite` capability too
}

return !!(provider.capabilities & FileSystemProviderCapabilities.FileAtomicRead);
}

export enum FileSystemProviderErrorCode {
FileExists = 'EntryExists',
FileNotFound = 'EntryNotFound',
Expand Down
111 changes: 91 additions & 20 deletions src/vs/platform/files/node/diskFileSystemProvider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,21 @@

import * as fs from 'fs';
import { gracefulify } from 'graceful-fs';
import { retry } from 'vs/base/common/async';
import { Barrier, retry } from 'vs/base/common/async';
import { ResourceMap } from 'vs/base/common/map';
import { VSBuffer } from 'vs/base/common/buffer';
import { CancellationToken } from 'vs/base/common/cancellation';
import { Event } from 'vs/base/common/event';
import { isEqual } from 'vs/base/common/extpath';
import { IDisposable } from 'vs/base/common/lifecycle';
import { IDisposable, toDisposable } from 'vs/base/common/lifecycle';
import { basename, dirname } from 'vs/base/common/path';
import { isLinux, isWindows } from 'vs/base/common/platform';
import { joinPath } from 'vs/base/common/resources';
import { extUriBiasedIgnorePathCase, joinPath } from 'vs/base/common/resources';
import { newWriteableStream, ReadableStreamEvents } from 'vs/base/common/stream';
import { URI } from 'vs/base/common/uri';
import { IDirent, Promises, RimRafMode, SymlinkSupport } from 'vs/base/node/pfs';
import { localize } from 'vs/nls';
import { createFileSystemProviderError, FileDeleteOptions, FileOpenOptions, FileOverwriteOptions, FileReadStreamOptions, FileSystemProviderCapabilities, FileSystemProviderError, FileSystemProviderErrorCode, FileType, FileWriteOptions, IFileSystemProviderWithFileFolderCopyCapability, IFileSystemProviderWithFileReadStreamCapability, IFileSystemProviderWithFileReadWriteCapability, IFileSystemProviderWithOpenReadWriteCloseCapability, isFileOpenForWriteOptions, IStat } from 'vs/platform/files/common/files';
import { createFileSystemProviderError, FileAtomicReadOptions, FileDeleteOptions, FileOpenOptions, FileOverwriteOptions, FileReadStreamOptions, FileSystemProviderCapabilities, FileSystemProviderError, FileSystemProviderErrorCode, FileType, FileWriteOptions, IFileSystemProviderWithFileAtomicReadCapability, IFileSystemProviderWithFileFolderCopyCapability, IFileSystemProviderWithFileReadStreamCapability, IFileSystemProviderWithFileReadWriteCapability, IFileSystemProviderWithOpenReadWriteCloseCapability, isFileOpenForWriteOptions, IStat } from 'vs/platform/files/common/files';
import { readFileIntoStream } from 'vs/platform/files/common/io';
import { NodeJSFileWatcher } from 'vs/platform/files/node/watcher/nodejs/nodejsWatcher';
import { ParcelWatcherClient } from 'vs/platform/files/node/watcher/parcel/parcelWatcherClient';
Expand Down Expand Up @@ -68,7 +69,8 @@ export class DiskFileSystemProvider extends AbstractDiskFileSystemProvider imple
IFileSystemProviderWithFileReadWriteCapability,
IFileSystemProviderWithOpenReadWriteCloseCapability,
IFileSystemProviderWithFileReadStreamCapability,
IFileSystemProviderWithFileFolderCopyCapability {
IFileSystemProviderWithFileFolderCopyCapability,
IFileSystemProviderWithFileAtomicReadCapability {

constructor(
logService: ILogService,
Expand All @@ -89,7 +91,8 @@ export class DiskFileSystemProvider extends AbstractDiskFileSystemProvider imple
FileSystemProviderCapabilities.FileOpenReadWriteClose |
FileSystemProviderCapabilities.FileReadStream |
FileSystemProviderCapabilities.FileFolderCopy |
FileSystemProviderCapabilities.FileWriteUnlock;
FileSystemProviderCapabilities.FileWriteUnlock |
FileSystemProviderCapabilities.FileAtomicRead;

if (isLinux) {
this._capabilities |= FileSystemProviderCapabilities.PathCaseSensitive;
Expand Down Expand Up @@ -172,13 +175,48 @@ export class DiskFileSystemProvider extends AbstractDiskFileSystemProvider imple

//#region File Reading/Writing

async readFile(resource: URI): Promise<Uint8Array> {
private readonly resourceLocks = new ResourceMap<Barrier>(resource => extUriBiasedIgnorePathCase.getComparisonKey(resource));

private async createResourceLock(resource: URI): Promise<IDisposable> {

// Await pending locks for resource
// It is possible for a new lock being
// added right after opening, so we have
// to loop over locks until no lock remains
let existingLock: Barrier | undefined = undefined;
while (existingLock = this.resourceLocks.get(resource)) {
await existingLock.wait();
}

// Store new
const newLock = new Barrier();
this.resourceLocks.set(resource, newLock);

return toDisposable(() => {

// Delete and open lock
this.resourceLocks.delete(resource);
newLock.open();
});
}

async readFile(resource: URI, options?: FileAtomicReadOptions): Promise<Uint8Array> {
let lock: IDisposable | undefined = undefined;
try {
if (options?.atomic) {
// When the read should be atomic, make sure
// to await any pending locks for the resource
// and lock for the duration of the read.
lock = await this.createResourceLock(resource);
}

const filePath = this.toFilePath(resource);

return await Promises.readFile(filePath);
} catch (error) {
throw this.toFileSystemProviderError(error);
} finally {
lock?.dispose();
}
}

Expand Down Expand Up @@ -227,11 +265,22 @@ export class DiskFileSystemProvider extends AbstractDiskFileSystemProvider imple
}

private readonly mapHandleToPos = new Map<number, number>();
private readonly mapHandleToLock = new Map<number, IDisposable>();

private readonly writeHandles = new Map<number, URI>();
private canFlush: boolean = true;

async open(resource: URI, opts: FileOpenOptions): Promise<number> {

// Writes: guard multiple writes to the same resource
// behind a single lock to prevent races when writing
// from multiple places at the same time to the same file
let lock: IDisposable | undefined = undefined;
if (isFileOpenForWriteOptions(opts)) {
lock = await this.createResourceLock(resource);
}

let handle: number | undefined = undefined;
try {
const filePath = this.toFilePath(resource);

Expand Down Expand Up @@ -280,28 +329,41 @@ export class DiskFileSystemProvider extends AbstractDiskFileSystemProvider imple
flags = 'r';
}

const handle = await Promises.open(filePath, flags);
// Finally open handle to file path
handle = await Promises.open(filePath, flags);

// remember this handle to track file position of the handle
// we init the position to 0 since the file descriptor was
// just created and the position was not moved so far (see
// also http://man7.org/linux/man-pages/man2/open.2.html -
// "The file offset is set to the beginning of the file.")
this.mapHandleToPos.set(handle, 0);
} catch (error) {

// remember that this handle was used for writing
if (isFileOpenForWriteOptions(opts)) {
this.writeHandles.set(handle, resource);
}
// Release lock because we have no valid handle
// if we did open a lock during this operation
lock?.dispose();

return handle;
} catch (error) {
// Rethrow as file system provider error
if (isFileOpenForWriteOptions(opts)) {
throw await this.toFileSystemProviderWriteError(resource, error);
} else {
throw this.toFileSystemProviderError(error);
}
}

// remember this handle to track file position of the handle
// we init the position to 0 since the file descriptor was
// just created and the position was not moved so far (see
// also http://man7.org/linux/man-pages/man2/open.2.html -
// "The file offset is set to the beginning of the file.")
this.mapHandleToPos.set(handle, 0);

// remember that this handle was used for writing
if (isFileOpenForWriteOptions(opts)) {
this.writeHandles.set(handle, resource);
}

// remember that this handle has an associated lock
if (lock) {
this.mapHandleToLock.set(handle, lock);
}

return handle;
}

async close(fd: number): Promise<void> {
Expand All @@ -326,6 +388,15 @@ export class DiskFileSystemProvider extends AbstractDiskFileSystemProvider imple
return await Promises.close(fd);
} catch (error) {
throw this.toFileSystemProviderError(error);
} finally {

// Release any known lock for the file handle
// since the handle has been closed and is invalid
const lockForHandle = this.mapHandleToLock.get(fd);
if (lockForHandle) {
lockForHandle.dispose();
this.mapHandleToLock.delete(fd);
}
}
}

Expand Down
Loading

0 comments on commit 9df559f

Please sign in to comment.