Skip to content

Commit

Permalink
Use py.new_pool() to bound pyo3 variable
Browse files Browse the repository at this point in the history
To alievate the unbounded memory growth, we're replacing variable dereferencing
with scoped `GILPool` as described in the pyo3 documentation recently updated. See:
PyO3/pyo3#2864
  • Loading branch information
haixuanTao committed Jan 10, 2023
1 parent e7611e1 commit 14b29b7
Showing 1 changed file with 12 additions and 13 deletions.
25 changes: 12 additions & 13 deletions binaries/runtime/src/operator/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use dora_message::uhlc;
use dora_node_api::communication::Publisher;
use dora_operator_api_python::metadata_to_pydict;
use dora_operator_api_types::DoraStatus;
use eyre::{bail, eyre, Context};
use eyre::{bail, eyre, Context, Result};
use pyo3::{
ffi, pyclass,
types::IntoPyDict,
Expand Down Expand Up @@ -137,7 +137,9 @@ pub fn spawn(
};
input.metadata.parameters.open_telemetry_context = Cow::Owned(string_cx);

let status_enum = Python::with_gil(|py| {
let status = Python::with_gil(|py| -> Result<i32> {
let pool = unsafe { py.new_pool() };
let py = pool.python();
let input_dict = PyDict::new(py);
let bytes = PyBytes::new(py, &input.data());

Expand All @@ -147,18 +149,15 @@ pub fn spawn(

let status_enum = operator
.call_method1(py, "on_input", (input_dict, send_output.clone()))
.map_err(traceback);

unsafe {
ffi::Py_DECREF(bytes.as_ptr());
ffi::Py_DECREF(input_dict.as_ptr());
}
status_enum
.map_err(traceback)?;

let status_val = status_enum
.getattr(py, "value")
.wrap_err("on_input must have enum return value")?;
status_val
.extract(py)
.wrap_err("on_input has invalid return value")
})?;
let status_val = Python::with_gil(|py| status_enum.getattr(py, "value"))
.wrap_err("on_input must have enum return value")?;
let status: i32 = Python::with_gil(|py| status_val.extract(py))
.wrap_err("on_input has invalid return value")?;
match status {
s if s == DoraStatus::Continue as i32 => {} // ok
s if s == DoraStatus::Stop as i32 => break StopReason::ExplicitStop,
Expand Down

0 comments on commit 14b29b7

Please sign in to comment.