Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wait for all Sessions during pantsd shutdown (cherrypick of #11929) #11934

Merged
merged 1 commit into from
Apr 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/python/pants/engine/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@ python_integration_tests(
# Loaded reflectively as a backend in `streaming_workunit_handler_integration_test.py`.
'testprojects/pants-plugins/src/python/workunit_logger',
],
timeout=120,
)
4 changes: 3 additions & 1 deletion src/python/pants/engine/internals/native_engine.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ def scheduler_execute(
scheduler: PyScheduler, session: PySession, execution_request: PyExecutionRequest
) -> tuple: ...
def scheduler_metrics(scheduler: PyScheduler, session: PySession) -> dict[str, int]: ...
def scheduler_shutdown(scheduler: PyScheduler, timeout_secs: int) -> None: ...
def session_new_run_id(session: PySession) -> None: ...
def session_poll_workunits(
scheduler: PyScheduler, session: PySession, max_log_verbosity_level: int
Expand All @@ -120,7 +121,7 @@ def session_get_observation_histograms(scheduler: PyScheduler, session: PySessio
def session_record_test_observation(
scheduler: PyScheduler, session: PySession, value: int
) -> None: ...
def session_isolated_shallow_clone(session: PySession) -> PySession: ...
def session_isolated_shallow_clone(session: PySession, build_id: str) -> PySession: ...
def all_counter_names() -> list[str]: ...
def graph_len(scheduler: PyScheduler) -> int: ...
def graph_visualize(scheduler: PyScheduler, session: PySession, path: str) -> None: ...
Expand Down Expand Up @@ -199,6 +200,7 @@ class PySession:
session_values: SessionValues,
cancellation_latch: PySessionCancellationLatch,
) -> None: ...
def cancel(self) -> None: ...

class PySessionCancellationLatch:
def __init__(self) -> None: ...
Expand Down
11 changes: 9 additions & 2 deletions src/python/pants/engine/internals/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,9 @@ def new_session(
),
)

def shutdown(self, timeout_secs: int = 60) -> None:
native_engine.scheduler_shutdown(self.py_scheduler, timeout_secs)


class _PathGlobsAndRootCollection(Collection[PathGlobsAndRoot]):
pass
Expand Down Expand Up @@ -339,9 +342,10 @@ def py_scheduler(self) -> PyScheduler:
def py_session(self) -> PySession:
return self._py_session

def isolated_shallow_clone(self) -> SchedulerSession:
def isolated_shallow_clone(self, build_id: str) -> SchedulerSession:
return SchedulerSession(
self._scheduler, native_engine.session_isolated_shallow_clone(self._py_session)
self._scheduler,
native_engine.session_isolated_shallow_clone(self._py_session, build_id),
)

def poll_workunits(self, max_log_verbosity: LogLevel) -> PolledWorkunits:
Expand Down Expand Up @@ -614,6 +618,9 @@ def get_observation_histograms(self) -> dict:
def record_test_observation(self, value: int) -> None:
native_engine.session_record_test_observation(self.py_scheduler, self.py_session, value)

def cancel(self) -> None:
self.py_session.cancel()


