diff --git a/NEWS.md b/NEWS.md index 9ecdd87f0c2bb..6b67011c8111e 100644 --- a/NEWS.md +++ b/NEWS.md @@ -61,6 +61,12 @@ variables. ([#53742]). Multi-threading changes ----------------------- +* New types are defined to handle the pattern of code that must run once per process, called + a `OncePerProcess{T}` type, which allows defining a function that should be run exactly once + the first time it is called, and then always return the same result value of type `T` + every subsequent time afterwards. There are also `OncePerThread{T}` and `OncePerTask{T}` types for + similar usage with threads or tasks. ([#TBD]) + Build system changes -------------------- diff --git a/base/docs/basedocs.jl b/base/docs/basedocs.jl index e28b3a21659a8..5839610abbfa1 100644 --- a/base/docs/basedocs.jl +++ b/base/docs/basedocs.jl @@ -153,6 +153,8 @@ runtime initialization functions of external C libraries and initializing global that involve pointers returned by external libraries. See the [manual section about modules](@ref modules) for more details. +See also: [`OncePerProcess`](@ref). + # Examples ```julia const foo_data_ptr = Ref{Ptr{Cvoid}}(0) diff --git a/base/exports.jl b/base/exports.jl index daba9a010a9e6..56cd58ce269e7 100644 --- a/base/exports.jl +++ b/base/exports.jl @@ -70,6 +70,9 @@ export OrdinalRange, Pair, PartialQuickSort, + OncePerProcess, + OncePerTask, + OncePerThread, PermutedDimsArray, QuickSort, Rational, diff --git a/base/lock.jl b/base/lock.jl index b473045e5809d..58fb263390860 100644 --- a/base/lock.jl +++ b/base/lock.jl @@ -2,6 +2,13 @@ const ThreadSynchronizer = GenericCondition{Threads.SpinLock} +""" + current_task() + +Get the currently running [`Task`](@ref). +""" +current_task() = ccall(:jl_get_current_task, Ref{Task}, ()) + # Advisory reentrant lock """ ReentrantLock() @@ -500,7 +507,7 @@ Create a level-triggered event source. Tasks that call [`wait`](@ref) on an After `notify` is called, the `Event` remains in a signaled state and tasks will no longer block when waiting for it, until `reset` is called. -If `autoreset` is true, at most one task will be released from `wait` for +If `autoreset` is true, at most one task will be released from `wait` for) each call to `notify`. This provides an acquire & release memory ordering on notify/wait. @@ -570,3 +577,274 @@ end import .Base: Event export Event end + +const PerStateInitial = 0x00 +const PerStateHasrun = 0x01 +const PerStateErrored = 0x02 +const PerStateConcurrent = 0x03 + +""" + OncePerProcess{T} + +Calling a `OncePerProcess` object returns a value of type `T` by running the +function `initializer` exactly once per process. All concurrent and future +calls in the same process will return exactly the same value. This is useful in +code that will be precompiled, as it allows setting up caches or other state +which won't get serialized. + +## Example + +```jldoctest +julia> const global_state = Base.OncePerProcess{Vector{UInt32}}() do + println("Making lazy global value...done.") + return [Libc.rand()] + end; + +julia> procstate = global_state(); +Making lazy global value...done. + +julia> procstate === global_state() +true + +julia> procstate === fetch(@async global_state()) +true +``` +""" +mutable struct OncePerProcess{T, F} + x::Union{Nothing,T} + @atomic state::UInt8 # 0=initial, 1=hasrun, 2=error + @atomic allow_compile_time::Bool + const initializer::F + const lock::ReentrantLock + + function OncePerProcess{T,F}(initializer::F) where {T, F} + once = new{T,F}(nothing, PerStateInitial, true, initializer, ReentrantLock()) + ccall(:jl_set_precompile_field_replace, Cvoid, (Any, Any, Any), + once, :x, nothing) + ccall(:jl_set_precompile_field_replace, Cvoid, (Any, Any, Any), + once, :state, PerStateInitial) + return once + end +end +OncePerProcess{T}(initializer::F) where {T, F} = OncePerProcess{T, F}(initializer) +OncePerProcess(initializer) = OncePerProcess{Base.promote_op(initializer), typeof(initializer)}(initializer) +@inline function (once::OncePerProcess{T})() where T + state = (@atomic :acquire once.state) + if state != PerStateHasrun + (@noinline function init_perprocesss(once, state) + state == PerStateErrored && error("OncePerProcess initializer failed previously") + once.allow_compile_time || __precompile__(false) + lock(once.lock) + try + state = @atomic :monotonic once.state + if state == PerStateInitial + once.x = once.initializer() + elseif state == PerStateErrored + error("OncePerProcess initializer failed previously") + elseif state != PerStateHasrun + error("invalid state for OncePerProcess") + end + catch + state == PerStateErrored || @atomic :release once.state = PerStateErrored + unlock(once.lock) + rethrow() + end + state == PerStateHasrun || @atomic :release once.state = PerStateHasrun + unlock(once.lock) + nothing + end)(once, state) + end + return once.x::T +end + +function copyto_monotonic!(dest::AtomicMemory, src) + i = 1 + for j in eachindex(src) + if isassigned(src, j) + @atomic :monotonic dest[i] = src[j] + #else + # _unsafeindex_atomic!(dest, i, src[j], :monotonic) + end + i += 1 + end + dest +end + +function fill_monotonic!(dest::AtomicMemory, x) + for i = 1:length(dest) + @atomic :monotonic dest[i] = x + end + dest +end + + +# share a lock/condition, since we just need it briefly, so some contention is okay +const PerThreadLock = ThreadSynchronizer() +""" + OncePerThread{T} + +Calling a `OncePerThread` object returns a value of type `T` by running the function +`initializer` exactly once per thread. All future calls in the same thread, and +concurrent or future calls with the same thread id, will return exactly the +same value. The object can also be indexed by the threadid for any existing +thread, to get (or initialize *on this thread*) the value stored for that +thread. Incorrect usage can lead to data-races or memory corruption so use only +if that behavior is correct within your library's threading-safety design. + +Warning: it is not necessarily true that a Task only runs on one thread, therefore the value +returned here may alias other values or change in the middle of your program. This type may +get deprecated in the future. If initializer yields, the thread running the current task +after the call might not be the same as the one at the start of the call. + +See also: [`OncePerTask`](@ref). + +## Example + +```jldoctest +julia> const thread_state = Base.OncePerThread{Vector{UInt32}}() do + println("Making lazy thread value...done.") + return [Libc.rand()] + end; + +julia> threadvec = thread_state(); +Making lazy thread value...done. + +julia> threadvec === fetch(@async thread_state()) +true + +julia> threadvec === thread_state[Threads.threadid()] +true +``` +""" +mutable struct OncePerThread{T, F} + @atomic xs::AtomicMemory{T} # values + @atomic ss::AtomicMemory{UInt8} # states: 0=initial, 1=hasrun, 2=error, 3==concurrent + const initializer::F + + function OncePerThread{T,F}(initializer::F) where {T, F} + xs, ss = AtomicMemory{T}(), AtomicMemory{UInt8}() + once = new{T,F}(xs, ss, initializer) + ccall(:jl_set_precompile_field_replace, Cvoid, (Any, Any, Any), + once, :xs, xs) + ccall(:jl_set_precompile_field_replace, Cvoid, (Any, Any, Any), + once, :ss, ss) + return once + end +end +OncePerThread{T}(initializer::F) where {T, F} = OncePerThread{T,F}(initializer) +OncePerThread(initializer) = OncePerThread{Base.promote_op(initializer), typeof(initializer)}(initializer) +@inline function getindex(once::OncePerThread, tid::Integer) + tid = Int(tid) + ss = @atomic :acquire once.ss + xs = @atomic :monotonic once.xs + # n.b. length(xs) >= length(ss) + if tid <= 0 || tid > length(ss) || (@atomic :acquire ss[tid]) != PerStateHasrun + (@noinline function init_perthread(once, tid) + local ss = @atomic :acquire once.ss + local xs = @atomic :monotonic once.xs + local len = length(ss) + # slow path to allocate it + nt = Threads.maxthreadid() + 0 < tid <= nt || throw(ArgumentError("thread id outside of allocated range")) + if tid <= length(ss) && (@atomic :acquire ss[tid]) == PerStateErrored + error("OncePerThread initializer failed previously") + end + newxs = xs + newss = ss + if tid > len + # attempt to do all allocations outside of PerThreadLock for better scaling + @assert length(xs) >= length(ss) "logical constraint violation" + newxs = typeof(xs)(undef, len + nt) + newss = typeof(ss)(undef, len + nt) + end + # uses state and locks to ensure this runs exactly once per tid argument + lock(PerThreadLock) + try + ss = @atomic :monotonic once.ss + xs = @atomic :monotonic once.xs + if tid > length(ss) + @assert len <= length(ss) <= length(newss) "logical constraint violation" + fill_monotonic!(newss, PerStateInitial) + xs = copyto_monotonic!(newxs, xs) + ss = copyto_monotonic!(newss, ss) + @atomic :release once.xs = xs + @atomic :release once.ss = ss + end + state = @atomic :monotonic ss[tid] + while state == PerStateConcurrent + # lost race, wait for notification this is done running elsewhere + wait(PerThreadLock) # wait for initializer to finish without releasing this thread + ss = @atomic :monotonic once.ss + state = @atomic :monotonic ss[tid] + end + if state == PerStateInitial + # won the race, drop lock in exchange for state, and run user initializer + @atomic :monotonic ss[tid] = PerStateConcurrent + result = try + unlock(PerThreadLock) + once.initializer() + catch + lock(PerThreadLock) + ss = @atomic :monotonic once.ss + @atomic :release ss[tid] = PerStateErrored + notify(PerThreadLock) + rethrow() + end + # store result and notify waiters + lock(PerThreadLock) + xs = @atomic :monotonic once.xs + @atomic :release xs[tid] = result + ss = @atomic :monotonic once.ss + @atomic :release ss[tid] = PerStateHasrun + notify(PerThreadLock) + elseif state == PerStateErrored + error("OncePerThread initializer failed previously") + elseif state != PerStateHasrun + error("invalid state for OncePerThread") + end + finally + unlock(PerThreadLock) + end + nothing + end)(once, tid) + xs = @atomic :monotonic once.xs + end + return xs[tid] +end +@inline (once::OncePerThread)() = once[Threads.threadid()] + +""" + OncePerTask{T} + +Calling a `OncePerTask` object returns a value of type `T` by running the function `initializer` +exactly once per Task. All future calls in the same Task will return exactly the same value. + +See also: [`task_local_storage`](@ref). + +## Example + +```jldoctest +julia> const task_state = Base.OncePerTask{Vector{UInt32}}() do + println("Making lazy task value...done.") + return [Libc.rand()] + end; + +julia> taskvec = task_state(); +Making lazy task value...done. + +julia> taskvec === task_state() +true + +julia> taskvec === fetch(@async task_state()) +Making lazy task value...done. +false +``` +""" +mutable struct OncePerTask{T, F} + const initializer::F + + OncePerTask{T}(initializer::F) where {T, F} = new{T,F}(initializer) + OncePerTask{T,F}(initializer::F) where {T, F} = new{T,F}(initializer) + OncePerTask(initializer) = new{Base.promote_op(initializer), typeof(initializer)}(initializer) +end +@inline (once::OncePerTask)() = get!(once.initializer, task_local_storage(), once) diff --git a/base/task.jl b/base/task.jl index 6cb1ff785eeee..f3a134f374421 100644 --- a/base/task.jl +++ b/base/task.jl @@ -143,13 +143,6 @@ macro task(ex) :(Task($thunk)) end -""" - current_task() - -Get the currently running [`Task`](@ref). -""" -current_task() = ccall(:jl_get_current_task, Ref{Task}, ()) - # task states const task_state_runnable = UInt8(0) diff --git a/doc/src/base/base.md b/doc/src/base/base.md index b5d50a846ce89..7181965d9aa81 100644 --- a/doc/src/base/base.md +++ b/doc/src/base/base.md @@ -34,6 +34,9 @@ Main.include Base.include_string Base.include_dependency __init__ +Base.OncePerProcess +Base.OncePerTask +Base.OncePerThread Base.which(::Any, ::Any) Base.methods Base.@show diff --git a/src/builtins.c b/src/builtins.c index 96c4cec0f5087..b129cca0ee71d 100644 --- a/src/builtins.c +++ b/src/builtins.c @@ -1008,7 +1008,7 @@ static inline size_t get_checked_fieldindex(const char *name, jl_datatype_t *st, else { jl_value_t *ts[2] = {(jl_value_t*)jl_long_type, (jl_value_t*)jl_symbol_type}; jl_value_t *t = jl_type_union(ts, 2); - jl_type_error("getfield", t, arg); + jl_type_error(name, t, arg); } if (mutabl && jl_field_isconst(st, idx)) { jl_errorf("%s: const field .%s of type %s cannot be changed", name, diff --git a/src/gc-stock.c b/src/gc-stock.c index d25f8917f302d..6d94b7e3ed19d 100644 --- a/src/gc-stock.c +++ b/src/gc-stock.c @@ -2739,6 +2739,8 @@ static void gc_mark_roots(jl_gc_markqueue_t *mq) gc_heap_snapshot_record_gc_roots((jl_value_t*)jl_global_roots_list, "global_roots_list"); gc_try_claim_and_push(mq, jl_global_roots_keyset, NULL); gc_heap_snapshot_record_gc_roots((jl_value_t*)jl_global_roots_keyset, "global_roots_keyset"); + gc_try_claim_and_push(mq, precompile_field_replace, NULL); + gc_heap_snapshot_record_gc_roots((jl_value_t*)precompile_field_replace, "precompile_field_replace"); } // find unmarked objects that need to be finalized from the finalizer list "list". diff --git a/src/julia_internal.h b/src/julia_internal.h index f00667d016796..76a39e8b69a14 100644 --- a/src/julia_internal.h +++ b/src/julia_internal.h @@ -855,6 +855,8 @@ extern jl_genericmemory_t *jl_global_roots_list JL_GLOBALLY_ROOTED; extern jl_genericmemory_t *jl_global_roots_keyset JL_GLOBALLY_ROOTED; JL_DLLEXPORT int jl_is_globally_rooted(jl_value_t *val JL_MAYBE_UNROOTED) JL_NOTSAFEPOINT; JL_DLLEXPORT jl_value_t *jl_as_global_root(jl_value_t *val, int insert) JL_GLOBALLY_ROOTED; +extern jl_svec_t *precompile_field_replace JL_GLOBALLY_ROOTED; +JL_DLLEXPORT void jl_set_precompile_field_replace(jl_value_t *val, jl_value_t *field, jl_value_t *newval) JL_GLOBALLY_ROOTED; jl_opaque_closure_t *jl_new_opaque_closure(jl_tupletype_t *argt, jl_value_t *rt_lb, jl_value_t *rt_ub, jl_value_t *source, jl_value_t **env, size_t nenv, int do_compile); diff --git a/src/staticdata.c b/src/staticdata.c index 363aa46b62221..59b486b467610 100644 --- a/src/staticdata.c +++ b/src/staticdata.c @@ -492,6 +492,7 @@ void *native_functions; // opaque jl_native_code_desc_t blob used for fetching // table of struct field addresses to rewrite during saving static htable_t field_replace; +static htable_t bits_replace; static htable_t relocatable_ext_cis; // array of definitions for the predefined function pointers @@ -1606,7 +1607,23 @@ static void jl_write_values(jl_serializer_state *s) JL_GC_DISABLED write_padding(f, offset - tot); tot = offset; size_t fsz = jl_field_size(t, i); - if (t->name->mutabl && jl_is_cpointer_type(jl_field_type_concrete(t, i)) && *(intptr_t*)slot != -1) { + jl_value_t *replace = (jl_value_t*)ptrhash_get(&bits_replace, (void*)slot); + if (replace != HT_NOTFOUND) { + assert(t->name->mutabl && !jl_field_isptr(t, i)); + jl_value_t *rty = jl_typeof(replace); + size_t sz = jl_datatype_size(rty); + ios_write(f, (const char*)replace, sz); + jl_value_t *ft = jl_field_type_concrete(t, i); + int isunion = jl_is_uniontype(ft); + unsigned nth = 0; + if (!jl_find_union_component(ft, rty, &nth)) + assert(0 && "invalid field assignment to isbits union"); + assert(sz <= fsz - isunion); + write_padding(f, fsz - sz - isunion); + if (isunion) + write_uint8(f, nth); + } + else if (t->name->mutabl && jl_is_cpointer_type(jl_field_type_concrete(t, i)) && *(intptr_t*)slot != -1) { // reset Ptr fields to C_NULL (but keep MAP_FAILED / INVALID_HANDLE) assert(!jl_field_isptr(t, i)); write_pointer(f); @@ -2553,6 +2570,65 @@ jl_mutex_t global_roots_lock; extern jl_mutex_t world_counter_lock; extern size_t jl_require_world; +jl_mutex_t precompile_field_replace_lock; +jl_svec_t *precompile_field_replace JL_GLOBALLY_ROOTED; + +static inline jl_value_t *get_checked_fieldindex(const char *name, jl_datatype_t *st, jl_value_t *v, jl_value_t *arg, int mutabl) +{ + if (mutabl) { + if (st == jl_module_type) + jl_error("cannot assign variables in other modules"); + if (!st->name->mutabl) + jl_errorf("%s: immutable struct of type %s cannot be changed", name, jl_symbol_name(st->name->name)); + } + size_t idx; + if (jl_is_long(arg)) { + idx = jl_unbox_long(arg) - 1; + if (idx >= jl_datatype_nfields(st)) + jl_bounds_error(v, arg); + } + else if (jl_is_symbol(arg)) { + idx = jl_field_index(st, (jl_sym_t*)arg, 1); + arg = jl_box_long(idx); + } + else { + jl_value_t *ts[2] = {(jl_value_t*)jl_long_type, (jl_value_t*)jl_symbol_type}; + jl_value_t *t = jl_type_union(ts, 2); + jl_type_error(name, t, arg); + } + if (mutabl && jl_field_isconst(st, idx)) { + jl_errorf("%s: const field .%s of type %s cannot be changed", name, + jl_symbol_name((jl_sym_t*)jl_svecref(jl_field_names(st), idx)), jl_symbol_name(st->name->name)); + } + return arg; +} + +JL_DLLEXPORT void jl_set_precompile_field_replace(jl_value_t *val, jl_value_t *field, jl_value_t *newval) +{ + if (!jl_generating_output()) + return; + jl_datatype_t *st = (jl_datatype_t*)jl_typeof(val); + jl_value_t *idx = get_checked_fieldindex("setfield!", st, val, field, 1); + JL_GC_PUSH1(&idx); + size_t idxval = jl_unbox_long(idx); + jl_value_t *ft = jl_field_type_concrete(st, idxval); + if (!jl_isa(newval, ft)) + jl_type_error("setfield!", ft, newval); + JL_LOCK(&precompile_field_replace_lock); + if (precompile_field_replace == NULL) { + precompile_field_replace = jl_alloc_svec(3); + jl_svecset(precompile_field_replace, 0, jl_alloc_vec_any(0)); + jl_svecset(precompile_field_replace, 1, jl_alloc_vec_any(0)); + jl_svecset(precompile_field_replace, 2, jl_alloc_vec_any(0)); + } + jl_array_ptr_1d_push((jl_array_t*)jl_svecref(precompile_field_replace, 0), val); + jl_array_ptr_1d_push((jl_array_t*)jl_svecref(precompile_field_replace, 1), idx); + jl_array_ptr_1d_push((jl_array_t*)jl_svecref(precompile_field_replace, 2), newval); + JL_GC_POP(); + JL_UNLOCK(&precompile_field_replace_lock); +} + + JL_DLLEXPORT int jl_is_globally_rooted(jl_value_t *val JL_MAYBE_UNROOTED) JL_NOTSAFEPOINT { if (jl_is_datatype(val)) { @@ -2672,9 +2748,41 @@ static void jl_save_system_image_to_stream(ios_t *f, jl_array_t *mod_array, jl_array_t *ext_targets, jl_array_t *edges) JL_GC_DISABLED { htable_new(&field_replace, 0); + htable_new(&bits_replace, 0); // strip metadata and IR when requested if (jl_options.strip_metadata || jl_options.strip_ir) jl_strip_all_codeinfos(); + // prepare hash table with any fields the user wanted us to rewrite during serialization + if (precompile_field_replace) { + jl_array_t *vals = (jl_array_t*)jl_svecref(precompile_field_replace, 0); + jl_array_t *fields = (jl_array_t*)jl_svecref(precompile_field_replace, 1); + jl_array_t *newvals = (jl_array_t*)jl_svecref(precompile_field_replace, 2); + size_t i, l = jl_array_nrows(vals); + assert(jl_array_nrows(fields) == l && jl_array_nrows(newvals) == l); + for (i = 0; i < l; i++) { + jl_value_t *val = jl_array_ptr_ref(vals, i); + size_t field = jl_unbox_long(jl_array_ptr_ref(fields, i)); + jl_value_t *newval = jl_array_ptr_ref(newvals, i); + jl_datatype_t *st = (jl_datatype_t*)jl_typeof(val); + size_t offs = jl_field_offset(st, field); + char *fldaddr = (char*)val + offs; + if (jl_field_isptr(st, field)) { + record_field_change((jl_value_t**)fldaddr, newval); + } + else { + // replace the bits + ptrhash_put(&bits_replace, (void*)fldaddr, newval); + // and any pointers inside + jl_datatype_t *rty = (jl_datatype_t*)jl_typeof(newval); + const jl_datatype_layout_t *layout = rty->layout; + size_t j, np = layout->npointers; + for (j = 0; j < np; j++) { + uint32_t ptr = jl_ptr_offset(rty, j); + record_field_change((jl_value_t**)fldaddr + ptr, *(((jl_value_t**)newval) + ptr)); + } + } + } + } int en = jl_gc_enable(0); nsym_tag = 0; @@ -2967,6 +3075,7 @@ static void jl_save_system_image_to_stream(ios_t *f, jl_array_t *mod_array, arraylist_free(&gvars); arraylist_free(&external_fns); htable_free(&field_replace); + htable_free(&bits_replace); htable_free(&serialization_order); htable_free(&nullptrs); htable_free(&symbol_table); diff --git a/src/threading.c b/src/threading.c index 44b1192528531..c26028d2f3da2 100644 --- a/src/threading.c +++ b/src/threading.c @@ -464,6 +464,30 @@ static void jl_delete_thread(void *value) JL_NOTSAFEPOINT_ENTER // prior unsafe-region (before we let it release the stack memory) (void)jl_gc_unsafe_enter(ptls); scheduler_delete_thread(ptls); + // need to clear pgcstack and eh, but we can clear everything now too + jl_task_t *ct = jl_atomic_load_relaxed(&ptls->current_task); + jl_task_frame_noreturn(ct); + if (jl_set_task_tid(ptls->root_task, ptls->tid)) { + // the system will probably free this stack memory soon + // so prevent any other thread from accessing it later + if (ct != ptls->root_task) + jl_task_frame_noreturn(ptls->root_task); + } + else { + // Uh oh. The user cleared the sticky bit so it started running + // elsewhere, then called pthread_exit on this thread from another + // Task, which will free the stack memory of that root task soon. This + // is not recoverable. Though we could just hang here, a fatal message + // is likely better. + jl_safe_printf("fatal: thread exited from wrong Task.\n"); + abort(); + } + ptls->previous_exception = NULL; + // allow the page root_task is on to be freed + ptls->root_task = NULL; + jl_free_thread_gc_state(ptls); + // park in safe-region from here on (this may run GC again) + (void)jl_gc_safe_enter(ptls); // try to free some state we do not need anymore #ifndef _OS_WINDOWS_ void *signal_stack = ptls->signal_stack; @@ -502,21 +526,7 @@ static void jl_delete_thread(void *value) JL_NOTSAFEPOINT_ENTER #else pthread_mutex_lock(&in_signal_lock); #endif - // need to clear pgcstack and eh, but we can clear everything now too - jl_task_frame_noreturn(jl_atomic_load_relaxed(&ptls->current_task)); - if (jl_set_task_tid(ptls->root_task, ptls->tid)) { - // the system will probably free this stack memory soon - // so prevent any other thread from accessing it later - jl_task_frame_noreturn(ptls->root_task); - } - else { - // Uh oh. The user cleared the sticky bit so it started running - // elsewhere, then called pthread_exit on this thread. This is not - // recoverable. Though we could just hang here, a fatal message is better. - jl_safe_printf("fatal: thread exited from wrong Task.\n"); - abort(); - } - jl_atomic_store_relaxed(&ptls->current_task, NULL); // dead + jl_atomic_store_relaxed(&ptls->current_task, NULL); // indicate dead // finally, release all of the locks we had grabbed #ifdef _OS_WINDOWS_ jl_unlock_profile_wr(); @@ -529,12 +539,6 @@ static void jl_delete_thread(void *value) JL_NOTSAFEPOINT_ENTER #endif free(ptls->bt_data); small_arraylist_free(&ptls->locks); - ptls->previous_exception = NULL; - // allow the page root_task is on to be freed - ptls->root_task = NULL; - jl_free_thread_gc_state(ptls); - // then park in safe-region - (void)jl_gc_safe_enter(ptls); } //// debugging hack: if we are exiting too fast for error message printing on threads, diff --git a/test/precompile.jl b/test/precompile.jl index 7a6e41061f9b1..adf10363298ba 100644 --- a/test/precompile.jl +++ b/test/precompile.jl @@ -94,6 +94,17 @@ precompile_test_harness(false) do dir end abstract type AbstractAlgebraMap{A} end struct GAPGroupHomomorphism{A, B} <: AbstractAlgebraMap{GAPGroupHomomorphism{B, A}} end + + global process_state_calls::Int = 0 + const process_state = Base.OncePerProcess{typeof(getpid())}() do + @assert (global process_state_calls += 1) == 1 + return getpid() + end + const mypid = process_state() + @assert process_state_calls === 1 + process_state_calls = 0 + @assert process_state() === process_state() + @assert process_state_calls === 0 end """) write(Foo2_file, @@ -272,6 +283,9 @@ precompile_test_harness(false) do dir oid_vec_int = objectid(a_vec_int) oid_mat_int = objectid(a_mat_int) + + using $FooBase_module: process_state, mypid as FooBase_pid, process_state_calls + const mypid = process_state() end """) # Issue #52063 @@ -333,6 +347,13 @@ precompile_test_harness(false) do dir @test isready(Foo.ch2) @test take!(Foo.ch2) === 2 @test !isready(Foo.ch2) + + @test Foo.process_state_calls === 0 + @test Foo.process_state() === getpid() + @test Foo.mypid !== getpid() + @test Foo.FooBase_pid !== getpid() + @test Foo.mypid !== Foo.FooBase_pid + @test Foo.process_state_calls === 1 end let diff --git a/test/threads.jl b/test/threads.jl index 6265368c2ac79..2dbdcf99eae28 100644 --- a/test/threads.jl +++ b/test/threads.jl @@ -374,3 +374,127 @@ end end end end + +let once = OncePerProcess(() -> return [nothing]) + @test typeof(once) <: OncePerProcess{Vector{Nothing}} + x = once() + @test x === once() + @atomic once.state = 0xff + @test_throws ErrorException("invalid state for OncePerProcess") once() + @test_throws ErrorException("OncePerProcess initializer failed previously") once() + @atomic once.state = 0x01 + @test x === once() +end +let once = OncePerProcess{Int}(() -> error("expected")) + @test_throws ErrorException("expected") once() + @test_throws ErrorException("OncePerProcess initializer failed previously") once() +end + +let e = Base.Event(true), + started = Channel{Int16}(Inf), + finish = Channel{Nothing}(Inf), + exiting = Channel{Nothing}(Inf), + starttest2 = Event(), + once = OncePerThread() do + push!(started, threadid()) + take!(finish) + return [nothing] + end + alls = OncePerThread() do + return [nothing] + end + @test typeof(once) <: OncePerThread{Vector{Nothing}} + push!(finish, nothing) + @test_throws ArgumentError once[0] + x = once() + @test_throws ArgumentError once[0] + @test x === once() === fetch(@async once()) === once[threadid()] + @test take!(started) == threadid() + @test isempty(started) + tids = zeros(UInt, 50) + newthreads = zeros(Int16, length(tids)) + onces = Vector{Vector{Nothing}}(undef, length(tids)) + allonces = Vector{Vector{Vector{Nothing}}}(undef, length(tids)) + for i = 1:length(tids) + function cl() + GC.gc(false) # stress test the GC-safepoint mechanics of jl_adopt_thread + try + newthreads[i] = threadid() + local y = once() + onces[i] = y + @test x !== y === once() === once[threadid()] + wait(starttest2) + allonces[i] = Vector{Nothing}[alls[tid] for tid in newthreads] + catch ex + close(started, ErrorException("failed")) + close(finish, ErrorException("failed")) + @lock stderr Base.display_error(current_exceptions()) + end + push!(exiting, nothing) + GC.gc(false) # stress test the GC-safepoint mechanics of jl_delete_thread + nothing + end + function threadcallclosure(cl::F) where {F} # create sparam so we can reference the type of cl in the ccall type + threadwork = @cfunction cl -> cl() Cvoid (Ref{F},) # create a cfunction that specializes on cl as an argument and calls it + err = @ccall uv_thread_create(Ref(tids, i)::Ptr{UInt}, threadwork::Ptr{Cvoid}, cl::Ref{F})::Cint # call that on a thread + err == 0 || Base.uv_error("uv_thread_create", err) + end + threadcallclosure(cl) + end + @noinline function waitallthreads(tids) + for i = 1:length(tids) + tid = Ref(tids, i) + tidp = Base.unsafe_convert(Ptr{UInt}, tid)::Ptr{UInt} + gc_state = @ccall jl_gc_safe_enter()::Int8 + GC.@preserve tid err = @ccall uv_thread_join(tidp::Ptr{UInt})::Cint + @ccall jl_gc_safe_leave(gc_state::Int8)::Cvoid + err == 0 || Base.uv_error("uv_thread_join", err) + end + end + try + # let them finish in batches of 10 + for i = 1:length(tids) รท 10 + for i = 1:10 + newid = take!(started) + @test newid != threadid() + end + for i = 1:10 + push!(finish, nothing) + end + end + @test isempty(started) + # now run the second part of the test where they all try to access the other threads elements + notify(starttest2) + finally + for _ = 1:length(tids) + # run IO loop until all threads are close to exiting + take!(exiting) + end + waitallthreads(tids) + end + @test isempty(started) + @test isempty(finish) + @test length(IdSet{eltype(onces)}(onces)) == length(onces) # make sure every object is unique + allexpected = Vector{Nothing}[alls[tid] for tid in newthreads] + @test length(IdSet{eltype(allexpected)}(allexpected)) == length(allexpected) # make sure every object is unique + @test all(i -> allonces[i] !== allexpected && all(j -> allonces[i][j] === allexpected[j], eachindex(allexpected)), eachindex(allonces)) # make sure every thread saw the same elements + @test_throws ArgumentError once[Threads.maxthreadid() + 1] + @test_throws ArgumentError once[-1] + +end +let once = OncePerThread{Int}(() -> error("expected")) + @test_throws ErrorException("expected") once() + @test_throws ErrorException("OncePerThread initializer failed previously") once() +end + +let once = OncePerTask(() -> return [nothing]) + @test typeof(once) <: OncePerTask{Vector{Nothing}} + x = once() + @test x === once() !== fetch(@async once()) + delete!(task_local_storage(), once) + @test x !== once() === once() +end +let once = OncePerTask{Int}(() -> error("expected")) + @test_throws ErrorException("expected") once() + @test_throws ErrorException("expected") once() +end