Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP Subscribe events - PubSub Extension #5218

Closed
wants to merge 2 commits into from

Conversation

fjetter
Copy link
Member

@fjetter fjetter commented Aug 16, 2021

This is an alternative implementation to #5217 using the pubsub extensions

see also #5184

The scheduler is treated as a special case for subscribing via the toggle log_queue when registering a publisher. This toggle signals that the events shall be logged in the internal Scheduler.events log as well.

I am reusing the same mechanism as for client subscribers. As soon as a client subscribes to an event, the publisher is forced to loop in the scheduler and the scheduler communicates with the client.

Internal log_events take a shortcut which is currently a bit ugly. Didn't want to overcommit to this before we're sure this is a valid approach

name = [name]
for n in name:
self.events[n] # init defaultdict
self.extensions["pubsub"].handle_message(name=n, msg=msg)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is currently ugly but the handle_message of the extension will append the event to the deque and inc the counter now

Comment on lines +356 to +357
# FIXME If I use to_serialize here, this breaks msgs of type dict!
data = {"op": "pubsub-msg", "name": self.name, "msg": msg}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This broke for simple dict type messages by raising a keyerror on on the remote side. probably a bug but how to deal with the serialized objects in general would be one of my questions

Comment on lines +3626 to +3653
if topic not in self.event_subscriptions:
sub = self.event_subscriptions[topic] = Sub(topic, client=self)
else:
sub = self.event_subscriptions[topic]

if topic in self.event_handlers:
fut = self.event_handlers[topic]
fut.cancel()

self.event_handlers[topic] = fut = asyncio.ensure_future(
self._handle_events(sub=sub, handler=handler)
)

def unsubscribe_topic(self, topic):
"""Unsubscribe from a topic and remove event handler

See Also
--------
dask.distributed.Client.subscribe_topic
dask.distributed.Client.get_events
dask.distributed.Client.log_event
"""
if topic in self.event_handlers:
fut = self.event_handlers.pop(topic)
fut.cancel()
sub = self.event_subscriptions.pop(topic)
sub.stop()
else:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is mostly convenience stuff to execute an event handler on client side. The actual subscription is handled via the extension

@fjetter
Copy link
Member Author

fjetter commented Aug 18, 2021

closing in favour of #5217

@fjetter fjetter closed this Aug 18, 2021
@fjetter fjetter deleted the events_using_pubsub branch August 18, 2021 10:54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant