Skip to content

Commit

Permalink
Fix connection getting timeout while idle (#1167)
Browse files Browse the repository at this point in the history
When the connection goes idle, an observer is appended. And this causes the receive timeout to start to count. This behaviour causes connections timing out while being idle in the pool.

For fixing this, the connection received methods to handle its status changing to idle and back from idle. When the connection goes idle, it should stop the timeouts and do not start any timeout until it gets busy again. 

This change also impacts the `hasOngoingRequests` method, since idle connections don't have ongoing requests.
Co-authored-by: Robsdedude <[email protected]>
  • Loading branch information
bigmontz authored Dec 27, 2023
1 parent fda596c commit ce8bc52
Show file tree
Hide file tree
Showing 8 changed files with 233 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -226,11 +226,11 @@ export default class PooledConnectionProvider extends ConnectionProvider {
}

static _installIdleObserverOnConnection (conn, observer) {
conn._queueObserver(observer)
conn._setIdle(observer)
}

static _removeIdleObserverOnConnection (conn) {
conn._updateCurrentObserver()
conn._unsetIdle()
}

_handleSecurityError (error, address, connection) {
Expand Down
27 changes: 25 additions & 2 deletions packages/bolt-connection/src/connection/connection-channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ export default class ChannelConnection extends Connection {
) {
super(errorHandler)
this._authToken = null
this._idle = false
this._reseting = false
this._resetObservers = []
this._id = idGenerator++
Expand Down Expand Up @@ -393,7 +394,26 @@ export default class ChannelConnection extends Connection {
}

/**
* This method still here because it's used by the {@link PooledConnectionProvider}
* This method is used by the {@link PooledConnectionProvider}
*
* @param {any} observer
*/
_setIdle (observer) {
this._idle = true
this._ch.stopReceiveTimeout()
this._protocol.queueObserverIfProtocolIsNotBroken(observer)
}

/**
* This method is used by the {@link PooledConnectionProvider}
*/
_unsetIdle () {
this._idle = false
this._updateCurrentObserver()
}

/**
* This method still here because of the connection-channel.tests.js
*
* @param {any} observer
*/
Expand All @@ -402,7 +422,7 @@ export default class ChannelConnection extends Connection {
}

hasOngoingObservableRequests () {
return this._protocol.hasOngoingObservableRequests()
return !this._idle && this._protocol.hasOngoingObservableRequests()
}

/**
Expand Down Expand Up @@ -500,6 +520,9 @@ export default class ChannelConnection extends Connection {
* @param {number} requestsNumber Ongoing requests number
*/
_handleOngoingRequestsNumberChange (requestsNumber) {
if (this._idle) {
return
}
if (requestsNumber === 0) {
this._ch.stopReceiveTimeout()
} else {
Expand Down
143 changes: 139 additions & 4 deletions packages/bolt-connection/test/connection/connection-channel.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ describe('ChannelConnection', () => {
})

describe('.__handleOngoingRequestsNumberChange()', () => {
it('should call channel.stopReceiveTimeout when requets number equals to 0', () => {
it('should call channel.stopReceiveTimeout when requests number equals to 0', () => {
const channel = {
stopReceiveTimeout: jest.fn().mockName('stopReceiveTimeout'),
startReceiveTimeout: jest.fn().mockName('startReceiveTimeout')
Expand All @@ -571,7 +571,7 @@ describe('ChannelConnection', () => {
expect(channel.stopReceiveTimeout).toHaveBeenCalledTimes(1)
})

it('should not call channel.startReceiveTimeout when requets number equals to 0', () => {
it('should not call channel.startReceiveTimeout when requests number equals to 0', () => {
const channel = {
stopReceiveTimeout: jest.fn().mockName('stopReceiveTimeout'),
startReceiveTimeout: jest.fn().mockName('startReceiveTimeout')
Expand All @@ -585,7 +585,7 @@ describe('ChannelConnection', () => {

it.each([
[1], [2], [3], [5], [8], [13], [3000]
])('should call channel.startReceiveTimeout when requets number equals to %d', (requests) => {
])('should call channel.startReceiveTimeout when requests number equals to %d', (requests) => {
const channel = {
stopReceiveTimeout: jest.fn().mockName('stopReceiveTimeout'),
startReceiveTimeout: jest.fn().mockName('startReceiveTimeout')
Expand All @@ -599,7 +599,7 @@ describe('ChannelConnection', () => {

it.each([
[1], [2], [3], [5], [8], [13], [3000]
])('should not call channel.stopReceiveTimeout when requets number equals to %d', (requests) => {
])('should not call channel.stopReceiveTimeout when requests number equals to %d', (requests) => {
const channel = {
stopReceiveTimeout: jest.fn().mockName('stopReceiveTimeout'),
startReceiveTimeout: jest.fn().mockName('startReceiveTimeout')
Expand All @@ -610,6 +610,68 @@ describe('ChannelConnection', () => {

expect(channel.stopReceiveTimeout).toHaveBeenCalledTimes(0)
})

it.each([
[0], [1], [2], [3], [5], [8], [13], [3000]
])('should not call channel.stopReceiveTimeout or startReceiveTimeout when requests number equals to %d and connection is idle', (requests) => {
const channel = {
stopReceiveTimeout: jest.fn().mockName('stopReceiveTimeout'),
startReceiveTimeout: jest.fn().mockName('startReceiveTimeout')
}
const protocol = {
queueObserverIfProtocolIsNotBroken: jest.fn(() => {})
}
const connection = spyOnConnectionChannel({ channel, protocolSupplier: () => protocol })
connection._setIdle({})
channel.stopReceiveTimeout.mockClear()

connection._handleOngoingRequestsNumberChange(requests)

expect(channel.stopReceiveTimeout).toHaveBeenCalledTimes(0)
expect(channel.startReceiveTimeout).toHaveBeenCalledTimes(0)
})

it.each([
[1], [2], [3], [5], [8], [13], [3000]
])('should call channel.startReceiveTimeout when requests number equals to %d and connection is not idle anymore', (requests) => {
const channel = {
stopReceiveTimeout: jest.fn().mockName('stopReceiveTimeout'),
startReceiveTimeout: jest.fn().mockName('startReceiveTimeout')
}
const protocol = {
queueObserverIfProtocolIsNotBroken: jest.fn(() => {}),
updateCurrentObserver: jest.fn(() => {})
}
const connection = spyOnConnectionChannel({ channel, protocolSupplier: () => protocol })
connection._setIdle({})
connection._unsetIdle()
channel.stopReceiveTimeout.mockClear()

connection._handleOngoingRequestsNumberChange(requests)

expect(channel.stopReceiveTimeout).toHaveBeenCalledTimes(0)
expect(channel.startReceiveTimeout).toHaveBeenCalledTimes(1)
})

it('should call channel.stopReceiveTimeout when requests number equals to 0 and connection is not idle anymore', () => {
const channel = {
stopReceiveTimeout: jest.fn().mockName('stopReceiveTimeout'),
startReceiveTimeout: jest.fn().mockName('startReceiveTimeout')
}
const protocol = {
queueObserverIfProtocolIsNotBroken: jest.fn(() => {}),
updateCurrentObserver: jest.fn(() => {})
}
const connection = spyOnConnectionChannel({ channel, protocolSupplier: () => protocol })
connection._setIdle({})
connection._unsetIdle()
channel.stopReceiveTimeout.mockClear()

connection._handleOngoingRequestsNumberChange(0)

expect(channel.stopReceiveTimeout).toHaveBeenCalledTimes(1)
expect(channel.startReceiveTimeout).toHaveBeenCalledTimes(0)
})
})

describe('.resetAndFlush()', () => {
Expand Down Expand Up @@ -1181,6 +1243,44 @@ describe('ChannelConnection', () => {
})

describe('.hasOngoingObservableRequests()', () => {
it('should return false if connection is idle', () => {
const protocol = {
hasOngoingObservableRequests: jest.fn(() => true),
queueObserverIfProtocolIsNotBroken: jest.fn(() => {})
}
const channel = {
stopReceiveTimeout: jest.fn().mockName('stopReceiveTimeout')
}

const connection = spyOnConnectionChannel({ protocolSupplier: () => protocol, channel })
connection._setIdle({})

const result = connection.hasOngoingObservableRequests()

expect(result).toBe(false)
expect(protocol.hasOngoingObservableRequests).not.toBeCalledWith()
})

it('should redirect request to the protocol when connection is not idle anymore', () => {
const protocol = {
hasOngoingObservableRequests: jest.fn(() => true),
queueObserverIfProtocolIsNotBroken: jest.fn(() => {}),
updateCurrentObserver: jest.fn(() => {})
}
const channel = {
stopReceiveTimeout: jest.fn().mockName('stopReceiveTimeout')
}

const connection = spyOnConnectionChannel({ protocolSupplier: () => protocol, channel })
connection._setIdle({})
connection._unsetIdle()

const result = connection.hasOngoingObservableRequests()

expect(result).toBe(true)
expect(protocol.hasOngoingObservableRequests).toBeCalledWith()
})

it('should call redirect request to the protocol', () => {
const protocol = {
hasOngoingObservableRequests: jest.fn(() => true)
Expand All @@ -1195,6 +1295,41 @@ describe('ChannelConnection', () => {
})
})

describe('._setIdle()', () => {
it('should stop receive timeout and enqueue observer', () => {
const protocol = {
queueObserverIfProtocolIsNotBroken: jest.fn(() => {})
}
const channel = {
stopReceiveTimeout: jest.fn().mockName('stopReceiveTimeout')
}
const observer = {
onComplete: () => {}
}

const connection = spyOnConnectionChannel({ protocolSupplier: () => protocol, channel })

connection._setIdle(observer)

expect(channel.stopReceiveTimeout).toBeCalledTimes(1)
expect(protocol.queueObserverIfProtocolIsNotBroken).toBeCalledWith(observer)
})
})

describe('._unsetIdle()', () => {
it('should update current observer', () => {
const protocol = {
updateCurrentObserver: jest.fn(() => {})
}

const connection = spyOnConnectionChannel({ protocolSupplier: () => protocol })

connection._unsetIdle()

expect(protocol.updateCurrentObserver).toBeCalledTimes(1)
})
})

function spyOnConnectionChannel ({
channel,
errorHandler,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,11 +226,11 @@ export default class PooledConnectionProvider extends ConnectionProvider {
}

static _installIdleObserverOnConnection (conn, observer) {
conn._queueObserver(observer)
conn._setIdle(observer)
}

static _removeIdleObserverOnConnection (conn) {
conn._updateCurrentObserver()
conn._unsetIdle()
}

_handleSecurityError (error, address, connection) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ export default class ChannelConnection extends Connection {
) {
super(errorHandler)
this._authToken = null
this._idle = false
this._reseting = false
this._resetObservers = []
this._id = idGenerator++
Expand Down Expand Up @@ -393,7 +394,26 @@ export default class ChannelConnection extends Connection {
}

/**
* This method still here because it's used by the {@link PooledConnectionProvider}
* This method is used by the {@link PooledConnectionProvider}
*
* @param {any} observer
*/
_setIdle (observer) {
this._idle = true
this._ch.stopReceiveTimeout()
this._protocol.queueObserverIfProtocolIsNotBroken(observer)
}

/**
* This method is used by the {@link PooledConnectionProvider}
*/
_unsetIdle () {
this._idle = false
this._updateCurrentObserver()
}

/**
* This method still here because of the connection-channel.tests.js
*
* @param {any} observer
*/
Expand All @@ -402,7 +422,7 @@ export default class ChannelConnection extends Connection {
}

hasOngoingObservableRequests () {
return this._protocol.hasOngoingObservableRequests()
return !this._idle && this._protocol.hasOngoingObservableRequests()
}

/**
Expand Down Expand Up @@ -500,6 +520,9 @@ export default class ChannelConnection extends Connection {
* @param {number} requestsNumber Ongoing requests number
*/
_handleOngoingRequestsNumberChange (requestsNumber) {
if (this._idle) {
return
}
if (requestsNumber === 0) {
this._ch.stopReceiveTimeout()
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,29 @@ describe('#unit PooledConnectionProvider', () => {
clock.uninstall()
}
})

it('_installIdleObserverOnConnection should set connection as idle', () => {
const connection = new FakeConnection()
const observer = { onCompleted: () => {} }

PooledConnectionProvider._installIdleObserverOnConnection(connection, observer)

expect(connection._idle).toBe(true)
expect(connection._idleObserver).toBe(observer)
})

it('_removeIdleObserverOnConnection should unset connection as idle', () => {
const connection = new FakeConnection()
const observer = { onCompleted: () => {} }

PooledConnectionProvider._installIdleObserverOnConnection(connection, observer)

expect(connection._idle).toBe(true)
expect(connection._idleObserver).toBe(observer)

PooledConnectionProvider._removeIdleObserverOnConnection(connection)

expect(connection._idle).toBe(false)
expect(connection._idleObserver).toBe(null)
})
})
13 changes: 12 additions & 1 deletion packages/neo4j-driver/test/internal/fake-connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ export default class FakeConnection extends Connection {
this._databaseId = null
this._requestRoutingInformationMock = null
this._creationTimestamp = Date.now()

this._idle = false
this._idleObserver = null
this.resetInvoked = 0
this.releaseInvoked = 0
this.seenQueries = []
Expand Down Expand Up @@ -101,6 +102,16 @@ export default class FakeConnection extends Connection {
return this._idleTimestamp
}

_setIdle (observer) {
this._idle = true
this._idleObserver = observer
}

_unsetIdle () {
this._idle = false
this._idleObserver = null
}

protocol () {
// return fake protocol object that simply records seen queries and parameters
return {
Expand Down
Loading

0 comments on commit ce8bc52

Please sign in to comment.