Skip to content

Commit

Permalink
chore: revert switch to httpun-ws
Browse files Browse the repository at this point in the history
  • Loading branch information
eWert-Online committed Sep 13, 2024
1 parent a02c89f commit 24d9bee
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 82 deletions.
4 changes: 3 additions & 1 deletion bin/Main.ml
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,9 @@ let default_cmd =
in
handle_response_eio
@@ Eio_main.run
@@ fun env -> Eio.Switch.run @@ fun sw -> run ~sw ~env
@@ fun env ->
Lwt_eio.with_event_loop ~clock:(Eio.Stdenv.clock env)
@@ fun _ -> Eio.Switch.run @@ fun sw -> run ~sw ~env
in
( (let open Term in
const exec $ noCreate $ noOnly $ noSkip $ parallelism $ config)
Expand Down
2 changes: 1 addition & 1 deletion bin/dune
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@
(public_name osnap)
(package osnap)
(ocamlopt_flags -O3)
(libraries OSnap cmdliner eio eio.core eio_main fmt))
(libraries OSnap cmdliner lwt_eio eio eio.core eio_main fmt))
7 changes: 4 additions & 3 deletions dune-project
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,20 @@
ocaml
dune
eio_main
lwt_eio
httpun-eio
base64
cdp
cmdliner
piaf
httpun-eio
httpun-ws-eio
decompress
fileutils
fmt
libspng
odiff-core
re
uri
websocket
websocket-lwt-unix
yaml
yojson
(ocaml-lsp-server :with-dev-setup)
Expand Down
141 changes: 81 additions & 60 deletions lib/OSnap_Websocket/OSnap_Websocket.ml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
open OSnap_Utils
open Lwt.Syntax

let id = ref 0

Expand All @@ -19,73 +20,99 @@ let call_event_handlers key message =
Hashtbl.remove events key))
;;

let connect ~sw ~env url =
let uri = Uri.of_string url in
let resource = Uri.path uri in
let*? client =
Piaf.Client.create env ~sw (Uri.with_scheme uri (Some "http"))
|> Result.map_error (fun _ -> `OSnap_CDP_Connection_Failed)
in
let*? wsd =
Piaf.Client.ws_upgrade client resource
|> Result.map_error (fun _ -> `OSnap_CDP_Connection_Failed)
in
let websocket_handler recv send =
let close () = Websocket.Frame.close 1002 |> send in
let send_payload payload = Websocket.Frame.create ~content:payload () |> send in
let rec input_loop () =
let () = Eio.Fiber.yield () in
let* () = Lwt.pause () in
if not (Queue.is_empty pending_requests)
then (
let key, message, resolver = Queue.take pending_requests in
Piaf.Ws.Descriptor.send_string wsd message;
let* () = send_payload message in
Hashtbl.add sent_requests key resolver;
input_loop ())
else input_loop ()
in
let react (frame : Websocket.Frame.t) =
match frame.opcode with
| Close | Continuation | Ctrl _ | Nonctrl _ -> close ()
| Ping -> Websocket.Frame.create ~opcode:Pong () |> send
| Pong -> Lwt.return ()
| Text | Binary ->
let response = frame.Websocket.Frame.content in
let id =
response
|> Yojson.Safe.from_string
|> Yojson.Safe.Util.member "id"
|> Yojson.Safe.Util.to_int_option
in
let method_ =
response
|> Yojson.Safe.from_string
|> Yojson.Safe.Util.member "method"
|> Yojson.Safe.Util.to_string_option
in
let sessionId =
response
|> Yojson.Safe.from_string
|> Yojson.Safe.Util.member "sessionId"
|> Yojson.Safe.Util.to_string_option
in
(match method_, sessionId with
| None, None -> ()
| None, _ -> ()
| Some method_, None ->
let key = method_ in
Hashtbl.add events key response;
Hashtbl.find_opt listeners key |> Option.iter (call_event_handlers key response)
| Some method_, Some sessionId ->
let key = method_ ^ sessionId in
Hashtbl.add events key response;
Hashtbl.find_opt listeners key |> Option.iter (call_event_handlers key response));
(match id with
| None -> Lwt.return ()
| Some key ->
Hashtbl.find_opt sent_requests key
|> Option.iter (fun resolver -> Lwt.wakeup_later resolver response);
Hashtbl.remove sent_requests key;
Lwt.return ())
in
let rec react_forever () =
let* frame = recv () in
let* () = react frame in
react_forever ()
in
Lwt.pick [ input_loop (); react_forever () ]
;;

