From 31e8594a422b322a0b8e9606dedd74bd654501f7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edwin=20T=C3=B6r=C3=B6k?= Date: Sun, 10 Oct 2021 23:50:35 +0100 Subject: [PATCH 01/15] Multi_channel: assert that id is valid before reusing it MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Edwin Török --- lib/multi_channel.ml | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/lib/multi_channel.ml b/lib/multi_channel.ml index 63888e5..16bbe4f 100644 --- a/lib/multi_channel.ml +++ b/lib/multi_channel.ml @@ -77,7 +77,10 @@ let init_domain_state mchan dls_state = let get_local_state mchan = let dls_state = Domain.DLS.get dls_key in - if dls_state.id >= 0 then dls_state + if dls_state.id >= 0 then begin + assert (dls_state.id < Array.length mchan.channels); + dls_state + end else (init_domain_state mchan dls_state) [@@inline] From b8cf627f84daaca65373f43b185ebc4360ccd5bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edwin=20T=C3=B6r=C3=B6k?= Date: Sun, 10 Oct 2021 23:48:54 +0100 Subject: [PATCH 02/15] Multi_channel: use per-channel key instead of global MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit There can be multiple multi_channel (or indeed pool) instances active at the same time, each with a different configuration. We cannot necessarily safely reuse any IDs issued by one channel on another channel, so ensure that we use a unique key per channel by allocating it together with the channel. Signed-off-by: Edwin Török --- lib/multi_channel.ml | 24 +++++++++++++----------- lib/task.ml | 4 ++-- 2 files changed, 15 insertions(+), 13 deletions(-) diff --git a/lib/multi_channel.ml b/lib/multi_channel.ml index 16bbe4f..d47ce20 100644 --- a/lib/multi_channel.ml +++ b/lib/multi_channel.ml @@ -25,22 +25,23 @@ type waiting_status = | Waiting | Released +type dls_state = { + mutable id: int; + mutable steal_offsets: int array; + rng_state: Random.State.t; + mc: mutex_condvar; +} + type 'a t = { mask: int; channels: 'a Ws_deque.t array; waiters: (waiting_status ref * mutex_condvar ) Chan.t; next_domain_id: int Atomic.t; recv_block_spins: int; + dls_key: dls_state Domain.DLS.key; } -type dls_state = { - mutable id: int; - mutable steal_offsets: int array; - rng_state: Random.State.t; - mc: mutex_condvar; -} - -let dls_key = +let dls_make_key () = Domain.DLS.new_key (fun () -> { id = -1; @@ -61,6 +62,7 @@ let make ?(recv_block_spins = 2048) n = waiters = Chan.make_unbounded (); next_domain_id = Atomic.make 0; recv_block_spins; + dls_key = dls_make_key () } let register_domain mchan = @@ -76,7 +78,7 @@ let init_domain_state mchan dls_state = [@@inline never] let get_local_state mchan = - let dls_state = Domain.DLS.get dls_key in + let dls_state = Domain.DLS.get mchan.dls_key in if dls_state.id >= 0 then begin assert (dls_state.id < Array.length mchan.channels); dls_state @@ -84,8 +86,8 @@ let get_local_state mchan = else (init_domain_state mchan dls_state) [@@inline] -let clear_local_state () = - let dls_state = Domain.DLS.get dls_key in +let clear_local_state mchan = + let dls_state = Domain.DLS.get mchan.dls_key in dls_state.id <- (-1) let rec check_waiters mchan = diff --git a/lib/task.ml b/lib/task.ml index b3138ac..3d9271d 100644 --- a/lib/task.ml +++ b/lib/task.ml @@ -38,7 +38,7 @@ let setup_pool ?name ~num_additional_domains () = let task_chan = Multi_channel.make (num_additional_domains+1) in let rec worker () = match Multi_channel.recv task_chan with - | Quit -> Multi_channel.clear_local_state (); + | Quit -> Multi_channel.clear_local_state task_chan; | Task (t, p) -> do_task t p; worker () @@ -86,7 +86,7 @@ let teardown_pool pool = for _i=1 to Array.length pd.domains do Multi_channel.send pd.task_chan Quit done; - Multi_channel.clear_local_state (); + Multi_channel.clear_local_state pd.task_chan; Array.iter Domain.join pd.domains; (* Remove the pool from the table *) begin match pd.name with From 9348e59d349a991042b01317f7eccc20c31ad778 Mon Sep 17 00:00:00 2001 From: KC Sivaramakrishnan Date: Tue, 12 Oct 2021 15:09:54 +0530 Subject: [PATCH 03/15] Utilise effect handlers --- lib/task.ml | 135 ++++++++++++++++++----------- lib/task.mli | 77 +++++++++------- test/LU_decomposition_multicore.ml | 14 +-- test/dune | 12 +-- test/enumerate_par.ml | 5 +- test/fib_par.ml | 4 +- test/game_of_life_multicore.ml | 2 +- test/prefix_sum.ml | 2 +- test/spectralnorm2_multicore.ml | 7 +- test/sum_par.ml | 14 +-- test/summed_area_table.ml | 2 +- test/task_exn.ml | 30 ------- test/test_deadlock.ml | 31 +++++++ test/test_task.ml | 34 ++++---- 14 files changed, 215 insertions(+), 154 deletions(-) delete mode 100644 test/task_exn.ml create mode 100644 test/test_deadlock.ml diff --git a/lib/task.ml b/lib/task.ml index b3138ac..316e9b4 100644 --- a/lib/task.ml +++ b/lib/task.ml @@ -1,33 +1,102 @@ +open EffectHandlers +open EffectHandlers.Deep + type 'a task = unit -> 'a -type 'a promise = ('a, exn) result option Atomic.t +type 'a promise_state = + Returned of 'a +| Raised of exn +| Pending of (unit, unit) continuation list -exception TasksActive +type 'a promise = 'a promise_state Atomic.t -type task_msg = - Task : 'a task * 'a promise -> task_msg -| Quit : task_msg +type message = + Work of (unit -> unit) +| Quit type pool_data = { domains : unit Domain.t array; - task_chan : task_msg Multi_channel.t; + task_chan : message Multi_channel.t; name: string option } type pool = pool_data option Atomic.t -let do_task f p = +let get_pool_data p = + match Atomic.get p with + | None -> raise (Invalid_argument "pool already torn down") + | Some p -> p + +let do_task pd f p = try let res = f () in - Atomic.set p (Some (Ok res)) + match Atomic.exchange p (Returned res) with + | Pending l -> + List.iter (fun k -> Multi_channel.send pd.task_chan (Work (continue k))) l + | _ -> failwith "Task.do_task: impossible (1)" with e -> - Atomic.set p (Some (Error e)); - match e with - | TasksActive -> raise e - | _ -> () + begin match Atomic.exchange p (Raised e) with + | Pending l -> List.iter (fun k -> + Multi_channel.send pd.task_chan (Work (fun _ -> discontinue k e))) l + | _ -> failwith "Task.do_task: impossible (2)" + end -let named_pools = Hashtbl.create 8 +let async pool f = + let pd = get_pool_data pool in + let p = Atomic.make (Pending []) in + Multi_channel.send pd.task_chan (Work (fun _ -> do_task pd f p)); + p + +type _ eff += Wait : 'a promise -> unit eff +let rec await promise = + match Atomic.get promise with + | Returned v -> v + | Raised e -> raise e + | Pending _ -> perform (Wait promise); await promise + +let step (type a) (f : a -> unit) (v : a) : unit = + try_with f v + { effc = fun (type a) (e : a eff) -> + match e with + | Wait p -> Some (fun (k : (a, _) continuation) -> + let rec loop () = + let old = Atomic.get p in + match old with + | Pending l -> + if Atomic.compare_and_set p old (Pending (k::l)) then () + else loop () + | Returned _ | Raised _ -> continue k () + in + loop ()) + | _ -> None } + +let rec worker task_chan = + match Multi_channel.recv task_chan with + | Quit -> Multi_channel.clear_local_state () + | Work f -> step f (); worker task_chan + +let run (type a) pool (f : unit -> a) : a = + let pd = get_pool_data pool in + let p = Atomic.make (Pending []) in + step (do_task pd f) p; + let rec loop () : a = + match Atomic.get p with + | Pending _ -> + begin + try + match Multi_channel.recv_poll pd.task_chan with + | Work f -> step f () + | Quit -> failwith "Task.run: tasks are active on pool" + with Exit -> Domain.Sync.cpu_relax () + end; + loop () + | Returned v -> v + | Raised e -> raise e + in + loop () + +let named_pools = Hashtbl.create 8 let named_pools_mutex = Mutex.create () let setup_pool ?name ~num_additional_domains () = @@ -36,14 +105,9 @@ let setup_pool ?name ~num_additional_domains () = "Task.setup_pool: num_additional_domains must be at least 0") else let task_chan = Multi_channel.make (num_additional_domains+1) in - let rec worker () = - match Multi_channel.recv task_chan with - | Quit -> Multi_channel.clear_local_state (); - | Task (t, p) -> - do_task t p; - worker () + let domains = Array.init num_additional_domains (fun _ -> + Domain.spawn (fun _ -> worker task_chan)) in - let domains = Array.init num_additional_domains (fun _ -> Domain.spawn worker) in let p = Atomic.make (Some {domains; task_chan; name}) in begin match name with | None -> () @@ -54,33 +118,6 @@ let setup_pool ?name ~num_additional_domains () = end; p -let get_pool_data p = - match Atomic.get p with - | None -> raise (Invalid_argument "pool already torn down") - | Some p -> p - -let async pool task = - let pd = get_pool_data pool in - let p = Atomic.make None in - Multi_channel.send pd.task_chan (Task(task,p)); - p - -let rec await pool promise = - let pd = get_pool_data pool in - match Atomic.get promise with - | None -> - begin - try - match Multi_channel.recv_poll pd.task_chan with - | Task (t, p) -> do_task t p - | Quit -> raise TasksActive - with - | Exit -> Domain.Sync.cpu_relax () - end; - await pool promise - | Some (Ok v) -> v - | Some (Error e) -> raise e - let teardown_pool pool = let pd = get_pool_data pool in for _i=1 to Array.length pd.domains do @@ -129,7 +166,7 @@ let parallel_for_reduce ?(chunk_size=0) ~start ~finish ~body pool reduce_fun ini let d = s + ((e - s) / 2) in let p = async pool (fun _ -> work s d) in let right = work (d+1) e in - let left = await pool p in + let left = await p in reduce_fun left right end in @@ -152,7 +189,7 @@ let parallel_for ?(chunk_size=0) ~start ~finish ~body pool = let d = s + ((e - s) / 2) in let left = async pool (fun _ -> work pool fn s d) in work pool fn (d+1) e; - await pool left + await left end in work pool body start finish diff --git a/lib/task.mli b/lib/task.mli index 761fb67..4f3b22d 100644 --- a/lib/task.mli +++ b/lib/task.mli @@ -9,57 +9,72 @@ type pool val setup_pool : ?name:string -> num_additional_domains:int -> unit -> pool (** Sets up a task execution pool with [num_additional_domains + 1] domains - * including the current domain. If [name] is provided, the pool is mapped to - * [name] which can be looked up later with [lookup_pool name]. - * Raises [Invalid_argumet] when [num_additional_domains] is less than 0. *) + including the current domain. If [name] is provided, the pool is mapped to + [name] which can be looked up later with [lookup_pool name]. -exception TasksActive + Raises [Invalid_argumet] when [num_additional_domains] is less than 0. *) val teardown_pool : pool -> unit -(** Tears down the task execution pool. - * Raises [TasksActive] exception if any tasks are currently active. *) +(** Tears down the task execution pool. *) val lookup_pool : string -> pool option (** [lookup_pool name] returns [Some pool] if [pool] is associated to [name] or - * returns [None] if no value is associated to it. *) + returns [None] if no value is associated to it. *) val get_num_domains : pool -> int (** [get_num_domains pool] returns the total number of domains in [pool] - * including the parent domain. *) + including the parent domain. *) + +val run : pool -> 'a task -> 'a +(** [run p t] runs the task [t] synchronously in the pool [p]. If the task [t] + blocks on a promise, then tasks from the pool [p] are executed until the + promise blocking [t] is resolved. + + This function should be used at the top level to enclose the calls to other + functions that may await on promises. This includes {!await}, + {!parallel_for} and its variants. Otherwise, those functions will raise + [Unhandled] exception. *) val async : pool -> 'a task -> 'a promise (** [async p t] runs the task [t] asynchronously in the pool [p]. The function - * returns a promise [r] in which the result of the task [t] will be stored. - * *) + returns a promise [r] in which the result of the task [t] will be stored. + *) + +val await : 'a promise -> 'a +(** [await r] waits for the promise to be resolved. If the task associated + with the promise had completed sucessfully, then the result of the task + will be returned. If the task had raised an exception, then [await] raises + the same exception. -val await : pool -> 'a promise -> 'a -(** [await p r] waits for the promise to be resolved. If the task associated - * with the promise had completed sucessfully, then the result of the task will - * be returned. If the task had raised an exception, then [await] raises the - * same exception. *) + Must be called with a call to {!run} in the dynamic scope. *) val parallel_for : ?chunk_size:int -> start:int -> finish:int -> body:(int -> unit) -> pool -> unit (** [parallel_for c s f b p] behaves similar to [for i=s to f do b i done], but - * runs the for loop in parallel. The chunk size [c] determines the number of - * body applications done in one task; this will default to - * [max(1, (finish-start + 1) / (8 * num_domains))]. Individual iterates may - * be run in any order. Tasks are distributed to workers using a - * divide-and-conquer scheme. - *) + runs the for loop in parallel. The chunk size [c] determines the number of + body applications done in one task; this will default to [max(1, + (finish-start + 1) / (8 * num_domains))]. Individual iterates may be run in + any order. Tasks are distributed to workers using a divide-and-conquer + scheme. + + Must be called with a call to {!run} in the dynamic scope. *) val parallel_for_reduce : ?chunk_size:int -> start:int -> finish:int -> body:(int -> 'a) -> pool -> ('a -> 'a -> 'a) -> 'a -> 'a (** [parallel_for_reduce c s f b p r i] is similar to [parallel_for] except - * that the result returned by each iteration is reduced with [r] with initial - * value [i]. The reduce operations are performed in an arbitrary order and the - * reduce function needs to be commutative and associative in order to obtain - * a deterministic result. *) + that the result returned by each iteration is reduced with [r] with initial + value [i]. The reduce operations are performed in an arbitrary order and + the reduce function needs to be commutative and associative in order to + obtain a deterministic result. + + Must be called with a call to {!run} in the dynamic scope. *) val parallel_scan : pool -> ('a -> 'a -> 'a) -> 'a array -> 'a array -(** [parallel_scan p op a] computes the scan of the array [a] - * in parallel with binary operator [op] and returns the result array. - * Scan is similar to [Array.fold_left] but returns an array of reduced - * intermediate values. The reduce operations are performed in an arbitrary - * order and the reduce function needs to be commutative and associative in - * order to obtain a deterministic result *) +(** [parallel_scan p op a] computes the scan of the array [a] in parallel with + binary operator [op] and returns the result array. Scan is similar to + [Array.fold_left] but returns an array of reduced intermediate values. The + reduce operations are performed in an arbitrary order and the reduce + function needs to be commutative and associative in order to obtain a + deterministic result. + + Must be called with a call to {!run} in the dynamic scope. *) diff --git a/test/LU_decomposition_multicore.ml b/test/LU_decomposition_multicore.ml index 31ad752..2bb62e7 100644 --- a/test/LU_decomposition_multicore.ml +++ b/test/LU_decomposition_multicore.ml @@ -29,7 +29,7 @@ module SquareMatrix = struct let b = Array.create_float n in let rec aux acc num_domains i = if (i = num_domains) then - (List.iter (fun e -> T.await pool e) acc) + (List.iter (fun e -> T.await e) acc) else begin aux ((T.async pool (fun _ -> copy_part a b i))::acc) num_domains (i+1) end @@ -55,9 +55,11 @@ let lup pool (a0 : float array) = let () = let pool = T.setup_pool ~num_additional_domains:(num_domains - 1) () in - let a = parallel_create pool - (fun _ _ -> (Random.State.float (Domain.DLS.get k) 100.0) +. 1.0 ) in - let lu = lup pool a in - let _l = parallel_create pool (fun i j -> if i > j then get lu i j else if i = j then 1.0 else 0.0) in - let _u = parallel_create pool (fun i j -> if i <= j then get lu i j else 0.0) in + T.run pool (fun _ -> + let a = parallel_create pool + (fun _ _ -> (Random.State.float (Domain.DLS.get k) 100.0) +. 1.0 ) in + let lu = lup pool a in + let _l = parallel_create pool (fun i j -> if i > j then get lu i j else if i = j then 1.0 else 0.0) in + let _u = parallel_create pool (fun i j -> if i <= j then get lu i j else 0.0) in + ()); T.teardown_pool pool diff --git a/test/dune b/test/dune index 18d93cc..1ae33d6 100644 --- a/test/dune +++ b/test/dune @@ -51,12 +51,6 @@ (modules sum_par) (modes native)) -(test - (name task_exn) - (libraries domainslib) - (modules task_exn) - (modes native)) - (test (name task_throughput) (libraries domainslib) @@ -86,3 +80,9 @@ (libraries domainslib) (modules test_task) (modes native)) + +(test + (name test_deadlock) + (libraries domainslib) + (modules test_deadlock) + (modes native)) diff --git a/test/enumerate_par.ml b/test/enumerate_par.ml index f2b736d..ea40092 100644 --- a/test/enumerate_par.ml +++ b/test/enumerate_par.ml @@ -5,6 +5,7 @@ module T = Domainslib.Task let _ = let p = T.setup_pool ~num_additional_domains:(num_domains - 1) () in - T.parallel_for p ~start:0 ~finish:(n-1) ~chunk_size:16 ~body:(fun i -> - print_string @@ Printf.sprintf "[%d] %d\n%!" (Domain.self () :> int) i); + T.run p (fun _ -> + T.parallel_for p ~start:0 ~finish:(n-1) ~chunk_size:16 ~body:(fun i -> + print_string @@ Printf.sprintf "[%d] %d\n%!" (Domain.self () :> int) i)); T.teardown_pool p diff --git a/test/fib_par.ml b/test/fib_par.ml index 9c0e57e..8babb7c 100644 --- a/test/fib_par.ml +++ b/test/fib_par.ml @@ -12,10 +12,10 @@ let rec fib_par pool n = else let a = T.async pool (fun _ -> fib_par pool (n-1)) in let b = T.async pool (fun _ -> fib_par pool (n-2)) in - T.await pool a + T.await pool b + T.await a + T.await b let main = let pool = T.setup_pool ~num_additional_domains:(num_domains - 1) () in - let res = fib_par pool n in + let res = T.run pool (fun _ -> fib_par pool n) in T.teardown_pool pool; Printf.printf "fib(%d) = %d\n" n res diff --git a/test/game_of_life_multicore.ml b/test/game_of_life_multicore.ml index 2f97a49..f26e422 100644 --- a/test/game_of_life_multicore.ml +++ b/test/game_of_life_multicore.ml @@ -64,6 +64,6 @@ let rec repeat pool n = let ()= let pool = T.setup_pool ~num_additional_domains:(num_domains - 1) () in print !rg; - repeat pool n_times; + T.run pool (fun _ -> repeat pool n_times); print !rg; T.teardown_pool pool diff --git a/test/prefix_sum.ml b/test/prefix_sum.ml index 0e50ae6..8926395 100644 --- a/test/prefix_sum.ml +++ b/test/prefix_sum.ml @@ -10,6 +10,6 @@ let _ = let pool = T.setup_pool ~num_additional_domains:(num_domains - 1) () in let arr = gen n in let t = Unix.gettimeofday() in - let _ = prefix_sum pool arr in + ignore (T.run pool (fun _ -> prefix_sum pool arr)); Printf.printf "Execution time: %fs\n" (Unix.gettimeofday() -. t); T.teardown_pool pool diff --git a/test/spectralnorm2_multicore.ml b/test/spectralnorm2_multicore.ml index fb59694..c2a3b5f 100644 --- a/test/spectralnorm2_multicore.ml +++ b/test/spectralnorm2_multicore.ml @@ -36,9 +36,10 @@ let eval_AtA_times_u pool u v = let () = let pool = T.setup_pool ~num_additional_domains:(num_domains - 1) () in let u = Array.make n 1.0 and v = Array.make n 0.0 in - for _i = 0 to 9 do - eval_AtA_times_u pool u v; eval_AtA_times_u pool v u - done; + T.run pool (fun _ -> + for _i = 0 to 9 do + eval_AtA_times_u pool u v; eval_AtA_times_u pool v u + done); T.teardown_pool pool; let vv = ref 0.0 and vBv = ref 0.0 in diff --git a/test/sum_par.ml b/test/sum_par.ml index d771878..0a99a67 100644 --- a/test/sum_par.ml +++ b/test/sum_par.ml @@ -6,9 +6,9 @@ module T = Domainslib.Task let _ = (* use parallel_for_reduce *) let p = T.setup_pool ~num_additional_domains:(num_domains - 1) () in - let sum = + let sum = T.run p (fun _ -> T.parallel_for_reduce p (+) 0 ~chunk_size:(n/(4*num_domains)) ~start:0 - ~finish:(n-1) ~body:(fun _i -> 1) + ~finish:(n-1) ~body:(fun _i -> 1)) in T.teardown_pool p; Printf.printf "Sum is %d\n" sum; @@ -18,8 +18,9 @@ let _ = (* explictly use empty pool and default chunk_size *) let p = T.setup_pool ~num_additional_domains:0 () in let sum = Atomic.make 0 in - T.parallel_for p ~start:0 ~finish:(n-1) - ~body:(fun _i -> ignore (Atomic.fetch_and_add sum 1)); + T.run p (fun _ -> + T.parallel_for p ~start:0 ~finish:(n-1) + ~body:(fun _i -> ignore (Atomic.fetch_and_add sum 1))); let sum = Atomic.get sum in T.teardown_pool p; Printf.printf "Sum is %d\n" sum; @@ -29,8 +30,9 @@ let _ = (* configured num_domains and default chunk_size *) let p = T.setup_pool ~num_additional_domains:(num_domains - 1) () in let sum = Atomic.make 0 in - T.parallel_for p ~start:0 ~finish:(n-1) - ~body:(fun _i -> ignore (Atomic.fetch_and_add sum 1)); + T.run p (fun _ -> + T.parallel_for p ~start:0 ~finish:(n-1) + ~body:(fun _i -> ignore (Atomic.fetch_and_add sum 1))); let sum = Atomic.get sum in T.teardown_pool p; Printf.printf "Sum is %d\n" sum; diff --git a/test/summed_area_table.ml b/test/summed_area_table.ml index 7684f3b..b48f076 100644 --- a/test/summed_area_table.ml +++ b/test/summed_area_table.ml @@ -30,7 +30,7 @@ let _ = let m = Array.make_matrix size size 1 (*Array.init size (fun _ -> Array.init size (fun _ -> Random.int size))*) in let pool = T.setup_pool ~num_additional_domains:(num_domains - 1) () in - let _ = calc_table pool m in + let _ = T.run pool (fun _ -> calc_table pool m) in (* for i = 0 to size-1 do for j = 0 to size-1 do diff --git a/test/task_exn.ml b/test/task_exn.ml deleted file mode 100644 index 1672fde..0000000 --- a/test/task_exn.ml +++ /dev/null @@ -1,30 +0,0 @@ -module T = Domainslib.Task - -exception E - -let _ = - let pool = T.setup_pool ~num_additional_domains:3 () in - - let p1 = T.async pool (fun () -> - let p2 = T.async pool (fun () -> raise E) in - T.await pool p2) - in - begin match T.await pool p1 with - | _ -> () - | exception E -> print_endline "Caught E" - end; - - let _p1 = T.async pool (fun () -> - let p2 = T.async pool (fun () -> - let rec loop n = - if n = 0 then () - else loop (n-1) - in loop 100000000) - in - T.await pool p2) - in - match T.teardown_pool pool with - | _ -> () - | exception T.TasksActive -> - (* innermost task may still be active *) - print_endline "Caught TasksActive" diff --git a/test/test_deadlock.ml b/test/test_deadlock.ml new file mode 100644 index 0000000..bbd6209 --- /dev/null +++ b/test/test_deadlock.ml @@ -0,0 +1,31 @@ +(* Despite what the name says, this test will not deadlock. A similar test will + * deadlock in the version not using effect handlers. See + * https://github.com/ocaml-multicore/ocaml-multicore/issues/670 *) + +module T = Domainslib.Task + +let n = try int_of_string Sys.argv.(1) with _ -> 1_000_000 + +let rec loop n = + if n = 0 then + Printf.printf "Looping finished on domain %d\n%!" (Domain.self () :> int) + else (Domain.Sync.cpu_relax (); loop (n-1)) + +let () = + let pool = T.setup_pool ~num_additional_domains:2 () in + T.run pool (fun _ -> + let a = T.async pool (fun _ -> + Printf.printf "Task A running on domain %d\n%!" (Domain.self () :> int); + loop n) + in + let b = T.async pool (fun _ -> + Printf.printf "Task B running on domain %d\n%!" (Domain.self () :> int); + T.await a) + in + let c = T.async pool (fun _ -> + Printf.printf "Task C running on domain %d\n%!" (Domain.self () :> int); + T.await b) + in + loop n; + T.await c); + T.teardown_pool pool diff --git a/test/test_task.ml b/test/test_task.ml index 1aaf3e2..951c253 100644 --- a/test/test_task.ml +++ b/test/test_task.ml @@ -39,22 +39,24 @@ let prefix_sum pool = fun () -> let () = let pool1 = Task.setup_pool ~num_additional_domains:2 ~name:"pool1" () in let pool2 = Task.setup_pool ~num_additional_domains:2 ~name:"pool2" () in - let p1 = Option.get @@ Task.lookup_pool "pool1" in - modify_arr pool1 0 (); - modify_arr pool1 25 (); - modify_arr pool1 100 (); - inc_ctr p1 0 (); - inc_ctr p1 16 (); - inc_ctr p1 32 (); - inc_ctr p1 1000 (); - let p2 = Option.get @@ Task.lookup_pool "pool2" in - sum_sequence pool2 0 0 (); - sum_sequence pool2 10 10 (); - sum_sequence pool2 1 0 (); - sum_sequence p2 1 10 (); - sum_sequence p2 100 10 (); - sum_sequence p2 100 100 (); - prefix_sum p2 (); + Task.run pool1 (fun _ -> + let p1 = Option.get @@ Task.lookup_pool "pool1" in + modify_arr pool1 0 (); + modify_arr pool1 25 (); + modify_arr pool1 100 (); + inc_ctr p1 0 (); + inc_ctr p1 16 (); + inc_ctr p1 32 (); + inc_ctr p1 1000 ()); + Task.run pool2 (fun _ -> + let p2 = Option.get @@ Task.lookup_pool "pool2" in + sum_sequence pool2 0 0 (); + sum_sequence pool2 10 10 (); + sum_sequence pool2 1 0 (); + sum_sequence p2 1 10 (); + sum_sequence p2 100 10 (); + sum_sequence p2 100 100 (); + prefix_sum p2 ()); Task.teardown_pool pool1; Task.teardown_pool pool2; From 2755c84a50a5f98277eded1d425f0e991b802763 Mon Sep 17 00:00:00 2001 From: KC Sivaramakrishnan Date: Thu, 14 Oct 2021 09:23:51 +0530 Subject: [PATCH 04/15] Prevent tasks blocking on await from hopping pools. Optimisations. --- lib/task.ml | 67 ++++++++++++++++-------------- lib/task.mli | 4 +- test/LU_decomposition_multicore.ml | 2 +- test/fib_par.ml | 2 +- test/test_deadlock.ml | 6 +-- 5 files changed, 42 insertions(+), 39 deletions(-) diff --git a/lib/task.ml b/lib/task.ml index 316e9b4..15f074d 100644 --- a/lib/task.ml +++ b/lib/task.ml @@ -3,69 +3,72 @@ open EffectHandlers.Deep type 'a task = unit -> 'a -type 'a promise_state = - Returned of 'a -| Raised of exn -| Pending of (unit, unit) continuation list - -type 'a promise = 'a promise_state Atomic.t - type message = Work of (unit -> unit) | Quit +type task_chan = message Multi_channel.t + type pool_data = { domains : unit Domain.t array; - task_chan : message Multi_channel.t; + task_chan : task_chan; name: string option } type pool = pool_data option Atomic.t +type 'a promise_state = + Returned of 'a +| Raised of exn +| Pending of ((unit, unit) continuation * task_chan) list + +type 'a promise = 'a promise_state Atomic.t + +type _ eff += Wait : 'a promise * task_chan -> unit eff + let get_pool_data p = match Atomic.get p with | None -> raise (Invalid_argument "pool already torn down") | Some p -> p -let do_task pd f p = - try - let res = f () in - match Atomic.exchange p (Returned res) with - | Pending l -> - List.iter (fun k -> Multi_channel.send pd.task_chan (Work (continue k))) l - | _ -> failwith "Task.do_task: impossible (1)" - with e -> - begin match Atomic.exchange p (Raised e) with - | Pending l -> List.iter (fun k -> - Multi_channel.send pd.task_chan (Work (fun _ -> discontinue k e))) l - | _ -> failwith "Task.do_task: impossible (2)" - end +let cont (k, c) = Multi_channel.send c (Work (continue k)) +let discont e (k, c) = Multi_channel.send c (Work (fun _ -> discontinue k e)) + +let do_task f p = + let action, result = + try cont, Returned (f ()) + with e -> discont e, Raised e + in + match Atomic.exchange p result with + | Pending l -> List.iter action l + | _ -> failwith "Task.do_task: impossible, can only set result of task once" let async pool f = let pd = get_pool_data pool in let p = Atomic.make (Pending []) in - Multi_channel.send pd.task_chan (Work (fun _ -> do_task pd f p)); + Multi_channel.send pd.task_chan (Work (fun _ -> do_task f p)); p -type _ eff += Wait : 'a promise -> unit eff - -let rec await promise = +let rec await pool promise = + let pd = get_pool_data pool in match Atomic.get promise with | Returned v -> v | Raised e -> raise e - | Pending _ -> perform (Wait promise); await promise + | Pending _ -> + perform (Wait (promise, pd.task_chan)); + await pool promise let step (type a) (f : a -> unit) (v : a) : unit = try_with f v { effc = fun (type a) (e : a eff) -> match e with - | Wait p -> Some (fun (k : (a, _) continuation) -> + | Wait (p,c) -> Some (fun (k : (a, _) continuation) -> let rec loop () = let old = Atomic.get p in match old with | Pending l -> - if Atomic.compare_and_set p old (Pending (k::l)) then () - else loop () + if Atomic.compare_and_set p old (Pending ((k,c)::l)) then () + else (Domain.Sync.cpu_relax (); loop ()) | Returned _ | Raised _ -> continue k () in loop ()) @@ -79,7 +82,7 @@ let rec worker task_chan = let run (type a) pool (f : unit -> a) : a = let pd = get_pool_data pool in let p = Atomic.make (Pending []) in - step (do_task pd f) p; + step (do_task f) p; let rec loop () : a = match Atomic.get p with | Pending _ -> @@ -166,7 +169,7 @@ let parallel_for_reduce ?(chunk_size=0) ~start ~finish ~body pool reduce_fun ini let d = s + ((e - s) / 2) in let p = async pool (fun _ -> work s d) in let right = work (d+1) e in - let left = await p in + let left = await pool p in reduce_fun left right end in @@ -189,7 +192,7 @@ let parallel_for ?(chunk_size=0) ~start ~finish ~body pool = let d = s + ((e - s) / 2) in let left = async pool (fun _ -> work pool fn s d) in work pool fn (d+1) e; - await left + await pool left end in work pool body start finish diff --git a/lib/task.mli b/lib/task.mli index 4f3b22d..129fc4b 100644 --- a/lib/task.mli +++ b/lib/task.mli @@ -40,8 +40,8 @@ val async : pool -> 'a task -> 'a promise returns a promise [r] in which the result of the task [t] will be stored. *) -val await : 'a promise -> 'a -(** [await r] waits for the promise to be resolved. If the task associated +val await : pool -> 'a promise -> 'a +(** [await p r] waits for the promise to be resolved. If the task associated with the promise had completed sucessfully, then the result of the task will be returned. If the task had raised an exception, then [await] raises the same exception. diff --git a/test/LU_decomposition_multicore.ml b/test/LU_decomposition_multicore.ml index 2bb62e7..aa207f1 100644 --- a/test/LU_decomposition_multicore.ml +++ b/test/LU_decomposition_multicore.ml @@ -29,7 +29,7 @@ module SquareMatrix = struct let b = Array.create_float n in let rec aux acc num_domains i = if (i = num_domains) then - (List.iter (fun e -> T.await e) acc) + (List.iter (fun e -> T.await pool e) acc) else begin aux ((T.async pool (fun _ -> copy_part a b i))::acc) num_domains (i+1) end diff --git a/test/fib_par.ml b/test/fib_par.ml index 8babb7c..eb65df1 100644 --- a/test/fib_par.ml +++ b/test/fib_par.ml @@ -12,7 +12,7 @@ let rec fib_par pool n = else let a = T.async pool (fun _ -> fib_par pool (n-1)) in let b = T.async pool (fun _ -> fib_par pool (n-2)) in - T.await a + T.await b + T.await pool a + T.await pool b let main = let pool = T.setup_pool ~num_additional_domains:(num_domains - 1) () in diff --git a/test/test_deadlock.ml b/test/test_deadlock.ml index bbd6209..72901c7 100644 --- a/test/test_deadlock.ml +++ b/test/test_deadlock.ml @@ -20,12 +20,12 @@ let () = in let b = T.async pool (fun _ -> Printf.printf "Task B running on domain %d\n%!" (Domain.self () :> int); - T.await a) + T.await pool a) in let c = T.async pool (fun _ -> Printf.printf "Task C running on domain %d\n%!" (Domain.self () :> int); - T.await b) + T.await pool b) in loop n; - T.await c); + T.await pool c); T.teardown_pool pool From d023dc21ca2e609df797caf16c4ddbbb2b05fec6 Mon Sep 17 00:00:00 2001 From: KC Sivaramakrishnan Date: Thu, 14 Oct 2021 13:26:30 +0530 Subject: [PATCH 05/15] Backtraces from await --- Makefile | 2 +- lib/task.ml | 13 ++++++++----- test/backtrace.ml | 35 +++++++++++++++++++++++++++++++++++ test/dune | 6 ++++++ 4 files changed, 50 insertions(+), 6 deletions(-) create mode 100644 test/backtrace.ml diff --git a/Makefile b/Makefile index 11e8a70..78e31fa 100644 --- a/Makefile +++ b/Makefile @@ -2,7 +2,7 @@ all: dune build @install run_test: - dune runtest -f + OCAMLRUNPARAM="b=1" dune runtest -f clean: dune clean diff --git a/lib/task.ml b/lib/task.ml index 15f074d..96f2c31 100644 --- a/lib/task.ml +++ b/lib/task.ml @@ -19,7 +19,7 @@ type pool = pool_data option Atomic.t type 'a promise_state = Returned of 'a -| Raised of exn +| Raised of exn * Printexc.raw_backtrace | Pending of ((unit, unit) continuation * task_chan) list type 'a promise = 'a promise_state Atomic.t @@ -32,12 +32,15 @@ let get_pool_data p = | Some p -> p let cont (k, c) = Multi_channel.send c (Work (continue k)) -let discont e (k, c) = Multi_channel.send c (Work (fun _ -> discontinue k e)) +let discont e bt (k, c) = Multi_channel.send c (Work (fun _ -> + discontinue_with_backtrace k e bt)) let do_task f p = let action, result = try cont, Returned (f ()) - with e -> discont e, Raised e + with e -> + let bt = Printexc.get_raw_backtrace () in + discont e bt, Raised (e, bt) in match Atomic.exchange p result with | Pending l -> List.iter action l @@ -53,7 +56,7 @@ let rec await pool promise = let pd = get_pool_data pool in match Atomic.get promise with | Returned v -> v - | Raised e -> raise e + | Raised (e, bt) -> Printexc.raise_with_backtrace e bt | Pending _ -> perform (Wait (promise, pd.task_chan)); await pool promise @@ -95,7 +98,7 @@ let run (type a) pool (f : unit -> a) : a = end; loop () | Returned v -> v - | Raised e -> raise e + | Raised (e, bt) -> Printexc.raise_with_backtrace e bt in loop () diff --git a/test/backtrace.ml b/test/backtrace.ml new file mode 100644 index 0000000..ca8e85d --- /dev/null +++ b/test/backtrace.ml @@ -0,0 +1,35 @@ +module T = Domainslib.Task + +let rec foo i = + if i = 0 then () + else begin + ignore (failwith "exn"); + foo i + end + [@@inline never] + +let rec bar i = + if i = 0 then () + else begin + foo i; + bar i + end + [@@inline never] + +let main () = + let pool = T.setup_pool ~num_additional_domains:0 () in + T.run pool (fun _ -> + let p = T.async pool (fun _ -> bar 42) in + T.await pool p; + Printf.printf "should not reach here\n%!"); + T.teardown_pool pool + +let _ = + try main () + with _ -> + let open Printexc in + let bt = get_raw_backtrace () in + let bt_slot_arr = Option.get (backtrace_slots bt) in + assert (Option.get (Slot.name bt_slot_arr.(1)) = "Backtrace.foo"); + let s = raw_backtrace_to_string bt in + print_string s diff --git a/test/dune b/test/dune index 1ae33d6..6d11acb 100644 --- a/test/dune +++ b/test/dune @@ -86,3 +86,9 @@ (libraries domainslib) (modules test_deadlock) (modes native)) + +(test + (name backtrace) + (libraries domainslib) + (modules backtrace) + (modes native)) From a9ce266be6389a8e0a1bf7b2e575123f49e56664 Mon Sep 17 00:00:00 2001 From: Sora Morimoto Date: Thu, 14 Oct 2021 20:26:51 +0900 Subject: [PATCH 06/15] Use a random number as the cache prefix to disable cache in CI Signed-off-by: Sora Morimoto --- .github/workflows/main.yml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 6467873..9ad5d90 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -24,6 +24,11 @@ jobs: - name: Checkout code uses: actions/checkout@v2 + - name: Generate random number + id: random + shell: pwsh + run: Write-Output "::set-output name=number::$(Get-Random)" + - name: Use OCaml ${{ matrix.ocaml-compiler }} uses: ocaml/setup-ocaml@v2 with: @@ -31,6 +36,7 @@ jobs: opam-repositories: | multicore: https://github.com/ocaml-multicore/multicore-opam.git default: https://github.com/ocaml/opam-repository.git + cache-prefix: ${{ steps.random.outputs.number }} - run: opam install . --deps-only From db22e176f720d5097c0b1d3b8edf01f4248dcad6 Mon Sep 17 00:00:00 2001 From: KC Sivaramakrishnan Date: Fri, 15 Oct 2021 09:32:20 +0530 Subject: [PATCH 07/15] Make await non-recursive --- lib/task.ml | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/lib/task.ml b/lib/task.ml index 96f2c31..a88a95c 100644 --- a/lib/task.ml +++ b/lib/task.ml @@ -20,24 +20,26 @@ type pool = pool_data option Atomic.t type 'a promise_state = Returned of 'a | Raised of exn * Printexc.raw_backtrace -| Pending of ((unit, unit) continuation * task_chan) list +| Pending of (('a, unit) continuation * task_chan) list type 'a promise = 'a promise_state Atomic.t -type _ eff += Wait : 'a promise * task_chan -> unit eff +type _ eff += Wait : 'a promise * task_chan -> 'a eff let get_pool_data p = match Atomic.get p with | None -> raise (Invalid_argument "pool already torn down") | Some p -> p -let cont (k, c) = Multi_channel.send c (Work (continue k)) +let cont v (k, c) = Multi_channel.send c (Work (fun _ -> continue k v)) let discont e bt (k, c) = Multi_channel.send c (Work (fun _ -> discontinue_with_backtrace k e bt)) -let do_task f p = +let do_task (type a) (f : unit -> a) (p : a promise) : unit = let action, result = - try cont, Returned (f ()) + try + let v = f () in + cont v, Returned v with e -> let bt = Printexc.get_raw_backtrace () in discont e bt, Raised (e, bt) @@ -52,14 +54,12 @@ let async pool f = Multi_channel.send pd.task_chan (Work (fun _ -> do_task f p)); p -let rec await pool promise = +let await pool promise = let pd = get_pool_data pool in match Atomic.get promise with | Returned v -> v | Raised (e, bt) -> Printexc.raise_with_backtrace e bt - | Pending _ -> - perform (Wait (promise, pd.task_chan)); - await pool promise + | Pending _ -> perform (Wait (promise, pd.task_chan)) let step (type a) (f : a -> unit) (v : a) : unit = try_with f v @@ -72,7 +72,8 @@ let step (type a) (f : a -> unit) (v : a) : unit = | Pending l -> if Atomic.compare_and_set p old (Pending ((k,c)::l)) then () else (Domain.Sync.cpu_relax (); loop ()) - | Returned _ | Raised _ -> continue k () + | Returned v -> continue k v + | Raised (e,bt) -> discontinue_with_backtrace k e bt in loop ()) | _ -> None } @@ -85,7 +86,7 @@ let rec worker task_chan = let run (type a) pool (f : unit -> a) : a = let pd = get_pool_data pool in let p = Atomic.make (Pending []) in - step (do_task f) p; + step (fun _ -> do_task f p) (); let rec loop () : a = match Atomic.get p with | Pending _ -> From b34a56b0951650df5235c6776daf308bdcd5f056 Mon Sep 17 00:00:00 2001 From: KC Sivaramakrishnan Date: Fri, 15 Oct 2021 09:36:19 +0530 Subject: [PATCH 08/15] Fix throughput test --- test/task_throughput.ml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test/task_throughput.ml b/test/task_throughput.ml index f451638..77d825e 100644 --- a/test/task_throughput.ml +++ b/test/task_throughput.ml @@ -55,7 +55,8 @@ let _ = let hist = TimingHist.make 5 25 in for _ = 1 to n_iterations do let t0 = Domain.timer_ticks () in - T.parallel_for pool ~start:1 ~finish:n_tasks ~body:(fun _ -> ()); + T.run pool (fun _ -> + T.parallel_for pool ~start:1 ~finish:n_tasks ~body:(fun _ -> ())); let t = Int64.sub (Domain.timer_ticks ()) t0 in TimingHist.add_point hist (Int64.to_int t); done; From 82cc93cdd0a61547dc91d3d257474120922e50d3 Mon Sep 17 00:00:00 2001 From: Enguerrand Decorne Date: Wed, 27 Oct 2021 15:40:54 +0200 Subject: [PATCH 09/15] use last 4.12+domains+effects hash as the cache-key --- .github/workflows/main.yml | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 9ad5d90..abe19b9 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -24,10 +24,13 @@ jobs: - name: Checkout code uses: actions/checkout@v2 - - name: Generate random number - id: random - shell: pwsh - run: Write-Output "::set-output name=number::$(Get-Random)" + - name: Get latest Multicore commit hash + id: multicore_hash + shell: bash + run: | + curl -sH "Accept: application/vnd.github.v3+json" \ + https://api.github.com/repos/ocaml-multicore/ocaml-multicore/commits/4.12+domains+effects \ + | jq .commit.tree.sha | xargs printf '::set-output name=commit::%s' - name: Use OCaml ${{ matrix.ocaml-compiler }} uses: ocaml/setup-ocaml@v2 @@ -36,7 +39,7 @@ jobs: opam-repositories: | multicore: https://github.com/ocaml-multicore/multicore-opam.git default: https://github.com/ocaml/opam-repository.git - cache-prefix: ${{ steps.random.outputs.number }} + cache-prefix: ${{ steps.multicore_hash.outputs.commit }} - run: opam install . --deps-only From 28957cd8f83e27738f839e0e6673c64f2bebd854 Mon Sep 17 00:00:00 2001 From: Bikal Lem Date: Wed, 27 Oct 2021 13:40:19 +0100 Subject: [PATCH 10/15] make domainslib build/run with OCaml 5.00 after PR #704 --- .github/workflows/main.yml | 2 +- domainslib.opam | 1 + lib/multi_channel.ml | 2 +- lib/task.ml | 2 +- lib/ws_deque.ml | 2 +- test/dune | 2 +- test/task_throughput.ml | 4 ++-- 7 files changed, 8 insertions(+), 7 deletions(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index abe19b9..17be0d5 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -41,7 +41,7 @@ jobs: default: https://github.com/ocaml/opam-repository.git cache-prefix: ${{ steps.multicore_hash.outputs.commit }} - - run: opam install . --deps-only + - run: opam install . --deps-only --with-test - run: opam exec -- make all diff --git a/domainslib.opam b/domainslib.opam index 537ef7f..da7a99e 100644 --- a/domainslib.opam +++ b/domainslib.opam @@ -12,6 +12,7 @@ depends: [ "base-domains" "ocamlfind" {build} "dune" {build} + "mirage-clock-unix" {with-test} ] depopts: [] build: [ diff --git a/lib/multi_channel.ml b/lib/multi_channel.ml index 63888e5..1d2d375 100644 --- a/lib/multi_channel.ml +++ b/lib/multi_channel.ml @@ -149,7 +149,7 @@ let rec recv_poll_repeated mchan dls repeats = | Exit -> if repeats = 1 then raise Exit else begin - Domain.Sync.cpu_relax (); + Domain.cpu_relax (); recv_poll_repeated mchan dls (repeats - 1) end diff --git a/lib/task.ml b/lib/task.ml index b3138ac..0d8d28c 100644 --- a/lib/task.ml +++ b/lib/task.ml @@ -75,7 +75,7 @@ let rec await pool promise = | Task (t, p) -> do_task t p | Quit -> raise TasksActive with - | Exit -> Domain.Sync.cpu_relax () + | Exit -> Domain.cpu_relax () end; await pool promise | Some (Ok v) -> v diff --git a/lib/ws_deque.ml b/lib/ws_deque.ml index 2e89885..d5675f4 100644 --- a/lib/ws_deque.ml +++ b/lib/ws_deque.ml @@ -189,7 +189,7 @@ module M : S = struct if Atomic.compare_and_set q.top t (t + 1) then release out else begin - Domain.Sync.cpu_relax (); + Domain.cpu_relax (); steal q end diff --git a/test/dune b/test/dune index 18d93cc..a37c387 100644 --- a/test/dune +++ b/test/dune @@ -59,7 +59,7 @@ (test (name task_throughput) - (libraries domainslib) + (libraries domainslib mirage-clock-unix) (modules task_throughput) (modes native)) diff --git a/test/task_throughput.ml b/test/task_throughput.ml index f451638..abec383 100644 --- a/test/task_throughput.ml +++ b/test/task_throughput.ml @@ -54,9 +54,9 @@ let _ = let hist = TimingHist.make 5 25 in for _ = 1 to n_iterations do - let t0 = Domain.timer_ticks () in + let t0 = Mclock.elapsed_ns() in T.parallel_for pool ~start:1 ~finish:n_tasks ~body:(fun _ -> ()); - let t = Int64.sub (Domain.timer_ticks ()) t0 in + let t = Int64.sub (Mclock.elapsed_ns ()) t0 in TimingHist.add_point hist (Int64.to_int t); done; From 602488746bd17873ca40fad98aefcf607a828ecd Mon Sep 17 00:00:00 2001 From: Anil Madhavapeddy Date: Mon, 1 Nov 2021 18:33:03 +0000 Subject: [PATCH 11/15] use Domain.cpu_relax instead of deprecated Domain.Sync.cpu_relax --- lib/task.ml | 4 ++-- test/test_deadlock.ml | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/task.ml b/lib/task.ml index a88a95c..1f5ed21 100644 --- a/lib/task.ml +++ b/lib/task.ml @@ -71,7 +71,7 @@ let step (type a) (f : a -> unit) (v : a) : unit = match old with | Pending l -> if Atomic.compare_and_set p old (Pending ((k,c)::l)) then () - else (Domain.Sync.cpu_relax (); loop ()) + else (Domain.cpu_relax (); loop ()) | Returned v -> continue k v | Raised (e,bt) -> discontinue_with_backtrace k e bt in @@ -95,7 +95,7 @@ let run (type a) pool (f : unit -> a) : a = match Multi_channel.recv_poll pd.task_chan with | Work f -> step f () | Quit -> failwith "Task.run: tasks are active on pool" - with Exit -> Domain.Sync.cpu_relax () + with Exit -> Domain.cpu_relax () end; loop () | Returned v -> v diff --git a/test/test_deadlock.ml b/test/test_deadlock.ml index 72901c7..42c41fc 100644 --- a/test/test_deadlock.ml +++ b/test/test_deadlock.ml @@ -9,7 +9,7 @@ let n = try int_of_string Sys.argv.(1) with _ -> 1_000_000 let rec loop n = if n = 0 then Printf.printf "Looping finished on domain %d\n%!" (Domain.self () :> int) - else (Domain.Sync.cpu_relax (); loop (n-1)) + else (Domain.cpu_relax (); loop (n-1)) let () = let pool = T.setup_pool ~num_additional_domains:2 () in From 0d7fd11ad823e431259d9c4290b3c36d78de22b5 Mon Sep 17 00:00:00 2001 From: Anil Madhavapeddy Date: Mon, 1 Nov 2021 18:44:49 +0000 Subject: [PATCH 12/15] Fix typo in task ocamldoc --- lib/task.mli | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/task.mli b/lib/task.mli index 129fc4b..51730fb 100644 --- a/lib/task.mli +++ b/lib/task.mli @@ -12,7 +12,7 @@ val setup_pool : ?name:string -> num_additional_domains:int -> unit -> pool including the current domain. If [name] is provided, the pool is mapped to [name] which can be looked up later with [lookup_pool name]. - Raises [Invalid_argumet] when [num_additional_domains] is less than 0. *) + Raises {!Invalid_argument} when [num_additional_domains] is less than 0. *) val teardown_pool : pool -> unit (** Tears down the task execution pool. *) From 0c0f8fa56a69190b37cec6de52f26a985d5e9787 Mon Sep 17 00:00:00 2001 From: Anil Madhavapeddy Date: Mon, 1 Nov 2021 18:45:08 +0000 Subject: [PATCH 13/15] Fix clarity in Task ocamldoc --- lib/task.mli | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/task.mli b/lib/task.mli index 51730fb..58c7c96 100644 --- a/lib/task.mli +++ b/lib/task.mli @@ -53,7 +53,7 @@ val parallel_for : ?chunk_size:int -> start:int -> finish:int -> (** [parallel_for c s f b p] behaves similar to [for i=s to f do b i done], but runs the for loop in parallel. The chunk size [c] determines the number of body applications done in one task; this will default to [max(1, - (finish-start + 1) / (8 * num_domains))]. Individual iterates may be run in + (finish-start + 1) / (8 * num_domains))]. Individual iterations may be run in any order. Tasks are distributed to workers using a divide-and-conquer scheme. From e1c00198995a4d2f8d59a90807c2c652fbdbcb9c Mon Sep 17 00:00:00 2001 From: Jan Midtgaard Date: Mon, 20 Dec 2021 21:11:40 +0100 Subject: [PATCH 14/15] add crash test --- test/dune | 6 ++++++ test/test_task_crash.ml | 27 +++++++++++++++++++++++++++ 2 files changed, 33 insertions(+) create mode 100644 test/test_task_crash.ml diff --git a/test/dune b/test/dune index 756aadc..3549998 100644 --- a/test/dune +++ b/test/dune @@ -87,6 +87,12 @@ (modules test_deadlock) (modes native)) +(test + (name test_task_crash) + (libraries domainslib) + (modules test_task_crash) + (modes native)) + (test (name backtrace) (libraries domainslib) diff --git a/test/test_task_crash.ml b/test/test_task_crash.ml new file mode 100644 index 0000000..5327349 --- /dev/null +++ b/test/test_task_crash.ml @@ -0,0 +1,27 @@ +open Domainslib + +(* a simple work item, from ocaml/testsuite/tests/misc/takc.ml *) +let rec tak x y z = + if x > y then tak (tak (x-1) y z) (tak (y-1) z x) (tak (z-1) x y) + else z + +let work () = + for _ = 1 to 200 do + assert (7 = tak 18 12 6); + done +;; +begin + let pool1 = Task.setup_pool ~num_additional_domains:2 () in + let pool2 = Task.setup_pool ~num_additional_domains:1 () in + + let pool1_prom0 = Task.async pool1 work in + + let pool2_prom0 = Task.async pool2 work in + let pool2_prom1 = Task.async pool2 work in + + Task.run pool1 (fun () -> List.iter (fun p -> Task.await pool1 p) [pool1_prom0]); + Task.run pool2 (fun () -> List.iter (fun p -> Task.await pool2 p) [pool2_prom0; pool2_prom1]); + + Task.teardown_pool pool1; + Task.teardown_pool pool2; +end From 75ace2cf19db64575d176a01d3bbc85c349faf07 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edwin=20T=C3=B6r=C3=B6k?= Date: Tue, 21 Dec 2021 15:34:08 +0000 Subject: [PATCH 15/15] Fix backtrace unit test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The test relies on reading backtrace contents, so we need to ensure that backtraces are on (by default they'd be off). Signed-off-by: Edwin Török --- test/backtrace.ml | 1 + 1 file changed, 1 insertion(+) diff --git a/test/backtrace.ml b/test/backtrace.ml index ca8e85d..f106e7f 100644 --- a/test/backtrace.ml +++ b/test/backtrace.ml @@ -25,6 +25,7 @@ let main () = T.teardown_pool pool let _ = + Printexc.record_backtrace true; try main () with _ -> let open Printexc in