From 9463fe8ca6d4b1476f483d2834ad9858d9fe7ab9 Mon Sep 17 00:00:00 2001 From: Jameson Nash Date: Thu, 21 Sep 2023 20:12:38 -0400 Subject: [PATCH 1/4] WIP: add an ability to suspend/resume a thread in a GC-safe way --- src/julia_internal.h | 2 +- src/julia_threads.h | 9 ++++--- src/safepoint.c | 63 ++++++++++++++++++++++++++++++++++++++++++-- src/signals-unix.c | 2 ++ src/threading.c | 6 ++--- 5 files changed, 72 insertions(+), 10 deletions(-) diff --git a/src/julia_internal.h b/src/julia_internal.h index 41f976b8585f3..de5169ea85d87 100644 --- a/src/julia_internal.h +++ b/src/julia_internal.h @@ -892,7 +892,7 @@ extern char *jl_safepoint_pages; STATIC_INLINE int jl_addr_is_safepoint(uintptr_t addr) { uintptr_t safepoint_addr = (uintptr_t)jl_safepoint_pages; - return addr >= safepoint_addr && addr < safepoint_addr + jl_page_size * 3; + return addr >= safepoint_addr && addr < safepoint_addr + jl_page_size * 4; } extern _Atomic(uint32_t) jl_gc_running; extern _Atomic(uint32_t) jl_gc_disable_counter; diff --git a/src/julia_threads.h b/src/julia_threads.h index 7510eae308d27..cd2d1205d3e77 100644 --- a/src/julia_threads.h +++ b/src/julia_threads.h @@ -211,7 +211,7 @@ typedef struct _jl_tls_states_t { int16_t tid; int8_t threadpoolid; uint64_t rngseed; - volatile size_t *safepoint; + _Atomic(volatile size_t *) safepoint; // may be changed to the suspend page by any thread _Atomic(int8_t) sleep_check_state; // read/write from foreign threads // Whether it is safe to execute GC at the same time. #define JL_GC_STATE_WAITING 1 @@ -225,9 +225,9 @@ typedef struct _jl_tls_states_t { // statements is prohibited from certain // callbacks (such as generated functions) // as it may make compilation undecidable - int8_t in_pure_callback; - int8_t in_finalizer; - int8_t disable_gc; + int16_t in_pure_callback; + int16_t in_finalizer; + int16_t disable_gc; // Counter to disable finalizer **on the current thread** int finalizers_inhibited; jl_thread_heap_t heap; // this is very large, and the offset is baked into codegen @@ -264,6 +264,7 @@ typedef struct _jl_tls_states_t { void *signal_stack; #endif jl_thread_t system_id; + _Atomic(int16_t) suspend_count; arraylist_t finalizers; jl_gc_page_stack_t page_metadata_allocd; jl_gc_page_stack_t page_metadata_buffered; diff --git a/src/safepoint.c b/src/safepoint.c index c6f9a42059d1a..c13cd9687e753 100644 --- a/src/safepoint.c +++ b/src/safepoint.c @@ -30,6 +30,7 @@ char *jl_safepoint_pages = NULL; // so that both safepoint load and pending signal load falls in this page. // The initialization of the `safepoint` pointer is done `ti_initthread` // in `threading.c`. +// The fourth page is always disabled, so not tracked here. uint8_t jl_safepoint_enable_cnt[3] = {0, 0, 0}; // This lock should be acquired before enabling/disabling the safepoint @@ -92,9 +93,9 @@ void jl_safepoint_init(void) // jl_page_size isn't available yet. size_t pgsz = jl_getpagesize(); #ifdef _OS_WINDOWS_ - char *addr = (char*)VirtualAlloc(NULL, pgsz * 3, MEM_COMMIT, PAGE_READONLY); + char *addr = (char*)VirtualAlloc(NULL, pgsz * 4, MEM_COMMIT, PAGE_READONLY); #else - char *addr = (char*)mmap(0, pgsz * 3, PROT_READ, + char *addr = (char*)mmap(0, pgsz * 4, PROT_READ, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0); if (addr == MAP_FAILED) addr = NULL; @@ -104,6 +105,13 @@ void jl_safepoint_init(void) jl_gc_debug_critical_error(); abort(); } + char *pageaddr = jl_safepoint_pages + jl_page_size * 3; +#ifdef _OS_WINDOWS_ + DWORD old_prot; + VirtualProtect(pageaddr, jl_page_size, PAGE_NOACCESS, &old_prot); +#else + mprotect(pageaddr, jl_page_size, PROT_NONE); +#endif // The signal page is for the gc safepoint. // The page before it is the sigint pending flag. jl_safepoint_pages = addr; @@ -175,6 +183,57 @@ void jl_safepoint_wait_gc(void) } } +// equivalent to jl_set_gc_and_wait, but waiting on resume-thread lock instead +void jl_safepoint_wait_thread_resume(void) +{ + jl_task_t *ct = jl_current_task; + JL_TIMING_SUSPEND_TASK(GC_SAFEPOINT, ct); + int8_t state = jl_atomic_load_relaxed(&ct->ptls->gc_state); + jl_atomic_store_release(&ct->ptls->gc_state, JL_GC_STATE_WAITING); + uv_mutex_lock(&ct->ptls->sleep_lock); + while (jl_atomic_load(&ct->ptls->suspend_count)) + uv_cond_wait(&ct->ptls->wake_signal, &ct->ptls->sleep_lock); + uv_mutex_unlock(&ct->ptls->sleep_lock); + jl_atomic_store_release(&ct->ptls->gc_state, state); +} + +// n.b. suspended threads may still run in the GC or GC safe regions +int jl_safepoint_suspend_thread(int tid) +{ + if (0 > tid || tid >= jl_atomic_load_acquire(&jl_n_threads)) + return 0; + jl_ptls_t ptls2 = jl_atomic_load_relaxed(&jl_all_tls_states)[tid]; + uv_mutex_lock(&ptls2->sleep_lock); + int16_t suspend_count = jl_atomic_load_relaxed(&ptls2->suspend_count) + 1; + jl_atomic_store(&ptls2->suspend_count, suspend_count); // seq-cst to ensure this happens-before gc_state read + if (suspend_count == 1) // first to suspend + jl_atomic_store_relaxed(&ptls2->safepoint, (size_t*)(jl_safepoint_pages + jl_page_size * 3 + sizeof(void*))); + uv_mutex_unlock(&ptls2->sleep_lock); + while (jl_atomic_load_acquire(&ptls2->gc_state)) // wait for suspend + jl_cpu_pause(); // yield? + return suspend_count; +} + +int jl_safepoint_resume_thread(int tid) JL_NOTSAFEPOINT +{ + if (0 > tid || tid >= jl_atomic_load_acquire(&jl_n_threads)) + return 0; + jl_ptls_t ptls2 = jl_atomic_load_relaxed(&jl_all_tls_states)[tid]; + uv_mutex_lock(&ptls2->sleep_lock); + int16_t suspend_count = jl_atomic_load_relaxed(&ptls2->suspend_count); + if (suspend_count == 1) { // last to unsuspend + if (tid == 0) + jl_atomic_store_relaxed(&ptls2->safepoint, (size_t*)(jl_safepoint_pages + jl_page_size)); + else + jl_atomic_store_relaxed(&ptls2->safepoint, (size_t*)(jl_safepoint_pages + jl_page_size * 2 + sizeof(void*))); + uv_cond_signal(&safepoint_cond); + } + if (suspend_count != 0) + jl_atomic_store(&ptls2->suspend_count, suspend_count - 1); + uv_mutex_unlock(&ptls2->sleep_lock); + return suspend_count; +} + void jl_safepoint_enable_sigint(void) { uv_mutex_lock(&safepoint_lock); diff --git a/src/signals-unix.c b/src/signals-unix.c index 0d5ad9b1be7c5..2c20d5d295273 100644 --- a/src/signals-unix.c +++ b/src/signals-unix.c @@ -361,6 +361,8 @@ JL_NO_ASAN static void segv_handler(int sig, siginfo_t *info, void *context) } if (sig == SIGSEGV && info->si_code == SEGV_ACCERR && jl_addr_is_safepoint((uintptr_t)info->si_addr) && !is_write_fault(context)) { jl_set_gc_and_wait(); + while (jl_atomic_load(&ct->ptls->suspend_count)) // seq-cst to ensure this happens-after gc_state write + jl_safepoint_wait_thread_resume(); // Do not raise sigint on worker thread if (jl_atomic_load_relaxed(&ct->tid) != 0) return; diff --git a/src/threading.c b/src/threading.c index 2ec9c220fbafb..319a2918fab3f 100644 --- a/src/threading.c +++ b/src/threading.c @@ -367,11 +367,11 @@ jl_ptls_t jl_init_threadtls(int16_t tid) // Conditionally initialize the safepoint address. See comment in // `safepoint.c` if (tid == 0) { - ptls->safepoint = (size_t*)(jl_safepoint_pages + jl_page_size); + jl_atomic_store_relaxed(&ptls->safepoint, (size_t*)(jl_safepoint_pages + jl_page_size)); } else { - ptls->safepoint = (size_t*)(jl_safepoint_pages + jl_page_size * 2 + - sizeof(size_t)); + jl_atomic_store_relaxed(&ptls->safepoint, (size_t*)(jl_safepoint_pages + jl_page_size * 2 + + sizeof(size_t))); } jl_bt_element_t *bt_data = (jl_bt_element_t*) malloc_s(sizeof(jl_bt_element_t) * (JL_MAX_BT_SIZE + 1)); From 5e827b697e50a251d01305ef3ae651bdf3b4d220 Mon Sep 17 00:00:00 2001 From: Jameson Nash Date: Thu, 28 Sep 2023 01:00:52 +0200 Subject: [PATCH 2/4] draft 2 --- src/gc.c | 2 ++ src/julia_internal.h | 9 ++++----- src/julia_threads.h | 20 ++++++++++---------- src/rtutils.c | 11 ++++------- src/safepoint.c | 43 +++++++++++++++++++++++++++++++++++-------- src/signals-unix.c | 8 ++++++-- 6 files changed, 61 insertions(+), 32 deletions(-) diff --git a/src/gc.c b/src/gc.c index 190b9810010e9..ed7e47582a10c 100644 --- a/src/gc.c +++ b/src/gc.c @@ -3521,6 +3521,7 @@ JL_DLLEXPORT void jl_gc_collect(jl_gc_collection_t collection) if (!jl_safepoint_start_gc()) { // either another thread is running GC, or the GC got disabled just now. jl_gc_state_set(ptls, old_state, JL_GC_STATE_WAITING); + jl_safepoint_wait_thread_resume(); // block in thread-suspend now if requested, after clearing the gc_state return; } @@ -3574,6 +3575,7 @@ JL_DLLEXPORT void jl_gc_collect(jl_gc_collection_t collection) jl_safepoint_end_gc(); jl_gc_state_set(ptls, old_state, JL_GC_STATE_WAITING); JL_PROBE_GC_END(); + jl_safepoint_wait_thread_resume(); // block in thread-suspend now if requested, after clearing the gc_state // Only disable finalizers on current thread // Doing this on all threads is racy (it's impossible to check diff --git a/src/julia_internal.h b/src/julia_internal.h index de5169ea85d87..699a5438f8c88 100644 --- a/src/julia_internal.h +++ b/src/julia_internal.h @@ -918,7 +918,7 @@ void jl_safepoint_end_gc(void); // Wait for the GC to finish // This function does **NOT** modify the `gc_state` to inform the GC thread // The caller should set it **BEFORE** calling this function. -void jl_safepoint_wait_gc(void); +void jl_safepoint_wait_gc(void) JL_NOTSAFEPOINT; // Set pending sigint and enable the mechanisms to deliver the sigint. void jl_safepoint_enable_sigint(void); @@ -946,8 +946,7 @@ JL_DLLEXPORT void jl_pgcstack_getkey(jl_get_pgcstack_func **f, jl_pgcstack_key_t extern pthread_mutex_t in_signal_lock; #endif -#if !defined(__clang_gcanalyzer__) && !defined(_OS_DARWIN_) -static inline void jl_set_gc_and_wait(void) +static inline void jl_set_gc_and_wait(void) // n.b. not used on _OS_DARWIN_ { jl_task_t *ct = jl_current_task; // reading own gc state doesn't need atomic ops since no one else @@ -956,8 +955,8 @@ static inline void jl_set_gc_and_wait(void) jl_atomic_store_release(&ct->ptls->gc_state, JL_GC_STATE_WAITING); jl_safepoint_wait_gc(); jl_atomic_store_release(&ct->ptls->gc_state, state); + jl_safepoint_wait_thread_resume(); // block in thread-suspend now if requested, after clearing the gc_state } -#endif // Query if a Julia object is if a permalloc region (due to part of a sys- pkg-image) STATIC_INLINE size_t n_linkage_blobs(void) JL_NOTSAFEPOINT @@ -1397,7 +1396,7 @@ extern jl_mutex_t typecache_lock; extern JL_DLLEXPORT jl_mutex_t jl_codegen_lock; #if defined(__APPLE__) -void jl_mach_gc_end(void); +void jl_mach_gc_end(void) JL_NOTSAFEPOINT; #endif // -- smallintset.c -- // diff --git a/src/julia_threads.h b/src/julia_threads.h index cd2d1205d3e77..b8276682ee359 100644 --- a/src/julia_threads.h +++ b/src/julia_threads.h @@ -334,17 +334,17 @@ void jl_sigint_safepoint(jl_ptls_t tls); // This triggers a SegFault when we are in GC // Assign it to a variable to make sure the compiler emit the load // and to avoid Clang warning for -Wunused-volatile-lvalue -#define jl_gc_safepoint_(ptls) do { \ - jl_signal_fence(); \ - size_t safepoint_load = *ptls->safepoint; \ - jl_signal_fence(); \ - (void)safepoint_load; \ +#define jl_gc_safepoint_(ptls) do { \ + jl_signal_fence(); \ + size_t safepoint_load = jl_atomic_load_relaxed(&ptls->safepoint)[0]; \ + jl_signal_fence(); \ + (void)safepoint_load; \ } while (0) -#define jl_sigint_safepoint(ptls) do { \ - jl_signal_fence(); \ - size_t safepoint_load = ptls->safepoint[-1]; \ - jl_signal_fence(); \ - (void)safepoint_load; \ +#define jl_sigint_safepoint(ptls) do { \ + jl_signal_fence(); \ + size_t safepoint_load = jl_atomic_load_relaxed(&ptls->safepoint)[-1]; \ + jl_signal_fence(); \ + (void)safepoint_load; \ } while (0) #endif STATIC_INLINE int8_t jl_gc_state_set(jl_ptls_t ptls, int8_t state, diff --git a/src/rtutils.c b/src/rtutils.c index 35ab89d856783..afe8d24678a61 100644 --- a/src/rtutils.c +++ b/src/rtutils.c @@ -275,15 +275,12 @@ JL_DLLEXPORT void jl_eh_restore_state(jl_handler_t *eh) } ct->world_age = eh->world_age; ct->ptls->defer_signal = eh->defer_signal; - if (old_gc_state != eh->gc_state) { + if (old_gc_state != eh->gc_state) jl_atomic_store_release(&ct->ptls->gc_state, eh->gc_state); - if (old_gc_state) { - jl_gc_safepoint_(ct->ptls); - } - } - if (old_defer_signal && !eh->defer_signal) { + if (!eh->gc_state) + jl_gc_safepoint_(ct->ptls); + if (old_defer_signal && !eh->defer_signal) jl_sigint_safepoint(ct->ptls); - } if (jl_atomic_load_relaxed(&jl_gc_have_pending_finalizers) && unlocks && eh->locks_len == 0) { jl_gc_run_pending_finalizers(ct); diff --git a/src/safepoint.c b/src/safepoint.c index c13cd9687e753..c94dc048af09b 100644 --- a/src/safepoint.c +++ b/src/safepoint.c @@ -164,7 +164,7 @@ void jl_safepoint_end_gc(void) uv_cond_broadcast(&safepoint_cond); } -void jl_safepoint_wait_gc(void) +void jl_safepoint_wait_gc(void) JL_NOTSAFEPOINT { jl_task_t *ct = jl_current_task; (void)ct; JL_TIMING_SUSPEND_TASK(GC_SAFEPOINT, ct); @@ -187,33 +187,60 @@ void jl_safepoint_wait_gc(void) void jl_safepoint_wait_thread_resume(void) { jl_task_t *ct = jl_current_task; - JL_TIMING_SUSPEND_TASK(GC_SAFEPOINT, ct); + // n.b. we do not permit a fast-path here that skips the lock acquire since + // we otherwise have no synchronization point to ensure that this thread + // will observe the change to the safepoint, even though the other thread + // might have already observed our gc_state. + // if (!jl_atomic_load_relaxed(&ct->ptls->suspend_count)) return; + JL_TIMING_SUSPEND_TASK(USER, ct); int8_t state = jl_atomic_load_relaxed(&ct->ptls->gc_state); jl_atomic_store_release(&ct->ptls->gc_state, JL_GC_STATE_WAITING); uv_mutex_lock(&ct->ptls->sleep_lock); - while (jl_atomic_load(&ct->ptls->suspend_count)) + while (jl_atomic_load_relaxed(&ct->ptls->suspend_count)) uv_cond_wait(&ct->ptls->wake_signal, &ct->ptls->sleep_lock); uv_mutex_unlock(&ct->ptls->sleep_lock); jl_atomic_store_release(&ct->ptls->gc_state, state); } // n.b. suspended threads may still run in the GC or GC safe regions -int jl_safepoint_suspend_thread(int tid) +// waitstate = 0 : do not wait for suspend to finish +// waitstate = 1 : wait for gc_state != 0 (JL_GC_STATE_WAITING or JL_GC_STATE_SAFE) +// waitstate = 2 : wait for full suspend (gc_state == JL_GC_STATE_WAITING) +// if another thread comes along and calls jl_safepoint_resume, we also return early +// return new suspend count on success, 0 on failure +int jl_safepoint_suspend_thread(int tid, int waitstate) { if (0 > tid || tid >= jl_atomic_load_acquire(&jl_n_threads)) return 0; jl_ptls_t ptls2 = jl_atomic_load_relaxed(&jl_all_tls_states)[tid]; uv_mutex_lock(&ptls2->sleep_lock); int16_t suspend_count = jl_atomic_load_relaxed(&ptls2->suspend_count) + 1; - jl_atomic_store(&ptls2->suspend_count, suspend_count); // seq-cst to ensure this happens-before gc_state read + jl_atomic_store_relaxed(&ptls2->suspend_count, suspend_count); if (suspend_count == 1) // first to suspend jl_atomic_store_relaxed(&ptls2->safepoint, (size_t*)(jl_safepoint_pages + jl_page_size * 3 + sizeof(void*))); uv_mutex_unlock(&ptls2->sleep_lock); - while (jl_atomic_load_acquire(&ptls2->gc_state)) // wait for suspend - jl_cpu_pause(); // yield? + if (waitstate) { + // wait for suspend (or another thread to call resume) + if (waitstate == 2) { + // We currently cannot distinguish if a thread is helping run GC or + // not, so assume it is running GC and wait for GC to finish first. + // It will be unable to reenter helping with GC because we have + // changed its safepoint page. + jl_set_gc_and_wait(); + } + while (jl_atomic_load_acquire(&ptls2->suspend_count) != 0) { + int8_t state2 = jl_atomic_load_acquire(&ptls2->gc_state); + if (waitstate == 1 && state2 != 0) + break; + if (waitstate == 2 && state2 == JL_GC_STATE_WAITING) + break; + jl_cpu_pause(); // yield? + } + } return suspend_count; } +// return old suspend count on success, 0 on failure int jl_safepoint_resume_thread(int tid) JL_NOTSAFEPOINT { if (0 > tid || tid >= jl_atomic_load_acquire(&jl_n_threads)) @@ -229,7 +256,7 @@ int jl_safepoint_resume_thread(int tid) JL_NOTSAFEPOINT uv_cond_signal(&safepoint_cond); } if (suspend_count != 0) - jl_atomic_store(&ptls2->suspend_count, suspend_count - 1); + jl_atomic_store_relaxed(&ptls2->suspend_count, suspend_count - 1); uv_mutex_unlock(&ptls2->sleep_lock); return suspend_count; } diff --git a/src/signals-unix.c b/src/signals-unix.c index 2c20d5d295273..07d6d0bb72cc1 100644 --- a/src/signals-unix.c +++ b/src/signals-unix.c @@ -361,11 +361,15 @@ JL_NO_ASAN static void segv_handler(int sig, siginfo_t *info, void *context) } if (sig == SIGSEGV && info->si_code == SEGV_ACCERR && jl_addr_is_safepoint((uintptr_t)info->si_addr) && !is_write_fault(context)) { jl_set_gc_and_wait(); - while (jl_atomic_load(&ct->ptls->suspend_count)) // seq-cst to ensure this happens-after gc_state write - jl_safepoint_wait_thread_resume(); // Do not raise sigint on worker thread if (jl_atomic_load_relaxed(&ct->tid) != 0) return; + // n.b. if the user might have seen that we were in a state where it + // was safe to run GC concurrently, we might briefly enter a state + // where our execution is not consistent with the gc_state of this + // thread. That will quickly be rectified when we rerun the faulting + // instruction and end up right back here, or we start to run the + // exception handler and immediately hit the safepoint there. if (ct->ptls->defer_signal) { jl_safepoint_defer_sigint(); } From 0ab09ec770bedccf37d43ad734a6c5991a97608e Mon Sep 17 00:00:00 2001 From: Jameson Nash Date: Thu, 28 Sep 2023 01:00:55 +0200 Subject: [PATCH 3/4] draft 3 --- src/jl_exported_funcs.inc | 2 + src/julia.h | 2 + src/julia_internal.h | 2 + src/safepoint.c | 32 +++++++++---- src/signals-mach.c | 97 ++++++++++++++++++++++++++------------- 5 files changed, 95 insertions(+), 40 deletions(-) diff --git a/src/jl_exported_funcs.inc b/src/jl_exported_funcs.inc index 228c88109f322..a47d191dbef76 100644 --- a/src/jl_exported_funcs.inc +++ b/src/jl_exported_funcs.inc @@ -415,6 +415,8 @@ XX(jl_rethrow_other) \ XX(jl_running_on_valgrind) \ XX(jl_safe_printf) \ + XX(jl_safepoint_suspend_thread) \ + XX(jl_safepoint_resume_thread) \ XX(jl_SC_CLK_TCK) \ XX(jl_set_ARGS) \ XX(jl_set_const) \ diff --git a/src/julia.h b/src/julia.h index 07f8459d37238..cd4cde8d0639e 100644 --- a/src/julia.h +++ b/src/julia.h @@ -1053,6 +1053,8 @@ JL_DLLEXPORT void *jl_gc_managed_malloc(size_t sz); JL_DLLEXPORT void *jl_gc_managed_realloc(void *d, size_t sz, size_t oldsz, int isaligned, jl_value_t *owner); JL_DLLEXPORT void jl_gc_safepoint(void); +JL_DLLEXPORT int jl_safepoint_suspend_thread(int tid, int waitstate); +JL_DLLEXPORT int jl_safepoint_resume_thread(int tid) JL_NOTSAFEPOINT; void *mtarraylist_get(small_arraylist_t *_a, size_t idx) JL_NOTSAFEPOINT; size_t mtarraylist_length(small_arraylist_t *_a) JL_NOTSAFEPOINT; diff --git a/src/julia_internal.h b/src/julia_internal.h index 699a5438f8c88..2a9e60c5f15c2 100644 --- a/src/julia_internal.h +++ b/src/julia_internal.h @@ -919,6 +919,7 @@ void jl_safepoint_end_gc(void); // This function does **NOT** modify the `gc_state` to inform the GC thread // The caller should set it **BEFORE** calling this function. void jl_safepoint_wait_gc(void) JL_NOTSAFEPOINT; +void jl_safepoint_wait_thread_resume(void) JL_NOTSAFEPOINT; // Set pending sigint and enable the mechanisms to deliver the sigint. void jl_safepoint_enable_sigint(void); @@ -1397,6 +1398,7 @@ extern JL_DLLEXPORT jl_mutex_t jl_codegen_lock; #if defined(__APPLE__) void jl_mach_gc_end(void) JL_NOTSAFEPOINT; +void jl_safepoint_resume_thread_mach(jl_ptls_t ptls2, int16_t tid2) JL_NOTSAFEPOINT; #endif // -- smallintset.c -- // diff --git a/src/safepoint.c b/src/safepoint.c index c94dc048af09b..092d1225a411c 100644 --- a/src/safepoint.c +++ b/src/safepoint.c @@ -105,7 +105,7 @@ void jl_safepoint_init(void) jl_gc_debug_critical_error(); abort(); } - char *pageaddr = jl_safepoint_pages + jl_page_size * 3; + char *pageaddr = addr + jl_page_size * 3; #ifdef _OS_WINDOWS_ DWORD old_prot; VirtualProtect(pageaddr, jl_page_size, PAGE_NOACCESS, &old_prot); @@ -121,6 +121,7 @@ int jl_safepoint_start_gc(void) { // The thread should have set this already assert(jl_atomic_load_relaxed(&jl_current_task->ptls->gc_state) == JL_GC_STATE_WAITING); + jl_safepoint_wait_thread_resume(); // make sure we are permitted to run GC now (we might be required to stop instead) uv_mutex_lock(&safepoint_lock); // In case multiple threads enter the GC at the same time, only allow // one of them to actually run the collection. We can't just let the @@ -156,7 +157,7 @@ void jl_safepoint_end_gc(void) jl_safepoint_disable(2); jl_safepoint_disable(1); jl_atomic_store_release(&jl_gc_running, 0); -# ifdef __APPLE__ +# ifdef _OS_DARWIN_ // This wakes up other threads on mac. jl_mach_gc_end(); # endif @@ -164,6 +165,7 @@ void jl_safepoint_end_gc(void) uv_cond_broadcast(&safepoint_cond); } +// this is the core of jl_set_gc_and_wait void jl_safepoint_wait_gc(void) JL_NOTSAFEPOINT { jl_task_t *ct = jl_current_task; (void)ct; @@ -198,14 +200,19 @@ void jl_safepoint_wait_thread_resume(void) uv_mutex_lock(&ct->ptls->sleep_lock); while (jl_atomic_load_relaxed(&ct->ptls->suspend_count)) uv_cond_wait(&ct->ptls->wake_signal, &ct->ptls->sleep_lock); - uv_mutex_unlock(&ct->ptls->sleep_lock); + // must while still holding the mutex_unlock, so we know other threads in + // jl_safepoint_suspend_thread will observe this thread in the correct GC + // state, and not still stuck in JL_GC_STATE_WAITING jl_atomic_store_release(&ct->ptls->gc_state, state); + uv_mutex_unlock(&ct->ptls->sleep_lock); } // n.b. suspended threads may still run in the GC or GC safe regions +// but shouldn't be observable, depending on which enum the user picks (only 1 and 2 are typically recommended here) // waitstate = 0 : do not wait for suspend to finish // waitstate = 1 : wait for gc_state != 0 (JL_GC_STATE_WAITING or JL_GC_STATE_SAFE) -// waitstate = 2 : wait for full suspend (gc_state == JL_GC_STATE_WAITING) +// waitstate = 2 : wait for gc_state != 0 (JL_GC_STATE_WAITING or JL_GC_STATE_SAFE) and that GC is not running on that thread +// waitstate = 3 : wait for full suspend (gc_state == JL_GC_STATE_WAITING) -- this may never happen if thread is sleeping currently // if another thread comes along and calls jl_safepoint_resume, we also return early // return new suspend count on success, 0 on failure int jl_safepoint_suspend_thread(int tid, int waitstate) @@ -221,7 +228,7 @@ int jl_safepoint_suspend_thread(int tid, int waitstate) uv_mutex_unlock(&ptls2->sleep_lock); if (waitstate) { // wait for suspend (or another thread to call resume) - if (waitstate == 2) { + if (waitstate >= 2) { // We currently cannot distinguish if a thread is helping run GC or // not, so assume it is running GC and wait for GC to finish first. // It will be unable to reenter helping with GC because we have @@ -230,9 +237,9 @@ int jl_safepoint_suspend_thread(int tid, int waitstate) } while (jl_atomic_load_acquire(&ptls2->suspend_count) != 0) { int8_t state2 = jl_atomic_load_acquire(&ptls2->gc_state); - if (waitstate == 1 && state2 != 0) + if (waitstate <= 2 && state2 != 0) break; - if (waitstate == 2 && state2 == JL_GC_STATE_WAITING) + if (waitstate == 3 && state2 == JL_GC_STATE_WAITING) break; jl_cpu_pause(); // yield? } @@ -246,6 +253,9 @@ int jl_safepoint_resume_thread(int tid) JL_NOTSAFEPOINT if (0 > tid || tid >= jl_atomic_load_acquire(&jl_n_threads)) return 0; jl_ptls_t ptls2 = jl_atomic_load_relaxed(&jl_all_tls_states)[tid]; +# ifdef _OS_DARWIN_ + uv_mutex_lock(&safepoint_lock); +# endif uv_mutex_lock(&ptls2->sleep_lock); int16_t suspend_count = jl_atomic_load_relaxed(&ptls2->suspend_count); if (suspend_count == 1) { // last to unsuspend @@ -253,11 +263,17 @@ int jl_safepoint_resume_thread(int tid) JL_NOTSAFEPOINT jl_atomic_store_relaxed(&ptls2->safepoint, (size_t*)(jl_safepoint_pages + jl_page_size)); else jl_atomic_store_relaxed(&ptls2->safepoint, (size_t*)(jl_safepoint_pages + jl_page_size * 2 + sizeof(void*))); - uv_cond_signal(&safepoint_cond); + uv_cond_signal(&ptls2->wake_signal); +#ifdef _OS_DARWIN_ + jl_safepoint_resume_thread_mach(ptls2, tid); +#endif } if (suspend_count != 0) jl_atomic_store_relaxed(&ptls2->suspend_count, suspend_count - 1); uv_mutex_unlock(&ptls2->sleep_lock); +# ifdef _OS_DARWIN_ + uv_mutex_unlock(&safepoint_lock); +# endif return suspend_count; } diff --git a/src/signals-mach.c b/src/signals-mach.c index 6ec8f95570f17..ebc54d35a5b46 100644 --- a/src/signals-mach.c +++ b/src/signals-mach.c @@ -45,50 +45,84 @@ static void attach_exception_port(thread_port_t thread, int segv_only); // low 16 bits are the thread id, the next 8 bits are the original gc_state static arraylist_t suspended_threads; extern uv_mutex_t safepoint_lock; -extern uv_cond_t safepoint_cond; -void jl_mach_gc_end(void) + +// see jl_safepoint_wait_thread_resume +void jl_safepoint_resume_thread_mach(jl_ptls_t ptls2, int16_t tid2) { - // Requires the safepoint lock to be held + // must be called with uv_mutex_lock(&safepoint_lock) and uv_mutex_lock(&ptls2->sleep_lock) held (in that order) for (size_t i = 0; i < suspended_threads.len; i++) { uintptr_t item = (uintptr_t)suspended_threads.items[i]; int16_t tid = (int16_t)item; int8_t gc_state = (int8_t)(item >> 8); - jl_ptls_t ptls2 = jl_atomic_load_relaxed(&jl_all_tls_states)[tid]; + if (tid != tid2) + continue; jl_atomic_store_release(&ptls2->gc_state, gc_state); thread_resume(pthread_mach_thread_np(ptls2->system_id)); + suspended_threads.items[i] = suspended_threads.items[--suspended_threads.len]; + break; + } + // thread hadn't actually reached a jl_mach_gc_wait call where we suspended it +} + +void jl_mach_gc_end(void) +{ + // must be called with uv_mutex_lock(&safepoint_lock) held + size_t j = 0; + for (size_t i = 0; i < suspended_threads.len; i++) { + uintptr_t item = (uintptr_t)suspended_threads.items[i]; + int16_t tid = (int16_t)item; + int8_t gc_state = (int8_t)(item >> 8); + jl_ptls_t ptls2 = jl_atomic_load_relaxed(&jl_all_tls_states)[tid]; + uv_mutex_lock(&ptls2->sleep_lock); + if (jl_atomic_load_relaxed(&ptls2->suspend_count) == 0) { + jl_atomic_store_release(&ptls2->gc_state, gc_state); + thread_resume(pthread_mach_thread_np(ptls2->system_id)); + } + else { + // this is the check for jl_safepoint_wait_thread_resume + suspended_threads.items[j++] = (void*)item; + } + uv_mutex_unlock(&ptls2->sleep_lock); } - suspended_threads.len = 0; + suspended_threads.len = j; } -// Suspend the thread and return `1` if the GC is running. -// Otherwise return `0` -static int jl_mach_gc_wait(jl_ptls_t ptls2, - mach_port_t thread, int16_t tid) +// implement jl_set_gc_and_wait from a different thread +static void jl_mach_gc_wait(jl_ptls_t ptls2, mach_port_t thread, int16_t tid) { + // relaxed, since we don't mind missing one--we will hit another soon (immediately probably) uv_mutex_lock(&safepoint_lock); - if (!jl_atomic_load_relaxed(&jl_gc_running)) { - // relaxed, since gets set to zero only while the safepoint_lock was held - // this means we can tell if GC is done before we got the message or - // the safepoint was enabled for SIGINT. - uv_mutex_unlock(&safepoint_lock); - return 0; + // Since this gets set to zero only while the safepoint_lock was held this + // means we can tell for sure if GC is done before we got the message or + // the safepoint was enabled for SIGINT instead. + int doing_gc = jl_atomic_load_relaxed(&jl_gc_running); + int do_suspend = doing_gc; + int relaxed_suspend_count = !doing_gc && jl_atomic_load_relaxed(&ptls2->suspend_count) != 0; + if (relaxed_suspend_count) { + uv_mutex_lock(&ptls2->sleep_lock); + do_suspend = jl_atomic_load_relaxed(&ptls2->suspend_count) != 0; + // only do_suspend while holding the sleep_lock, otherwise we might miss a resume + } + if (do_suspend) { + // Set the gc state of the thread, suspend and record it + // + // TODO: TSAN will complain that it never saw the faulting task do an + // atomic release (it was in the kernel). And our attempt here does + // nothing, since we are a different thread, and it is not transitive). + // + // This also means we are not making this thread available for GC work. + // Eventually, we should probably release this signal to the original + // thread, (return KERN_FAILURE instead of KERN_SUCCESS) so that it + // triggers a SIGSEGV and gets handled by the usual codepath for unix. + int8_t gc_state = ptls2->gc_state; + jl_atomic_store_release(&ptls2->gc_state, JL_GC_STATE_WAITING); + uintptr_t item = tid | (((uintptr_t)gc_state) << 16); + arraylist_push(&suspended_threads, (void*)item); + thread_suspend(thread); } - // Otherwise, set the gc state of the thread, suspend and record it - // TODO: TSAN will complain that it never saw the faulting task do an - // atomic release (it was in the kernel). And our attempt here does - // nothing, since we are a different thread, and it is not transitive). - // - // This also means we are not making this thread available for GC work. - // Eventually, we should probably release this signal to the original - // thread, (return KERN_FAILURE instead of KERN_SUCCESS) so that it - // triggers a SIGSEGV and gets handled by the usual codepath for unix. - int8_t gc_state = ptls2->gc_state; - jl_atomic_store_release(&ptls2->gc_state, JL_GC_STATE_WAITING); - uintptr_t item = tid | (((uintptr_t)gc_state) << 16); - arraylist_push(&suspended_threads, (void*)item); - thread_suspend(thread); + if (relaxed_suspend_count) + uv_mutex_unlock(&ptls2->sleep_lock); uv_mutex_unlock(&safepoint_lock); - return 1; } static mach_port_t segv_port = 0; @@ -314,8 +348,7 @@ kern_return_t catch_mach_exception_raise( kern_return_t ret = thread_get_state(thread, HOST_EXCEPTION_STATE, (thread_state_t)&exc_state, &exc_count); HANDLE_MACH_ERROR("thread_get_state", ret); if (jl_addr_is_safepoint(fault_addr) && !is_write_fault(exc_state)) { - if (jl_mach_gc_wait(ptls2, thread, tid)) - return KERN_SUCCESS; + jl_mach_gc_wait(ptls2, thread, tid); if (ptls2->tid != 0) return KERN_SUCCESS; if (ptls2->defer_signal) { From 9e65521f67d22aca14993087e242c486498bfdb9 Mon Sep 17 00:00:00 2001 From: Jameson Nash Date: Thu, 28 Sep 2023 02:26:09 +0200 Subject: [PATCH 4/4] draft 4 --- src/safepoint.c | 45 +++++++++++++++++++++++++++++---------------- 1 file changed, 29 insertions(+), 16 deletions(-) diff --git a/src/safepoint.c b/src/safepoint.c index 092d1225a411c..5a845496f36c6 100644 --- a/src/safepoint.c +++ b/src/safepoint.c @@ -30,8 +30,8 @@ char *jl_safepoint_pages = NULL; // so that both safepoint load and pending signal load falls in this page. // The initialization of the `safepoint` pointer is done `ti_initthread` // in `threading.c`. -// The fourth page is always disabled, so not tracked here. -uint8_t jl_safepoint_enable_cnt[3] = {0, 0, 0}; +// The fourth page is the count of suspended threads +uint16_t jl_safepoint_enable_cnt[4] = {0, 0, 0, 0}; // This lock should be acquired before enabling/disabling the safepoint // or accessing one of the following variables: @@ -49,12 +49,12 @@ uv_cond_t safepoint_cond; static void jl_safepoint_enable(int idx) JL_NOTSAFEPOINT { // safepoint_lock should be held - assert(0 <= idx && idx < 3); + assert(0 <= idx && idx <= 3); if (jl_safepoint_enable_cnt[idx]++ != 0) { // We expect this to be enabled at most twice // one for the GC, one for SIGINT. // Update this if this is not the case anymore in the future. - assert(jl_safepoint_enable_cnt[idx] <= 2); + assert(jl_safepoint_enable_cnt[idx] <= (idx == 3 ? INT16_MAX : 2)); return; } // Now that we are requested to mprotect the page and it wasn't already. @@ -63,14 +63,15 @@ static void jl_safepoint_enable(int idx) JL_NOTSAFEPOINT DWORD old_prot; VirtualProtect(pageaddr, jl_page_size, PAGE_NOACCESS, &old_prot); #else - mprotect(pageaddr, jl_page_size, PROT_NONE); + int r = mprotect(pageaddr, jl_page_size, PROT_NONE); + (void)r; //if (r) perror("mprotect"); #endif } static void jl_safepoint_disable(int idx) JL_NOTSAFEPOINT { // safepoint_lock should be held - assert(0 <= idx && idx < 3); + assert(0 <= idx && idx <= 3); if (--jl_safepoint_enable_cnt[idx] != 0) { assert(jl_safepoint_enable_cnt[idx] > 0); return; @@ -82,7 +83,8 @@ static void jl_safepoint_disable(int idx) JL_NOTSAFEPOINT DWORD old_prot; VirtualProtect(pageaddr, jl_page_size, PAGE_READONLY, &old_prot); #else - mprotect(pageaddr, jl_page_size, PROT_READ); + int r = mprotect(pageaddr, jl_page_size, PROT_READ); + (void)r; //if (r) perror("mprotect"); #endif } @@ -105,13 +107,18 @@ void jl_safepoint_init(void) jl_gc_debug_critical_error(); abort(); } - char *pageaddr = addr + jl_page_size * 3; -#ifdef _OS_WINDOWS_ - DWORD old_prot; - VirtualProtect(pageaddr, jl_page_size, PAGE_NOACCESS, &old_prot); -#else - mprotect(pageaddr, jl_page_size, PROT_NONE); -#endif +// // If we able to skip past the faulting safepoint instruction conditionally, +// // then we can make this safepoint page unconditional. But otherwise we +// // only enable this page when required, though it gives us less +// // fine-grained control over individual resume. +// char *pageaddr = addr + pgsz * 3; +//#ifdef _OS_WINDOWS_ +// DWORD old_prot; +// VirtualProtect(pageaddr, pgsz, PAGE_NOACCESS, &old_prot); +//#else +// int r = mprotect(pageaddr, pgsz, PROT_NONE); +// (void)r; //if (r) perror("mprotect"); +//#endif // The signal page is for the gc safepoint. // The page before it is the sigint pending flag. jl_safepoint_pages = addr; @@ -223,8 +230,10 @@ int jl_safepoint_suspend_thread(int tid, int waitstate) uv_mutex_lock(&ptls2->sleep_lock); int16_t suspend_count = jl_atomic_load_relaxed(&ptls2->suspend_count) + 1; jl_atomic_store_relaxed(&ptls2->suspend_count, suspend_count); - if (suspend_count == 1) // first to suspend + if (suspend_count == 1) { // first to suspend + jl_safepoint_enable(3); jl_atomic_store_relaxed(&ptls2->safepoint, (size_t*)(jl_safepoint_pages + jl_page_size * 3 + sizeof(void*))); + } uv_mutex_unlock(&ptls2->sleep_lock); if (waitstate) { // wait for suspend (or another thread to call resume) @@ -248,6 +257,7 @@ int jl_safepoint_suspend_thread(int tid, int waitstate) } // return old suspend count on success, 0 on failure +// n.b. threads often do not resume until after all suspended threads have been resumed! int jl_safepoint_resume_thread(int tid) JL_NOTSAFEPOINT { if (0 > tid || tid >= jl_atomic_load_acquire(&jl_n_threads)) @@ -268,8 +278,11 @@ int jl_safepoint_resume_thread(int tid) JL_NOTSAFEPOINT jl_safepoint_resume_thread_mach(ptls2, tid); #endif } - if (suspend_count != 0) + if (suspend_count != 0) { jl_atomic_store_relaxed(&ptls2->suspend_count, suspend_count - 1); + if (suspend_count == 1) + jl_safepoint_disable(3); + } uv_mutex_unlock(&ptls2->sleep_lock); # ifdef _OS_DARWIN_ uv_mutex_unlock(&safepoint_lock);