From 21c3b90e2ed61c831dc08776daec6a51c11c6cf4 Mon Sep 17 00:00:00 2001 From: Thomas Gazagnaire Date: Thu, 8 Dec 2016 12:13:44 +0000 Subject: [PATCH 1/3] github: use an unlimited number of fids and walk in parallel This speeds-up initialisation of the bridge massively. Signed-off-by: Thomas Gazagnaire --- bridge/github/datakit_github_conv.ml | 97 +++++++++++++++------------- bridge/github/main.ml | 2 +- tests/test_github.ml | 10 +-- tests/test_utils.ml | 2 +- 4 files changed, 59 insertions(+), 52 deletions(-) diff --git a/bridge/github/datakit_github_conv.ml b/bridge/github/datakit_github_conv.ml index 5e216a287..6ab0465b8 100644 --- a/bridge/github/datakit_github_conv.ml +++ b/bridge/github/datakit_github_conv.ml @@ -151,17 +151,20 @@ module Make (DK: Datakit_S.CLIENT) = struct Log.debug (fun l -> l "repo %a -> true" Repo.pp repo); Some repo + let reduce_repos = List.fold_left Repo.Set.union Repo.Set.empty + let repos tree = let root = Datakit_path.empty in safe_read_dir tree root >>= fun users -> - Lwt_list.fold_left_s (fun acc user -> + Lwt_list.map_p (fun user -> safe_read_dir tree (root / user) >>= fun repos -> - Lwt_list.fold_left_s (fun acc repo -> + Lwt_list.map_p (fun repo -> safe_read_file tree (root / user /repo / ".monitor") >|= function - | None -> acc - | Some _ -> Repo.Set.add (Repo.v ~user ~repo) acc - ) acc repos - ) Repo.Set.empty users >|= fun repos -> + | None -> Repo.Set.empty + | Some _ -> Repo.Set.singleton (Repo.v ~user ~repo) + ) repos >|= reduce_repos + ) users >|= fun repos -> + let repos = reduce_repos repos in Log.debug (fun l -> l "repos -> @;@[<2>%a@]" Repo.Set.pp repos); repos @@ -244,14 +247,17 @@ module Make (DK: Datakit_S.CLIENT) = struct in Some (PR.v ~state ~title ~base head number) + let reduce_prs = List.fold_left PR.Set.union PR.Set.empty + let prs_of_repo tree repo = let dir = root repo / "pr" in safe_read_dir tree dir >>= fun nums -> - Lwt_list.fold_left_s (fun acc n -> + Lwt_list.map_p (fun n -> pr tree (repo, int_of_string n) >|= function - | None -> acc - | Some p -> PR.Set.add p acc - ) PR.Set.empty nums >|= fun prs -> + | None -> PR.Set.empty + | Some p -> PR.Set.singleton p + ) nums >|= fun prs -> + let prs = reduce_prs prs in Log.debug (fun l -> l "prs_of_repo %a -> @;@[<2>%a@]" Repo.pp repo PR.Set.pp prs); prs @@ -262,11 +268,8 @@ module Make (DK: Datakit_S.CLIENT) = struct let prs ?repos:rs tree = maybe_repos tree rs >>= fun repos -> - Lwt_list.fold_left_s (fun acc r -> - prs_of_repo tree r >|= fun prs -> - PR.Set.union prs acc - ) PR.Set.empty (Repo.Set.elements repos) - >|= fun prs -> + Lwt_list.map_p (prs_of_repo tree) (Repo.Set.elements repos) >|= fun prs -> + let prs = reduce_prs prs in Log.debug (fun l -> l "prs -> @;@[<2>%a@]" PR.Set.pp prs); prs @@ -294,13 +297,13 @@ module Make (DK: Datakit_S.CLIENT) = struct Commit.Set.pp cs); cs + let reduce_commits = List.fold_left Commit.Set.union Commit.Set.empty + let commits ?repos:rs tree = maybe_repos tree rs >>= fun repos -> - Lwt_list.fold_left_s (fun acc r -> - commits_of_repo tree r >|= fun commits -> - Commit.Set.union commits acc - ) Commit.Set.empty (Repo.Set.elements repos) + Lwt_list.map_p (commits_of_repo tree) (Repo.Set.elements repos) >|= fun cs -> + let cs = reduce_commits cs in Log.debug (fun l -> l "commits -> @;@[<2>%a@]" Commit.Set.pp cs); cs @@ -321,7 +324,7 @@ module Make (DK: Datakit_S.CLIENT) = struct "state" , Some (Status_state.to_string @@ Status.state s); "target_url" , mapo Uri.to_string (Status.url s); ] in - Lwt_list.iter_s (fun (k, v) -> match v with + Lwt_list.iter_p (fun (k, v) -> match v with | None -> safe_remove t (dir / k) | Some v -> let v = Cstruct.of_string (v ^ "\n") in @@ -355,16 +358,17 @@ module Make (DK: Datakit_S.CLIENT) = struct let url = mapo Uri.of_string url in Some (Status.v ?description ?url commit context state) + let reduce_status = List.fold_left Status.Set.union Status.Set.empty + let statuses_of_commits tree commits = - Lwt_list.fold_left_s (fun acc commit -> + Lwt_list.map_p (fun commit -> let dir = root (Commit.repo commit) / "commit" in let dir = dir / Commit.hash commit / "status" in walk (module Status.Set) tree dir ("state", fun c -> status tree (commit, c)) - >|= fun status -> - Status.Set.union status acc - ) Status.Set.empty (Commit.Set.elements commits) + ) (Commit.Set.elements commits) >|= fun status -> + let status = reduce_status status in Log.debug (fun l -> l "statuses_of_commits %a -> @;@[<2>%a@]" Commit.Set.pp commits Status.Set.pp status); status @@ -401,13 +405,12 @@ module Make (DK: Datakit_S.CLIENT) = struct l "refs_of_repo %a -> @;@[<2>%a@]" Repo.pp repo Ref.Set.pp refs); refs + let reduce_refs = List.fold_left Ref.Set.union Ref.Set.empty + let refs ?repos:rs tree = maybe_repos tree rs >>= fun repos -> - Lwt_list.fold_left_s (fun acc r -> - refs_of_repo tree r >|= fun refs -> - Ref.Set.union acc refs - ) Ref.Set.empty (Repo.Set.elements repos) - >|= fun refs -> + Lwt_list.map_p (refs_of_repo tree) (Repo.Set.elements repos) >|= fun refs -> + let refs = reduce_refs refs in Log.debug (fun l -> l "refs -> @;@[<2>%a@]" Ref.Set.pp refs); refs @@ -455,29 +458,31 @@ module Make (DK: Datakit_S.CLIENT) = struct (* Dirty *) + let reduce_elts = List.fold_left Elt.IdSet.union Elt.IdSet.empty + let dirty_repos tree = let root = Datakit_path.empty in safe_read_dir tree root >>= fun users -> - Lwt_list.fold_left_s (fun acc user -> + Lwt_list.map_p (fun user -> safe_read_dir tree (root / user) >>= fun repos -> - Lwt_list.fold_left_s (fun acc repo -> + Lwt_list.map_p (fun repo -> safe_exists_file tree (root / user /repo / ".dirty") >|= function - | false -> acc - | true -> Elt.IdSet.add (`Repo (Repo.v ~user ~repo)) acc - ) acc repos - ) Elt.IdSet.empty users + | false -> Elt.IdSet.empty + | true -> Elt.IdSet.singleton (`Repo (Repo.v ~user ~repo)) + ) repos >|= reduce_elts + ) users >|= reduce_elts let dirty_prs tree repo = let dir = root repo / "pr" in safe_read_dir tree dir >>= fun nums -> - Lwt_list.fold_left_s (fun acc n -> + Lwt_list.map_p (fun n -> let d = dir / n / ".dirty" in safe_exists_file tree d >|= function - | false -> acc + | false -> Elt.IdSet.empty | true -> - try Elt.IdSet.add (`PR (repo, int_of_string n)) acc - with Failure _ -> acc - ) Elt.IdSet.empty nums + try Elt.IdSet.singleton (`PR (repo, int_of_string n)) + with Failure _ -> Elt.IdSet.empty + ) nums >|= reduce_elts let dirty_refs tree repo = let dir = root repo / "ref" in @@ -491,11 +496,13 @@ module Make (DK: Datakit_S.CLIENT) = struct dirty_repos t >>= fun dirty_repos -> repos t >>= fun repos -> (* we only check for dirty prs/refs for monitored repos only *) - Lwt_list.fold_left_s (fun acc r -> + Lwt_list.map_p (fun r -> dirty_prs t r >>= fun prs -> dirty_refs t r >|= fun refs -> - acc ++ prs ++ refs - ) dirty_repos (Repo.Set.elements repos) + prs ++ refs + ) (Repo.Set.elements repos) + >|= fun more -> + dirty_repos ++ reduce_elts more let dirty_file: Elt.id -> Datakit_path.t = function | `Repo r -> root r / ".dirty" @@ -661,7 +668,7 @@ module Make (DK: Datakit_S.CLIENT) = struct let f tr = Log.debug (fun l -> l "remove_snapshot (from %s):@;%a" debug Elt.IdSet.pp t); - Lwt_list.iter_s (remove_elt tr) (Elt.IdSet.elements t) + Lwt_list.iter_p (remove_elt tr) (Elt.IdSet.elements t) in Some f @@ -671,7 +678,7 @@ module Make (DK: Datakit_S.CLIENT) = struct let f tr = Log.debug (fun l -> l "update_snapshot (from %s):@;%a" debug Elt.Set.pp t); - Lwt_list.iter_s (update_elt tr) (Elt.Set.elements t) + Lwt_list.iter_p (update_elt tr) (Elt.Set.elements t) in Some f diff --git a/bridge/github/main.ml b/bridge/github/main.ml index f6b86773c..5fcb9f18f 100644 --- a/bridge/github/main.ml +++ b/bridge/github/main.ml @@ -130,7 +130,7 @@ let start () no_listen listen_urls datakit cap webhook resync_interval = failwith "connect to datakit" in Lwt.catch - (fun () -> Client9p.connect proto address ()) + (fun () -> Client9p.connect proto address ~max_fids:Int32.max_int ()) (fun e -> Lwt.fail_with @@ Fmt.strf "%a" Fmt.exn e) >>= function | Error (`Msg e) -> diff --git a/tests/test_github.ml b/tests/test_github.ml index 90bbe186e..6d927226f 100644 --- a/tests/test_github.ml +++ b/tests/test_github.ml @@ -1511,7 +1511,7 @@ let rec read_state ~user ~repo ~commit tree path context = [ Status.v ?description ?url commit context state] end >>= fun this_state -> - items |> Lwt_list.map_s (function + items |> Lwt_list.map_p (function | "status" | "description" | "target_url" -> Lwt.return [] | item -> read_state ~user ~repo ~commit tree (path / item) (context @ [item]) @@ -1528,7 +1528,7 @@ let read_opt_dir tree path = let read_commits tree ~user ~repo = let path = Datakit_path.of_steps_exn [user; repo; "commit"] in read_opt_dir tree path >>= - Lwt_list.map_s (fun commit -> + Lwt_list.map_p (fun commit -> let path = Datakit_path.of_steps_exn [user; repo; "commit"; commit; "status"] in @@ -1539,7 +1539,7 @@ let read_commits tree ~user ~repo = let read_prs tree ~user ~repo = let path = Datakit_path.of_steps_exn [user; repo; "pr"] in read_opt_dir tree path >>= - Lwt_list.map_s (fun number -> + Lwt_list.map_p (fun number -> let path = Datakit_path.of_steps_exn [user; repo; "pr"; number] in let number = int_of_string number in let read name = @@ -1742,11 +1742,11 @@ let test_random_datakit ~quick _repo conn = let update_datakit users = let events = Users.diff_events users (Users.empty ()) in DK.Branch.with_transaction branch (fun tr -> - Lwt_list.iter_s (fun { Repo.user; repo } -> + Lwt_list.iter_p (fun { Repo.user; repo } -> safe_remove tr Datakit_path.(empty / user / repo) ) (Repo.Set.elements all_repos) >>= fun () -> - Lwt_list.iter_s (Conv.update_event tr) events >>= fun () -> + Lwt_list.iter_p (Conv.update_event tr) events >>= fun () -> DK.Transaction.commit tr ~message:"User updates" ) >>= function | Error e -> Lwt.fail (DK_error e) diff --git a/tests/test_utils.ml b/tests/test_utils.ml index aff22db55..d282088ec 100644 --- a/tests/test_utils.ml +++ b/tests/test_utils.ml @@ -179,7 +179,7 @@ let run fn = Server.accept ~root ~msg:"test" for_server >>*= Lwt.return in Lwt.finalize - (fun () -> Client.connect for_client () >>*= fn repo) + (fun () -> Client.connect for_client ~max_fids:Int32.max_int () >>*= fn repo) (fun () -> Lwt.cancel server_thread; Lwt.return ()) end From cce5c6c3ec0dbd243d35e20a0b2e8a079bc55a6f Mon Sep 17 00:00:00 2001 From: Thomas Gazagnaire Date: Mon, 12 Dec 2016 18:00:42 +0000 Subject: [PATCH 2/3] github: depends on protocol-9p >= 0.8.0 Signed-off-by: Thomas Gazagnaire --- datakit-github.opam | 1 + 1 file changed, 1 insertion(+) diff --git a/datakit-github.opam b/datakit-github.opam index 99941ff38..400c6da4c 100644 --- a/datakit-github.opam +++ b/datakit-github.opam @@ -20,6 +20,7 @@ depends: [ "lwt" "asetmap" "logs" "fmt" "mtime" "asl" "win-eventlog" "hvsock" "hex" "nocrypto" "conduit" + "protocol-9p" {>= "0.8.0"} "datakit-server" {>= "0.7.0"} "datakit-client" {>= "0.7.0"} "github-hooks" {>= "0.1.1"} From 36d91de2c22842730b4c9df0756cac64c174f7bd Mon Sep 17 00:00:00 2001 From: Thomas Gazagnaire Date: Mon, 12 Dec 2016 18:01:15 +0000 Subject: [PATCH 3/3] github: tweak Docker.github to empty the Docker cache Signed-off-by: Thomas Gazagnaire --- Dockerfile.github | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Dockerfile.github b/Dockerfile.github index 0472917b4..f475eb447 100644 --- a/Dockerfile.github +++ b/Dockerfile.github @@ -1,7 +1,8 @@ FROM docker/datakit:server # TMP -RUN cd /home/opam/opam-repository && git pull && opam update -y +RUN cd /home/opam/opam-repository && git pull && \ + opam update -y COPY datakit-github.opam /home/opam/src/datakit/datakit-github.opam RUN opam pin add github --dev -n