Skip to content

Commit

Permalink
Replace @async with Threads.@Spawn
Browse files Browse the repository at this point in the history
  • Loading branch information
JamesWrigley committed Jul 25, 2024
1 parent 0377cda commit 01c33e9
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 29 deletions.
32 changes: 16 additions & 16 deletions src/cluster.jl
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,10 @@ function check_worker_state(w::Worker)
else
w.ct_time = time()
if myid() > w.id
t = @async exec_conn_func(w)
t = Threads.@spawn exec_conn_func(w)
else
# route request via node 1
t = @async remotecall_fetch((p,to_id) -> remotecall_fetch(exec_conn_func, p, to_id), 1, w.id, myid())
t = Threads.@spawn remotecall_fetch((p,to_id) -> remotecall_fetch(exec_conn_func, p, to_id), 1, w.id, myid())
end
errormonitor(t)
wait_for_conn(w)
Expand Down Expand Up @@ -258,7 +258,7 @@ function start_worker(out::IO, cookie::AbstractString=readline(stdin); close_std
else
sock = listen(interface, LPROC.bind_port)
end
errormonitor(@async while isopen(sock)
errormonitor(Threads.@spawn while isopen(sock)
client = accept(sock)
process_messages(client, client, true)
end)
Expand Down Expand Up @@ -290,7 +290,7 @@ end


function redirect_worker_output(ident, stream)
t = @async while !eof(stream)
t = Threads.@spawn while !eof(stream)
line = readline(stream)
if startswith(line, " From worker ")
# stdout's of "additional" workers started from an initial worker on a host are not available
Expand Down Expand Up @@ -329,7 +329,7 @@ function read_worker_host_port(io::IO)
leader = String[]
try
while ntries > 0
readtask = @async readline(io)
readtask = Threads.@spawn readline(io)
yield()
while !istaskdone(readtask) && ((time_ns() - t0) < timeout)
sleep(0.05)
Expand Down Expand Up @@ -430,7 +430,7 @@ if launching workers programmatically, execute `addprocs` in its own task.
```julia
# On busy clusters, call `addprocs` asynchronously
t = @async addprocs(...)
t = Threads.@spawn addprocs(...)
```
```julia
Expand Down Expand Up @@ -496,13 +496,13 @@ function addprocs_locked(manager::ClusterManager; kwargs...)
# call manager's `launch` is a separate task. This allows the master
# process initiate the connection setup process as and when workers come
# online
t_launch = @async launch(manager, params, launched, launch_ntfy)
t_launch = Threads.@spawn launch(manager, params, launched, launch_ntfy)

@sync begin
while true
if isempty(launched)
istaskdone(t_launch) && break
@async begin
Threads.@spawn begin
sleep(1)
notify(launch_ntfy)
end
Expand All @@ -512,7 +512,7 @@ function addprocs_locked(manager::ClusterManager; kwargs...)
if !isempty(launched)
wconfig = popfirst!(launched)
let wconfig=wconfig
@async setup_launched_worker(manager, wconfig, launched_q)
Threads.@spawn setup_launched_worker(manager, wconfig, launched_q)
end
end
end
Expand Down Expand Up @@ -592,7 +592,7 @@ function launch_n_additional_processes(manager, frompid, fromconfig, cnt, launch
wconfig.port = port

let wconfig=wconfig
@async begin
Threads.@spawn begin

Check warning on line 595 in src/cluster.jl

View check run for this annotation

Codecov / codecov/patch

src/cluster.jl#L595

Added line #L595 was not covered by tests
pid = create_worker(manager, wconfig)
remote_do(redirect_output_from_additional_worker, frompid, pid, port)
push!(launched_q, pid)
Expand Down Expand Up @@ -706,11 +706,11 @@ function create_worker(manager, wconfig)
join_message = JoinPGRPMsg(w.id, all_locs, PGRP.topology, enable_threaded_blas, isclusterlazy())
send_msg_now(w, MsgHeader(RRID(0,0), ntfy_oid), join_message)

errormonitor(@async manage(w.manager, w.id, w.config, :register))
errormonitor(Threads.@spawn manage(w.manager, w.id, w.config, :register))
# wait for rr_ntfy_join with timeout
timedout = false
errormonitor(
@async begin
Threads.@spawn begin
sleep($timeout)
timedout = true
put!(rr_ntfy_join, 1)
Expand Down Expand Up @@ -767,7 +767,7 @@ function check_master_connect()
end

errormonitor(
@async begin
Threads.@spawn begin
start = time_ns()
while !haskey(map_pid_wrkr, 1) && (time_ns() - start) < timeout
sleep(1.0)
Expand Down Expand Up @@ -1063,13 +1063,13 @@ function rmprocs(pids...; waitfor=typemax(Int))

pids = vcat(pids...)
if waitfor == 0
t = @async _rmprocs(pids, typemax(Int))
t = Threads.@spawn _rmprocs(pids, typemax(Int))

Check warning on line 1066 in src/cluster.jl

View check run for this annotation

Codecov / codecov/patch

src/cluster.jl#L1066

Added line #L1066 was not covered by tests
yield()
return t
else
_rmprocs(pids, waitfor)
# return a dummy task object that user code can wait on.
return @async nothing
return Threads.@spawn nothing
end
end

Expand Down Expand Up @@ -1252,7 +1252,7 @@ function interrupt(pids::AbstractVector=workers())
@assert myid() == 1
@sync begin
for pid in pids
@async interrupt(pid)
Threads.@spawn interrupt(pid)

Check warning on line 1255 in src/cluster.jl

View check run for this annotation

Codecov / codecov/patch

src/cluster.jl#L1255

Added line #L1255 was not covered by tests
end
end
end
Expand Down
4 changes: 2 additions & 2 deletions src/macros.jl
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ function remotecall_eval(m::Module, procs, ex)
# execute locally last as we do not want local execution to block serialization
# of the request to remote nodes.
for _ in 1:run_locally
@async Core.eval(m, ex)
Threads.@spawn Core.eval(m, ex)
end
end
nothing
Expand Down Expand Up @@ -275,7 +275,7 @@ function preduce(reducer, f, R)
end

function pfor(f, R)
t = @async @sync for c in splitrange(Int(firstindex(R)), Int(lastindex(R)), nworkers())
t = Threads.@spawn @sync for c in splitrange(Int(firstindex(R)), Int(lastindex(R)), nworkers())
@spawnat :any f(R, first(c), last(c))
end
errormonitor(t)
Expand Down
4 changes: 2 additions & 2 deletions src/managers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ function launch(manager::SSHManager, params::Dict, launched::Array, launch_ntfy:
# Wait for all launches to complete.
@sync for (i, (machine, cnt)) in enumerate(manager.machines)
let machine=machine, cnt=cnt
@async try
Threads.@spawn try

Check warning on line 179 in src/managers.jl

View check run for this annotation

Codecov / codecov/patch

src/managers.jl#L179

Added line #L179 was not covered by tests
launch_on_machine(manager, $machine, $cnt, params, launched, launch_ntfy)
catch e
print(stderr, "exception launching on machine $(machine) : $(e)\n")
Expand Down Expand Up @@ -742,7 +742,7 @@ function kill(manager::LocalManager, pid::Int, config::WorkerConfig; exit_timeou
# First, try sending `exit()` to the remote over the usual control channels
remote_do(exit, pid)

timer_task = @async begin
timer_task = Threads.@spawn begin
sleep(exit_timeout)

# Check to see if our child exited, and if not, send an actual kill signal
Expand Down
2 changes: 1 addition & 1 deletion src/messages.jl
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ function flush_gc_msgs()
end
catch e
bt = catch_backtrace()
@async showerror(stderr, e, bt)
Threads.@spawn showerror(stderr, e, bt)

Check warning on line 203 in src/messages.jl

View check run for this annotation

Codecov / codecov/patch

src/messages.jl#L203

Added line #L203 was not covered by tests
end
end

Expand Down
14 changes: 7 additions & 7 deletions src/process_messages.jl
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ function schedule_call(rid, thunk)
rv = RemoteValue(def_rv_channel())
(PGRP::ProcessGroup).refs[rid] = rv
push!(rv.clientset, rid.whence)
errormonitor(@async run_work_thunk(rv, thunk))
errormonitor(Threads.@spawn run_work_thunk(rv, thunk))
return rv
end
end
Expand Down Expand Up @@ -118,7 +118,7 @@ end

## message event handlers ##
function process_messages(r_stream::TCPSocket, w_stream::TCPSocket, incoming::Bool=true)
errormonitor(@async process_tcp_streams(r_stream, w_stream, incoming))
errormonitor(Threads.@spawn process_tcp_streams(r_stream, w_stream, incoming))
end

function process_tcp_streams(r_stream::TCPSocket, w_stream::TCPSocket, incoming::Bool)
Expand Down Expand Up @@ -148,7 +148,7 @@ Julia version number to perform the authentication handshake.
See also [`cluster_cookie`](@ref).
"""
function process_messages(r_stream::IO, w_stream::IO, incoming::Bool=true)
errormonitor(@async message_handler_loop(r_stream, w_stream, incoming))
errormonitor(Threads.@spawn message_handler_loop(r_stream, w_stream, incoming))

Check warning on line 151 in src/process_messages.jl

View check run for this annotation

Codecov / codecov/patch

src/process_messages.jl#L151

Added line #L151 was not covered by tests
end

function message_handler_loop(r_stream::IO, w_stream::IO, incoming::Bool)
Expand Down Expand Up @@ -283,7 +283,7 @@ function handle_msg(msg::CallMsg{:call}, header, r_stream, w_stream, version)
schedule_call(header.response_oid, ()->invokelatest(msg.f, msg.args...; msg.kwargs...))
end
function handle_msg(msg::CallMsg{:call_fetch}, header, r_stream, w_stream, version)
errormonitor(@async begin
errormonitor(Threads.@spawn begin
v = run_work_thunk(()->invokelatest(msg.f, msg.args...; msg.kwargs...), false)
if isa(v, SyncTake)
try
Expand All @@ -299,15 +299,15 @@ function handle_msg(msg::CallMsg{:call_fetch}, header, r_stream, w_stream, versi
end

function handle_msg(msg::CallWaitMsg, header, r_stream, w_stream, version)
errormonitor(@async begin
errormonitor(Threads.@spawn begin
rv = schedule_call(header.response_oid, ()->invokelatest(msg.f, msg.args...; msg.kwargs...))
deliver_result(w_stream, :call_wait, header.notify_oid, fetch(rv.c))
nothing
end)
end

function handle_msg(msg::RemoteDoMsg, header, r_stream, w_stream, version)
errormonitor(@async run_work_thunk(()->invokelatest(msg.f, msg.args...; msg.kwargs...), true))
errormonitor(Threads.@spawn run_work_thunk(()->invokelatest(msg.f, msg.args...; msg.kwargs...), true))
end

function handle_msg(msg::ResultMsg, header, r_stream, w_stream, version)
Expand Down Expand Up @@ -350,7 +350,7 @@ function handle_msg(msg::JoinPGRPMsg, header, r_stream, w_stream, version)
# The constructor registers the object with a global registry.
Worker(rpid, ()->connect_to_peer(cluster_manager, rpid, wconfig))
else
@async connect_to_peer(cluster_manager, rpid, wconfig)
Threads.@spawn connect_to_peer(cluster_manager, rpid, wconfig)
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion src/remotecall.jl
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ or to use a local [`Channel`](@ref) as a proxy:
```julia
p = 1
f = Future(p)
errormonitor(@async put!(f, remotecall_fetch(long_computation, p)))
errormonitor(Threads.@spawn put!(f, remotecall_fetch(long_computation, p)))
isready(f) # will not block
```
"""
Expand Down

0 comments on commit 01c33e9

Please sign in to comment.