diff --git a/krake/krake/controller/__init__.py b/krake/krake/controller/__init__.py index 880d70d9..45d62066 100644 --- a/krake/krake/controller/__init__.py +++ b/krake/krake/controller/__init__.py @@ -44,6 +44,17 @@ class WorkQueue(object): loop (asyncio.AbstractEventLoop, optional): Event loop that should be used + :attr:`dirty` holds the last known value of a key i.e. the next value which will be + given by the :meth:`get` method. + + :attr:`timers` holds the current debounce coroutine for a key. Either this coroutine + is canceled (if a new value for a key is given to the WorkQueue through the + meth:`put`) or the value is added to the :attr:`dirty` dictionary. + + :attr:`active` ensures that a key isn't added twice to the :attr:`queue`. Keys are + added to this set when they are first added to the :attr:`dirty` dictionary, and are + removed from the set when the Worker calls the :meth:`done` method. + Todo: * Implement rate limiting and delays """ @@ -59,6 +70,16 @@ def __init__(self, maxsize=0, debounce=0, loop=None): self.loop = loop self.queue = asyncio.Queue(maxsize=maxsize, loop=loop) + async def _add_key_to_queue(self, key): + """Puts the key in active and in the queue + + Args: + key: Key that used to identity the value + + """ + self.active.add(key) + await self.queue.put(key) + async def put(self, key, value, delay=None): """Put a new key-value pair into the queue. @@ -74,19 +95,27 @@ async def put(self, key, value, delay=None): delay = self.debounce async def debounce(): + """Coroutine which waits for :attr:`delay` seconds before adding the value + to the :attr:`dirty` dictionary. + + """ + await asyncio.sleep(delay) await put_key() async def put_key(): + """Actually adds the value to the :attr:`dirty` dictionary, and adds the key + to the :attr:`queue` if it's not currently being worked on by the Worker. + + """ self.dirty[key] = value if key not in self.active: - self.active.add(key) - await self.queue.put(key) + await self._add_key_to_queue(key) def remove_timer(_): - # Remove timer from dictionary and resolve the waiter for the - # removal. + """Remove timer from dictionary and resolve the waiter for the removal""" + _, removed = self.timers.pop(key) removed.set_result(None) @@ -134,14 +163,27 @@ async def get(self): return key, value async def done(self, key): + """Called by the Worker to notify that the work on the given key is done. This + method first removes the key from the :attr:`active` set, and then adds this key + to the set if a new value has arrived. + + Args: + key: Key that used to identity the value + """ - """ + self.active.discard(key) if key in self.dirty: - await self.queue.put(key) + await self._add_key_to_queue(key) async def cancel(self, key): + """Cancel the corresponding debounce coroutine for the given key + + Args: + key: Key that identifies the value + + """ if key in self.timers: timer, removed = self.timers[key] timer.cancel() diff --git a/krake/tests/controller/test_controller.py b/krake/tests/controller/test_controller.py index b82579ee..5f69e46a 100644 --- a/krake/tests/controller/test_controller.py +++ b/krake/tests/controller/test_controller.py @@ -518,3 +518,21 @@ async def test_multiple_puts(loop): with pytest.raises(asyncio.TimeoutError): await asyncio.wait_for(queue.get(), loop=loop, timeout=0) + + +async def test_active(loop): + queue = WorkQueue(loop=loop, debounce=0) + + await queue.put("key", 1) + await queue.get() + + await queue.put("key", 2) + await queue.done("key") + + # Check that the key isn't removed from the active set when the done method is + # called while a new value is present in the dirty dictionary. + assert "key" in queue.active + + # Consequently, check that the key is added only once to the queue. + await queue.put("key", 3) + assert queue.queue.qsize() == 1