Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add multithreaded Pub/Sub message processing #77

Merged
merged 3 commits into from
Aug 7, 2016

Conversation

wuzzeb
Copy link
Contributor

@wuzzeb wuzzeb commented Jul 30, 2016

To implement issue #49, add a new multithreaded Pub/Sub message processing feature.
I added extensive haddocks to describe how to use the new feature.
The main benefit over the existing Pub/Sub code (which is left unchanged) is:

  • you can make subscription changes at any time
  • you can safely recover from networking errors such as the redis servier dying
  • you can detect when Redis has actually processed a subscription request and
    handlers will now start receiving messages.

This likely also solves #39 since you can recover from connection failures, and while not directly implementing it, will allow the timeout in #7 since you can manually implement some timeout.


The design turned out to be relatively simple:

  • There is a single thread which only reads messages from redis
  • There is a single thread which only sends subscription changes to redis
  • there is a 'PubSubController' which allows any thread to interact with these threads,
    mainly by notifying the sending thread that a subscription change should occur.
  • By using two threads and always only reading from one thread and always only sending on another thread.

To implement issue informatikr#49, add a new multithreaded Pub/Sub message processing feature.
The main benefit over the existing Pub/Sub code (which is left unchanged) is:

- you can make subscription changes at any time
- you can safely recover from networking errors such as the redis servier dying
- you can detect when Redis has actually processed a subscription request and
  handlers will now start receiving messages.
@@ -3,21 +3,39 @@

module Database.Redis.PubSub (
publish,

-- ** Subscribing to channels
-- $pubsubexpl
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some leftover?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I added an explanation of why there are two different pub/sub implementations. That is a haddock reference to the documentation block at the bottom of the file.

@k-bx
Copy link
Collaborator

k-bx commented Jul 31, 2016

This looks good overall, thank you very much! One last minor nitpick – would be nice to add at least few happy-path tests into the test suite which runs on CI. Otherwise, I'll wait for this and other minor edits and then would be happy to merge!

@wuzzeb
Copy link
Contributor Author

wuzzeb commented Aug 2, 2016

Ok, I'll work on adding some tests to the automated test suite plus the small changes tomorrow.

Also, the past few days I have been starting to use this in my own project and found one minor place I would like the API to improve. Right now you can subscribe to the same channel multiple times and the controller keeps a list of handlers, but you can't unsubscribe an individual handler, you can just unsubscribe in bulk all handlers. I plan on adding some method to unsubscribe from an individual handler and only one the handler count reaches zero is the channel actually unsubscribed from redis.

@k-bx
Copy link
Collaborator

k-bx commented Aug 2, 2016

@wuzzeb all right, sounds reasonable

- Minor fixes suggested by reviewers on github
- Added ability to unsubscribe from just the handlers
  you register, instead of unregistering all handlers
  for a given channel or pattern channel name
This tests almost everything.  The only thing missing from the
test is proper reconnection after the redis server network
connection dies, but that can't be automated.
@wuzzeb
Copy link
Contributor Author

wuzzeb commented Aug 5, 2016

  • I made the above suggested changes, except I left RedisChannel and RedisPChannel as types instead of newtypes. I don't think there is much type safety, and it just makes it harder to work with the channels when you are registering them.
  • I added a method to unregister only the handlers you registered. This took me a while to find a nice API and implementation, I think the current one is OK. It (internally) allocates a unique integer each time you add handlers, and uses that to filter out the handlers on unregister.
  • I added some tests to the automatic test suite. Actually, once I started writing them, I was able to test essentially everything except reconnecting after the redis-server dies.

remPChans = filter (\n -> HM.member n pm) pchans

-- helper functions to filter out handlers that match
let filterHandle :: Maybe [(UnregisterHandle,a)] -> Maybe [(UnregisterHandle,a)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about naming this funciton removeHandle? It appears to do a portion of removeHandles and doesn't work completely as filter so the current name is a bit confusing IMO

@k-bx k-bx merged commit 7bce891 into informatikr:master Aug 7, 2016
@k-bx
Copy link
Collaborator

k-bx commented Aug 7, 2016

All right, thanks a lot! Will make a release soon.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants