diff --git a/src/rust/engine/src/externs/interface.rs b/src/rust/engine/src/externs/interface.rs index 125faf746c8..7b8af154339 100644 --- a/src/rust/engine/src/externs/interface.rs +++ b/src/rust/engine/src/externs/interface.rs @@ -36,7 +36,7 @@ use pyo3::prelude::{ PyResult as PyO3Result, Python, ToPyObject, }; use pyo3::types::{PyBytes, PyDict, PyList, PyTuple, PyType}; -use pyo3::{create_exception, IntoPy, PyAny}; +use pyo3::{create_exception, IntoPy, PyAny, PyRef}; use regex::Regex; use rule_graph::{self, RuleGraph}; use task_executor::Executor; @@ -861,35 +861,61 @@ async fn workunits_to_py_tuple_value( #[pyfunction] fn session_poll_workunits( - py: Python, - py_scheduler: &PyScheduler, - py_session: &PySession, + py_scheduler: PyObject, + py_session: PyObject, max_log_verbosity_level: u64, ) -> PyO3Result { - let py_level: PythonLogLevel = max_log_verbosity_level - .try_into() - .map_err(|e| PyException::new_err(format!("{}", e)))?; - let core = &py_scheduler.0.core; - core.executor.enter(|| { - let mut workunit_store = py_session.0.workunit_store(); - let (started, completed) = - py.allow_threads(|| workunit_store.latest_workunits(py_level.into())); - - let started_val = core.executor.block_on(workunits_to_py_tuple_value( - py, - &workunit_store, - started, - core, - &py_session.0, - ))?; - let completed_val = core.executor.block_on(workunits_to_py_tuple_value( - py, - &workunit_store, - completed, - core, - &py_session.0, - ))?; - Ok(externs::store_tuple(py, vec![started_val, completed_val]).into()) + // TODO: Black magic. PyObject is not marked UnwindSafe, and contains an UnsafeCell. Since PyO3 + // only allows us to receive `pyfunction` arguments as `PyObject` (or references under a held + // GIL), we cannot do what it does to use `catch_unwind` which would be interacting with + // `catch_unwind` while the object is still a raw pointer, and unchecked. + // + // Instead, we wrap the call, and assert that it is safe. It really might not be though. So this + // code should only live long enough to shake out the current issue, and an upstream issue with + // PyO3 will be the long term solution. + // + // see https://github.com/PyO3/pyo3/issues/2102 for more info. + let py_scheduler = std::panic::AssertUnwindSafe(py_scheduler); + let py_session = std::panic::AssertUnwindSafe(py_session); + std::panic::catch_unwind(|| { + let (core, session, py_level) = { + let gil = Python::acquire_gil(); + let py = gil.python(); + + let py_scheduler = py_scheduler.extract::>(py)?; + let py_session = py_session.extract::>(py)?; + let py_level: PythonLogLevel = max_log_verbosity_level + .try_into() + .map_err(|e| PyException::new_err(format!("{}", e)))?; + (py_scheduler.0.core.clone(), py_session.0.clone(), py_level) + }; + core.executor.enter(|| { + let mut workunit_store = session.workunit_store(); + let (started, completed) = workunit_store.latest_workunits(py_level.into()); + + let gil = Python::acquire_gil(); + let py = gil.python(); + + let started_val = core.executor.block_on(workunits_to_py_tuple_value( + py, + &workunit_store, + started, + &core, + &session, + ))?; + let completed_val = core.executor.block_on(workunits_to_py_tuple_value( + py, + &workunit_store, + completed, + &core, + &session, + ))?; + Ok(externs::store_tuple(py, vec![started_val, completed_val]).into()) + }) + }) + .unwrap_or_else(|e| { + log::warn!("Panic in `session_poll_workunits`: {:?}", e); + std::panic::resume_unwind(e); }) }