From 01c33e994c3bc680ef04bbc5aed5e39682fb4447 Mon Sep 17 00:00:00 2001 From: JamesWrigley Date: Tue, 23 Jul 2024 21:16:54 +0200 Subject: [PATCH] Replace @async with Threads.@spawn --- src/cluster.jl | 32 ++++++++++++++++---------------- src/macros.jl | 4 ++-- src/managers.jl | 4 ++-- src/messages.jl | 2 +- src/process_messages.jl | 14 +++++++------- src/remotecall.jl | 2 +- 6 files changed, 29 insertions(+), 29 deletions(-) diff --git a/src/cluster.jl b/src/cluster.jl index 57e2632..c97a560 100644 --- a/src/cluster.jl +++ b/src/cluster.jl @@ -163,10 +163,10 @@ function check_worker_state(w::Worker) else w.ct_time = time() if myid() > w.id - t = @async exec_conn_func(w) + t = Threads.@spawn exec_conn_func(w) else # route request via node 1 - t = @async remotecall_fetch((p,to_id) -> remotecall_fetch(exec_conn_func, p, to_id), 1, w.id, myid()) + t = Threads.@spawn remotecall_fetch((p,to_id) -> remotecall_fetch(exec_conn_func, p, to_id), 1, w.id, myid()) end errormonitor(t) wait_for_conn(w) @@ -258,7 +258,7 @@ function start_worker(out::IO, cookie::AbstractString=readline(stdin); close_std else sock = listen(interface, LPROC.bind_port) end - errormonitor(@async while isopen(sock) + errormonitor(Threads.@spawn while isopen(sock) client = accept(sock) process_messages(client, client, true) end) @@ -290,7 +290,7 @@ end function redirect_worker_output(ident, stream) - t = @async while !eof(stream) + t = Threads.@spawn while !eof(stream) line = readline(stream) if startswith(line, " From worker ") # stdout's of "additional" workers started from an initial worker on a host are not available @@ -329,7 +329,7 @@ function read_worker_host_port(io::IO) leader = String[] try while ntries > 0 - readtask = @async readline(io) + readtask = Threads.@spawn readline(io) yield() while !istaskdone(readtask) && ((time_ns() - t0) < timeout) sleep(0.05) @@ -430,7 +430,7 @@ if launching workers programmatically, execute `addprocs` in its own task. ```julia # On busy clusters, call `addprocs` asynchronously -t = @async addprocs(...) +t = Threads.@spawn addprocs(...) ``` ```julia @@ -496,13 +496,13 @@ function addprocs_locked(manager::ClusterManager; kwargs...) # call manager's `launch` is a separate task. This allows the master # process initiate the connection setup process as and when workers come # online - t_launch = @async launch(manager, params, launched, launch_ntfy) + t_launch = Threads.@spawn launch(manager, params, launched, launch_ntfy) @sync begin while true if isempty(launched) istaskdone(t_launch) && break - @async begin + Threads.@spawn begin sleep(1) notify(launch_ntfy) end @@ -512,7 +512,7 @@ function addprocs_locked(manager::ClusterManager; kwargs...) if !isempty(launched) wconfig = popfirst!(launched) let wconfig=wconfig - @async setup_launched_worker(manager, wconfig, launched_q) + Threads.@spawn setup_launched_worker(manager, wconfig, launched_q) end end end @@ -592,7 +592,7 @@ function launch_n_additional_processes(manager, frompid, fromconfig, cnt, launch wconfig.port = port let wconfig=wconfig - @async begin + Threads.@spawn begin pid = create_worker(manager, wconfig) remote_do(redirect_output_from_additional_worker, frompid, pid, port) push!(launched_q, pid) @@ -706,11 +706,11 @@ function create_worker(manager, wconfig) join_message = JoinPGRPMsg(w.id, all_locs, PGRP.topology, enable_threaded_blas, isclusterlazy()) send_msg_now(w, MsgHeader(RRID(0,0), ntfy_oid), join_message) - errormonitor(@async manage(w.manager, w.id, w.config, :register)) + errormonitor(Threads.@spawn manage(w.manager, w.id, w.config, :register)) # wait for rr_ntfy_join with timeout timedout = false errormonitor( - @async begin + Threads.@spawn begin sleep($timeout) timedout = true put!(rr_ntfy_join, 1) @@ -767,7 +767,7 @@ function check_master_connect() end errormonitor( - @async begin + Threads.@spawn begin start = time_ns() while !haskey(map_pid_wrkr, 1) && (time_ns() - start) < timeout sleep(1.0) @@ -1063,13 +1063,13 @@ function rmprocs(pids...; waitfor=typemax(Int)) pids = vcat(pids...) if waitfor == 0 - t = @async _rmprocs(pids, typemax(Int)) + t = Threads.@spawn _rmprocs(pids, typemax(Int)) yield() return t else _rmprocs(pids, waitfor) # return a dummy task object that user code can wait on. - return @async nothing + return Threads.@spawn nothing end end @@ -1252,7 +1252,7 @@ function interrupt(pids::AbstractVector=workers()) @assert myid() == 1 @sync begin for pid in pids - @async interrupt(pid) + Threads.@spawn interrupt(pid) end end end diff --git a/src/macros.jl b/src/macros.jl index a767c7a..b1b8a4d 100644 --- a/src/macros.jl +++ b/src/macros.jl @@ -230,7 +230,7 @@ function remotecall_eval(m::Module, procs, ex) # execute locally last as we do not want local execution to block serialization # of the request to remote nodes. for _ in 1:run_locally - @async Core.eval(m, ex) + Threads.@spawn Core.eval(m, ex) end end nothing @@ -275,7 +275,7 @@ function preduce(reducer, f, R) end function pfor(f, R) - t = @async @sync for c in splitrange(Int(firstindex(R)), Int(lastindex(R)), nworkers()) + t = Threads.@spawn @sync for c in splitrange(Int(firstindex(R)), Int(lastindex(R)), nworkers()) @spawnat :any f(R, first(c), last(c)) end errormonitor(t) diff --git a/src/managers.jl b/src/managers.jl index a0e9e68..56be8ad 100644 --- a/src/managers.jl +++ b/src/managers.jl @@ -176,7 +176,7 @@ function launch(manager::SSHManager, params::Dict, launched::Array, launch_ntfy: # Wait for all launches to complete. @sync for (i, (machine, cnt)) in enumerate(manager.machines) let machine=machine, cnt=cnt - @async try + Threads.@spawn try launch_on_machine(manager, $machine, $cnt, params, launched, launch_ntfy) catch e print(stderr, "exception launching on machine $(machine) : $(e)\n") @@ -742,7 +742,7 @@ function kill(manager::LocalManager, pid::Int, config::WorkerConfig; exit_timeou # First, try sending `exit()` to the remote over the usual control channels remote_do(exit, pid) - timer_task = @async begin + timer_task = Threads.@spawn begin sleep(exit_timeout) # Check to see if our child exited, and if not, send an actual kill signal diff --git a/src/messages.jl b/src/messages.jl index fe3e5ab..70baa25 100644 --- a/src/messages.jl +++ b/src/messages.jl @@ -200,7 +200,7 @@ function flush_gc_msgs() end catch e bt = catch_backtrace() - @async showerror(stderr, e, bt) + Threads.@spawn showerror(stderr, e, bt) end end diff --git a/src/process_messages.jl b/src/process_messages.jl index 3032917..15f5be6 100644 --- a/src/process_messages.jl +++ b/src/process_messages.jl @@ -85,7 +85,7 @@ function schedule_call(rid, thunk) rv = RemoteValue(def_rv_channel()) (PGRP::ProcessGroup).refs[rid] = rv push!(rv.clientset, rid.whence) - errormonitor(@async run_work_thunk(rv, thunk)) + errormonitor(Threads.@spawn run_work_thunk(rv, thunk)) return rv end end @@ -118,7 +118,7 @@ end ## message event handlers ## function process_messages(r_stream::TCPSocket, w_stream::TCPSocket, incoming::Bool=true) - errormonitor(@async process_tcp_streams(r_stream, w_stream, incoming)) + errormonitor(Threads.@spawn process_tcp_streams(r_stream, w_stream, incoming)) end function process_tcp_streams(r_stream::TCPSocket, w_stream::TCPSocket, incoming::Bool) @@ -148,7 +148,7 @@ Julia version number to perform the authentication handshake. See also [`cluster_cookie`](@ref). """ function process_messages(r_stream::IO, w_stream::IO, incoming::Bool=true) - errormonitor(@async message_handler_loop(r_stream, w_stream, incoming)) + errormonitor(Threads.@spawn message_handler_loop(r_stream, w_stream, incoming)) end function message_handler_loop(r_stream::IO, w_stream::IO, incoming::Bool) @@ -283,7 +283,7 @@ function handle_msg(msg::CallMsg{:call}, header, r_stream, w_stream, version) schedule_call(header.response_oid, ()->invokelatest(msg.f, msg.args...; msg.kwargs...)) end function handle_msg(msg::CallMsg{:call_fetch}, header, r_stream, w_stream, version) - errormonitor(@async begin + errormonitor(Threads.@spawn begin v = run_work_thunk(()->invokelatest(msg.f, msg.args...; msg.kwargs...), false) if isa(v, SyncTake) try @@ -299,7 +299,7 @@ function handle_msg(msg::CallMsg{:call_fetch}, header, r_stream, w_stream, versi end function handle_msg(msg::CallWaitMsg, header, r_stream, w_stream, version) - errormonitor(@async begin + errormonitor(Threads.@spawn begin rv = schedule_call(header.response_oid, ()->invokelatest(msg.f, msg.args...; msg.kwargs...)) deliver_result(w_stream, :call_wait, header.notify_oid, fetch(rv.c)) nothing @@ -307,7 +307,7 @@ function handle_msg(msg::CallWaitMsg, header, r_stream, w_stream, version) end function handle_msg(msg::RemoteDoMsg, header, r_stream, w_stream, version) - errormonitor(@async run_work_thunk(()->invokelatest(msg.f, msg.args...; msg.kwargs...), true)) + errormonitor(Threads.@spawn run_work_thunk(()->invokelatest(msg.f, msg.args...; msg.kwargs...), true)) end function handle_msg(msg::ResultMsg, header, r_stream, w_stream, version) @@ -350,7 +350,7 @@ function handle_msg(msg::JoinPGRPMsg, header, r_stream, w_stream, version) # The constructor registers the object with a global registry. Worker(rpid, ()->connect_to_peer(cluster_manager, rpid, wconfig)) else - @async connect_to_peer(cluster_manager, rpid, wconfig) + Threads.@spawn connect_to_peer(cluster_manager, rpid, wconfig) end end end diff --git a/src/remotecall.jl b/src/remotecall.jl index 0b1143d..4556d62 100644 --- a/src/remotecall.jl +++ b/src/remotecall.jl @@ -205,7 +205,7 @@ or to use a local [`Channel`](@ref) as a proxy: ```julia p = 1 f = Future(p) -errormonitor(@async put!(f, remotecall_fetch(long_computation, p))) +errormonitor(Threads.@spawn put!(f, remotecall_fetch(long_computation, p))) isready(f) # will not block ``` """