Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pubsub and connection handling #39

Open
RamotionRussell opened this issue Dec 8, 2015 · 4 comments
Open

Pubsub and connection handling #39

RamotionRussell opened this issue Dec 8, 2015 · 4 comments

Comments

@RamotionRussell
Copy link

I'm having issues with pubsub -- I can subscribe to channels and receive data just fine for short-term intervals, but after a while, stuff seems to just... stop arriving. Are subscriptions properly handled in the connection pool? Because it seems like the subscribe command is just executed on a connection in the pool and forgotten, and if that connection is lost and replaced with another in the pool, the subscription is also lost.

@k-bx
Copy link
Collaborator

k-bx commented Dec 8, 2015

@RamotionRussell would be helpful if you'd share reproducible piece of code which we can discuss

@levinotik
Copy link

I'm looking into this issue as well. Still digging around as the issues may very well be in our application and not hedis, but I do have situations where subs seem to stop working. Here's the relevant code:

lookupOrCreateChannel :: Connection -> Server -> ChannelSlug -> IO Channel
lookupOrCreateChannel conn server@Server{..} name = do
      channel <- atomically $ lookupChannel server name
      let pubSubChan = chanId2Bs name
      case channel of
        Nothing -> do
          newServerChan <- atomically $ do
              chan <- newChannel name
              modifyTVar serverChannels . M.insert name $ chan
              return chan
          void $ forkIO $ runRedis conn (pubSub (subscribe [pubSubChan]) (messageCallback server))
          return newServerChan
        Just chan -> return chan

messageCallback :: Server -> Redis.Message -> IO PubSub
messageCallback server@Server{..} msg = 
  atomically $ do
    channel <- lookupChannel server (ChannelSlug $ decodeUtf8 $ msgChannel msg)
    case channel of
      Just chan -> do
        let broadcastMsg = Aeson.eitherDecode (LC8.fromStrict $ msgMessage msg) :: Either String RtmEvent
        case broadcastMsg of
          Right bMsg -> writeTChan (channelBroadcastChan chan) bMsg
          Left err   -> return ()
      Nothing -> return ()
    return mempty

@qrilka
Copy link
Contributor

qrilka commented Feb 15, 2016

@RamotionRussell @levinotik could you give probably some more comments on this issue?
I was trying to find possible problems with it and the only one which looks to be a possible reason is half-open sockets. And to prevent those we need either use PING from time to time or SO_KEEPALIVE socket option.

@qnikst
Copy link
Collaborator

qnikst commented Jun 11, 2023

Hello, were there any updates for this situation?

One possible problem that we have faced was when we used a proxy to support sentinel protocol for us. We used a haproxy and it has it's own connection timeout, and as it does not know that connection is in pubSub mode (Redis knows that and to not break connection on timeout in that case) it happily disconnects a client connection.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants