From 950bfad48bb86fca766ab301276db0a0e5884131 Mon Sep 17 00:00:00 2001 From: Roger Qiu Date: Sun, 25 Jun 2023 16:39:31 +1000 Subject: [PATCH] feat: replaced custom transaction with `Monitor` --- src/DB.ts | 10 +++ src/DBTransaction.ts | 122 ++++++++---------------------------- src/errors.ts | 5 ++ src/types.ts | 15 ----- tests/DBTransaction.test.ts | 26 ++++++++ 5 files changed, 68 insertions(+), 110 deletions(-) diff --git a/src/DB.ts b/src/DB.ts index cb9fadd9..9d307b6e 100644 --- a/src/DB.ts +++ b/src/DB.ts @@ -37,6 +37,7 @@ class DB { public static async createDB({ dbPath, crypto, + deadlock = false, fs = require('fs'), logger = new Logger(this.name), fresh = false, @@ -47,6 +48,7 @@ class DB { key: Buffer; ops: Crypto; }; + deadlock?: boolean; fs?: FileSystem; logger?: Logger; fresh?: boolean; @@ -54,6 +56,7 @@ class DB { logger.info(`Creating ${this.name}`); const db = new this({ dbPath, + deadlock, fs, logger, }); @@ -75,6 +78,7 @@ class DB { protected logger: Logger; protected workerManager?: DBWorkerManagerInterface; protected _lockBox: LockBox = new LockBox(); + protected _locksPending?: Map; protected _db: RocksDBDatabase; /** * References to iterators @@ -109,15 +113,20 @@ class DB { constructor({ dbPath, + deadlock, fs, logger, }: { dbPath: string; + deadlock: boolean; fs: FileSystem; logger: Logger; }) { this.logger = logger; this.dbPath = dbPath; + if (deadlock) { + this._locksPending = new Map(); + } this.fs = fs; } @@ -213,6 +222,7 @@ class DB { const tran = new DBTransaction({ db: this, lockBox: this._lockBox, + locksPending: this._locksPending, logger: this.logger, }); return [ diff --git a/src/DBTransaction.ts b/src/DBTransaction.ts index c7b3e309..26797c30 100644 --- a/src/DBTransaction.ts +++ b/src/DBTransaction.ts @@ -1,17 +1,11 @@ -import type { ResourceRelease } from '@matrixai/resources'; -import type { - LockBox, - MultiLockRequest as AsyncLocksMultiLockRequest, -} from '@matrixai/async-locks'; +import type { LockBox } from '@matrixai/async-locks'; import type DB from './DB'; import type { - ToString, KeyPath, LevelPath, DBIteratorOptions, DBClearOptions, DBCountOptions, - MultiLockRequest, } from './types'; import type { RocksDBTransaction, @@ -20,7 +14,12 @@ import type { } from './native/types'; import Logger from '@matrixai/logger'; import { CreateDestroy, ready } from '@matrixai/async-init/dist/CreateDestroy'; -import { Lock, RWLockWriter } from '@matrixai/async-locks'; +import { + Monitor, + Lock, + RWLockWriter, + errors as asyncLocksErrors, +} from '@matrixai/async-locks'; import DBIterator from './DBIterator'; import { rocksdbP } from './native'; import * as utils from './utils'; @@ -33,15 +32,7 @@ class DBTransaction { protected _db: DB; protected logger: Logger; - protected lockBox: LockBox; - protected _locks: Map< - string, - { - lock: RWLockWriter; - type: 'read' | 'write'; - release: ResourceRelease; - } - > = new Map(); + protected monitor: Monitor; protected _options: RocksDBTransactionOptions; protected _transaction: RocksDBTransaction; protected _snapshot: RocksDBTransactionSnapshot; @@ -58,18 +49,20 @@ class DBTransaction { public constructor({ db, lockBox, + locksPending, logger, ...options }: { db: DB; lockBox: LockBox; + locksPending?: Map; logger?: Logger; } & RocksDBTransactionOptions) { logger = logger ?? new Logger(this.constructor.name); logger.debug(`Constructing ${this.constructor.name}`); this.logger = logger; this._db = db; - this.lockBox = lockBox; + this.monitor = new Monitor(lockBox, RWLockWriter, locksPending); const options_ = { ...options, // Transactions should be synchronous @@ -96,9 +89,7 @@ class DBTransaction { // this then allows the destruction to proceed await this.commitOrRollbackLock.waitForUnlock(); this._db.transactionRefs.delete(this); - // Unlock all locked keys in reverse - const lockedKeys = [...this._locks.keys()].reverse(); - await this.unlock(...lockedKeys); + await this.monitor.unlockAll(); this.logger.debug(`Destroyed ${this.constructor.name} ${this.id}`); } @@ -150,15 +141,8 @@ class DBTransaction { return this._rollbacked; } - get locks(): ReadonlyMap< - string, - { - lock: RWLockWriter; - type: 'read' | 'write'; - release: ResourceRelease; - } - > { - return this._locks; + get locks(): Monitor['locks'] { + return this.monitor.locks; } /** @@ -168,78 +152,26 @@ class DBTransaction { return this._iteratorRefs; } - /** - * Lock a sequence of lock requests - * If the lock request doesn't specify, it - * defaults to using `RWLockWriter` with `write` type - * Keys are locked in string sorted order - * Even though keys can be arbitrary strings, by convention, you should use - * keys that correspond to keys in the database - * Locking with the same key is idempotent therefore lock re-entrancy is enabled - * Keys are automatically unlocked in reverse sorted order - * when the transaction is destroyed - * There is no support for lock upgrading or downgrading - * There is no deadlock detection - */ public async lock( - ...requests: Array + ...params: Parameters['lock']> ): Promise { - const requests_: Array> = []; - for (const request of requests) { - if (Array.isArray(request)) { - const [key, ...lockingParams] = request; - const key_ = key.toString(); - const lock = this._locks.get(key_); - // Default the lock type to `write` - const lockType = (lockingParams[0] = lockingParams[0] ?? 'write'); - if (lock == null) { - requests_.push([key_, RWLockWriter, ...lockingParams]); - } else if (lock.type !== lockType) { - throw new errors.ErrorDBTransactionLockType(); - } - } else { - const key_ = request.toString(); - const lock = this._locks.get(key_); - if (lock == null) { - // Default to using `RWLockWriter` write lock for just string keys - requests_.push([key_, RWLockWriter, 'write']); - } else if (lock.type !== 'write') { - throw new errors.ErrorDBTransactionLockType(); - } + try { + await this.monitor.lock(...params)(); + } catch (e) { + if (e instanceof asyncLocksErrors.ErrorAsyncLocksMonitorLockType) { + throw new errors.ErrorDBTransactionLockType(undefined, { cause: e }); } - } - if (requests_.length > 0) { - // Duplicates are eliminated, and the returned acquisitions are sorted - const lockAcquires = this.lockBox.lockMulti(...requests_); - for (const [key, lockAcquire, ...lockingParams] of lockAcquires) { - const [lockRelease, lock] = await lockAcquire(); - // The `Map` will maintain insertion order - // these must be unlocked in reverse order - // when the transaction is destroyed - this._locks.set(key as string, { - lock: lock!, - type: lockingParams[0]!, // The `type` is defaulted to `write` - release: lockRelease, - }); + if (e instanceof asyncLocksErrors.ErrorAsyncLocksMonitorDeadlock) { + throw new errors.ErrorDBTransactionDeadlock(undefined, { cause: e }); } + throw e; } } - /** - * Unlock a sequence of lock keys - * Unlocking will be done in the order of the keys - * A transaction instance is only allowed to unlock keys that it previously - * locked, all keys that are not part of the `this._locks` is ignored - * Unlocking the same keys is idempotent - */ - public async unlock(...keys: Array): Promise { - for (const key of keys) { - const key_ = key.toString(); - const lock = this._locks.get(key_); - if (lock == null) continue; - this._locks.delete(key_); - await lock.release(); - } + public async unlock( + ...params: Parameters['unlock']> + ): Promise { + await this.monitor.unlock(...params); } public async get( diff --git a/src/errors.ts b/src/errors.ts index 52f3a8aa..e676e332 100644 --- a/src/errors.ts +++ b/src/errors.ts @@ -87,6 +87,10 @@ class ErrorDBTransactionLockType extends ErrorDBTransaction { 'DBTransaction does not support upgrading or downgrading the lock type'; } +class ErrorDBTransactionDeadlock extends ErrorDBTransaction { + static description = 'DBTransaction encountered a pessimistic deadlock'; +} + export { ErrorDB, ErrorDBRunning, @@ -109,4 +113,5 @@ export { ErrorDBTransactionNotCommittedNorRollbacked, ErrorDBTransactionConflict, ErrorDBTransactionLockType, + ErrorDBTransactionDeadlock, }; diff --git a/src/types.ts b/src/types.ts index ad700ec6..fc5839fc 100644 --- a/src/types.ts +++ b/src/types.ts @@ -1,5 +1,4 @@ import type fs from 'fs'; -import type { RWLockWriter } from '@matrixai/async-locks'; import type { WorkerManagerInterface } from '@matrixai/workers'; import type { RocksDBDatabaseOptions, @@ -17,13 +16,6 @@ import type { */ type POJO = { [key: string]: any }; -/** - * Any type that can be turned into a string - */ -interface ToString { - toString(): string; -} - /** * Opaque types are wrappers of existing types * that require smart constructors @@ -159,14 +151,8 @@ type DBOp = type DBOps = Array; -type MultiLockRequest = [ - key: ToString, - ...lockingParams: Parameters, -]; - export type { POJO, - ToString, Opaque, Callback, Merge, @@ -182,5 +168,4 @@ export type { DBBatch, DBOp, DBOps, - MultiLockRequest, }; diff --git a/tests/DBTransaction.test.ts b/tests/DBTransaction.test.ts index fb702d77..c2d84107 100644 --- a/tests/DBTransaction.test.ts +++ b/tests/DBTransaction.test.ts @@ -1139,4 +1139,30 @@ describe(DBTransaction.name, () => { }); }); }); + test('deadlock detection', async () => { + const dbPath = `${dataDir}/db2`; + const db = await DB.createDB({ dbPath, crypto, deadlock: true, logger }); + const barrier = await Barrier.createBarrier(2); + const results = await Promise.allSettled([ + db.withTransactionF(async (tran1) => { + await tran1.lock('foo'); + await barrier.wait(); + await tran1.lock('bar'); + }), + db.withTransactionF(async (tran2) => { + await tran2.lock('bar'); + await barrier.wait(); + await tran2.lock('foo'); + }), + ]); + expect( + results.some( + (r) => + r.status === 'rejected' && + r.reason instanceof errors.ErrorDBTransactionDeadlock, + ), + ).toBe(true); + expect(results.some((r) => r.status === 'fulfilled')).toBe(true); + await db.stop(); + }); });