From a44fb6b433b099b1545f0bf4de3fd1705fb8b6e4 Mon Sep 17 00:00:00 2001 From: nicor88 <6278547+nicor88@users.noreply.github.com> Date: Fri, 17 Dec 2021 21:10:07 +0100 Subject: [PATCH] Take with timestamp (#242) * add full_take * remove processor remove * try to add kafka_timestamp * tweaks * add * add extra param * add offset * remove offset in this pr * fix formatting * add functional tests * add another unit test and edge case handling * change buffer type... Co-authored-by: Vikram Patki <54442035+patkivikram@users.noreply.github.com> --- faust/streams.py | 99 ++++++++++++++++++++++++++ tests/functional/test_streams.py | 117 +++++++++++++++++++++++++++++++ 2 files changed, 216 insertions(+) diff --git a/faust/streams.py b/faust/streams.py index dfbc2a3a9..99855104c 100644 --- a/faust/streams.py +++ b/faust/streams.py @@ -392,6 +392,105 @@ async def add_to_buffer(value: T) -> T: self.enable_acks = stream_enable_acks self._processors.remove(add_to_buffer) + async def take_with_timestamp( + self, max_: int, within: Seconds, timestamp_field_name: str + ) -> AsyncIterable[Sequence[T_co]]: + """Buffer n values at a time and yield a list of buffered values with the timestamp + when the message was added to kafka. + + Arguments: + max_: Max number of messages to receive. When more than this + number of messages are received within the specified number of + seconds then we flush the buffer immediately. + within: Timeout for when we give up waiting for another value, + and process the values we have. + Warning: If there's no timeout (i.e. `timeout=None`), + the agent is likely to stall and block buffered events for an + unreasonable length of time(!). + timestamp_field_name: the name of the field containing kafka timestamp, + that is going to be added to the value + """ + buffer: List[T_co] = [] + events: List[EventT] = [] + buffer_add = buffer.append + event_add = events.append + buffer_size = buffer.__len__ + buffer_full = asyncio.Event(loop=self.loop) + buffer_consumed = asyncio.Event(loop=self.loop) + timeout = want_seconds(within) if within else None + stream_enable_acks: bool = self.enable_acks + + buffer_consuming: Optional[asyncio.Future] = None + + channel_it = aiter(self.channel) + + # We add this processor to populate the buffer, and the stream + # is passively consumed in the background (enable_passive below). + async def add_to_buffer(value: T) -> T: + try: + # buffer_consuming is set when consuming buffer after timeout. + nonlocal buffer_consuming + if buffer_consuming is not None: + try: + await buffer_consuming + finally: + buffer_consuming = None + event = self.current_event + if isinstance(value, dict) and timestamp_field_name: + value[timestamp_field_name] = event.message.timestamp + buffer_add(value) + if event is None: + raise RuntimeError("Take buffer found current_event is None") + event_add(event) + if buffer_size() >= max_: + # signal that the buffer is full and should be emptied. + buffer_full.set() + # strict wait for buffer to be consumed after buffer full. + # If max is 1000, we are not allowed to return 1001 values. + buffer_consumed.clear() + await self.wait(buffer_consumed) + except CancelledError: # pragma: no cover + raise + except Exception as exc: + self.log.exception("Error adding to take buffer: %r", exc) + await self.crash(exc) + return value + + # Disable acks to ensure this method acks manually + # events only after they are consumed by the user + self.enable_acks = False + + self.add_processor(add_to_buffer) + self._enable_passive(cast(ChannelT, channel_it)) + try: + while not self.should_stop: + # wait until buffer full, or timeout + await self.wait_for_stopped(buffer_full, timeout=timeout) + if buffer: + # make sure background thread does not add new items to + # buffer while we read. + buffer_consuming = self.loop.create_future() + try: + yield list(buffer) + finally: + buffer.clear() + for event in events: + await self.ack(event) + events.clear() + # allow writing to buffer again + notify(buffer_consuming) + buffer_full.clear() + buffer_consumed.set() + else: # pragma: no cover + pass + else: # pragma: no cover + pass + + finally: + # Restore last behaviour of "enable_acks" + self.enable_acks = stream_enable_acks + self._processors.remove(add_to_buffer) + def enumerate(self, start: int = 0) -> AsyncIterable[Tuple[int, T_co]]: """Enumerate values received on this stream. diff --git a/tests/functional/test_streams.py b/tests/functional/test_streams.py index cea34e777..84ae325e8 100644 --- a/tests/functional/test_streams.py +++ b/tests/functional/test_streams.py @@ -777,3 +777,120 @@ def current_event(self, event): assert isinstance(s._crash_reason, RuntimeError) print("RETURNING") assert s.enable_acks is True + + +@pytest.mark.asyncio +async def test_take_wit_timestamp(app): + async with new_stream(app) as s: + assert s.enable_acks is True + await s.channel.send(value={"id": 1}) + event = None + async for value in s.take_with_timestamp( + 1, within=1, timestamp_field_name="test_timestamp" + ): + assert "test_timestamp" in value[0].keys() + assert isinstance(value[0]["test_timestamp"], float) + assert s.enable_acks is False + event = mock_stream_event_ack(s) + break + + assert event + # need one sleep on Python 3.6.0-3.6.6 + 3.7.0 + # need two sleeps on Python 3.6.7 + 3.7.1 :-/ + await asyncio.sleep(0) + await asyncio.sleep(0) + + if not event.ack.called: + assert event.message.acked + assert not event.message.refcount + assert s.enable_acks is True + + +@pytest.mark.asyncio +async def test_take_wit_timestamp_wit_simple_value(app): + async with new_stream(app) as s: + assert s.enable_acks is True + await s.channel.send(value=1) + event = None + async for value in s.take_with_timestamp( + 1, within=1, timestamp_field_name="test_timestamp" + ): + assert value == [1] + assert s.enable_acks is False + event = mock_stream_event_ack(s) + break + + assert event + # need one sleep on Python 3.6.0-3.6.6 + 3.7.0 + # need two sleeps on Python 3.6.7 + 3.7.1 :-/ + await asyncio.sleep(0) + await asyncio.sleep(0) + + if not event.ack.called: + assert event.message.acked + assert not event.message.refcount + assert s.enable_acks is True + + +@pytest.mark.asyncio +async def test_take_wit_timestamp_without_timestamp_field(app): + async with new_stream(app) as s: + assert s.enable_acks is True + await s.channel.send(value=1) + event = None + async for value in s.take_with_timestamp( + 1, within=1, timestamp_field_name=None + ): + assert value == [1] + assert s.enable_acks is False + event = mock_stream_event_ack(s) + break + + assert event + # need one sleep on Python 3.6.0-3.6.6 + 3.7.0 + # need two sleeps on Python 3.6.7 + 3.7.1 :-/ + await asyncio.sleep(0) + await asyncio.sleep(0) + + if not event.ack.called: + assert event.message.acked + assert not event.message.refcount + assert s.enable_acks is True + + +@pytest.mark.asyncio +async def test_take_wit_timestamp__5(app, loop): + s = new_stream(app) + async with s: + assert s.enable_acks is True + for i in range(5): + await s.channel.send(value={"id": i}) + + event = None + buffer_processor = s.take_with_timestamp( + 5, within=10.0, timestamp_field_name="test_timestamp" + ) + async for batch in buffer_processor: + assert len(batch) == 5 + assert all("test_timestamp" in _m.keys() for _m in batch) + assert s.enable_acks is False + + event = mock_stream_event_ack(s) + break + + try: + await buffer_processor.athrow(asyncio.CancelledError()) + except asyncio.CancelledError: + pass + + assert event + # need one sleep on Python 3.6.0-3.6.6 + 3.7.0 + # need two sleeps on Python 3.6.7 + 3.7.1 :-/ + await asyncio.sleep(0) # needed for some reason + await asyncio.sleep(0) # needed for some reason + await asyncio.sleep(0) # needed for some reason + + if not event.ack.called: + assert event.message.acked + assert not event.message.refcount + assert s.enable_acks is True