Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[internal] Add debug output for intermittent abort in poll_workunits #14253

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 54 additions & 28 deletions src/rust/engine/src/externs/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use pyo3::prelude::{
PyResult as PyO3Result, Python, ToPyObject,
};
use pyo3::types::{PyBytes, PyDict, PyList, PyTuple, PyType};
use pyo3::{create_exception, IntoPy, PyAny};
use pyo3::{create_exception, IntoPy, PyAny, PyRef};
use regex::Regex;
use rule_graph::{self, RuleGraph};
use task_executor::Executor;
Expand Down Expand Up @@ -861,35 +861,61 @@ async fn workunits_to_py_tuple_value(

#[pyfunction]
fn session_poll_workunits(
py: Python,
py_scheduler: &PyScheduler,
py_session: &PySession,
py_scheduler: PyObject,
py_session: PyObject,
max_log_verbosity_level: u64,
) -> PyO3Result<PyObject> {
let py_level: PythonLogLevel = max_log_verbosity_level
.try_into()
.map_err(|e| PyException::new_err(format!("{}", e)))?;
let core = &py_scheduler.0.core;
core.executor.enter(|| {
let mut workunit_store = py_session.0.workunit_store();
let (started, completed) =
py.allow_threads(|| workunit_store.latest_workunits(py_level.into()));

let started_val = core.executor.block_on(workunits_to_py_tuple_value(
py,
&workunit_store,
started,
core,
&py_session.0,
))?;
let completed_val = core.executor.block_on(workunits_to_py_tuple_value(
py,
&workunit_store,
completed,
core,
&py_session.0,
))?;
Ok(externs::store_tuple(py, vec![started_val, completed_val]).into())
// TODO: Black magic. PyObject is not marked UnwindSafe, and contains an UnsafeCell. Since PyO3
// only allows us to receive `pyfunction` arguments as `PyObject` (or references under a held
// GIL), we cannot do what it does to use `catch_unwind` which would be interacting with
// `catch_unwind` while the object is still a raw pointer, and unchecked.
//
// Instead, we wrap the call, and assert that it is safe. It really might not be though. So this
// code should only live long enough to shake out the current issue, and an upstream issue with
// PyO3 will be the long term solution.
//
// see https://github.com/PyO3/pyo3/issues/2102 for more info.
let py_scheduler = std::panic::AssertUnwindSafe(py_scheduler);
let py_session = std::panic::AssertUnwindSafe(py_session);
std::panic::catch_unwind(|| {
let (core, session, py_level) = {
let gil = Python::acquire_gil();
let py = gil.python();

let py_scheduler = py_scheduler.extract::<PyRef<PyScheduler>>(py)?;
let py_session = py_session.extract::<PyRef<PySession>>(py)?;
let py_level: PythonLogLevel = max_log_verbosity_level
.try_into()
.map_err(|e| PyException::new_err(format!("{}", e)))?;
(py_scheduler.0.core.clone(), py_session.0.clone(), py_level)
};
core.executor.enter(|| {
let mut workunit_store = session.workunit_store();
let (started, completed) = workunit_store.latest_workunits(py_level.into());

let gil = Python::acquire_gil();
let py = gil.python();

let started_val = core.executor.block_on(workunits_to_py_tuple_value(
py,
&workunit_store,
started,
&core,
&session,
))?;
let completed_val = core.executor.block_on(workunits_to_py_tuple_value(
py,
&workunit_store,
completed,
&core,
&session,
))?;
Ok(externs::store_tuple(py, vec![started_val, completed_val]).into())
})
})
.unwrap_or_else(|e| {
log::warn!("Panic in `session_poll_workunits`: {:?}", e);
std::panic::resume_unwind(e);
})
}

Expand Down