Skip to content

Commit

Permalink
Avoid Results try to pull more records after transaction fail (#1145)
Browse files Browse the repository at this point in the history
When two or more queries are running concurrent in the same transaction, a failure happenning in any of the queries implies in a broken transaction and any new message to the server should result in a error since the server state will be in failure.

Notifying all open results in the transaction about any error avoids any new message to be send by these objects.
  • Loading branch information
bigmontz authored Oct 10, 2023
1 parent 3602321 commit e26619c
Show file tree
Hide file tree
Showing 7 changed files with 89 additions and 20 deletions.
14 changes: 12 additions & 2 deletions packages/core/src/transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class Transaction {
private readonly _onError: (error: Error) => Promise<Connection | null>
private readonly _onComplete: (metadata: any, previousBookmarks?: Bookmarks) => void
private readonly _fetchSize: number
private readonly _results: any[]
private readonly _results: Result[]
private readonly _impersonatedUser?: string
private readonly _lowRecordWatermak: number
private readonly _highRecordWatermark: number
Expand Down Expand Up @@ -291,12 +291,22 @@ class Transaction {
}
}

_onErrorCallback (): Promise<Connection | null> {
_onErrorCallback (error: Error): Promise<Connection | null> {
// error will be "acknowledged" by sending a RESET message
// database will then forget about this transaction and cleanup all corresponding resources
// it is thus safe to move this transaction to a FAILED state and disallow any further interactions with it
this._state = _states.FAILED
this._onClose()
this._results.forEach(result => {
if (result.isOpen()) {
// @ts-expect-error
result._streamObserverPromise
.then(resultStreamObserver => resultStreamObserver.onError(error))
// Nothing to do since we don't have a observer to notify the error
// the result will be already broke in other ways.
.catch((_: Error) => {})
}
})

// release connection back to the pool
return this._connectionHolder.releaseConnection()
Expand Down
45 changes: 44 additions & 1 deletion packages/core/test/transaction.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@
*/

import { ConnectionProvider, newError, NotificationFilter, Transaction, TransactionPromise } from '../src'
import { BeginTransactionConfig } from '../src/connection'
import { BeginTransactionConfig, RunQueryConfig } from '../src/connection'
import { Bookmarks } from '../src/internal/bookmarks'
import { ConnectionHolder } from '../src/internal/connection-holder'
import { Logger } from '../src/internal/logger'
import { TxConfig } from '../src/internal/tx-config'
import FakeConnection from './utils/connection.fake'
import { validNotificationFilters } from './utils/notification-filters.fixtures'
import ResultStreamObserverMock from './utils/result-stream-observer.mock'

testTx('Transaction', newRegularTransaction)

Expand Down Expand Up @@ -392,6 +393,48 @@ function testTx<T extends Transaction> (transactionName: string, newTransaction:
})
)
})

it('should cascade errors in a result to other open results', async () => {
const connection = newFakeConnection()
const expectedError = newError('Something right is not wrong, wut?')
const tx = newTransaction({
connection,
fetchSize: 1000,
highRecordWatermark: 700,
lowRecordWatermark: 300
})

const observers: ResultStreamObserverMock[] = []

jest.spyOn(connection, 'run')
.mockImplementation((query: string, parameters: any, config: RunQueryConfig) => {
const steamObserver = new ResultStreamObserverMock({
beforeError: config.beforeError,
afterComplete: config.afterComplete
})
if (query === 'should error') {
steamObserver.onError(expectedError)
} else if (query === 'finished result') {
steamObserver.onCompleted({})
}

observers.push(steamObserver)

return steamObserver
})

tx._begin(async () => Bookmarks.empty(), TxConfig.empty())

const nonConsumedResult = tx.run('RETURN 1')
await tx.run('finished result')
const brokenResult = tx.run('should error')

await expect(brokenResult).rejects.toThrowError(expectedError)
await expect(nonConsumedResult).rejects.toThrowError(expectedError)
expect(observers[0].error).toEqual(expectedError)
expect(observers[1].error).not.toBeDefined()
expect(observers[2].error).toEqual(expectedError)
})
})

