Skip to content

Commit

Permalink
Add threadpool support to runtime
Browse files Browse the repository at this point in the history
Adds support for Julia to be started with `--threads=auto|N[,M]` where
`N` specifies the number of threads in the default threadpool and `M`,
if provided, specifies the number of threads in the new interactive
threadpool.

Adds an optional first parameter to `Threads.@spawn`:
`[:default|:interactive]`. If `:interactive` is specified, the task will
be run by thread(s) in the interactive threadpool only (if there is
one).

Co-authored-by: K Pamnany <[email protected]>
  • Loading branch information
jpsamaroo and kpamnany committed Apr 25, 2022
1 parent 65b9be4 commit c7ea709
Show file tree
Hide file tree
Showing 23 changed files with 431 additions and 101 deletions.
6 changes: 4 additions & 2 deletions base/options.jl
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# This file is a part of Julia. License is MIT: https://julialang.org/license

# NOTE: This type needs to be kept in sync with jl_options in src/julia.h
# NOTE: This type needs to be kept in sync with jl_options in src/jloptions.h
struct JLOptions
quiet::Int8
banner::Int8
Expand All @@ -9,7 +9,9 @@ struct JLOptions
commands::Ptr{Ptr{UInt8}} # (e)eval, (E)print, (L)load
image_file::Ptr{UInt8}
cpu_target::Ptr{UInt8}
nthreads::Int32
nthreadpools::Int16
nthreads::Int16
nthreads_per_pool::Ptr{Int16}
nprocs::Int32
machine_file::Ptr{UInt8}
project::Ptr{UInt8}
Expand Down
78 changes: 45 additions & 33 deletions base/partr.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

module Partr

using ..Threads: SpinLock, nthreads
using ..Threads: SpinLock, nthreads, threadid

# a task minheap
mutable struct taskheap
Expand All @@ -16,12 +16,13 @@ end

# multiqueue minheap state
const heap_d = UInt32(8)
global heaps::Vector{taskheap} = Vector{taskheap}(undef, 0)
const heaps_lock = SpinLock()
global cong_unbias::UInt32 = typemax(UInt32)
const heaps = [Vector{taskheap}(undef, 0), Vector{taskheap}(undef, 0)]
const heaps_lock = [SpinLock(), SpinLock()]
const cong_unbias = [typemax(UInt32), typemax(UInt32)]


cong(max::UInt32, unbias::UInt32) = ccall(:jl_rand_ptls, UInt32, (UInt32, UInt32), max, unbias) + UInt32(1)
cong(max::UInt32, unbias::UInt32) =
ccall(:jl_rand_ptls, UInt32, (UInt32, UInt32), max, unbias) + UInt32(1)

function unbias_cong(max::UInt32)
return typemax(UInt32) - ((typemax(UInt32) % max) + UInt32(1))
Expand Down Expand Up @@ -60,46 +61,52 @@ function multiq_sift_down(heap::taskheap, idx::Int32)
end


function multiq_size()
function multiq_size(tpid::Int8)
nt = UInt32(Threads._nthreads_in_pool(tpid))
tp = tpid + 1
tpheaps = heaps[tp]
heap_c = UInt32(2)
heap_p = UInt32(length(heaps))
nt = UInt32(nthreads())
heap_p = UInt32(length(tpheaps))

if heap_c * nt <= heap_p
return heap_p
end

@lock heaps_lock begin
heap_p = UInt32(length(heaps))
nt = UInt32(nthreads())
@lock heaps_lock[tp] begin
heap_p = UInt32(length(tpheaps))
nt = UInt32(Threads._nthreads_in_pool(tpid))
if heap_c * nt <= heap_p
return heap_p
end

heap_p += heap_c * nt
newheaps = Vector{taskheap}(undef, heap_p)
copyto!(newheaps, heaps)
for i = (1 + length(heaps)):heap_p
copyto!(newheaps, tpheaps)
for i = (1 + length(tpheaps)):heap_p
newheaps[i] = taskheap()
end
global heaps = newheaps
global cong_unbias = unbias_cong(heap_p)
heaps[tp] = newheaps
cong_unbias[tp] = unbias_cong(heap_p)
end

return heap_p
end


function multiq_insert(task::Task, priority::UInt16)
tpid = ccall(:jl_get_task_threadpoolid, Int8, (Any,), task)
heap_p = multiq_size(tpid)
tp = tpid + 1

task.priority = priority

heap_p = multiq_size()
rn = cong(heap_p, cong_unbias)
while !trylock(heaps[rn].lock)
rn = cong(heap_p, cong_unbias)
rn = cong(heap_p, cong_unbias[tp])
tpheaps = heaps[tp]
while !trylock(tpheaps[rn].lock)
rn = cong(heap_p, cong_unbias[tp])
end

