Skip to content

Commit

Permalink
Fix the done method of the WorkQueue
Browse files Browse the repository at this point in the history
Previously, keys were always removed from the active set when the done method
was called. When a key was present in the dirty dictionary, the key was added
to the queue, but not to the active set.

When a certain sequence of actions were happening, this lead to the key being
present in the queue, but not in the dirty dictionary, subsequently raising a
KeyError:

# 'dirty': {}
# 'active': set(),
# 'queue': <Queue at 0x7f8c75c50208 maxsize=0>

await queue.put("key", 1)
# 'dirty': {'key': 1}
# 'active': {'key'},
# 'queue': <Queue at 0x7f8c75c50208 maxsize=0 _queue=['key'] tasks=1>

await queue.get()
# 'dirty': {}
# 'active': {'key'},
# 'queue': <Queue at 0x7f8c75c50208 maxsize=0 tasks=1>

await queue.put("key", 2)
# 'dirty': {'key': 2}
# 'active': {'key'},
# 'queue': <Queue at 0x7f8c75c50208 maxsize=0 tasks=1>

await queue.done("key")
# 'dirty': {'key': 2}
# 'active': set(),
# 'queue': <Queue at 0x7f8c75c50208 maxsize=0 _queue=['key'] tasks=2>

# => the WorkQueue is in an invalid state with a key in queue which is not
# present in active

await queue.put("key", 3)
# 'dirty': {'key': 3}
# 'active': {'key'},
# 'queue': <Queue at 0x7f8c75c50208 maxsize=0 _queue=['key', 'key'] tasks=3>

# => the WorkQueue is in an invalid state with the key present two times in the
# queue

await queue.get()
# 'dirty': {}
# 'active': {'key'},
# 'queue': <Queue at 0x7f8c75c50208 maxsize=0 _queue=['key'] tasks=3>

await queue.done("key")
# 'dirty': {}
# 'active': set(),
# 'queue': <Queue at 0x7f8c75c50208 maxsize=0 _queue=['key'] tasks=3>

await queue.get()
# Raises KeyError: 'key'

This commit fixes this error by only removing the key from the active set if it
isn't added to the queue.
  • Loading branch information
Matthias Goerens committed Aug 11, 2020
1 parent 239a252 commit 8117028
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 6 deletions.
54 changes: 48 additions & 6 deletions krake/krake/controller/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,17 @@ class WorkQueue(object):
loop (asyncio.AbstractEventLoop, optional): Event loop that should be
used
:attr:`dirty` holds the last known value of a key i.e. the next value which will be
given by the :meth:`get` method.
:attr:`timers` holds the current debounce coroutine for a key. Either this coroutine
is canceled (if a new value for a key is given to the WorkQueue through the
meth:`put`) or the value is added to the :attr:`dirty` dictionary.
:attr:`active` ensures that a key isn't added twice to the :attr:`queue`. Keys are
added to this set when they are first added to the :attr:`dirty` dictionary, and are
removed from the set when the Worker calls the :meth:`done` method.
Todo:
* Implement rate limiting and delays
"""
Expand All @@ -59,6 +70,16 @@ def __init__(self, maxsize=0, debounce=0, loop=None):
self.loop = loop
self.queue = asyncio.Queue(maxsize=maxsize, loop=loop)

async def _add_key_to_queue(self, key):
"""Puts the key in active and in the queue
Args:
key: Key that used to identity the value
"""
self.active.add(key)
await self.queue.put(key)

async def put(self, key, value, delay=None):
"""Put a new key-value pair into the queue.
Expand All @@ -74,19 +95,27 @@ async def put(self, key, value, delay=None):
delay = self.debounce

async def debounce():
"""Coroutine which waits for :attr:`delay` seconds before adding the value
to the :attr:`dirty` dictionary.
"""

await asyncio.sleep(delay)
await put_key()

async def put_key():
"""Actually adds the value to the :attr:`dirty` dictionary, and adds the key
to the :attr:`queue` if it's not currently being worked on by the Worker.
"""
self.dirty[key] = value

if key not in self.active:
self.active.add(key)
await self.queue.put(key)
await self._add_key_to_queue(key)

def remove_timer(_):
# Remove timer from dictionary and resolve the waiter for the
# removal.
"""Remove timer from dictionary and resolve the waiter for the removal"""

_, removed = self.timers.pop(key)
removed.set_result(None)

Expand Down Expand Up @@ -134,14 +163,27 @@ async def get(self):
return key, value

async def done(self, key):
"""Called by the Worker to notify that the work on the given key is done. This
method first removes the key from the :attr:`active` set, and then adds this key
to the set if a new value has arrived.
Args:
key: Key that used to identity the value
"""
"""

self.active.discard(key)

if key in self.dirty:
await self.queue.put(key)
await self._add_key_to_queue(key)

async def cancel(self, key):
"""Cancel the corresponding debounce coroutine for the given key
Args:
key: Key that identifies the value
"""
if key in self.timers:
timer, removed = self.timers[key]
timer.cancel()
Expand Down
18 changes: 18 additions & 0 deletions krake/tests/controller/test_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -518,3 +518,21 @@ async def test_multiple_puts(loop):

with pytest.raises(asyncio.TimeoutError):
await asyncio.wait_for(queue.get(), loop=loop, timeout=0)


async def test_active(loop):
queue = WorkQueue(loop=loop, debounce=0)

await queue.put("key", 1)
await queue.get()

await queue.put("key", 2)
await queue.done("key")

# Check that the key isn't removed from the active set when the done method is
# called while a new value is present in the dirty dictionary.
assert "key" in queue.active

# Consequently, check that the key is added only once to the queue.
await queue.put("key", 3)
assert queue.queue.qsize() == 1

0 comments on commit 8117028

Please sign in to comment.