describe('.close()', () => {
Expand Down
17 changes: 16 additions & 1 deletion packages/core/test/utils/result-stream-observer.mock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,18 @@ export default class ResultStreamObserverMock implements observer.ResultStreamOb
private readonly _observers: ResultObserver[]
private _error?: Error
private _meta?: any
private readonly _beforeError?: (error: Error) => void
private readonly _afterComplete?: (metadata: any) => void

constructor () {
constructor (observers?: { beforeError?: (error: Error) => void, afterComplete?: (metadata: any) => void }) {
this._queuedRecords = []
this._observers = []
this._beforeError = observers?.beforeError
this._afterComplete = observers?.afterComplete
}

get error (): Error | undefined {
return this._error
}

cancel (): void {}
Expand Down Expand Up @@ -88,6 +96,9 @@ export default class ResultStreamObserverMock implements observer.ResultStreamOb

onError (error: Error): void {
this._error = error
if (this._beforeError != null) {
this._beforeError(error)
}
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this._observers.filter(o => o.onError).forEach(o => o.onError!(error))
}
Expand All @@ -98,6 +109,10 @@ export default class ResultStreamObserverMock implements observer.ResultStreamOb
.filter(o => o.onCompleted)
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
.forEach(o => o.onCompleted!(meta))

if (this._afterComplete != null) {
this._afterComplete(meta)
}
}

pause (): void {
Expand Down
14 changes: 12 additions & 2 deletions packages/neo4j-driver-deno/lib/core/transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class Transaction {
private readonly _onError: (error: Error) => Promise<Connection | null>
private readonly _onComplete: (metadata: any, previousBookmarks?: Bookmarks) => void
private readonly _fetchSize: number
private readonly _results: any[]
private readonly _results: Result[]
private readonly _impersonatedUser?: string
private readonly _lowRecordWatermak: number
private readonly _highRecordWatermark: number
Expand Down Expand Up @@ -291,12 +291,22 @@ class Transaction {
}
}

_onErrorCallback (): Promise<Connection | null> {
_onErrorCallback (error: Error): Promise<Connection | null> {
// error will be "acknowledged" by sending a RESET message
// database will then forget about this transaction and cleanup all corresponding resources
// it is thus safe to move this transaction to a FAILED state and disallow any further interactions with it
this._state = _states.FAILED
this._onClose()
this._results.forEach(result => {
if (result.isOpen()) {
// @ts-expect-error
result._streamObserverPromise
.then(resultStreamObserver => resultStreamObserver.onError(error))
// Nothing to do since we don't have a observer to notify the error
// the result will be already broke in other ways.
.catch((_: Error) => {})
}
})

// release connection back to the pool
return this._connectionHolder.releaseConnection()
Expand Down
1 change: 1 addition & 0 deletions packages/testkit-backend/deno/controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ function newWire(context: Context, reply: Reply): Wire {
name: "DriverError",
data: {
id,
errorType: e.name,
msg: e.message,
// @ts-ignore Code Neo4jError does have code
code: e.code,
Expand Down
1 change: 1 addition & 0 deletions packages/testkit-backend/src/controller/local.js
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ export default class LocalController extends Controller {
const id = this._contexts.get(contextId).addError(e)
this._writeResponse(contextId, newResponse('DriverError', {
id,
errorType: e.name,
msg: e.message,
code: e.code,
retryable: e.retriable
Expand Down
17 changes: 3 additions & 14 deletions packages/testkit-backend/src/skipped-tests/common.js
Original file line number Diff line number Diff line change
@@ -1,17 +1,6 @@
import skip, { ifEquals, ifEndsWith, endsWith, ifStartsWith, startsWith, not, or } from './skip.js'

const skippedTests = [
skip(
"Fixme: transactions don't prevent further actions after failure.",
ifEquals('stub.tx_run.test_tx_run.TestTxRun.test_should_prevent_discard_after_tx_termination_on_run'),
ifEquals('stub.tx_run.test_tx_run.TestTxRun.test_should_prevent_pull_after_tx_termination_on_pull'),
ifEquals('stub.tx_run.test_tx_run.TestTxRun.test_should_prevent_pull_after_tx_termination_on_run'),
ifEquals('stub.tx_run.test_tx_run.TestTxRun.test_should_prevent_run_after_tx_termination_on_run'),
ifEquals('stub.tx_run.test_tx_run.TestTxRun.test_should_prevent_run_after_tx_termination_on_pull'),
ifEquals('stub.tx_run.test_tx_run.TestTxRun.test_should_prevent_rollback_message_after_tx_termination'),
ifEquals('stub.tx_run.test_tx_run.TestTxRun.test_should_prevent_commit_after_tx_termination'),
ifEquals('stub.configuration_hints.test_connection_recv_timeout_seconds.TestDirectConnectionRecvTimeout.test_timeout_unmanaged_tx_should_fail_subsequent_usage_after_timeout')
),
skip(
'Driver does not return offset for old DateTime implementations',
ifStartsWith('stub.types.test_temporal_types.TestTemporalTypes')
Expand All @@ -28,7 +17,7 @@ const skippedTests = [
ifEquals('neo4j.datatypes.test_temporal_types.TestDataTypes.test_duration_components')
),
skip(
'Testkit implemenation is deprecated',
'Testkit implementation is deprecated',
ifEquals('stub.basic_query.test_basic_query.TestBasicQuery.test_5x0_populates_node_only_element_id'),
ifEquals('stub.basic_query.test_basic_query.TestBasicQuery.test_5x0_populates_path_element_ids_with_only_string'),
ifEquals('stub.basic_query.test_basic_query.TestBasicQuery.test_5x0_populates_rel_only_element_id')
Expand All @@ -38,7 +27,7 @@ const skippedTests = [
ifEndsWith('neo4j.test_summary.TestSummary.test_protocol_version_information')
),
skip(
'Handle qid omission optmization can cause issues in nested queries',
'Handle qid omission optimization can cause issues in nested queries',
ifEquals('stub.optimizations.test_optimizations.TestOptimizations.test_uses_implicit_default_arguments'),
ifEquals('stub.optimizations.test_optimizations.TestOptimizations.test_uses_implicit_default_arguments_multi_query'),
ifEquals('stub.optimizations.test_optimizations.TestOptimizations.test_uses_implicit_default_arguments_multi_query_nested')
Expand Down Expand Up @@ -81,7 +70,7 @@ const skippedTests = [
ifEquals('stub.iteration.test_iteration_session_run.TestIterationSessionRun.test_nested')
),
skip(
'Nested calls does not garauntee order in the records pulling',
'Nested calls does not guarantee order in the records pulling',
ifEquals('stub.iteration.test_iteration_tx_run.TestIterationTxRun.test_nested')
),
skip(
Expand Down

0 comments on commit e26619c

Please sign in to comment.