heap = heaps[rn]
heap = tpheaps[rn]
if heap.ntasks >= length(heap.tasks)
resize!(heap.tasks, length(heap.tasks) * 2)
end
Expand All @@ -122,34 +129,37 @@ function multiq_deletemin()
local rn1, rn2
local prio1, prio2

tid = Threads.threadid()
tp = ccall(:jl_threadpoolid, Int8, (Int16,), tid-1) + 1
tpheaps = heaps[tp]

@label retry
GC.safepoint()
heap_p = UInt32(length(heaps))
heap_p = UInt32(length(tpheaps))
for i = UInt32(0):heap_p
if i == heap_p
return nothing
end
rn1 = cong(heap_p, cong_unbias)
rn2 = cong(heap_p, cong_unbias)
prio1 = heaps[rn1].priority
prio2 = heaps[rn2].priority
rn1 = cong(heap_p, cong_unbias[tp])
rn2 = cong(heap_p, cong_unbias[tp])
prio1 = tpheaps[rn1].priority
prio2 = tpheaps[rn2].priority
if prio1 > prio2
prio1 = prio2
rn1 = rn2
elseif prio1 == prio2 && prio1 == typemax(UInt16)
continue
end
if trylock(heaps[rn1].lock)
if prio1 == heaps[rn1].priority
if trylock(tpheaps[rn1].lock)
if prio1 == tpheaps[rn1].priority
break
end
unlock(heaps[rn1].lock)
unlock(tpheaps[rn1].lock)
end
end

heap = heaps[rn1]
heap = tpheaps[rn1]
task = heap.tasks[1]
tid = Threads.threadid()
if ccall(:jl_set_task_tid, Cint, (Any, Cint), task, tid-1) == 0
unlock(heap.lock)
@goto retry
Expand All @@ -171,9 +181,11 @@ end


function multiq_check_empty()
for i = UInt32(1):length(heaps)
if heaps[i].ntasks != 0
return false
for j = UInt32(1):length(heaps)
for i = UInt32(1):length(heaps[j])
if heaps[j][i].ntasks != 0
return false
end
end
end
return true
Expand Down
4 changes: 4 additions & 0 deletions base/task.jl
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,10 @@ true
istaskfailed(t::Task) = (load_state_acquire(t) === task_state_failed)

Threads.threadid(t::Task) = Int(ccall(:jl_get_task_tid, Int16, (Any,), t)+1)
function Threads.threadpool(t::Task)
tpid = ccall(:jl_get_task_threadpoolid, Int8, (Any,), t)
return tpid == 0 ? :default : :interactive
end

task_result(t::Task) = t.result

Expand Down
2 changes: 1 addition & 1 deletion base/threadcall.jl
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ const threadcall_restrictor = Semaphore(max_ccall_threads)
The `@threadcall` macro is called in the same way as [`ccall`](@ref) but does the work
in a different thread. This is useful when you want to call a blocking C
function without causing the main `julia` thread to become blocked. Concurrency
function without causing the current `julia` thread to become blocked. Concurrency
is limited by size of the libuv thread pool, which defaults to 4 threads but
can be increased by setting the `UV_THREADPOOL_SIZE` environment variable and
restarting the `julia` process.
Expand Down
104 changes: 84 additions & 20 deletions base/threadingconstructs.jl
Original file line number Diff line number Diff line change
@@ -1,26 +1,62 @@
# This file is a part of Julia. License is MIT: https://julialang.org/license

export threadid, nthreads, @threads, @spawn
export threadid, nthreads, @threads, @spawn,
threadpool, nthreadpools

"""
Threads.threadid()
Threads.threadid() -> Int
Get the ID number of the current thread of execution. The master thread has ID `1`.
Get the ID number of the current thread of execution. The master thread has
ID `1`.
"""
threadid() = Int(ccall(:jl_threadid, Int16, ())+1)

# Inclusive upper bound on threadid()
"""
Threads.nthreads()
Threads.nthreads([:default|:interactive]) -> Int
Get the number of threads available to the Julia process. This is the inclusive upper bound
on [`threadid()`](@ref).
Get the number of threads (across all thread pools or within the specified
thread pool) available to Julia. The number of threads across all thread
pools is the inclusive upper bound on [`threadid()`](@ref).
See also: `BLAS.get_num_threads` and `BLAS.set_num_threads` in the
[`LinearAlgebra`](@ref man-linalg) standard library, and `nprocs()` in the
[`Distributed`](@ref man-distributed) standard library.
"""
function nthreads end

