Skip to content

Commit

Permalink
Merge branch 'fix_workqueue' into 'master'
Browse files Browse the repository at this point in the history
Fix the done method of the WorkQueue

See merge request rak-n-rok/krake!349
  • Loading branch information
jchorin committed Aug 13, 2020
2 parents 239a252 + 8117028 commit eaafaab
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 6 deletions.
54 changes: 48 additions & 6 deletions krake/krake/controller/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,17 @@ class WorkQueue(object):
loop (asyncio.AbstractEventLoop, optional): Event loop that should be
used
:attr:`dirty` holds the last known value of a key i.e. the next value which will be
given by the :meth:`get` method.
:attr:`timers` holds the current debounce coroutine for a key. Either this coroutine
is canceled (if a new value for a key is given to the WorkQueue through the
meth:`put`) or the value is added to the :attr:`dirty` dictionary.
:attr:`active` ensures that a key isn't added twice to the :attr:`queue`. Keys are
added to this set when they are first added to the :attr:`dirty` dictionary, and are
removed from the set when the Worker calls the :meth:`done` method.
Todo:
* Implement rate limiting and delays
"""
Expand All @@ -59,6 +70,16 @@ def __init__(self, maxsize=0, debounce=0, loop=None):
self.loop = loop
self.queue = asyncio.Queue(maxsize=maxsize, loop=loop)

async def _add_key_to_queue(self, key):
"""Puts the key in active and in the queue
Args:
key: Key that used to identity the value
"""
self.active.add(key)
await self.queue.put(key)

async def put(self, key, value, delay=None):
"""Put a new key-value pair into the queue.
Expand All @@ -74,19 +95,27 @@ async def put(self, key, value, delay=None):
delay = self.debounce

async def debounce():
"""Coroutine which waits for :attr:`delay` seconds before adding the value
to the :attr:`dirty` dictionary.
"""

await asyncio.sleep(delay)
await put_key()

async def put_key():
"""Actually adds the value to the :attr:`dirty` dictionary, and adds the key
to the :attr:`queue` if it's not currently being worked on by the Worker.
"""
self.dirty[key] = value

if key not in self.active:
self.active.add(key)
await self.queue.put(key)
await self._add_key_to_queue(key)

def remove_timer(_):
# Remove timer from dictionary and resolve the waiter for the
# removal.
"""Remove timer from dictionary and resolve the waiter for the removal"""

_, removed = self.timers.pop(key)
removed.set_result(None)

Expand Down Expand Up @@ -134,14 +163,27 @@ async def get(self):
return key, value

async def done(self, key):
"""Called by the Worker to notify that the work on the given key is done. This
method first removes the key from the :attr:`active` set, and then adds this key
to the set if a new value has arrived.
Args:
key: Key that used to identity the value
"""
"""

self.active.discard(key)

if key in self.dirty:
await self.queue.put(key)
await self._add_key_to_queue(key)

async def cancel(self, key):
"""Cancel the corresponding debounce coroutine for the given key
Args:
key: Key that identifies the value
"""
if key in self.timers:
timer, removed = self.timers[key]
timer.cancel()
Expand Down
18 changes: 18 additions & 0 deletions krake/tests/controller/test_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -518,3 +518,21 @@ async def test_multiple_puts(loop):

with pytest.raises(asyncio.TimeoutError):
await asyncio.wait_for(queue.get(), loop=loop, timeout=0)


async def test_active(loop):
queue = WorkQueue(loop=loop, debounce=0)

await queue.put("key", 1)
await queue.get()

await queue.put("key", 2)
await queue.done("key")

# Check that the key isn't removed from the active set when the done method is
# called while a new value is present in the dirty dictionary.
assert "key" in queue.active

# Consequently, check that the key is added only once to the queue.
await queue.put("key", 3)
assert queue.queue.qsize() == 1

0 comments on commit eaafaab

Please sign in to comment.