Skip to content

Commit

Permalink
Add :greedy scheduler to @threads (#52096)
Browse files Browse the repository at this point in the history
  • Loading branch information
Seelengrab authored Feb 6, 2024
1 parent 353884c commit 94fd312
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 10 deletions.
2 changes: 2 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ difference between defining a `main` function and executing the code directly at
Multi-threading changes
-----------------------

* `Threads.@threads` now supports the `:greedy` scheduler, intended for non-uniform workloads ([#52096]).

Build system changes
--------------------

Expand Down
63 changes: 53 additions & 10 deletions base/threadingconstructs.jl
Original file line number Diff line number Diff line change
Expand Up @@ -176,9 +176,46 @@ end
function _threadsfor(iter, lbody, schedule)
lidx = iter.args[1] # index
range = iter.args[2]
esc_range = esc(range)
func = if schedule === :greedy
greedy_func(esc_range, lidx, lbody)
else
default_func(esc_range, lidx, lbody)
end
quote
local threadsfor_fun
let range = $(esc(range))
$func
if $(schedule === :greedy || schedule === :dynamic || schedule === :default)
threading_run(threadsfor_fun, false)
elseif ccall(:jl_in_threaded_region, Cint, ()) != 0 # :static
error("`@threads :static` cannot be used concurrently or nested")
else # :static
threading_run(threadsfor_fun, true)
end
nothing
end
end

function greedy_func(itr, lidx, lbody)
quote
let c = Channel{eltype($itr)}(0,spawn=true) do ch
for item in $itr
put!(ch, item)
end
end
function threadsfor_fun(tid)
for item in c
local $(esc(lidx)) = item
$(esc(lbody))
end
end
end
end
end

function default_func(itr, lidx, lbody)
quote
let range = $itr
function threadsfor_fun(tid = 1; onethread = false)
r = range # Load into local variable
lenr = length(r)
Expand Down Expand Up @@ -216,14 +253,6 @@ function _threadsfor(iter, lbody, schedule)
end
end
end
if $(schedule === :dynamic || schedule === :default)
threading_run(threadsfor_fun, false)
elseif ccall(:jl_in_threaded_region, Cint, ()) != 0 # :static
error("`@threads :static` cannot be used concurrently or nested")
else # :static
threading_run(threadsfor_fun, true)
end
nothing
end
end

Expand Down Expand Up @@ -289,6 +318,20 @@ microseconds).
!!! compat "Julia 1.8"
The `:dynamic` option for the `schedule` argument is available and the default as of Julia 1.8.
### `:greedy`
`:greedy` scheduler spawns up to [`Threads.threadpoolsize()`](@ref) tasks, each greedily working on
the given iterated values as they are produced. As soon as one task finishes its work, it takes
the next value from the iterator. Work done by any individual task is not necessarily on
contiguous values from the iterator. The given iterator may produce values forever, only the
iterator interface is required (no indexing).
This scheduling option is generally a good choice if the workload of individual iterations
is not uniform/has a large spread.
!!! compat "Julia 1.11"
The `:greedy` option for the `schedule` argument is available as of Julia 1.11.
### `:static`
`:static` scheduler creates one task per thread and divides the iterations equally among
Expand Down Expand Up @@ -344,7 +387,7 @@ macro threads(args...)
# for now only allow quoted symbols
sched = nothing
end
if sched !== :static && sched !== :dynamic
if sched !== :static && sched !== :dynamic && sched !== :greedy
throw(ArgumentError("unsupported schedule argument in @threads"))
end
elseif na == 1
Expand Down
78 changes: 78 additions & 0 deletions test/threads_exec.jl
Original file line number Diff line number Diff line change
Expand Up @@ -803,6 +803,84 @@ function _atthreads_dynamic_with_error(a)
end
@test_throws "user error in the loop body" _atthreads_dynamic_with_error(zeros(threadpoolsize()))

####
# :greedy
###

function _atthreads_greedy_schedule(n)
inc = Threads.Atomic{Int}(0)
flags = zeros(Int, n)
Threads.@threads :greedy for i = 1:n
Threads.atomic_add!(inc, 1)
flags[i] = 1
end
return inc[], flags
end
@test _atthreads_greedy_schedule(threadpoolsize()) == (threadpoolsize(), ones(threadpoolsize()))
@test _atthreads_greedy_schedule(1) == (1, ones(1))
@test _atthreads_greedy_schedule(10) == (10, ones(10))
@test _atthreads_greedy_schedule(threadpoolsize() * 2) == (threadpoolsize() * 2, ones(threadpoolsize() * 2))

# nested greedy schedule
function _atthreads_greedy_greedy_schedule()
inc = Threads.Atomic{Int}(0)
Threads.@threads :greedy for _ = 1:threadpoolsize()
Threads.@threads :greedy for _ = 1:threadpoolsize()
Threads.atomic_add!(inc, 1)
end
end
return inc[]
end
@test _atthreads_greedy_greedy_schedule() == threadpoolsize() * threadpoolsize()

function _atthreads_greedy_dynamic_schedule()
inc = Threads.Atomic{Int}(0)
Threads.@threads :greedy for _ = 1:threadpoolsize()
Threads.@threads :dynamic for _ = 1:threadpoolsize()
Threads.atomic_add!(inc, 1)
end
end
return inc[]
end
@test _atthreads_greedy_dynamic_schedule() == threadpoolsize() * threadpoolsize()

function _atthreads_dymamic_greedy_schedule()
inc = Threads.Atomic{Int}(0)
Threads.@threads :dynamic for _ = 1:threadpoolsize()
Threads.@threads :greedy for _ = 1:threadpoolsize()
Threads.atomic_add!(inc, 1)
end
end
return inc[]
end
@test _atthreads_dymamic_greedy_schedule() == threadpoolsize() * threadpoolsize()

function _atthreads_static_greedy_schedule()
ids = zeros(Int, threadpoolsize())
inc = Threads.Atomic{Int}(0)
Threads.@threads :static for i = 1:threadpoolsize()
ids[i] = Threads.threadid()
Threads.@threads :greedy for _ = 1:threadpoolsize()
Threads.atomic_add!(inc, 1)
end
end
return ids, inc[]
end
@test _atthreads_static_greedy_schedule() == (1:threadpoolsize(), threadpoolsize() * threadpoolsize())

# errors inside @threads :greedy
function _atthreads_greedy_with_error(a)
Threads.@threads :greedy for i in eachindex(a)
error("user error in the loop body")
end
a
end
@test_throws "user error in the loop body" _atthreads_greedy_with_error(zeros(threadpoolsize()))

####
# multi-argument loop
####

try
@macroexpand @threads(for i = 1:10, j = 1:10; end)
catch ex
Expand Down

0 comments on commit 94fd312

Please sign in to comment.