def register_rules(rule_index: RuleIndex, union_membership: UnionMembership) -> PyTasks:
"""Create a native Tasks object loaded with given RuleIndex."""
Expand Down
2 changes: 1 addition & 1 deletion src/python/pants/engine/streaming_workunit_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ def __init__(
pantsd: bool,
max_workunit_verbosity: LogLevel = LogLevel.TRACE,
) -> None:
scheduler = scheduler.isolated_shallow_clone()
scheduler = scheduler.isolated_shallow_clone("streaming_workunit_handler_session")
self.callbacks = callbacks
self.context = StreamingWorkunitContext(
_scheduler=scheduler,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,19 @@ def run(args: List[str], success: bool = True) -> Tuple[PantsResult, str | None]
with setup_tmpdir({}) as tmpdir:
dest = os.path.join(tmpdir, "dest.log")
pants_run = run_pants(args, config=workunit_logger_config(dest))
log_content = maybe_read_file(dest)
if success:
pants_run.assert_success()
assert log_content
assert FINISHED_SUCCESSFULLY in log_content
confirm_eventual_success(dest)
else:
pants_run.assert_failure()
return pants_run, log_content
return pants_run, maybe_read_file(dest)


def confirm_eventual_success(log_dest: str) -> None:
for _ in attempts("The log should eventually show that the SWH shut down."):
content = maybe_read_file(log_dest)
if content and FINISHED_SUCCESSFULLY in content:
break


def test_list() -> None:
Expand All @@ -61,7 +66,9 @@ def test_ctrl_c() -> None:
os.kill(client_pid, signal.SIGINT)

# Confirm that finish is still called (even though it may be backgrounded in the server).
for _ in attempts("The log should eventually show that the SWH shut down."):
content = maybe_read_file(dest)
if content and FINISHED_SUCCESSFULLY in content:
break
confirm_eventual_success(dest)


def test_restart() -> None:
# Will trigger a restart
run(["--pantsd-max-memory-usage=1", "roots"])
7 changes: 6 additions & 1 deletion src/python/pants/pantsd/pants_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,9 +201,14 @@ def run_sync(self):
while self._core.is_valid():
time.sleep(self.JOIN_TIMEOUT_SECONDS)

# We're exiting: join the server to avoid interrupting ongoing runs.
# We're exiting: purge our metadata to prevent new connections, then join the server
# to avoid interrupting ongoing runs.
self.purge_metadata(force=True)
self._logger.info("Waiting for ongoing runs to complete before exiting...")
native_engine.nailgun_server_await_shutdown(self._server)
# Then shutdown the PantsDaemonCore, which will shut down any live Scheduler.
self._logger.info("Waiting for Sessions to complete before exiting...")
self._core.shutdown()
self._logger.info("Exiting pantsd")


Expand Down
9 changes: 9 additions & 0 deletions src/python/pants/pantsd/pants_daemon_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,3 +129,12 @@ def prepare(
self._initialize(options_fingerprint, options_bootstrapper, env)
assert self._scheduler is not None
return self._scheduler, self._options_initializer

def shutdown(self) -> None:
with self._lifecycle_lock:
if self._services is not None:
self._services.shutdown()
self._services = None
if self._scheduler is not None:
self._scheduler.scheduler.shutdown()
self._scheduler = None
1 change: 1 addition & 0 deletions src/python/pants/pantsd/service/scheduler_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,3 +155,4 @@ def run(self):
# Watcher failed for some reason
self._logger.critical(f"The scheduler was invalidated: {e!r}")
self.terminate()
self._scheduler_session.cancel()
1 change: 1 addition & 0 deletions src/python/pants/pantsd/service/store_gc_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,4 @@ def run(self):
except Exception as e:
self._logger.critical(f"GC failed: {e!r}")
self.terminate()
self._scheduler_session.cancel()
13 changes: 13 additions & 0 deletions src/rust/engine/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,19 @@ impl Core {
pub fn store(&self) -> Store {
self.store.clone()
}

///
/// Shuts down this Core.
///
pub async fn shutdown(&self, timeout: Duration) {
// Shutdown the Sessions, which will prevent new work from starting and then await any ongoing
// work.
if let Err(msg) = self.sessions.shutdown(timeout).await {
log::warn!("During shutdown: {}", msg);
}
// Then clear the Graph to ensure that drop handlers run (particular for running processes).
self.graph.clear();
}
}

pub struct InvalidatableGraph(Graph<NodeKey>);
Expand Down
41 changes: 35 additions & 6 deletions src/rust/engine/src/externs/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ py_module_initializer!(native_engine, |py, m| {
m.add(
py,
"session_isolated_shallow_clone",
py_fn!(py, session_isolated_shallow_clone(a: PySession)),
py_fn!(py, session_isolated_shallow_clone(a: PySession, b: String)),
)?;

m.add(py, "all_counter_names", py_fn!(py, all_counter_names()))?;
Expand Down Expand Up @@ -397,6 +397,11 @@ py_module_initializer!(native_engine, |py, m| {
)
),
)?;
m.add(
py,
"scheduler_shutdown",
py_fn!(py, scheduler_shutdown(a: PyScheduler, b: u64)),
)?;

m.add(
py,
Expand Down Expand Up @@ -627,14 +632,19 @@ py_class!(class PySession |py| {
session_values: PyObject,
cancellation_latch: PySessionCancellationLatch,
) -> CPyResult<Self> {
Self::create_instance(py, Session::new(
let session = Session::new(
scheduler.scheduler(py),
should_render_ui,
build_id,
session_values.into(),
cancellation_latch.cancelled(py).clone(),
)
)
).map_err(|err_str| PyErr::new::<exc::Exception, _>(py, (err_str,)))?;
Self::create_instance(py, session)
}

def cancel(&self) -> PyUnitResult {
self.session(py).cancel();
Ok(None)
}
});

Expand Down Expand Up @@ -1188,6 +1198,18 @@ fn scheduler_metrics(
})
}

fn scheduler_shutdown(py: Python, scheduler_ptr: PyScheduler, timeout_secs: u64) -> PyUnitResult {
with_scheduler(py, scheduler_ptr, |scheduler| {
py.allow_threads(|| {
scheduler
.core
.executor
.block_on(scheduler.core.shutdown(Duration::from_secs(timeout_secs)));
})
});
Ok(None)
}

fn all_counter_names(_: Python) -> CPyResult<Vec<String>> {
Ok(Metric::all_metrics())
}
Expand Down Expand Up @@ -1427,9 +1449,16 @@ fn session_record_test_observation(
})
}

fn session_isolated_shallow_clone(py: Python, session_ptr: PySession) -> CPyResult<PySession> {
fn session_isolated_shallow_clone(
py: Python,
session_ptr: PySession,
build_id: String,
) -> CPyResult<PySession> {
with_session(py, session_ptr, |session| {
PySession::create_instance(py, session.isolated_shallow_clone())
let session_clone = session
.isolated_shallow_clone(build_id)
.map_err(|e| PyErr::new::<exc::Exception, _>(py, (e,)))?;
PySession::create_instance(py, session_clone)
})
}

Expand Down
Loading