-
Notifications
You must be signed in to change notification settings - Fork 17
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add method to MovingWindow that waits for a number of samples #387
Add method to MovingWindow that waits for a number of samples #387
Conversation
c136a73
to
e1e58b6
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also feel something smells here, I don't like having some class variable to only track a counter that is used by one method. My instinct says that there should be a way to split this functionality to a different class that uses, can be plugged to or proxies to the MovingWindow
, but I don't have anything in particular to propose right. At least nothing that's not a lot of extra work (and probably worsen the usability of this too), so I'm fine to leave it as is, but just mentioning it in case anyone suddenly gets a brilliant idea 💡
self.count_samples = 0 | ||
"""The number of samples that have been received.""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this one should probably be a read-only public attribute. Also for attributes it's better to use nouns instead of verbs (and it's missing the type), for example:
self.count_samples = 0 | |
"""The number of samples that have been received.""" | |
self._received_samples_count: int = 0 | |
"""The number of samples that have been received.""" |
Plus
@property
def received_samples_count(self) -> int
return self._received_samples_count
self.wait_for_num_samples = 0 | ||
"""The number of samples to wait for before | ||
the wait_for_samples method triggers.""" | ||
self.wait_for_samples_event = asyncio.Event() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These two really look like they should be private. Same about names (and missing types):
self.wait_for_num_samples = 0 | |
"""The number of samples to wait for before | |
the wait_for_samples method triggers.""" | |
self.wait_for_samples_event = asyncio.Event() | |
self._expected_samples_count: int = 0 | |
"""The number of samples to wait for before `wait_for_samples()` triggers.""" | |
self._wait_for_samples_event: asyncio.Event = asyncio.Event() | |
"""The event to signal `wait_for_samples()` that the wait is over.""" |
Oh, I also noticed in your last PRs that it seems you have your editor configured to a maximum line length of maybe 50 chars? Ideally it should be 88 to match |
return | ||
|
||
self.wait_for_num_samples = num_samples | ||
await self.wait_for_samples_event.wait() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you never call self.wait_for_samples_event.clear()
it will always fire immediately in .wait()
e1e58b6
to
2566927
Compare
Up to now there was no clean way to wait until the MovingWindow got updated with a certain number of samples. In this commit we introduce a `wait_for_samples` method that finishes once the number of samples arrived. Signed-off-by: Matthias Wende <[email protected]>
2566927
to
f80b565
Compare
I've reworked the implementation. Now the user can get a channel that sends a |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Besides a few comments on improving consistency, the API and the tests, the commit message needs to be updated because it mentions a wait_for_samples
method.
self._wait_for_num_samples: int = 0 | ||
"""The number of samples to wait for before the wait_for_num_samples channels | ||
sends out an event.""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recommend using nouns for non-boolean attributes:
self._wait_for_num_samples: int = 0 | |
"""The number of samples to wait for before the wait_for_num_samples channels | |
sends out an event.""" | |
self._num_samples_to_wait_for: int = 0 | |
"""The number of samples to wait for before triggering an event through the channel.""" |
self._wait_for_samples_channel = Broadcast[None]( | ||
"Wait for number of samples channel." | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the text here is for debugging purposes only, right @shsms?
I'd suggest using a shorter string for that and using the long string as documentation for the variable:
self._wait_for_samples_channel = Broadcast[None]( | |
"Wait for number of samples channel." | |
) | |
self._wait_for_samples_channel = Broadcast[None]("wait-for-samples") | |
"""Channel to send events to when wait for number of samples is triggered.""" | |
@@ -169,6 +176,9 @@ async def _run_impl(self) -> None: | |||
Raises: | |||
asyncio.CancelledError: if the MovingWindow task is cancelled. | |||
""" | |||
received_samples_count = 0 | |||
wait_for_samples_sender = self._wait_for_samples_channel.new_sender() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess once a MW was stopped there is no way to start it again, right? Otherwise the sender should probably be created in the constructor instead to avoid leaking sender objects.
received_samples_count = 0 | ||
await wait_for_samples_sender.send(None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since it's free and even when the user should know it beforehand anyway, we could also send the number of samples received instead of None
, maybe it could become a handy shortcut.
received_samples_count = 0 | |
await wait_for_samples_sender.send(None) | |
await wait_for_samples_sender.send(received_samples_count) | |
received_samples_count = 0 |
def set_sample_counter(self, num_samples: int) -> None: | ||
"""Set the number of samples to wait for until the sample counter triggers. | ||
|
||
Args: | ||
num_samples: The number of samples to wait for. | ||
|
||
Raises: | ||
ValueError: if the number of samples is less than or equal to zero. | ||
""" | ||
if num_samples <= 0: | ||
raise ValueError( | ||
"The number of samples to wait for should be greater than zero." | ||
) | ||
self._wait_for_num_samples = num_samples |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand correctly, this indirectly enables sending the events if num_samples > 0
, right? If so I would rename this method to something that makes it easier to realize that happens. What about:
@property
def is_wait_for_samples_event_enabled(self) -> bool:
# Returns `self._wait_for_num_samples != 0`
def enable_wait_for_samples_event(self, num_samples: int) -> bool:
# Same as above but check for `num_samples > 0`, return whether it was enabled before
def disable_wait_for_samples_event(self) -> bool:
# Set `num_samples = 0`, return whether it was enabled before
) | ||
self._wait_for_num_samples = num_samples | ||
|
||
def new_sample_count_receiver(self) -> Receiver[None]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the above suggestion is applied, I would rename this for consistency:
def new_sample_count_receiver(self) -> Receiver[None]: | |
def new_wait_for_samples_event_receiver(self) -> Receiver[None]: |
We could go with something different than wait_for_samples_event
as it is a bit long, but the 4 methods should use the same term IMHO.
|
||
window.set_sample_counter(samples_to_wait_for) | ||
|
||
# asyncio.create_task(push_data_delayed()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
# asyncio.create_task(push_data_delayed()) |
?
for i in range(0, samples_to_wait_for): | ||
await sender.send( | ||
Sample(datetime.now(tz=timezone.utc) + timedelta(seconds=i), 1.0) | ||
) | ||
await sample_count_recv.receive() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are some failures that this will not detect or detect wrongly. For example if you trigger before the samples_to_wait_for
this will succeed too. If the event doesn't trigger, this test will hang forever, which might be quite annoying.
Maybe could check for the internal counter to verify it is incremented? If you also add my suggestion to return the number of samples received you could add the check here too.
Maybe you can also move the await sample_count_recv.receive()
to a different task so you can check it didn't fire before you sent enough samples and that it was fired after you sent all the expected samples but without blocking the main thread if it fails?
Maybe you could also push different sample values and then check that the state of the moving window is what it would be expected after receiving all that samples, so if the even triggered before that check should also fail.
Will this not be part of v1 milestone? |
Let's see. This change is not breaking so it can be v1.1.x too. But let's try to get it in. |
This comment was marked as off-topic.
This comment was marked as off-topic.
This comment was marked as off-topic.
This comment was marked as off-topic.
This comment was marked as off-topic.
This comment was marked as off-topic.
This is quite old. It we need it we can reopen the PR or start from scratch. |
Is there another way to know if the moving window was updated? |
No there isn't. |
It turns out this is required in the Forecast actor. Only yesterday evening, Christoph showed me the place where he's using a while loop that could be replaced with this, just a little bit after we were talking about closing this issue. But I think not top priority, so we can pick this up again soon. |
Okay, thanks for the notice. I'll put it back on my todo list. |
Cool, is there an issue to track? |
|
Up to now there was no clean way to wait until the MovingWindow got updated with a certain number of samples.
In this PR we introduce a
wait_for_samples
method that finishes once the number of samples arrived.