From bc1512d49e02acb302566ae3b9d93acf70b894a2 Mon Sep 17 00:00:00 2001 From: Stu Hood Date: Fri, 16 Apr 2021 15:07:47 -0700 Subject: [PATCH] Wait for all Sessions during pantsd shutdown (#11929) As described in #11618, when `pantsd` intentionally exits due to low memory, a few types of work can be cut short: 1. if the run ends in Ctrl+C, processes that were cancelled may not have had time to be dropped before `pantsd exits. 2. async StreamingWorkunitHandler threads might still be running. This change adds orderly-shutdown mechanisms to the `Scheduler`/`Core` to join all ongoing `Sessions` (including the SWH), and improves tests to ensure that the SWH is waited for. Additionally, in the last commit, added purging of the `pantsd` metadata as soon as we decide to restart, which should reduce (but probably not eliminate) the incidence of item 1. from #11618. Work for #11831 will likely further harden this path. [ci skip-build-wheels] --- src/python/pants/engine/BUILD | 1 + .../pants/engine/internals/native_engine.pyi | 4 +- .../pants/engine/internals/scheduler.py | 11 ++- .../engine/streaming_workunit_handler.py | 2 +- ...aming_workunit_handler_integration_test.py | 23 +++-- src/python/pants/pantsd/pants_daemon.py | 7 +- src/python/pants/pantsd/pants_daemon_core.py | 9 ++ .../pants/pantsd/service/scheduler_service.py | 1 + .../pants/pantsd/service/store_gc_service.py | 1 + src/rust/engine/src/context.rs | 13 +++ src/rust/engine/src/externs/interface.rs | 41 ++++++-- src/rust/engine/src/session.rs | 99 ++++++++++++++----- .../src/python/workunit_logger/register.py | 6 +- 13 files changed, 175 insertions(+), 43 deletions(-) diff --git a/src/python/pants/engine/BUILD b/src/python/pants/engine/BUILD index cdebd42b57b..9459529d138 100644 --- a/src/python/pants/engine/BUILD +++ b/src/python/pants/engine/BUILD @@ -17,4 +17,5 @@ python_integration_tests( # Loaded reflectively as a backend in `streaming_workunit_handler_integration_test.py`. 'testprojects/pants-plugins/src/python/workunit_logger', ], + timeout=120, ) diff --git a/src/python/pants/engine/internals/native_engine.pyi b/src/python/pants/engine/internals/native_engine.pyi index b6fa6676183..388c7abcf39 100644 --- a/src/python/pants/engine/internals/native_engine.pyi +++ b/src/python/pants/engine/internals/native_engine.pyi @@ -112,6 +112,7 @@ def scheduler_execute( scheduler: PyScheduler, session: PySession, execution_request: PyExecutionRequest ) -> tuple: ... def scheduler_metrics(scheduler: PyScheduler, session: PySession) -> dict[str, int]: ... +def scheduler_shutdown(scheduler: PyScheduler, timeout_secs: int) -> None: ... def session_new_run_id(session: PySession) -> None: ... def session_poll_workunits( scheduler: PyScheduler, session: PySession, max_log_verbosity_level: int @@ -120,7 +121,7 @@ def session_get_observation_histograms(scheduler: PyScheduler, session: PySessio def session_record_test_observation( scheduler: PyScheduler, session: PySession, value: int ) -> None: ... -def session_isolated_shallow_clone(session: PySession) -> PySession: ... +def session_isolated_shallow_clone(session: PySession, build_id: str) -> PySession: ... def all_counter_names() -> list[str]: ... def graph_len(scheduler: PyScheduler) -> int: ... def graph_visualize(scheduler: PyScheduler, session: PySession, path: str) -> None: ... @@ -199,6 +200,7 @@ class PySession: session_values: SessionValues, cancellation_latch: PySessionCancellationLatch, ) -> None: ... + def cancel(self) -> None: ... class PySessionCancellationLatch: def __init__(self) -> None: ... diff --git a/src/python/pants/engine/internals/scheduler.py b/src/python/pants/engine/internals/scheduler.py index 2e79dfa107f..bf61c567dec 100644 --- a/src/python/pants/engine/internals/scheduler.py +++ b/src/python/pants/engine/internals/scheduler.py @@ -311,6 +311,9 @@ def new_session( ), ) + def shutdown(self, timeout_secs: int = 60) -> None: + native_engine.scheduler_shutdown(self.py_scheduler, timeout_secs) + class _PathGlobsAndRootCollection(Collection[PathGlobsAndRoot]): pass @@ -339,9 +342,10 @@ def py_scheduler(self) -> PyScheduler: def py_session(self) -> PySession: return self._py_session - def isolated_shallow_clone(self) -> SchedulerSession: + def isolated_shallow_clone(self, build_id: str) -> SchedulerSession: return SchedulerSession( - self._scheduler, native_engine.session_isolated_shallow_clone(self._py_session) + self._scheduler, + native_engine.session_isolated_shallow_clone(self._py_session, build_id), ) def poll_workunits(self, max_log_verbosity: LogLevel) -> PolledWorkunits: @@ -614,6 +618,9 @@ def get_observation_histograms(self) -> dict: def record_test_observation(self, value: int) -> None: native_engine.session_record_test_observation(self.py_scheduler, self.py_session, value) + def cancel(self) -> None: + self.py_session.cancel() + def register_rules(rule_index: RuleIndex, union_membership: UnionMembership) -> PyTasks: """Create a native Tasks object loaded with given RuleIndex.""" diff --git a/src/python/pants/engine/streaming_workunit_handler.py b/src/python/pants/engine/streaming_workunit_handler.py index 72ba2f12e2d..be98fdf31f7 100644 --- a/src/python/pants/engine/streaming_workunit_handler.py +++ b/src/python/pants/engine/streaming_workunit_handler.py @@ -185,7 +185,7 @@ def __init__( pantsd: bool, max_workunit_verbosity: LogLevel = LogLevel.TRACE, ) -> None: - scheduler = scheduler.isolated_shallow_clone() + scheduler = scheduler.isolated_shallow_clone("streaming_workunit_handler_session") self.callbacks = callbacks self.context = StreamingWorkunitContext( _scheduler=scheduler, diff --git a/src/python/pants/engine/streaming_workunit_handler_integration_test.py b/src/python/pants/engine/streaming_workunit_handler_integration_test.py index 85dac59f371..8507e0b7b79 100644 --- a/src/python/pants/engine/streaming_workunit_handler_integration_test.py +++ b/src/python/pants/engine/streaming_workunit_handler_integration_test.py @@ -32,14 +32,19 @@ def run(args: List[str], success: bool = True) -> Tuple[PantsResult, str | None] with setup_tmpdir({}) as tmpdir: dest = os.path.join(tmpdir, "dest.log") pants_run = run_pants(args, config=workunit_logger_config(dest)) - log_content = maybe_read_file(dest) if success: pants_run.assert_success() - assert log_content - assert FINISHED_SUCCESSFULLY in log_content + confirm_eventual_success(dest) else: pants_run.assert_failure() - return pants_run, log_content + return pants_run, maybe_read_file(dest) + + +def confirm_eventual_success(log_dest: str) -> None: + for _ in attempts("The log should eventually show that the SWH shut down."): + content = maybe_read_file(log_dest) + if content and FINISHED_SUCCESSFULLY in content: + break def test_list() -> None: @@ -61,7 +66,9 @@ def test_ctrl_c() -> None: os.kill(client_pid, signal.SIGINT) # Confirm that finish is still called (even though it may be backgrounded in the server). - for _ in attempts("The log should eventually show that the SWH shut down."): - content = maybe_read_file(dest) - if content and FINISHED_SUCCESSFULLY in content: - break + confirm_eventual_success(dest) + + +def test_restart() -> None: + # Will trigger a restart + run(["--pantsd-max-memory-usage=1", "roots"]) diff --git a/src/python/pants/pantsd/pants_daemon.py b/src/python/pants/pantsd/pants_daemon.py index 03a5a1a9bb2..4e052ba0eb1 100644 --- a/src/python/pants/pantsd/pants_daemon.py +++ b/src/python/pants/pantsd/pants_daemon.py @@ -201,9 +201,14 @@ def run_sync(self): while self._core.is_valid(): time.sleep(self.JOIN_TIMEOUT_SECONDS) - # We're exiting: join the server to avoid interrupting ongoing runs. + # We're exiting: purge our metadata to prevent new connections, then join the server + # to avoid interrupting ongoing runs. + self.purge_metadata(force=True) self._logger.info("Waiting for ongoing runs to complete before exiting...") native_engine.nailgun_server_await_shutdown(self._server) + # Then shutdown the PantsDaemonCore, which will shut down any live Scheduler. + self._logger.info("Waiting for Sessions to complete before exiting...") + self._core.shutdown() self._logger.info("Exiting pantsd") diff --git a/src/python/pants/pantsd/pants_daemon_core.py b/src/python/pants/pantsd/pants_daemon_core.py index bc59f99c9fe..2da4c71ecfe 100644 --- a/src/python/pants/pantsd/pants_daemon_core.py +++ b/src/python/pants/pantsd/pants_daemon_core.py @@ -129,3 +129,12 @@ def prepare( self._initialize(options_fingerprint, options_bootstrapper, env) assert self._scheduler is not None return self._scheduler, self._options_initializer + + def shutdown(self) -> None: + with self._lifecycle_lock: + if self._services is not None: + self._services.shutdown() + self._services = None + if self._scheduler is not None: + self._scheduler.scheduler.shutdown() + self._scheduler = None diff --git a/src/python/pants/pantsd/service/scheduler_service.py b/src/python/pants/pantsd/service/scheduler_service.py index a0e2dde3db4..d37d7fb525b 100644 --- a/src/python/pants/pantsd/service/scheduler_service.py +++ b/src/python/pants/pantsd/service/scheduler_service.py @@ -155,3 +155,4 @@ def run(self): # Watcher failed for some reason self._logger.critical(f"The scheduler was invalidated: {e!r}") self.terminate() + self._scheduler_session.cancel() diff --git a/src/python/pants/pantsd/service/store_gc_service.py b/src/python/pants/pantsd/service/store_gc_service.py index cf63a92e865..697b99c754a 100644 --- a/src/python/pants/pantsd/service/store_gc_service.py +++ b/src/python/pants/pantsd/service/store_gc_service.py @@ -84,3 +84,4 @@ def run(self): except Exception as e: self._logger.critical(f"GC failed: {e!r}") self.terminate() + self._scheduler_session.cancel() diff --git a/src/rust/engine/src/context.rs b/src/rust/engine/src/context.rs index 5abe173cf8e..98a0e834e64 100644 --- a/src/rust/engine/src/context.rs +++ b/src/rust/engine/src/context.rs @@ -412,6 +412,19 @@ impl Core { pub fn store(&self) -> Store { self.store.clone() } + + /// + /// Shuts down this Core. + /// + pub async fn shutdown(&self, timeout: Duration) { + // Shutdown the Sessions, which will prevent new work from starting and then await any ongoing + // work. + if let Err(msg) = self.sessions.shutdown(timeout).await { + log::warn!("During shutdown: {}", msg); + } + // Then clear the Graph to ensure that drop handlers run (particular for running processes). + self.graph.clear(); + } } pub struct InvalidatableGraph(Graph); diff --git a/src/rust/engine/src/externs/interface.rs b/src/rust/engine/src/externs/interface.rs index c9136af1037..ae6ba7c9514 100644 --- a/src/rust/engine/src/externs/interface.rs +++ b/src/rust/engine/src/externs/interface.rs @@ -324,7 +324,7 @@ py_module_initializer!(native_engine, |py, m| { m.add( py, "session_isolated_shallow_clone", - py_fn!(py, session_isolated_shallow_clone(a: PySession)), + py_fn!(py, session_isolated_shallow_clone(a: PySession, b: String)), )?; m.add(py, "all_counter_names", py_fn!(py, all_counter_names()))?; @@ -397,6 +397,11 @@ py_module_initializer!(native_engine, |py, m| { ) ), )?; + m.add( + py, + "scheduler_shutdown", + py_fn!(py, scheduler_shutdown(a: PyScheduler, b: u64)), + )?; m.add( py, @@ -627,14 +632,19 @@ py_class!(class PySession |py| { session_values: PyObject, cancellation_latch: PySessionCancellationLatch, ) -> CPyResult { - Self::create_instance(py, Session::new( + let session = Session::new( scheduler.scheduler(py), should_render_ui, build_id, session_values.into(), cancellation_latch.cancelled(py).clone(), - ) - ) + ).map_err(|err_str| PyErr::new::(py, (err_str,)))?; + Self::create_instance(py, session) + } + + def cancel(&self) -> PyUnitResult { + self.session(py).cancel(); + Ok(None) } }); @@ -1188,6 +1198,18 @@ fn scheduler_metrics( }) } +fn scheduler_shutdown(py: Python, scheduler_ptr: PyScheduler, timeout_secs: u64) -> PyUnitResult { + with_scheduler(py, scheduler_ptr, |scheduler| { + py.allow_threads(|| { + scheduler + .core + .executor + .block_on(scheduler.core.shutdown(Duration::from_secs(timeout_secs))); + }) + }); + Ok(None) +} + fn all_counter_names(_: Python) -> CPyResult> { Ok(Metric::all_metrics()) } @@ -1427,9 +1449,16 @@ fn session_record_test_observation( }) } -fn session_isolated_shallow_clone(py: Python, session_ptr: PySession) -> CPyResult { +fn session_isolated_shallow_clone( + py: Python, + session_ptr: PySession, + build_id: String, +) -> CPyResult { with_session(py, session_ptr, |session| { - PySession::create_instance(py, session.isolated_shallow_clone()) + let session_clone = session + .isolated_shallow_clone(build_id) + .map_err(|e| PyErr::new::(py, (e,)))?; + PySession::create_instance(py, session_clone) }) } diff --git a/src/rust/engine/src/session.rs b/src/rust/engine/src/session.rs index 92e74c9a0ca..e815bae347f 100644 --- a/src/rust/engine/src/session.rs +++ b/src/rust/engine/src/session.rs @@ -12,7 +12,7 @@ use crate::nodes::{NodeKey, Select}; use crate::scheduler::Scheduler; use async_latch::AsyncLatch; -use futures::future::{AbortHandle, Abortable}; +use futures::future::{self, AbortHandle, Abortable}; use futures::FutureExt; use graph::LastObserved; use log::warn; @@ -79,8 +79,6 @@ struct SessionState { roots: Mutex>>, // A place to store info about workunits in rust part workunit_store: WorkunitStore, - // The unique id for this Session: used for metrics gathering purposes. - build_id: String, // Per-Session values that have been set for this session. session_values: Mutex, // An id used to control the visibility of uncacheable rules. Generally this is identical for an @@ -98,6 +96,8 @@ struct SessionState { /// A cancellable handle to a Session, with an optional associated UI. /// struct SessionHandle { + // The unique id for this Session: used for metrics gathering purposes. + build_id: String, // Whether or not this Session has been cancelled. If a Session has been cancelled, all work that // it started should attempt to exit in an orderly fashion. cancelled: AsyncLatch, @@ -114,6 +114,12 @@ impl SessionHandle { } } +impl Drop for SessionHandle { + fn drop(&mut self) { + self.cancelled.trigger(); + } +} + /// /// A Session represents a related series of requests (generally: one run of the pants CLI) on an /// underlying Scheduler, and is a useful scope for metrics. @@ -138,7 +144,7 @@ impl Session { build_id: String, session_values: Value, cancelled: AsyncLatch, - ) -> Session { + ) -> Result { let workunit_store = WorkunitStore::new(!should_render_ui); let display = Mutex::new(SessionDisplay::new( &workunit_store, @@ -146,21 +152,24 @@ impl Session { should_render_ui, )); - let handle = Arc::new(SessionHandle { cancelled, display }); - scheduler.core.sessions.add(&handle); - Session { + let handle = Arc::new(SessionHandle { + cancelled, + build_id, + display, + }); + scheduler.core.sessions.add(&handle)?; + Ok(Session { handle, state: Arc::new(SessionState { core: scheduler.core.clone(), preceding_graph_size: scheduler.core.graph.len(), roots: Mutex::new(HashMap::new()), workunit_store, - build_id, session_values: Mutex::new(session_values), run_id: Mutex::new(Uuid::new_v4()), workunit_metadata_map: RwLock::new(HashMap::new()), }), - } + }) } /// @@ -170,21 +179,22 @@ impl Session { /// Useful when executing background work "on behalf of a Session" which should not be torn down /// when a client disconnects. /// - pub fn isolated_shallow_clone(&self) -> Session { + pub fn isolated_shallow_clone(&self, build_id: String) -> Result { let display = Mutex::new(SessionDisplay::new( &self.state.workunit_store, self.state.core.local_parallelism, false, )); let handle = Arc::new(SessionHandle { + build_id, cancelled: AsyncLatch::new(), display, }); - self.state.core.sessions.add(&handle); - Session { + self.state.core.sessions.add(&handle)?; + Ok(Session { handle, state: self.state.clone(), - } + }) } pub fn core(&self) -> &Arc { @@ -246,7 +256,7 @@ impl Session { } pub fn build_id(&self) -> &String { - &self.state.build_id + &self.handle.build_id } pub fn run_id(&self) -> Uuid { @@ -329,14 +339,18 @@ impl Session { pub struct Sessions { /// Live sessions. Completed Sessions (i.e., those for which the Weak reference is dead) are /// removed from this collection on a best effort when new Sessions are created. - sessions: Arc>>>, + /// + /// If the wrapping Option is None, it is because `fn shutdown` is running, and the associated + /// Core/Scheduler are being shut down. + sessions: Arc>>>>, /// Handle to kill the signal monitoring task when this object is killed. signal_task_abort_handle: AbortHandle, } impl Sessions { pub fn new(executor: &Executor) -> Result { - let sessions: Arc>>> = Arc::default(); + let sessions: Arc>>>> = + Arc::new(Mutex::new(Some(Vec::new()))); let signal_task_abort_handle = { let mut signal_stream = signal(SignalKind::interrupt()) .map_err(|err| format!("Failed to install interrupt handler: {}", err))?; @@ -346,9 +360,12 @@ impl Sessions { async move { loop { let _ = signal_stream.recv().await; - for session in &*sessions.lock() { - if let Some(session) = session.upgrade() { - session.cancel(); + let sessions = sessions.lock(); + if let Some(ref sessions) = *sessions { + for session in sessions { + if let Some(session) = session.upgrade() { + session.cancel(); + } } } } @@ -363,10 +380,48 @@ impl Sessions { }) } - fn add(&self, session: &Arc) { + fn add(&self, handle: &Arc) -> Result<(), String> { let mut sessions = self.sessions.lock(); - sessions.retain(|weak_session| weak_session.upgrade().is_some()); - sessions.push(Arc::downgrade(session)); + if let Some(ref mut sessions) = *sessions { + sessions.retain(|weak_handle| weak_handle.upgrade().is_some()); + sessions.push(Arc::downgrade(handle)); + Ok(()) + } else { + Err("The scheduler is shutting down: no new sessions may be created.".to_string()) + } + } + + /// + /// Shuts down this Sessions instance by waiting for all existing Sessions to exit. + /// + /// Waits at most `timeout` for Sessions to complete. + /// + pub async fn shutdown(&self, timeout: Duration) -> Result<(), String> { + if let Some(sessions) = self.sessions.lock().take() { + // Collect clones of the cancellation tokens for each Session, which allows us to watch for + // them to have been dropped. + let (build_ids, cancellation_latches): (Vec<_>, Vec<_>) = sessions + .into_iter() + .filter_map(|weak_handle| weak_handle.upgrade()) + .map(|handle| { + let build_id = handle.build_id.clone(); + let cancelled = handle.cancelled.clone(); + let cancellation_triggered = async move { + cancelled.triggered().await; + log::info!("Shutdown completed: {:?}", build_id) + }; + (handle.build_id.clone(), cancellation_triggered) + }) + .unzip(); + + if !build_ids.is_empty() { + log::info!("Waiting for shutdown of: {:?}", build_ids); + tokio::time::timeout(timeout, future::join_all(cancellation_latches)) + .await + .map_err(|_| format!("Some Sessions did not shutdown within {:?}.", timeout))?; + } + } + Ok(()) } } diff --git a/testprojects/pants-plugins/src/python/workunit_logger/register.py b/testprojects/pants-plugins/src/python/workunit_logger/register.py index 245b221c321..cd88f01c160 100644 --- a/testprojects/pants-plugins/src/python/workunit_logger/register.py +++ b/testprojects/pants-plugins/src/python/workunit_logger/register.py @@ -2,6 +2,7 @@ # Licensed under the Apache License, Version 2.0 (see LICENSE). import logging +import time from dataclasses import dataclass from typing import Tuple @@ -41,8 +42,7 @@ class WorkunitsLogger(WorkunitsCallback): @property def can_finish_async(self) -> bool: - # We'd like to synchronously fail the run on the final call if need be. - return False + return True def __call__( self, @@ -55,6 +55,8 @@ def __call__( with open(self.dest, "a") as dest: print(str(completed_workunits), file=dest) if finished and context.run_tracker.has_ended(): + # Sleep a little while to ensure that we're finishing asynchronously. + time.sleep(2) print(FINISHED_SUCCESSFULLY, file=dest)