Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

github: use an unlimited number of fids and walk in parallel #401

Merged
merged 3 commits into from
Dec 13, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Dockerfile.github
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
FROM docker/datakit:server

# TMP
RUN cd /home/opam/opam-repository && git pull && opam update -y
RUN cd /home/opam/opam-repository && git pull && \
opam update -y

COPY datakit-github.opam /home/opam/src/datakit/datakit-github.opam
RUN opam pin add github --dev -n
Expand Down
97 changes: 52 additions & 45 deletions bridge/github/datakit_github_conv.ml
Original file line number Diff line number Diff line change
Expand Up @@ -151,17 +151,20 @@ module Make (DK: Datakit_S.CLIENT) = struct
Log.debug (fun l -> l "repo %a -> true" Repo.pp repo);
Some repo

let reduce_repos = List.fold_left Repo.Set.union Repo.Set.empty

let repos tree =
let root = Datakit_path.empty in
safe_read_dir tree root >>= fun users ->
Lwt_list.fold_left_s (fun acc user ->
Lwt_list.map_p (fun user ->
safe_read_dir tree (root / user) >>= fun repos ->
Lwt_list.fold_left_s (fun acc repo ->
Lwt_list.map_p (fun repo ->
safe_read_file tree (root / user /repo / ".monitor") >|= function
| None -> acc
| Some _ -> Repo.Set.add (Repo.v ~user ~repo) acc
) acc repos
) Repo.Set.empty users >|= fun repos ->
| None -> Repo.Set.empty
| Some _ -> Repo.Set.singleton (Repo.v ~user ~repo)
) repos >|= reduce_repos
) users >|= fun repos ->
let repos = reduce_repos repos in
Log.debug (fun l -> l "repos -> @;@[<2>%a@]" Repo.Set.pp repos);
repos

Expand Down Expand Up @@ -244,14 +247,17 @@ module Make (DK: Datakit_S.CLIENT) = struct
in
Some (PR.v ~state ~title ~base head number)

let reduce_prs = List.fold_left PR.Set.union PR.Set.empty

let prs_of_repo tree repo =
let dir = root repo / "pr" in
safe_read_dir tree dir >>= fun nums ->
Lwt_list.fold_left_s (fun acc n ->
Lwt_list.map_p (fun n ->
pr tree (repo, int_of_string n) >|= function
| None -> acc
| Some p -> PR.Set.add p acc
) PR.Set.empty nums >|= fun prs ->
| None -> PR.Set.empty
| Some p -> PR.Set.singleton p
) nums >|= fun prs ->
let prs = reduce_prs prs in
Log.debug (fun l ->
l "prs_of_repo %a -> @;@[<2>%a@]" Repo.pp repo PR.Set.pp prs);
prs
Expand All @@ -262,11 +268,8 @@ module Make (DK: Datakit_S.CLIENT) = struct

let prs ?repos:rs tree =
maybe_repos tree rs >>= fun repos ->
Lwt_list.fold_left_s (fun acc r ->
prs_of_repo tree r >|= fun prs ->
PR.Set.union prs acc
) PR.Set.empty (Repo.Set.elements repos)
>|= fun prs ->
Lwt_list.map_p (prs_of_repo tree) (Repo.Set.elements repos) >|= fun prs ->
let prs = reduce_prs prs in
Log.debug (fun l -> l "prs -> @;@[<2>%a@]" PR.Set.pp prs);
prs

Expand Down Expand Up @@ -294,13 +297,13 @@ module Make (DK: Datakit_S.CLIENT) = struct
Commit.Set.pp cs);
cs

let reduce_commits = List.fold_left Commit.Set.union Commit.Set.empty

let commits ?repos:rs tree =
maybe_repos tree rs >>= fun repos ->
Lwt_list.fold_left_s (fun acc r ->
commits_of_repo tree r >|= fun commits ->
Commit.Set.union commits acc
) Commit.Set.empty (Repo.Set.elements repos)
Lwt_list.map_p (commits_of_repo tree) (Repo.Set.elements repos)
>|= fun cs ->
let cs = reduce_commits cs in
Log.debug (fun l -> l "commits -> @;@[<2>%a@]" Commit.Set.pp cs);
cs

