Skip to content

Commit

Permalink
fix: BLOCKING behaviour to not return immediately
Browse files Browse the repository at this point in the history
It needs to wait until the timeElapsed is greater or eq the defined blocking time.
The test was enhanced to spot the issue and verify the fix worked. This
uncovered another issue with the way found events are evaluated to always return
even without any event received. This problem is also fixed by this commit.
  • Loading branch information
ssterb committed Sep 15, 2023
1 parent 2c4b39f commit b5983f6
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 6 deletions.
7 changes: 5 additions & 2 deletions src/commands/xread.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,12 @@ export function xread(option, ...args) {
let timeElapsed = 0
const f = () =>
setTimeout(() => {
if (opVal > 0 && timeElapsed < opVal) return resolve(null)
if (opVal > 0 && timeElapsed >= opVal) return resolve(null)
const events = pollEvents(toPoll, 1)
if (events.length > 0) return resolve(events)
// If any stream has a value return
if (events.find(event => event[1] && event[1].length > 0)) {
return resolve(events)
}
timeElapsed += 100
return f()
}, 100)
Expand Down
61 changes: 57 additions & 4 deletions test/integration/commands/xread.js
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,64 @@ describe('xread', () => {
})
})

it('should block reads with a time out', () => {
it('should block reads on a stream with a time out', () => {
const redis = new Redis()
return redis.xread('BLOCK', '500', 'STREAMS', 'stream', '$').then(row => {
expect(row).toBe(null)
})
const before = performance.now()
return redis
.xread('BLOCK', '500', 'STREAMS', 'empty-stream', '$')
.then(row => {
const after = performance.now()
expect(after - before >= 500).toBe(true)
expect(row).toBe(null)
})
})

it('should block reads on multiple streams with a time out', () => {
const redis = new Redis()
const before = performance.now()
return redis
.xread(
'BLOCK',
'500',
'STREAMS',
'empty-stream',
'empty-stream-2',
'$',
'$'
)
.then(row => {
const after = performance.now()
expect(after - before >= 500).toBe(true)
expect(row).toBe(null)
})
})

it('should block until data is provided and return', () => {
const redis = new Redis()
const before = performance.now()

setTimeout(() => {
return redis.xadd('empty-stream-2', '*', 'key', 'val')
}, 100)

return redis
.xread(
'BLOCK',
'500',
'STREAMS',
'empty-stream',
'empty-stream-2',
'$',
'$'
)
.then(row => {
const after = performance.now()
expect(after - before >= 100).toBe(true)
expect(row).toEqual([
['empty-stream-2', [['1-0', ['key', 'val']]]],
['empty-stream', []],
])
})
})

it('should poll all events since ID if no COUNT is given', () => {
Expand Down

0 comments on commit b5983f6

Please sign in to comment.