let connect ~sw ~env:_ url =
Result.ok
@@ Eio.Fiber.fork_daemon ~sw
@@ fun () ->
Eio.Fiber.both
(fun () -> input_loop ())
(fun () ->
wsd
|> Piaf.Ws.Descriptor.messages
|> Piaf.Stream.iter ~f:(fun (_opcode, { Piaf.IOVec.buffer; off; len }) ->
let response = Bigstringaf.substring ~off ~len buffer in
let module Json = Yojson.Basic in
let json = Json.from_string response in
Eio.Fiber.both
(fun () ->
match
( json |> Json.Util.member "method" |> Json.Util.to_string_option
, json |> Json.Util.member "sessionId" |> Json.Util.to_string_option )
with
| None, None -> ()
| None, _ -> ()
| Some meth, None ->
let key = meth in
Hashtbl.add events key response;
Hashtbl.find_opt listeners key
|> Option.iter (call_event_handlers key response)
| Some meth, Some sessionId ->
let key = meth ^ sessionId in
Hashtbl.add events key response;
Hashtbl.find_opt listeners key
|> Option.iter (call_event_handlers key response))
(fun () ->
match json |> Json.Util.member "id" |> Json.Util.to_int_option with
| None -> ()
| Some key ->
Hashtbl.find_opt sent_requests key
|> Option.iter (fun resolver -> Eio.Promise.resolve resolver response);
Hashtbl.remove sent_requests key)));
let () =
Lwt_eio.run_lwt
@@ fun () ->
let orig_uri = Uri.of_string url in
let uri = Uri.with_scheme orig_uri (Some "http") in
let* endpoint = Resolver_lwt.resolve_uri ~uri Resolver_lwt_unix.system in
let default_context = Lazy.force Conduit_lwt_unix.default_ctx in
let* client = endpoint |> Conduit_lwt_unix.endp_to_client ~ctx:default_context in
let* conn = Websocket_lwt_unix.connect ~ctx:default_context client uri in
let recv () = Websocket_lwt_unix.read conn in
let send = Websocket_lwt_unix.write conn in
websocket_handler recv send
in
`Stop_daemon
;;

let send message =
Lwt_eio.run_lwt
@@ fun () ->
let key = id () in
let message = message key in
let p, resolver = Eio.Promise.create () in
let p, resolver = Lwt.wait () in
pending_requests |> Queue.add (key, message, resolver);
Eio.Promise.await p
p
;;

let listen ?(look_behind = true) ~event ~sessionId handler =
Expand All @@ -96,15 +123,9 @@ let listen ?(look_behind = true) ~event ~sessionId handler =
| Some stored -> Hashtbl.replace listeners key (handler :: stored));
if look_behind
then
Eio.Switch.run
@@ fun sw ->
events
|> Hashtbl.iter (fun k event ->
Eio.Fiber.fork ~sw
@@ fun () ->
if key = k
then
handler event (fun () ->
Hashtbl.remove listeners key;
Hashtbl.remove events key))
Hashtbl.find_all events key
|> List.iter (fun event ->
handler event (fun () ->
Hashtbl.remove listeners key;
Hashtbl.remove events key))
;;
17 changes: 7 additions & 10 deletions lib/OSnap_Websocket/dune
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,12 @@
(name OSnap_Websocket)
(libraries
OSnap_Utils
bigstringaf
httpun
httpun-ws
httpun-ws-eio
piaf
piaf.stream
conduit-lwt
conduit-lwt-unix
uri
eio
eio.core
eio.unix
lwt
lwt.unix
lwt_eio
yojson
base64))
websocket
websocket-lwt-unix))
9 changes: 4 additions & 5 deletions osnap.opam
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,20 @@ depends: [
"ocaml"
"dune" {>= "3.15"}
"eio_main"
"lwt_eio"
"httpun-eio"
"base64"
"cdp"
"cmdliner"
"piaf"
"httpun-eio"
"httpun-ws-eio"
"decompress"
"fileutils"
"fmt"
"libspng"
"odiff-core"
"re"
"uri"
"websocket"
"websocket-lwt-unix"
"yaml"
"yojson"
"ocaml-lsp-server" {with-dev-setup}
Expand All @@ -48,7 +49,5 @@ dev-repo: "git+https://github.com/eWert-Online/OSnap.git"
pin-depends: [
[ "cdp.dev" "git+https://github.com/eWert-Online/reason-cdp.git#a0579cb4789b8ccf1cc4342b9885a46cf1549012"]
[ "libspng.dev" "git+https://github.com/eWert-Online/esy-libspng.git#opam"]
[ "multipart_form.dev" "git+https://github.com/anmonteiro/multipart_form.git"]
[ "odiff-core.dev" "git+https://github.com/dmtrKovalenko/odiff.git#v3.1.1"]
[ "piaf.dev" "git+https://github.com/anmonteiro/piaf.git"]
]
2 changes: 0 additions & 2 deletions osnap.opam.template
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
pin-depends: [
[ "cdp.dev" "git+https://github.com/eWert-Online/reason-cdp.git#a0579cb4789b8ccf1cc4342b9885a46cf1549012"]
[ "libspng.dev" "git+https://github.com/eWert-Online/esy-libspng.git#opam"]
[ "multipart_form.dev" "git+https://github.com/anmonteiro/multipart_form.git"]
[ "odiff-core.dev" "git+https://github.com/dmtrKovalenko/odiff.git#v3.1.1"]
[ "piaf.dev" "git+https://github.com/anmonteiro/piaf.git"]
]

0 comments on commit 24d9bee

Please sign in to comment.