Skip to content

Commit

Permalink
fix(client): Fix duplicate rollback issue in adapter (#1208)
Browse files Browse the repository at this point in the history
Fixes:
1) Duplicate rollbacks in interactive transactions, as a failing
statement within a transaction would first trigger a rollback from
within the `Transaction` object and then trigger a second one from the
adapter's `transaction` API (see [discord
chat](https://discord.com/channels/933657521581858818/1224163932630159461))
2) Interleaving of statements before a rollback occurs, as a failing
statement within an interactive transaction would release the adapter's
lock _before_ running `ROLLBACK` and thus lead to interleaving of
statements run.

I've added failing tests for both cases and subsequently a fix for both.

I've kept multiple lock `release` calls to maintain the performance
optimization of calling `release` _immediately_ after a commit is
successful to avoid another event loop cycle blocking other adapter
tasks, but if we believe that is a minimal improvement we could just
keep the `release` call in the `finally` clause for clarity.
  • Loading branch information
msfstef authored Apr 30, 2024
1 parent abebbaa commit 3794e2b
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 36 deletions.
5 changes: 5 additions & 0 deletions .changeset/strange-rabbits-compete.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"electric-sql": patch
---

Fix duplicate `ROLLBACK`s when using interactive transactions through the adapter's `transaction` API.
48 changes: 13 additions & 35 deletions clients/typescript/src/drivers/generic/adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,38 +56,29 @@ abstract class DatabaseAdapter
}

return new Promise<T>((resolve, reject) => {
const releaseAndReject = (err?: any) => {
// if the tx is rolled back, release the lock and reject the promise
release()
reject(err)
}

const tx = new Transaction(this, releaseAndReject)
const tx = new Transaction(this, reject)

f(tx, (res) => {
// Commit the transaction when the user sets the result.
// This assumes that the user does not execute any more queries after setting the result.
this._run({ sql: 'COMMIT' })
.then(() => {
// Release early if commit succeeded
release()
resolve(res)
})
.catch((err) => {
// Failed to commit
reject(err)
})
// Failed to commit
.catch(reject)
})
}).catch((err) => {
// something went wrong
// let's roll back
return this._run({ sql: 'ROLLBACK' })
.then(() => {
})
.catch((err) => {
// something went wrong
// let's roll back and rethrow
return this._run({ sql: 'ROLLBACK' }).then(() => {
throw err
}) // rethrow the error
.finally(() => {
release()
})
})
})
.finally(release)
}

run(stmt: Statement): Promise<RunResult> {
Expand Down Expand Up @@ -173,20 +164,6 @@ class Transaction implements Tx {
private signalFailure: (reason?: any) => void
) {}

private rollback(err: any, errorCallback?: (error: any) => void) {
const invokeErrorCallbackAndSignalFailure = () => {
if (typeof errorCallback !== 'undefined') errorCallback(err)
this.signalFailure(err)
}

this.adapter
._run({ sql: 'ROLLBACK' })
.then(() => {
invokeErrorCallbackAndSignalFailure()
})
.catch(() => invokeErrorCallbackAndSignalFailure())
}

private invokeCallback<T>(
prom: Promise<T>,
successCallback?: (tx: Transaction, result: T) => void,
Expand All @@ -197,7 +174,8 @@ class Transaction implements Tx {
if (typeof successCallback !== 'undefined') successCallback(this, res)
})
.catch((err) => {
this.rollback(err, errorCallback)
if (typeof errorCallback !== 'undefined') errorCallback(err)
this.signalFailure(err)
})
}

Expand Down
90 changes: 89 additions & 1 deletion clients/typescript/test/drivers/generic-adapters.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ test('interactive transactions work', async (t) => {
t.false(adapter.isLocked)
})

test('interactive transactions roll back if an error is thrown', async (t) => {
test('interactive transactions roll back if an error in between statements is thrown', async (t) => {
const adapter = new MockDatabaseAdapter()

const sql = 'INSERT INTO items VALUES (1);'
Expand Down Expand Up @@ -139,6 +139,94 @@ test('interactive transactions roll back if an error is thrown', async (t) => {
t.false(adapter.isLocked)
})

test('interactive transactions roll back once if an error in transaction is thrown', async (t) => {
const adapter = new MockDatabaseAdapter()

const sql = 'INSERT INTO items VALUES (1);'
const insert = { sql }

t.plan(5)

adapter.mockRun(async (stmt) => {
// First statement is `BEGIN`
t.deepEqual(stmt, { sql: 'BEGIN' })
// Next statement should be our insert
adapter.mockRun(async (stmt) => {
t.deepEqual(stmt, insert)

// Next statement should be `ROLLBACK`, only once
adapter.mockRun(async (stmt) => {
t.deepEqual(stmt, { sql: 'ROLLBACK' })
return { rowsAffected: 0 }
})

throw new Error('mocked failure')
})
return { rowsAffected: 1 }
})

await t.throwsAsync(
adapter.transaction((tx, res) => {
tx.run(insert, (_, r) => res(r))
}),
{
message: 'mocked failure',
}
)

t.false(adapter.isLocked)
})

test('interactive transactions hold lock until end of rollback', async (t) => {
const adapter = new MockDatabaseAdapter()

const sql = 'INSERT INTO items VALUES (1);'
const insert = { sql }
const nextStatement = { sql: 'NEXT STATEMENT' }

t.plan(6)

adapter.mockRun(async (stmt) => {
// First statement is `BEGIN`
t.deepEqual(stmt, { sql: 'BEGIN' })
// Next statement should be our insert
adapter.mockRun(async (stmt) => {
t.deepEqual(stmt, insert)

// Next statement should be `ROLLBACK`, only once
adapter.mockRun(async (stmt) => {
t.deepEqual(stmt, { sql: 'ROLLBACK' })

// _After_ rollback is done, then next statemnet is run
adapter.mockRun(async (stmt) => {
t.deepEqual(stmt, nextStatement)
return { rowsAffected: 0 }
})

return { rowsAffected: 0 }
})

throw new Error('mocked failure')
})
return { rowsAffected: 1 }
})

await t.throwsAsync(
async () => {
const prom1 = adapter.transaction((tx, res) => {
tx.run(insert, (_, r) => res(r))
})
const prom2 = adapter.run(nextStatement)
return Promise.all([prom1, prom2])
},
{
message: 'mocked failure',
}
)

t.false(adapter.isLocked)
})

test('interactive transactions roll back if commit fails', async (t) => {
const adapter = new MockDatabaseAdapter()

Expand Down

0 comments on commit 3794e2b

Please sign in to comment.