Expand All @@ -321,7 +324,7 @@ module Make (DK: Datakit_S.CLIENT) = struct
"state" , Some (Status_state.to_string @@ Status.state s);
"target_url" , mapo Uri.to_string (Status.url s);
] in
Lwt_list.iter_s (fun (k, v) -> match v with
Lwt_list.iter_p (fun (k, v) -> match v with
| None -> safe_remove t (dir / k)
| Some v ->
let v = Cstruct.of_string (v ^ "\n") in
Expand Down Expand Up @@ -355,16 +358,17 @@ module Make (DK: Datakit_S.CLIENT) = struct
let url = mapo Uri.of_string url in
Some (Status.v ?description ?url commit context state)

let reduce_status = List.fold_left Status.Set.union Status.Set.empty

let statuses_of_commits tree commits =
Lwt_list.fold_left_s (fun acc commit ->
Lwt_list.map_p (fun commit ->
let dir = root (Commit.repo commit) / "commit" in
let dir = dir / Commit.hash commit / "status" in
walk (module Status.Set) tree dir
("state", fun c -> status tree (commit, c))
>|= fun status ->
Status.Set.union status acc
) Status.Set.empty (Commit.Set.elements commits)
) (Commit.Set.elements commits)
>|= fun status ->
let status = reduce_status status in
Log.debug (fun l -> l "statuses_of_commits %a -> @;@[<2>%a@]"
Commit.Set.pp commits Status.Set.pp status);
status
Expand Down Expand Up @@ -401,13 +405,12 @@ module Make (DK: Datakit_S.CLIENT) = struct
l "refs_of_repo %a -> @;@[<2>%a@]" Repo.pp repo Ref.Set.pp refs);
refs

let reduce_refs = List.fold_left Ref.Set.union Ref.Set.empty

let refs ?repos:rs tree =
maybe_repos tree rs >>= fun repos ->
Lwt_list.fold_left_s (fun acc r ->
refs_of_repo tree r >|= fun refs ->
Ref.Set.union acc refs
) Ref.Set.empty (Repo.Set.elements repos)
>|= fun refs ->
Lwt_list.map_p (refs_of_repo tree) (Repo.Set.elements repos) >|= fun refs ->
let refs = reduce_refs refs in
Log.debug (fun l -> l "refs -> @;@[<2>%a@]" Ref.Set.pp refs);
refs

Expand Down Expand Up @@ -455,29 +458,31 @@ module Make (DK: Datakit_S.CLIENT) = struct

(* Dirty *)

let reduce_elts = List.fold_left Elt.IdSet.union Elt.IdSet.empty

let dirty_repos tree =
let root = Datakit_path.empty in
safe_read_dir tree root >>= fun users ->
Lwt_list.fold_left_s (fun acc user ->
Lwt_list.map_p (fun user ->
safe_read_dir tree (root / user) >>= fun repos ->
Lwt_list.fold_left_s (fun acc repo ->
Lwt_list.map_p (fun repo ->
safe_exists_file tree (root / user /repo / ".dirty") >|= function
| false -> acc
| true -> Elt.IdSet.add (`Repo (Repo.v ~user ~repo)) acc
) acc repos
) Elt.IdSet.empty users
| false -> Elt.IdSet.empty
| true -> Elt.IdSet.singleton (`Repo (Repo.v ~user ~repo))
) repos >|= reduce_elts
) users >|= reduce_elts

let dirty_prs tree repo =
let dir = root repo / "pr" in
safe_read_dir tree dir >>= fun nums ->
Lwt_list.fold_left_s (fun acc n ->
Lwt_list.map_p (fun n ->
let d = dir / n / ".dirty" in
safe_exists_file tree d >|= function
| false -> acc
| false -> Elt.IdSet.empty
| true ->
try Elt.IdSet.add (`PR (repo, int_of_string n)) acc
with Failure _ -> acc
) Elt.IdSet.empty nums
try Elt.IdSet.singleton (`PR (repo, int_of_string n))
with Failure _ -> Elt.IdSet.empty
) nums >|= reduce_elts

let dirty_refs tree repo =
let dir = root repo / "ref" in
Expand All @@ -491,11 +496,13 @@ module Make (DK: Datakit_S.CLIENT) = struct
dirty_repos t >>= fun dirty_repos ->
repos t >>= fun repos ->
(* we only check for dirty prs/refs for monitored repos only *)
Lwt_list.fold_left_s (fun acc r ->
Lwt_list.map_p (fun r ->
dirty_prs t r >>= fun prs ->
dirty_refs t r >|= fun refs ->
acc ++ prs ++ refs
) dirty_repos (Repo.Set.elements repos)
prs ++ refs
) (Repo.Set.elements repos)
>|= fun more ->
dirty_repos ++ reduce_elts more

let dirty_file: Elt.id -> Datakit_path.t = function
| `Repo r -> root r / ".dirty"
Expand Down Expand Up @@ -661,7 +668,7 @@ module Make (DK: Datakit_S.CLIENT) = struct
let f tr =
Log.debug
(fun l -> l "remove_snapshot (from %s):@;%a" debug Elt.IdSet.pp t);
Lwt_list.iter_s (remove_elt tr) (Elt.IdSet.elements t)
Lwt_list.iter_p (remove_elt tr) (Elt.IdSet.elements t)
in
Some f

Expand All @@ -671,7 +678,7 @@ module Make (DK: Datakit_S.CLIENT) = struct
let f tr =
Log.debug
(fun l -> l "update_snapshot (from %s):@;%a" debug Elt.Set.pp t);
Lwt_list.iter_s (update_elt tr) (Elt.Set.elements t)
Lwt_list.iter_p (update_elt tr) (Elt.Set.elements t)
in
Some f

Expand Down
2 changes: 1 addition & 1 deletion bridge/github/main.ml
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ let start () no_listen listen_urls datakit cap webhook resync_interval =
failwith "connect to datakit"
in
Lwt.catch
(fun () -> Client9p.connect proto address ())
(fun () -> Client9p.connect proto address ~max_fids:Int32.max_int ())
(fun e -> Lwt.fail_with @@ Fmt.strf "%a" Fmt.exn e)
>>= function
| Error (`Msg e) ->
Expand Down
1 change: 1 addition & 0 deletions datakit-github.opam
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ depends: [
"lwt" "asetmap"
"logs" "fmt" "mtime" "asl" "win-eventlog" "hvsock"
"hex" "nocrypto" "conduit"
"protocol-9p" {>= "0.8.0"}
"datakit-server" {>= "0.7.0"}
"datakit-client" {>= "0.7.0"}
"github-hooks" {>= "0.1.1"}
Expand Down
10 changes: 5 additions & 5 deletions tests/test_github.ml
Original file line number Diff line number Diff line change
Expand Up @@ -1511,7 +1511,7 @@ let rec read_state ~user ~repo ~commit tree path context =
[ Status.v ?description ?url commit context state]
end
>>= fun this_state ->
items |> Lwt_list.map_s (function
items |> Lwt_list.map_p (function
| "status" | "description" | "target_url" -> Lwt.return []
| item ->
read_state ~user ~repo ~commit tree (path / item) (context @ [item])
Expand All @@ -1528,7 +1528,7 @@ let read_opt_dir tree path =
let read_commits tree ~user ~repo =
let path = Datakit_path.of_steps_exn [user; repo; "commit"] in
read_opt_dir tree path >>=
Lwt_list.map_s (fun commit ->
Lwt_list.map_p (fun commit ->
let path =
Datakit_path.of_steps_exn [user; repo; "commit"; commit; "status"]
in
Expand All @@ -1539,7 +1539,7 @@ let read_commits tree ~user ~repo =
let read_prs tree ~user ~repo =
let path = Datakit_path.of_steps_exn [user; repo; "pr"] in
read_opt_dir tree path >>=
Lwt_list.map_s (fun number ->
Lwt_list.map_p (fun number ->
let path = Datakit_path.of_steps_exn [user; repo; "pr"; number] in
let number = int_of_string number in
let read name =
Expand Down Expand Up @@ -1742,11 +1742,11 @@ let test_random_datakit ~quick _repo conn =
let update_datakit users =
let events = Users.diff_events users (Users.empty ()) in
DK.Branch.with_transaction branch (fun tr ->
Lwt_list.iter_s (fun { Repo.user; repo } ->
Lwt_list.iter_p (fun { Repo.user; repo } ->
safe_remove tr Datakit_path.(empty / user / repo)
) (Repo.Set.elements all_repos)
>>= fun () ->
Lwt_list.iter_s (Conv.update_event tr) events >>= fun () ->
Lwt_list.iter_p (Conv.update_event tr) events >>= fun () ->
DK.Transaction.commit tr ~message:"User updates"
) >>= function
| Error e -> Lwt.fail (DK_error e)
Expand Down
2 changes: 1 addition & 1 deletion tests/test_utils.ml
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ let run fn =
Server.accept ~root ~msg:"test" for_server >>*= Lwt.return
in
Lwt.finalize
(fun () -> Client.connect for_client () >>*= fn repo)
(fun () -> Client.connect for_client ~max_fids:Int32.max_int () >>*= fn repo)
(fun () -> Lwt.cancel server_thread; Lwt.return ())
end

Expand Down