diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 6467873..17be0d5 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -24,6 +24,14 @@ jobs: - name: Checkout code uses: actions/checkout@v2 + - name: Get latest Multicore commit hash + id: multicore_hash + shell: bash + run: | + curl -sH "Accept: application/vnd.github.v3+json" \ + https://api.github.com/repos/ocaml-multicore/ocaml-multicore/commits/4.12+domains+effects \ + | jq .commit.tree.sha | xargs printf '::set-output name=commit::%s' + - name: Use OCaml ${{ matrix.ocaml-compiler }} uses: ocaml/setup-ocaml@v2 with: @@ -31,8 +39,9 @@ jobs: opam-repositories: | multicore: https://github.com/ocaml-multicore/multicore-opam.git default: https://github.com/ocaml/opam-repository.git + cache-prefix: ${{ steps.multicore_hash.outputs.commit }} - - run: opam install . --deps-only + - run: opam install . --deps-only --with-test - run: opam exec -- make all diff --git a/Makefile b/Makefile index 11e8a70..78e31fa 100644 --- a/Makefile +++ b/Makefile @@ -2,7 +2,7 @@ all: dune build @install run_test: - dune runtest -f + OCAMLRUNPARAM="b=1" dune runtest -f clean: dune clean diff --git a/domainslib.opam b/domainslib.opam index 537ef7f..da7a99e 100644 --- a/domainslib.opam +++ b/domainslib.opam @@ -12,6 +12,7 @@ depends: [ "base-domains" "ocamlfind" {build} "dune" {build} + "mirage-clock-unix" {with-test} ] depopts: [] build: [ diff --git a/lib/multi_channel.ml b/lib/multi_channel.ml index 63888e5..490213c 100644 --- a/lib/multi_channel.ml +++ b/lib/multi_channel.ml @@ -25,22 +25,23 @@ type waiting_status = | Waiting | Released +type dls_state = { + mutable id: int; + mutable steal_offsets: int array; + rng_state: Random.State.t; + mc: mutex_condvar; +} + type 'a t = { mask: int; channels: 'a Ws_deque.t array; waiters: (waiting_status ref * mutex_condvar ) Chan.t; next_domain_id: int Atomic.t; recv_block_spins: int; + dls_key: dls_state Domain.DLS.key; } -type dls_state = { - mutable id: int; - mutable steal_offsets: int array; - rng_state: Random.State.t; - mc: mutex_condvar; -} - -let dls_key = +let dls_make_key () = Domain.DLS.new_key (fun () -> { id = -1; @@ -61,6 +62,7 @@ let make ?(recv_block_spins = 2048) n = waiters = Chan.make_unbounded (); next_domain_id = Atomic.make 0; recv_block_spins; + dls_key = dls_make_key () } let register_domain mchan = @@ -76,13 +78,16 @@ let init_domain_state mchan dls_state = [@@inline never] let get_local_state mchan = - let dls_state = Domain.DLS.get dls_key in - if dls_state.id >= 0 then dls_state + let dls_state = Domain.DLS.get mchan.dls_key in + if dls_state.id >= 0 then begin + assert (dls_state.id < Array.length mchan.channels); + dls_state + end else (init_domain_state mchan dls_state) [@@inline] -let clear_local_state () = - let dls_state = Domain.DLS.get dls_key in +let clear_local_state mchan = + let dls_state = Domain.DLS.get mchan.dls_key in dls_state.id <- (-1) let rec check_waiters mchan = @@ -149,7 +154,7 @@ let rec recv_poll_repeated mchan dls repeats = | Exit -> if repeats = 1 then raise Exit else begin - Domain.Sync.cpu_relax (); + Domain.cpu_relax (); recv_poll_repeated mchan dls (repeats - 1) end diff --git a/lib/task.ml b/lib/task.ml index b3138ac..e8c2052 100644 --- a/lib/task.ml +++ b/lib/task.ml @@ -1,33 +1,109 @@ -type 'a task = unit -> 'a +open EffectHandlers +open EffectHandlers.Deep -type 'a promise = ('a, exn) result option Atomic.t +type 'a task = unit -> 'a -exception TasksActive +type message = + Work of (unit -> unit) +| Quit -type task_msg = - Task : 'a task * 'a promise -> task_msg -| Quit : task_msg +type task_chan = message Multi_channel.t type pool_data = { domains : unit Domain.t array; - task_chan : task_msg Multi_channel.t; + task_chan : task_chan; name: string option } type pool = pool_data option Atomic.t -let do_task f p = - try - let res = f () in - Atomic.set p (Some (Ok res)) - with e -> - Atomic.set p (Some (Error e)); - match e with - | TasksActive -> raise e - | _ -> () +type 'a promise_state = + Returned of 'a +| Raised of exn * Printexc.raw_backtrace +| Pending of (('a, unit) continuation * task_chan) list -let named_pools = Hashtbl.create 8 +type 'a promise = 'a promise_state Atomic.t + +type _ eff += Wait : 'a promise * task_chan -> 'a eff + +let get_pool_data p = + match Atomic.get p with + | None -> raise (Invalid_argument "pool already torn down") + | Some p -> p + +let cont v (k, c) = Multi_channel.send c (Work (fun _ -> continue k v)) +let discont e bt (k, c) = Multi_channel.send c (Work (fun _ -> + discontinue_with_backtrace k e bt)) + +let do_task (type a) (f : unit -> a) (p : a promise) : unit = + let action, result = + try + let v = f () in + cont v, Returned v + with e -> + let bt = Printexc.get_raw_backtrace () in + discont e bt, Raised (e, bt) + in + match Atomic.exchange p result with + | Pending l -> List.iter action l + | _ -> failwith "Task.do_task: impossible, can only set result of task once" + +let async pool f = + let pd = get_pool_data pool in + let p = Atomic.make (Pending []) in + Multi_channel.send pd.task_chan (Work (fun _ -> do_task f p)); + p +let await pool promise = + let pd = get_pool_data pool in + match Atomic.get promise with + | Returned v -> v + | Raised (e, bt) -> Printexc.raise_with_backtrace e bt + | Pending _ -> perform (Wait (promise, pd.task_chan)) + +let step (type a) (f : a -> unit) (v : a) : unit = + try_with f v + { effc = fun (type a) (e : a eff) -> + match e with + | Wait (p,c) -> Some (fun (k : (a, _) continuation) -> + let rec loop () = + let old = Atomic.get p in + match old with + | Pending l -> + if Atomic.compare_and_set p old (Pending ((k,c)::l)) then () + else (Domain.cpu_relax (); loop ()) + | Returned v -> continue k v + | Raised (e,bt) -> discontinue_with_backtrace k e bt + in + loop ()) + | _ -> None } + +let rec worker task_chan = + match Multi_channel.recv task_chan with + | Quit -> Multi_channel.clear_local_state task_chan + | Work f -> step f (); worker task_chan + +let run (type a) pool (f : unit -> a) : a = + let pd = get_pool_data pool in + let p = Atomic.make (Pending []) in + step (fun _ -> do_task f p) (); + let rec loop () : a = + match Atomic.get p with + | Pending _ -> + begin + try + match Multi_channel.recv_poll pd.task_chan with + | Work f -> step f () + | Quit -> failwith "Task.run: tasks are active on pool" + with Exit -> Domain.cpu_relax () + end; + loop () + | Returned v -> v + | Raised (e, bt) -> Printexc.raise_with_backtrace e bt + in + loop () + +let named_pools = Hashtbl.create 8 let named_pools_mutex = Mutex.create () let setup_pool ?name ~num_additional_domains () = @@ -36,14 +112,9 @@ let setup_pool ?name ~num_additional_domains () = "Task.setup_pool: num_additional_domains must be at least 0") else let task_chan = Multi_channel.make (num_additional_domains+1) in - let rec worker () = - match Multi_channel.recv task_chan with - | Quit -> Multi_channel.clear_local_state (); - | Task (t, p) -> - do_task t p; - worker () + let domains = Array.init num_additional_domains (fun _ -> + Domain.spawn (fun _ -> worker task_chan)) in - let domains = Array.init num_additional_domains (fun _ -> Domain.spawn worker) in let p = Atomic.make (Some {domains; task_chan; name}) in begin match name with | None -> () @@ -54,39 +125,12 @@ let setup_pool ?name ~num_additional_domains () = end; p -let get_pool_data p = - match Atomic.get p with - | None -> raise (Invalid_argument "pool already torn down") - | Some p -> p - -let async pool task = - let pd = get_pool_data pool in - let p = Atomic.make None in - Multi_channel.send pd.task_chan (Task(task,p)); - p - -let rec await pool promise = - let pd = get_pool_data pool in - match Atomic.get promise with - | None -> - begin - try - match Multi_channel.recv_poll pd.task_chan with - | Task (t, p) -> do_task t p - | Quit -> raise TasksActive - with - | Exit -> Domain.Sync.cpu_relax () - end; - await pool promise - | Some (Ok v) -> v - | Some (Error e) -> raise e - let teardown_pool pool = let pd = get_pool_data pool in for _i=1 to Array.length pd.domains do Multi_channel.send pd.task_chan Quit done; - Multi_channel.clear_local_state (); + Multi_channel.clear_local_state pd.task_chan; Array.iter Domain.join pd.domains; (* Remove the pool from the table *) begin match pd.name with diff --git a/lib/task.mli b/lib/task.mli index 761fb67..58c7c96 100644 --- a/lib/task.mli +++ b/lib/task.mli @@ -9,57 +9,72 @@ type pool val setup_pool : ?name:string -> num_additional_domains:int -> unit -> pool (** Sets up a task execution pool with [num_additional_domains + 1] domains - * including the current domain. If [name] is provided, the pool is mapped to - * [name] which can be looked up later with [lookup_pool name]. - * Raises [Invalid_argumet] when [num_additional_domains] is less than 0. *) + including the current domain. If [name] is provided, the pool is mapped to + [name] which can be looked up later with [lookup_pool name]. -exception TasksActive + Raises {!Invalid_argument} when [num_additional_domains] is less than 0. *) val teardown_pool : pool -> unit -(** Tears down the task execution pool. - * Raises [TasksActive] exception if any tasks are currently active. *) +(** Tears down the task execution pool. *) val lookup_pool : string -> pool option (** [lookup_pool name] returns [Some pool] if [pool] is associated to [name] or - * returns [None] if no value is associated to it. *) + returns [None] if no value is associated to it. *) val get_num_domains : pool -> int (** [get_num_domains pool] returns the total number of domains in [pool] - * including the parent domain. *) + including the parent domain. *) + +val run : pool -> 'a task -> 'a +(** [run p t] runs the task [t] synchronously in the pool [p]. If the task [t] + blocks on a promise, then tasks from the pool [p] are executed until the + promise blocking [t] is resolved. + + This function should be used at the top level to enclose the calls to other + functions that may await on promises. This includes {!await}, + {!parallel_for} and its variants. Otherwise, those functions will raise + [Unhandled] exception. *) val async : pool -> 'a task -> 'a promise (** [async p t] runs the task [t] asynchronously in the pool [p]. The function - * returns a promise [r] in which the result of the task [t] will be stored. - * *) + returns a promise [r] in which the result of the task [t] will be stored. + *) val await : pool -> 'a promise -> 'a (** [await p r] waits for the promise to be resolved. If the task associated - * with the promise had completed sucessfully, then the result of the task will - * be returned. If the task had raised an exception, then [await] raises the - * same exception. *) + with the promise had completed sucessfully, then the result of the task + will be returned. If the task had raised an exception, then [await] raises + the same exception. + + Must be called with a call to {!run} in the dynamic scope. *) val parallel_for : ?chunk_size:int -> start:int -> finish:int -> body:(int -> unit) -> pool -> unit (** [parallel_for c s f b p] behaves similar to [for i=s to f do b i done], but - * runs the for loop in parallel. The chunk size [c] determines the number of - * body applications done in one task; this will default to - * [max(1, (finish-start + 1) / (8 * num_domains))]. Individual iterates may - * be run in any order. Tasks are distributed to workers using a - * divide-and-conquer scheme. - *) + runs the for loop in parallel. The chunk size [c] determines the number of + body applications done in one task; this will default to [max(1, + (finish-start + 1) / (8 * num_domains))]. Individual iterations may be run in + any order. Tasks are distributed to workers using a divide-and-conquer + scheme. + + Must be called with a call to {!run} in the dynamic scope. *) val parallel_for_reduce : ?chunk_size:int -> start:int -> finish:int -> body:(int -> 'a) -> pool -> ('a -> 'a -> 'a) -> 'a -> 'a (** [parallel_for_reduce c s f b p r i] is similar to [parallel_for] except - * that the result returned by each iteration is reduced with [r] with initial - * value [i]. The reduce operations are performed in an arbitrary order and the - * reduce function needs to be commutative and associative in order to obtain - * a deterministic result. *) + that the result returned by each iteration is reduced with [r] with initial + value [i]. The reduce operations are performed in an arbitrary order and + the reduce function needs to be commutative and associative in order to + obtain a deterministic result. + + Must be called with a call to {!run} in the dynamic scope. *) val parallel_scan : pool -> ('a -> 'a -> 'a) -> 'a array -> 'a array -(** [parallel_scan p op a] computes the scan of the array [a] - * in parallel with binary operator [op] and returns the result array. - * Scan is similar to [Array.fold_left] but returns an array of reduced - * intermediate values. The reduce operations are performed in an arbitrary - * order and the reduce function needs to be commutative and associative in - * order to obtain a deterministic result *) +(** [parallel_scan p op a] computes the scan of the array [a] in parallel with + binary operator [op] and returns the result array. Scan is similar to + [Array.fold_left] but returns an array of reduced intermediate values. The + reduce operations are performed in an arbitrary order and the reduce + function needs to be commutative and associative in order to obtain a + deterministic result. + + Must be called with a call to {!run} in the dynamic scope. *) diff --git a/lib/ws_deque.ml b/lib/ws_deque.ml index 2e89885..d5675f4 100644 --- a/lib/ws_deque.ml +++ b/lib/ws_deque.ml @@ -189,7 +189,7 @@ module M : S = struct if Atomic.compare_and_set q.top t (t + 1) then release out else begin - Domain.Sync.cpu_relax (); + Domain.cpu_relax (); steal q end diff --git a/test/LU_decomposition_multicore.ml b/test/LU_decomposition_multicore.ml index 31ad752..aa207f1 100644 --- a/test/LU_decomposition_multicore.ml +++ b/test/LU_decomposition_multicore.ml @@ -55,9 +55,11 @@ let lup pool (a0 : float array) = let () = let pool = T.setup_pool ~num_additional_domains:(num_domains - 1) () in - let a = parallel_create pool - (fun _ _ -> (Random.State.float (Domain.DLS.get k) 100.0) +. 1.0 ) in - let lu = lup pool a in - let _l = parallel_create pool (fun i j -> if i > j then get lu i j else if i = j then 1.0 else 0.0) in - let _u = parallel_create pool (fun i j -> if i <= j then get lu i j else 0.0) in + T.run pool (fun _ -> + let a = parallel_create pool + (fun _ _ -> (Random.State.float (Domain.DLS.get k) 100.0) +. 1.0 ) in + let lu = lup pool a in + let _l = parallel_create pool (fun i j -> if i > j then get lu i j else if i = j then 1.0 else 0.0) in + let _u = parallel_create pool (fun i j -> if i <= j then get lu i j else 0.0) in + ()); T.teardown_pool pool diff --git a/test/backtrace.ml b/test/backtrace.ml new file mode 100644 index 0000000..f106e7f --- /dev/null +++ b/test/backtrace.ml @@ -0,0 +1,36 @@ +module T = Domainslib.Task + +let rec foo i = + if i = 0 then () + else begin + ignore (failwith "exn"); + foo i + end + [@@inline never] + +let rec bar i = + if i = 0 then () + else begin + foo i; + bar i + end + [@@inline never] + +let main () = + let pool = T.setup_pool ~num_additional_domains:0 () in + T.run pool (fun _ -> + let p = T.async pool (fun _ -> bar 42) in + T.await pool p; + Printf.printf "should not reach here\n%!"); + T.teardown_pool pool + +let _ = + Printexc.record_backtrace true; + try main () + with _ -> + let open Printexc in + let bt = get_raw_backtrace () in + let bt_slot_arr = Option.get (backtrace_slots bt) in + assert (Option.get (Slot.name bt_slot_arr.(1)) = "Backtrace.foo"); + let s = raw_backtrace_to_string bt in + print_string s diff --git a/test/dune b/test/dune index 18d93cc..3549998 100644 --- a/test/dune +++ b/test/dune @@ -51,15 +51,9 @@ (modules sum_par) (modes native)) -(test - (name task_exn) - (libraries domainslib) - (modules task_exn) - (modes native)) - (test (name task_throughput) - (libraries domainslib) + (libraries domainslib mirage-clock-unix) (modules task_throughput) (modes native)) @@ -86,3 +80,21 @@ (libraries domainslib) (modules test_task) (modes native)) + +(test + (name test_deadlock) + (libraries domainslib) + (modules test_deadlock) + (modes native)) + +(test + (name test_task_crash) + (libraries domainslib) + (modules test_task_crash) + (modes native)) + +(test + (name backtrace) + (libraries domainslib) + (modules backtrace) + (modes native)) diff --git a/test/enumerate_par.ml b/test/enumerate_par.ml index f2b736d..ea40092 100644 --- a/test/enumerate_par.ml +++ b/test/enumerate_par.ml @@ -5,6 +5,7 @@ module T = Domainslib.Task let _ = let p = T.setup_pool ~num_additional_domains:(num_domains - 1) () in - T.parallel_for p ~start:0 ~finish:(n-1) ~chunk_size:16 ~body:(fun i -> - print_string @@ Printf.sprintf "[%d] %d\n%!" (Domain.self () :> int) i); + T.run p (fun _ -> + T.parallel_for p ~start:0 ~finish:(n-1) ~chunk_size:16 ~body:(fun i -> + print_string @@ Printf.sprintf "[%d] %d\n%!" (Domain.self () :> int) i)); T.teardown_pool p diff --git a/test/fib_par.ml b/test/fib_par.ml index 9c0e57e..eb65df1 100644 --- a/test/fib_par.ml +++ b/test/fib_par.ml @@ -16,6 +16,6 @@ let rec fib_par pool n = let main = let pool = T.setup_pool ~num_additional_domains:(num_domains - 1) () in - let res = fib_par pool n in + let res = T.run pool (fun _ -> fib_par pool n) in T.teardown_pool pool; Printf.printf "fib(%d) = %d\n" n res diff --git a/test/game_of_life_multicore.ml b/test/game_of_life_multicore.ml index 2f97a49..f26e422 100644 --- a/test/game_of_life_multicore.ml +++ b/test/game_of_life_multicore.ml @@ -64,6 +64,6 @@ let rec repeat pool n = let ()= let pool = T.setup_pool ~num_additional_domains:(num_domains - 1) () in print !rg; - repeat pool n_times; + T.run pool (fun _ -> repeat pool n_times); print !rg; T.teardown_pool pool diff --git a/test/prefix_sum.ml b/test/prefix_sum.ml index 0e50ae6..8926395 100644 --- a/test/prefix_sum.ml +++ b/test/prefix_sum.ml @@ -10,6 +10,6 @@ let _ = let pool = T.setup_pool ~num_additional_domains:(num_domains - 1) () in let arr = gen n in let t = Unix.gettimeofday() in - let _ = prefix_sum pool arr in + ignore (T.run pool (fun _ -> prefix_sum pool arr)); Printf.printf "Execution time: %fs\n" (Unix.gettimeofday() -. t); T.teardown_pool pool diff --git a/test/spectralnorm2_multicore.ml b/test/spectralnorm2_multicore.ml index fb59694..c2a3b5f 100644 --- a/test/spectralnorm2_multicore.ml +++ b/test/spectralnorm2_multicore.ml @@ -36,9 +36,10 @@ let eval_AtA_times_u pool u v = let () = let pool = T.setup_pool ~num_additional_domains:(num_domains - 1) () in let u = Array.make n 1.0 and v = Array.make n 0.0 in - for _i = 0 to 9 do - eval_AtA_times_u pool u v; eval_AtA_times_u pool v u - done; + T.run pool (fun _ -> + for _i = 0 to 9 do + eval_AtA_times_u pool u v; eval_AtA_times_u pool v u + done); T.teardown_pool pool; let vv = ref 0.0 and vBv = ref 0.0 in diff --git a/test/sum_par.ml b/test/sum_par.ml index d771878..0a99a67 100644 --- a/test/sum_par.ml +++ b/test/sum_par.ml @@ -6,9 +6,9 @@ module T = Domainslib.Task let _ = (* use parallel_for_reduce *) let p = T.setup_pool ~num_additional_domains:(num_domains - 1) () in - let sum = + let sum = T.run p (fun _ -> T.parallel_for_reduce p (+) 0 ~chunk_size:(n/(4*num_domains)) ~start:0 - ~finish:(n-1) ~body:(fun _i -> 1) + ~finish:(n-1) ~body:(fun _i -> 1)) in T.teardown_pool p; Printf.printf "Sum is %d\n" sum; @@ -18,8 +18,9 @@ let _ = (* explictly use empty pool and default chunk_size *) let p = T.setup_pool ~num_additional_domains:0 () in let sum = Atomic.make 0 in - T.parallel_for p ~start:0 ~finish:(n-1) - ~body:(fun _i -> ignore (Atomic.fetch_and_add sum 1)); + T.run p (fun _ -> + T.parallel_for p ~start:0 ~finish:(n-1) + ~body:(fun _i -> ignore (Atomic.fetch_and_add sum 1))); let sum = Atomic.get sum in T.teardown_pool p; Printf.printf "Sum is %d\n" sum; @@ -29,8 +30,9 @@ let _ = (* configured num_domains and default chunk_size *) let p = T.setup_pool ~num_additional_domains:(num_domains - 1) () in let sum = Atomic.make 0 in - T.parallel_for p ~start:0 ~finish:(n-1) - ~body:(fun _i -> ignore (Atomic.fetch_and_add sum 1)); + T.run p (fun _ -> + T.parallel_for p ~start:0 ~finish:(n-1) + ~body:(fun _i -> ignore (Atomic.fetch_and_add sum 1))); let sum = Atomic.get sum in T.teardown_pool p; Printf.printf "Sum is %d\n" sum; diff --git a/test/summed_area_table.ml b/test/summed_area_table.ml index 7684f3b..b48f076 100644 --- a/test/summed_area_table.ml +++ b/test/summed_area_table.ml @@ -30,7 +30,7 @@ let _ = let m = Array.make_matrix size size 1 (*Array.init size (fun _ -> Array.init size (fun _ -> Random.int size))*) in let pool = T.setup_pool ~num_additional_domains:(num_domains - 1) () in - let _ = calc_table pool m in + let _ = T.run pool (fun _ -> calc_table pool m) in (* for i = 0 to size-1 do for j = 0 to size-1 do diff --git a/test/task_exn.ml b/test/task_exn.ml deleted file mode 100644 index 1672fde..0000000 --- a/test/task_exn.ml +++ /dev/null @@ -1,30 +0,0 @@ -module T = Domainslib.Task - -exception E - -let _ = - let pool = T.setup_pool ~num_additional_domains:3 () in - - let p1 = T.async pool (fun () -> - let p2 = T.async pool (fun () -> raise E) in - T.await pool p2) - in - begin match T.await pool p1 with - | _ -> () - | exception E -> print_endline "Caught E" - end; - - let _p1 = T.async pool (fun () -> - let p2 = T.async pool (fun () -> - let rec loop n = - if n = 0 then () - else loop (n-1) - in loop 100000000) - in - T.await pool p2) - in - match T.teardown_pool pool with - | _ -> () - | exception T.TasksActive -> - (* innermost task may still be active *) - print_endline "Caught TasksActive" diff --git a/test/task_throughput.ml b/test/task_throughput.ml index f451638..059dec9 100644 --- a/test/task_throughput.ml +++ b/test/task_throughput.ml @@ -54,9 +54,10 @@ let _ = let hist = TimingHist.make 5 25 in for _ = 1 to n_iterations do - let t0 = Domain.timer_ticks () in - T.parallel_for pool ~start:1 ~finish:n_tasks ~body:(fun _ -> ()); - let t = Int64.sub (Domain.timer_ticks ()) t0 in + let t0 = Mclock.elapsed_ns() in + T.run pool (fun _ -> + T.parallel_for pool ~start:1 ~finish:n_tasks ~body:(fun _ -> ())); + let t = Int64.sub (Mclock.elapsed_ns ()) t0 in TimingHist.add_point hist (Int64.to_int t); done; diff --git a/test/test_deadlock.ml b/test/test_deadlock.ml new file mode 100644 index 0000000..42c41fc --- /dev/null +++ b/test/test_deadlock.ml @@ -0,0 +1,31 @@ +(* Despite what the name says, this test will not deadlock. A similar test will + * deadlock in the version not using effect handlers. See + * https://github.com/ocaml-multicore/ocaml-multicore/issues/670 *) + +module T = Domainslib.Task + +let n = try int_of_string Sys.argv.(1) with _ -> 1_000_000 + +let rec loop n = + if n = 0 then + Printf.printf "Looping finished on domain %d\n%!" (Domain.self () :> int) + else (Domain.cpu_relax (); loop (n-1)) + +let () = + let pool = T.setup_pool ~num_additional_domains:2 () in + T.run pool (fun _ -> + let a = T.async pool (fun _ -> + Printf.printf "Task A running on domain %d\n%!" (Domain.self () :> int); + loop n) + in + let b = T.async pool (fun _ -> + Printf.printf "Task B running on domain %d\n%!" (Domain.self () :> int); + T.await pool a) + in + let c = T.async pool (fun _ -> + Printf.printf "Task C running on domain %d\n%!" (Domain.self () :> int); + T.await pool b) + in + loop n; + T.await pool c); + T.teardown_pool pool diff --git a/test/test_task.ml b/test/test_task.ml index 1aaf3e2..951c253 100644 --- a/test/test_task.ml +++ b/test/test_task.ml @@ -39,22 +39,24 @@ let prefix_sum pool = fun () -> let () = let pool1 = Task.setup_pool ~num_additional_domains:2 ~name:"pool1" () in let pool2 = Task.setup_pool ~num_additional_domains:2 ~name:"pool2" () in - let p1 = Option.get @@ Task.lookup_pool "pool1" in - modify_arr pool1 0 (); - modify_arr pool1 25 (); - modify_arr pool1 100 (); - inc_ctr p1 0 (); - inc_ctr p1 16 (); - inc_ctr p1 32 (); - inc_ctr p1 1000 (); - let p2 = Option.get @@ Task.lookup_pool "pool2" in - sum_sequence pool2 0 0 (); - sum_sequence pool2 10 10 (); - sum_sequence pool2 1 0 (); - sum_sequence p2 1 10 (); - sum_sequence p2 100 10 (); - sum_sequence p2 100 100 (); - prefix_sum p2 (); + Task.run pool1 (fun _ -> + let p1 = Option.get @@ Task.lookup_pool "pool1" in + modify_arr pool1 0 (); + modify_arr pool1 25 (); + modify_arr pool1 100 (); + inc_ctr p1 0 (); + inc_ctr p1 16 (); + inc_ctr p1 32 (); + inc_ctr p1 1000 ()); + Task.run pool2 (fun _ -> + let p2 = Option.get @@ Task.lookup_pool "pool2" in + sum_sequence pool2 0 0 (); + sum_sequence pool2 10 10 (); + sum_sequence pool2 1 0 (); + sum_sequence p2 1 10 (); + sum_sequence p2 100 10 (); + sum_sequence p2 100 100 (); + prefix_sum p2 ()); Task.teardown_pool pool1; Task.teardown_pool pool2; diff --git a/test/test_task_crash.ml b/test/test_task_crash.ml new file mode 100644 index 0000000..5327349 --- /dev/null +++ b/test/test_task_crash.ml @@ -0,0 +1,27 @@ +open Domainslib + +(* a simple work item, from ocaml/testsuite/tests/misc/takc.ml *) +let rec tak x y z = + if x > y then tak (tak (x-1) y z) (tak (y-1) z x) (tak (z-1) x y) + else z + +let work () = + for _ = 1 to 200 do + assert (7 = tak 18 12 6); + done +;; +begin + let pool1 = Task.setup_pool ~num_additional_domains:2 () in + let pool2 = Task.setup_pool ~num_additional_domains:1 () in + + let pool1_prom0 = Task.async pool1 work in + + let pool2_prom0 = Task.async pool2 work in + let pool2_prom1 = Task.async pool2 work in + + Task.run pool1 (fun () -> List.iter (fun p -> Task.await pool1 p) [pool1_prom0]); + Task.run pool2 (fun () -> List.iter (fun p -> Task.await pool2 p) [pool2_prom0; pool2_prom1]); + + Task.teardown_pool pool1; + Task.teardown_pool pool2; +end