diff --git a/binaries/runtime/src/operator/python.rs b/binaries/runtime/src/operator/python.rs index a00f1755f..975eb2aef 100644 --- a/binaries/runtime/src/operator/python.rs +++ b/binaries/runtime/src/operator/python.rs @@ -10,7 +10,7 @@ use dora_message::uhlc; use dora_node_api::communication::Publisher; use dora_operator_api_python::metadata_to_pydict; use dora_operator_api_types::DoraStatus; -use eyre::{bail, eyre, Context}; +use eyre::{bail, eyre, Context, Result}; use pyo3::{ ffi, pyclass, types::IntoPyDict, @@ -137,7 +137,9 @@ pub fn spawn( }; input.metadata.parameters.open_telemetry_context = Cow::Owned(string_cx); - let status_enum = Python::with_gil(|py| { + let status = Python::with_gil(|py| -> Result { + let pool = unsafe { py.new_pool() }; + let py = pool.python(); let input_dict = PyDict::new(py); let bytes = PyBytes::new(py, &input.data()); @@ -147,18 +149,15 @@ pub fn spawn( let status_enum = operator .call_method1(py, "on_input", (input_dict, send_output.clone())) - .map_err(traceback); - - unsafe { - ffi::Py_DECREF(bytes.as_ptr()); - ffi::Py_DECREF(input_dict.as_ptr()); - } - status_enum + .map_err(traceback)?; + + let status_val = status_enum + .getattr(py, "value") + .wrap_err("on_input must have enum return value")?; + status_val + .extract(py) + .wrap_err("on_input has invalid return value") })?; - let status_val = Python::with_gil(|py| status_enum.getattr(py, "value")) - .wrap_err("on_input must have enum return value")?; - let status: i32 = Python::with_gil(|py| status_val.extract(py)) - .wrap_err("on_input has invalid return value")?; match status { s if s == DoraStatus::Continue as i32 => {} // ok s if s == DoraStatus::Stop as i32 => break StopReason::ExplicitStop,