Skip to content

Commit

Permalink
feat: replaced custom transaction with Monitor
Browse files Browse the repository at this point in the history
  • Loading branch information
CMCDragonkai committed Jun 25, 2023
1 parent 7717f9f commit 950bfad
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 110 deletions.
10 changes: 10 additions & 0 deletions src/DB.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class DB {
public static async createDB({
dbPath,
crypto,
deadlock = false,
fs = require('fs'),
logger = new Logger(this.name),
fresh = false,
Expand All @@ -47,13 +48,15 @@ class DB {
key: Buffer;
ops: Crypto;
};
deadlock?: boolean;
fs?: FileSystem;
logger?: Logger;
fresh?: boolean;
} & DBOptions): Promise<DB> {
logger.info(`Creating ${this.name}`);
const db = new this({
dbPath,
deadlock,
fs,
logger,
});
Expand All @@ -75,6 +78,7 @@ class DB {
protected logger: Logger;
protected workerManager?: DBWorkerManagerInterface;
protected _lockBox: LockBox<RWLockWriter> = new LockBox();
protected _locksPending?: Map<string, { count: number }>;
protected _db: RocksDBDatabase;
/**
* References to iterators
Expand Down Expand Up @@ -109,15 +113,20 @@ class DB {

constructor({
dbPath,
deadlock,
fs,
logger,
}: {
dbPath: string;
deadlock: boolean;
fs: FileSystem;
logger: Logger;
}) {
this.logger = logger;
this.dbPath = dbPath;
if (deadlock) {
this._locksPending = new Map();
}
this.fs = fs;
}

Expand Down Expand Up @@ -213,6 +222,7 @@ class DB {
const tran = new DBTransaction({
db: this,
lockBox: this._lockBox,
locksPending: this._locksPending,
logger: this.logger,
});
return [
Expand Down
122 changes: 27 additions & 95 deletions src/DBTransaction.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,11 @@
import type { ResourceRelease } from '@matrixai/resources';
import type {
LockBox,
MultiLockRequest as AsyncLocksMultiLockRequest,
} from '@matrixai/async-locks';
import type { LockBox } from '@matrixai/async-locks';
import type DB from './DB';
import type {
ToString,
KeyPath,
LevelPath,
DBIteratorOptions,
DBClearOptions,
DBCountOptions,
MultiLockRequest,
} from './types';
import type {
RocksDBTransaction,
Expand All @@ -20,7 +14,12 @@ import type {
} from './native/types';
import Logger from '@matrixai/logger';
import { CreateDestroy, ready } from '@matrixai/async-init/dist/CreateDestroy';
import { Lock, RWLockWriter } from '@matrixai/async-locks';
import {
Monitor,
Lock,
RWLockWriter,
errors as asyncLocksErrors,
} from '@matrixai/async-locks';
import DBIterator from './DBIterator';
import { rocksdbP } from './native';
import * as utils from './utils';
Expand All @@ -33,15 +32,7 @@ class DBTransaction {

protected _db: DB;
protected logger: Logger;
protected lockBox: LockBox<RWLockWriter>;
protected _locks: Map<
string,
{
lock: RWLockWriter;
type: 'read' | 'write';
release: ResourceRelease;
}
> = new Map();
protected monitor: Monitor<RWLockWriter>;
protected _options: RocksDBTransactionOptions;
protected _transaction: RocksDBTransaction;
protected _snapshot: RocksDBTransactionSnapshot;
Expand All @@ -58,18 +49,20 @@ class DBTransaction {
public constructor({
db,
lockBox,
locksPending,
logger,
...options
}: {
db: DB;
lockBox: LockBox<RWLockWriter>;
locksPending?: Map<string, { count: number }>;
logger?: Logger;
} & RocksDBTransactionOptions) {
logger = logger ?? new Logger(this.constructor.name);
logger.debug(`Constructing ${this.constructor.name}`);
this.logger = logger;
this._db = db;
this.lockBox = lockBox;
this.monitor = new Monitor(lockBox, RWLockWriter, locksPending);
const options_ = {
...options,
// Transactions should be synchronous
Expand All @@ -96,9 +89,7 @@ class DBTransaction {
// this then allows the destruction to proceed
await this.commitOrRollbackLock.waitForUnlock();
this._db.transactionRefs.delete(this);
// Unlock all locked keys in reverse
const lockedKeys = [...this._locks.keys()].reverse();
await this.unlock(...lockedKeys);
await this.monitor.unlockAll();
this.logger.debug(`Destroyed ${this.constructor.name} ${this.id}`);
}

Expand Down Expand Up @@ -150,15 +141,8 @@ class DBTransaction {
return this._rollbacked;
}

get locks(): ReadonlyMap<
string,
{
lock: RWLockWriter;
type: 'read' | 'write';
release: ResourceRelease;
}
> {
return this._locks;
get locks(): Monitor<RWLockWriter>['locks'] {
return this.monitor.locks;
}

/**
Expand All @@ -168,78 +152,26 @@ class DBTransaction {
return this._iteratorRefs;
}

/**
* Lock a sequence of lock requests
* If the lock request doesn't specify, it
* defaults to using `RWLockWriter` with `write` type
* Keys are locked in string sorted order
* Even though keys can be arbitrary strings, by convention, you should use
* keys that correspond to keys in the database
* Locking with the same key is idempotent therefore lock re-entrancy is enabled
* Keys are automatically unlocked in reverse sorted order
* when the transaction is destroyed
* There is no support for lock upgrading or downgrading
* There is no deadlock detection
*/
public async lock(
...requests: Array<MultiLockRequest | ToString>
...params: Parameters<Monitor<RWLockWriter>['lock']>
): Promise<void> {
const requests_: Array<AsyncLocksMultiLockRequest<RWLockWriter>> = [];
for (const request of requests) {
if (Array.isArray(request)) {
const [key, ...lockingParams] = request;
const key_ = key.toString();
const lock = this._locks.get(key_);
// Default the lock type to `write`
const lockType = (lockingParams[0] = lockingParams[0] ?? 'write');
if (lock == null) {
requests_.push([key_, RWLockWriter, ...lockingParams]);
} else if (lock.type !== lockType) {
throw new errors.ErrorDBTransactionLockType();
}
} else {
const key_ = request.toString();
const lock = this._locks.get(key_);
if (lock == null) {
// Default to using `RWLockWriter` write lock for just string keys
requests_.push([key_, RWLockWriter, 'write']);
} else if (lock.type !== 'write') {
throw new errors.ErrorDBTransactionLockType();
}
try {
await this.monitor.lock(...params)();
} catch (e) {
if (e instanceof asyncLocksErrors.ErrorAsyncLocksMonitorLockType) {
throw new errors.ErrorDBTransactionLockType(undefined, { cause: e });
}
}
if (requests_.length > 0) {
// Duplicates are eliminated, and the returned acquisitions are sorted
const lockAcquires = this.lockBox.lockMulti(...requests_);
for (const [key, lockAcquire, ...lockingParams] of lockAcquires) {
const [lockRelease, lock] = await lockAcquire();
// The `Map` will maintain insertion order
// these must be unlocked in reverse order
// when the transaction is destroyed
this._locks.set(key as string, {
lock: lock!,
type: lockingParams[0]!, // The `type` is defaulted to `write`
release: lockRelease,
});
if (e instanceof asyncLocksErrors.ErrorAsyncLocksMonitorDeadlock) {
throw new errors.ErrorDBTransactionDeadlock(undefined, { cause: e });
}
throw e;
}
}

/**
* Unlock a sequence of lock keys
* Unlocking will be done in the order of the keys
* A transaction instance is only allowed to unlock keys that it previously
* locked, all keys that are not part of the `this._locks` is ignored
* Unlocking the same keys is idempotent
*/
public async unlock(...keys: Array<ToString>): Promise<void> {
for (const key of keys) {
const key_ = key.toString();
const lock = this._locks.get(key_);
if (lock == null) continue;
this._locks.delete(key_);
await lock.release();
}
public async unlock(
...params: Parameters<Monitor<RWLockWriter>['unlock']>
): Promise<void> {
await this.monitor.unlock(...params);
}

public async get<T>(
Expand Down
5 changes: 5 additions & 0 deletions src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ class ErrorDBTransactionLockType<T> extends ErrorDBTransaction<T> {
'DBTransaction does not support upgrading or downgrading the lock type';
}

class ErrorDBTransactionDeadlock<T> extends ErrorDBTransaction<T> {
static description = 'DBTransaction encountered a pessimistic deadlock';
}

export {
ErrorDB,
ErrorDBRunning,
Expand All @@ -109,4 +113,5 @@ export {
ErrorDBTransactionNotCommittedNorRollbacked,
ErrorDBTransactionConflict,
ErrorDBTransactionLockType,
ErrorDBTransactionDeadlock,
};
15 changes: 0 additions & 15 deletions src/types.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import type fs from 'fs';
import type { RWLockWriter } from '@matrixai/async-locks';
import type { WorkerManagerInterface } from '@matrixai/workers';
import type {
RocksDBDatabaseOptions,
Expand All @@ -17,13 +16,6 @@ import type {
*/
type POJO = { [key: string]: any };

/**
* Any type that can be turned into a string
*/
interface ToString {
toString(): string;
}

/**
* Opaque types are wrappers of existing types
* that require smart constructors
Expand Down Expand Up @@ -159,14 +151,8 @@ type DBOp =

type DBOps = Array<DBOp>;

type MultiLockRequest = [
key: ToString,
...lockingParams: Parameters<RWLockWriter['lock']>,
];

export type {
POJO,
ToString,
Opaque,
Callback,
Merge,
Expand All @@ -182,5 +168,4 @@ export type {
DBBatch,
DBOp,
DBOps,
MultiLockRequest,
};
26 changes: 26 additions & 0 deletions tests/DBTransaction.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1139,4 +1139,30 @@ describe(DBTransaction.name, () => {
});
});
});
test('deadlock detection', async () => {
const dbPath = `${dataDir}/db2`;
const db = await DB.createDB({ dbPath, crypto, deadlock: true, logger });
const barrier = await Barrier.createBarrier(2);
const results = await Promise.allSettled([
db.withTransactionF(async (tran1) => {
await tran1.lock('foo');
await barrier.wait();
await tran1.lock('bar');
}),
db.withTransactionF(async (tran2) => {
await tran2.lock('bar');
await barrier.wait();
await tran2.lock('foo');
}),
]);
expect(
results.some(
(r) =>
r.status === 'rejected' &&
r.reason instanceof errors.ErrorDBTransactionDeadlock,
),
).toBe(true);
expect(results.some((r) => r.status === 'fulfilled')).toBe(true);
await db.stop();
});
});

0 comments on commit 950bfad

Please sign in to comment.