Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for RPC across ZeroMQ #25

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions msgpack-idl/Language/MessagePack/IDL/CodeGen/Cpp.hs
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,19 @@ genTypeDecl _ MPType { .. } =
typedef #{genType tyType} #{tyName};
|]

genTypeDecl _ MPEnum { ..} =
[lt|
enum #{enumName} {
#{genEnum enumMem}
};|]


genTypeDecl _ _ = ""

genEnum :: [(Int, T.Text)] -> LT.Text
genEnum entries = LT.intercalate ",\n " $ map enumEntry entries
where enumEntry (val, name) = [lt|#{name} = #{show val}|]

genMsg name flds isExc =
let fields = map f flds
fs = map (maybe undefined fldName) $ sortField flds
Expand Down
48 changes: 45 additions & 3 deletions msgpack-rpc/Network/MessagePackRpc/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ module Network.MessagePackRpc.Server (
-- * RPC method types
RpcMethod,
RpcMethodType(..),
Endpoint(..),
-- * Create RPC method
fun,
-- * Start RPC server
Expand All @@ -39,14 +40,17 @@ import Control.DeepSeq
import Control.Exception as E
import Control.Monad
import Control.Monad.Trans
import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as BL
import qualified Data.Conduit as C
import qualified Data.Conduit.Binary as CB
import qualified Data.Conduit.Attoparsec as CA
import qualified Data.Attoparsec as A
import Data.Maybe
import Data.MessagePack
import Network
import System.IO
import System.ZMQ

import Prelude hiding (catch)

Expand All @@ -71,19 +75,24 @@ fromObject' o =
fun :: RpcMethodType f => f -> RpcMethod
fun = toRpcMethod

data Endpoint = TCP Int
| ZeroMQ [String]
deriving Show

-- | Start RPC server with a set of RPC methods.
serve :: Int -- ^ Port number
serve :: Endpoint -- ^ listen on this endpoint
-> [(String, RpcMethod)] -- ^ list of (method name, RPC method)
-> IO ()
serve port methods = withSocketsDo $ do

serve (TCP port) methods = withSocketsDo $ do
sock <- listenOn (PortNumber $ fromIntegral port)
forever $ do
(h, host, hostport) <- accept sock
forkIO $
(processRequests h `finally` hClose h) `catches`
[ Handler $ \e ->
case e of
CA.ParseError ["demandInput"] _ -> return ()
CA.ParseError ["demandInput"] _ _ -> return ()
_ -> hPutStrLn stderr $ host ++ ":" ++ show hostport ++ ": " ++ show e
, Handler $ \e ->
hPutStrLn stderr $ host ++ ":" ++ show hostport ++ ": " ++ show (e :: SomeException)]
Expand Down Expand Up @@ -116,3 +125,36 @@ serve port methods = withSocketsDo $ do
fail $ "method '" ++ methodName ++ "' not found"
Just method ->
method args

serve (ZeroMQ endpoints) methods =
withContext 1 $ \ctx ->
withSocket ctx Rep $ \s -> do
mapM_ (bind s) endpoints
forever $ do
req <- receive s []
resp <- processRequest req
send s ((B.concat . BL.toChunks) resp) []
where
processRequest req =
case A.parseOnly get req of
Left _ -> fail "Parsing failed."
Right (rtype, msgid, method, args) -> do
resp <- try $ getResponse rtype method args
case resp of
Left err ->
return $ pack (1 :: Int, msgid :: Int, show (err :: SomeException), ())
Right ret ->
return $ pack (1 :: Int, msgid :: Int, (), ret)

getResponse rtype method args = do
when (rtype /= (0 :: Int)) $
fail "request type is not 0"
r <- callMethod (method :: String) (args :: [Object])
r `deepseq` return r

callMethod methodName args =
case lookup methodName methods of
Nothing ->
fail $ "method '" ++ methodName ++ "' not found"
Just method ->
method args
4 changes: 3 additions & 1 deletion msgpack-rpc/msgpack-rpc.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@ Library
, network >= 2.2 && < 2.4
, random == 1.0.*
, mtl == 2.0.*
, conduit >= 0.2 && < 0.5
, conduit >= 0.5
, attoparsec
, attoparsec-conduit
, deepseq >= 1.1 && < 1.4
, msgpack == 0.7.*
, zeromq-haskell >= 0.8.4

Ghc-options: -Wall

Expand Down