nthreads() = Int(unsafe_load(cglobal(:jl_n_threads, Cint)))
function nthreads(pool::Symbol)
if pool == :default
tpid = Int8(0)
elseif pool == :interactive
tpid = Int8(1)
else
error("invalid threadpool specified")
end
return _nthreads_in_pool(tpid)
end
function _nthreads_in_pool(tpid::Int8)
p = unsafe_load(cglobal(:jl_n_threads_per_pool, Ptr{Cint}))
return Int(unsafe_load(p, tpid + 1))
end

"""
Threads.threadpool(tid = threadid()) -> Symbol
Returns the specified thread's threadpool; either `:default` or `:interactive`.
"""
function threadpool(tid = threadid())
tpid = ccall(:jl_threadpoolid, Int8, (Int16,), tid-1)
return tpid == 0 ? :default : :interactive
end

"""
Threads.nthreadpools() -> Int
Returns the number of threadpools currently configured.
"""
nthreadpools() = Int(unsafe_load(cglobal(:jl_n_threadpools, Cint)))


function threading_run(fun, static)
ccall(:jl_enter_threaded_region, Cvoid, ())
Expand Down Expand Up @@ -48,7 +84,7 @@ function _threadsfor(iter, lbody, schedule)
quote
local threadsfor_fun
let range = $(esc(range))
function threadsfor_fun(tid=1; onethread=false)
function threadsfor_fun(tid = 1; onethread = false)
r = range # Load into local variable
lenr = length(r)
# divide loop iterations among threads
Expand Down Expand Up @@ -232,35 +268,63 @@ macro threads(args...)
end

"""
Threads.@spawn expr
Threads.@spawn [:default|:interactive] expr
Create a [`Task`](@ref) and [`schedule`](@ref) it to run on any available thread.
The task is allocated to a thread after it becomes available. To wait for the task
to finish, call [`wait`](@ref) on the result of this macro, or call [`fetch`](@ref) to
wait and then obtain its return value.
Create a [`Task`](@ref) and [`schedule`](@ref) it to run on any available
thread in the specified threadpool (`:default` if unspecified). The task is
allocated to a thread once one becomes available. To wait for the task to
finish, call [`wait`](@ref) on the result of this macro, or call
[`fetch`](@ref) to wait and then obtain its return value.
Values can be interpolated into `@spawn` via `\$`, which copies the value directly into the
constructed underlying closure. This allows you to insert the _value_ of a variable,
isolating the asynchronous code from changes to the variable's value in the current task.
Values can be interpolated into `@spawn` via `\$`, which copies the value
directly into the constructed underlying closure. This allows you to insert
the _value_ of a variable, isolating the asynchronous code from changes to
the variable's value in the current task.
!!! note
See the manual chapter on threading for important caveats.
See the manual chapter on [multi-threading](@ref man-multithreading)
for important caveats. See also the chapter on [threadpools](@ref man-threadpools).
!!! compat "Julia 1.3"
This macro is available as of Julia 1.3.
!!! compat "Julia 1.4"
Interpolating values via `\$` is available as of Julia 1.4.
!!! compat "Julia 1.9"
A threadpool may be specified as of Julia 1.9.
"""
macro spawn(expr)
letargs = Base._lift_one_interp!(expr)
macro spawn(args...)
tpid = Int8(0)
na = length(args)
if na == 2
ttype, ex = args
if ttype isa QuoteNode
ttype = ttype.value
elseif ttype isa Symbol
# TODO: allow unquoted symbols
ttype = nothing
end
if ttype === :interactive
tpid = Int8(1)
elseif ttype !== :default
throw(ArgumentError("unsupported threadpool in @spawn: $ttype"))
end
elseif na == 1
ex = args[1]
else
throw(ArgumentError("wrong number of arguments in @spawn"))
end

letargs = Base._lift_one_interp!(ex)

thunk = esc(:(()->($expr)))
thunk = esc(:(()->($ex)))
var = esc(Base.sync_varname)
quote
let $(letargs...)
local task = Task($thunk)
task.sticky = false
ccall(:jl_set_task_threadpoolid, Cint, (Any, Int8), task, $tpid)
if $(Expr(:islocal, var))
put!($var, task)
end
Expand Down
4 changes: 3 additions & 1 deletion doc/src/base/multi-threading.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ Base.Threads.foreach
Base.Threads.@spawn
Base.Threads.threadid
Base.Threads.nthreads
Base.Threads.threadpool
Base.Threads.nthreadpools
```

See also [Multi-Threading](@ref man-multithreading).
Expand Down Expand Up @@ -49,7 +51,7 @@ Base.Threads.atomic_min!
Base.Threads.atomic_fence
```

## ccall using a threadpool (Experimental)
## ccall using a libuv threadpool (Experimental)

```@docs
Base.@threadcall
Expand Down
Loading

0 comments on commit c7ea709

Please sign in to comment.