forked from andrewray/iocaml
-
Notifications
You must be signed in to change notification settings - Fork 0
/
sockets.ml
60 lines (51 loc) · 1.63 KB
/
sockets.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
(*
* iocaml - an OCaml kernel for IPython
*
* (c) 2014 MicroJamJar Ltd
*
* Author(s): [email protected]
* Description: handle zmq sockets
*
*)
open Iocaml_zmq
let context = ZMQ.Context.create ()
let () = at_exit
(fun () -> ZMQ.Context.terminate context)
let version = [4;0] (* XXX get from ZMQ *)
let addr conn port =
Ipython_json_j.(conn.transport ^ "://" ^ conn.ip ^ ":" ^ string_of_int port)
let open_socket typ conn port =
let socket = ZMQ.Socket.(create context typ) in
let addr = addr conn port in
let () = ZMQ.Socket.bind socket addr in
Log.log ("open and bind socket " ^ addr ^ "\n");
socket
let heartbeat conn =
let socket = open_socket ZMQ.Socket.rep conn conn.Ipython_json_j.hb_port in
while true do
let data = ZMQ.Socket.recv socket in
Log.log("Heartbeat\n");
ZMQ.Socket.send socket data;
done;
(* XXX close down properly...we never get here *)
ZMQ.Socket.close socket
type sockets =
{
shell : [`Router] ZMQ.Socket.t;
control : [`Router] ZMQ.Socket.t;
stdin : [`Router] ZMQ.Socket.t;
iopub : [`Pub] ZMQ.Socket.t;
}
let open_sockets conn =
{
shell = open_socket ZMQ.Socket.router conn conn.Ipython_json_j.shell_port;
control = open_socket ZMQ.Socket.router conn conn.Ipython_json_j.control_port;
stdin = open_socket ZMQ.Socket.router conn conn.Ipython_json_j.stdin_port;
iopub = open_socket ZMQ.Socket.pub conn conn.Ipython_json_j.iopub_port;
}
let dump name socket =
while true do
let msg = Message.recv socket in
let () = Message.log msg in
()
done