diff --git a/Cargo.lock b/Cargo.lock index 500501d5d0..541d8a3ad6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1302,8 +1302,8 @@ dependencies = [ name = "common-daft-config" version = "0.3.0-dev0" dependencies = [ - "bincode", "common-io-config", + "common-py-serde", "pyo3", "serde", ] @@ -1313,7 +1313,7 @@ name = "common-display" version = "0.3.0-dev0" dependencies = [ "comfy-table 7.1.1", - "indexmap 2.3.0", + "indexmap 2.5.0", "pyo3", "terminal_size", "textwrap", @@ -1735,7 +1735,7 @@ dependencies = [ "fnv", "html-escape", "hyperloglog", - "indexmap 2.3.0", + "indexmap 2.5.0", "itertools 0.11.0", "jaq-core", "jaq-interpret", @@ -1809,7 +1809,7 @@ dependencies = [ "daft-core", "daft-sketch", "derive_more", - "indexmap 2.3.0", + "indexmap 2.5.0", "itertools 0.11.0", "log", "pyo3", @@ -1915,7 +1915,7 @@ dependencies = [ "daft-io", "daft-table", "futures", - "indexmap 2.3.0", + "indexmap 2.5.0", "memchr", "memmap2", "num-traits", @@ -2011,7 +2011,7 @@ dependencies = [ "daft-stats", "daft-table", "futures", - "indexmap 2.3.0", + "indexmap 2.5.0", "itertools 0.11.0", "log", "parquet2", @@ -2057,7 +2057,7 @@ dependencies = [ "daft-scan", "daft-schema", "daft-table", - "indexmap 2.3.0", + "indexmap 2.5.0", "itertools 0.11.0", "log", "pretty_assertions", @@ -2126,7 +2126,7 @@ dependencies = [ "common-version", "derive_more", "html-escape", - "indexmap 2.3.0", + "indexmap 2.5.0", "num-derive", "num-traits", "pyo3", @@ -2171,7 +2171,7 @@ dependencies = [ "daft-core", "daft-dsl", "daft-table", - "indexmap 2.3.0", + "indexmap 2.5.0", "serde", "snafu", ] @@ -2189,6 +2189,7 @@ dependencies = [ "daft-dsl", "daft-image", "html-escape", + "indexmap 2.5.0", "num-traits", "pyo3", "rand 0.8.5", @@ -2752,7 +2753,7 @@ dependencies = [ "futures-sink", "futures-util", "http", - "indexmap 2.3.0", + "indexmap 2.5.0", "slab", "tokio", "tokio-util", @@ -3034,9 +3035,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.3.0" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "de3fc2e30ba82dd1b3911c8de1ffc143c74a914a14e99514d7637e3099df5ea0" +checksum = "68b900aa2f7301e21c36462b170ee99994de34dff39a4a6a528e80e7376d07e5" dependencies = [ "equivalent", "hashbrown 0.14.5", @@ -3045,9 +3046,9 @@ dependencies = [ [[package]] name = "indoc" -version = "1.0.9" +version = "2.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bfa799dd5ed20a7e349f3b4639aa80d74549c81716d9ec4f994c9b5815598306" +checksum = "b248f5224d1d606005e02c97f5aa4e88eeb230488bcc03bc9ca4d7991399f2b5" [[package]] name = "infer" @@ -3132,7 +3133,7 @@ dependencies = [ "ahash", "dyn-clone", "hifijson", - "indexmap 2.3.0", + "indexmap 2.5.0", "jaq-syn", "once_cell", "serde_json", @@ -3685,9 +3686,9 @@ dependencies = [ [[package]] name = "numpy" -version = "0.19.0" +version = "0.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "437213adf41bbccf4aeae535fbfcdad0f6fed241e1ae182ebe97fa1f3ce19389" +checksum = "ec170733ca37175f5d75a5bea5911d6ff45d2cd52849ce98b685394e4f2f37f4" dependencies = [ "libc", "ndarray", @@ -3875,7 +3876,7 @@ dependencies = [ "criterion", "flate2", "futures", - "indexmap 2.3.0", + "indexmap 2.5.0", "lz4", "lz4_flex", "parquet-format-safe", @@ -4066,6 +4067,12 @@ dependencies = [ "miniz_oxide", ] +[[package]] +name = "portable-atomic" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da544ee218f0d287a911e9c99a39a8c9bc8fcad3cb8db5959940044ecfc67265" + [[package]] name = "powerfmt" version = "0.2.0" @@ -4164,16 +4171,18 @@ dependencies = [ [[package]] name = "pyo3" -version = "0.19.2" +version = "0.21.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e681a6cfdc4adcc93b4d3cf993749a4552018ee0a9b65fc0ccfad74352c72a38" +checksum = "a5e00b96a521718e08e03b1a622f01c8a8deb50719335de3f60b3b3950f069d8" dependencies = [ "cfg-if", + "indexmap 2.5.0", "indoc", "inventory", "libc", "memoffset", "parking_lot", + "portable-atomic", "pyo3-build-config", "pyo3-ffi", "pyo3-macros", @@ -4182,9 +4191,9 @@ dependencies = [ [[package]] name = "pyo3-build-config" -version = "0.19.2" +version = "0.21.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "076c73d0bc438f7a4ef6fdd0c3bb4732149136abd952b110ac93e4edb13a6ba5" +checksum = "7883df5835fafdad87c0d888b266c8ec0f4c9ca48a5bed6bbb592e8dedee1b50" dependencies = [ "once_cell", "target-lexicon", @@ -4192,9 +4201,9 @@ dependencies = [ [[package]] name = "pyo3-ffi" -version = "0.19.2" +version = "0.21.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e53cee42e77ebe256066ba8aa77eff722b3bb91f3419177cf4cd0f304d3284d9" +checksum = "01be5843dc60b916ab4dad1dca6d20b9b4e6ddc8e15f50c47fe6d85f1fb97403" dependencies = [ "libc", "pyo3-build-config", @@ -4202,9 +4211,9 @@ dependencies = [ [[package]] name = "pyo3-log" -version = "0.8.4" +version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c09c2b349b6538d8a73d436ca606dab6ce0aaab4dad9e6b7bdd57a4f556c3bc3" +checksum = "3ac84e6eec1159bc2a575c9ae6723baa6ee9d45873e9bebad1e3ad7e8d28a443" dependencies = [ "arc-swap", "log", @@ -4213,25 +4222,27 @@ dependencies = [ [[package]] name = "pyo3-macros" -version = "0.19.2" +version = "0.21.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dfeb4c99597e136528c6dd7d5e3de5434d1ceaf487436a3f03b2d56b6fc9efd1" +checksum = "77b34069fc0682e11b31dbd10321cbf94808394c56fd996796ce45217dfac53c" dependencies = [ "proc-macro2", "pyo3-macros-backend", "quote", - "syn 1.0.109", + "syn 2.0.74", ] [[package]] name = "pyo3-macros-backend" -version = "0.19.2" +version = "0.21.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "947dc12175c254889edc0c02e399476c2f652b4b9ebd123aa655c224de259536" +checksum = "08260721f32db5e1a5beae69a55553f56b99bd0e1c3e6e0a5e8851a9d0f5a85c" dependencies = [ + "heck 0.4.1", "proc-macro2", + "pyo3-build-config", "quote", - "syn 1.0.109", + "syn 2.0.74", ] [[package]] @@ -4781,7 +4792,7 @@ version = "1.0.124" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "66ad62847a56b3dba58cc891acd13884b9c61138d330c0d7b6181713d4fce38d" dependencies = [ - "indexmap 2.3.0", + "indexmap 2.5.0", "itoa", "memchr", "ryu", @@ -5699,9 +5710,9 @@ checksum = "f962df74c8c05a667b5ee8bcf162993134c104e96440b663c8daa176dc772d8c" [[package]] name = "unindent" -version = "0.1.11" +version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e1766d682d402817b5ac4490b3c3002d91dfa0d22812f341609f97b08757359c" +checksum = "c7de7d73e1754487cb58364ee906a499937a0dfabd86bcb980fa99ec8c8fa2ce" [[package]] name = "untrusted" diff --git a/Cargo.toml b/Cargo.toml index 9390c5c4db..5ad3f1ab49 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -209,11 +209,11 @@ features = ['async'] path = "src/parquet2" [workspace.dependencies.pyo3] -features = ["extension-module", "multiple-pymethods", "abi3-py38"] -version = "0.19.2" +features = ["extension-module", "multiple-pymethods", "abi3-py38", "indexmap"] +version = "0.21.0" [workspace.dependencies.pyo3-log] -version = "0.8.3" +version = "0.11.0" [workspace.dependencies.serde] features = ["derive", "rc"] diff --git a/src/common/arrow-ffi/src/lib.rs b/src/common/arrow-ffi/src/lib.rs index 24d57cfacd..9176e0eeb9 100644 --- a/src/common/arrow-ffi/src/lib.rs +++ b/src/common/arrow-ffi/src/lib.rs @@ -10,7 +10,7 @@ use pyo3::prelude::*; pub type ArrayRef = Box; #[cfg(feature = "python")] -pub fn array_to_rust(arrow_array: &PyAny) -> PyResult { +pub fn array_to_rust(py: Python, arrow_array: Bound) -> PyResult { // prepare a pointer to receive the Array struct let array = Box::new(ffi::ArrowArray::empty()); let schema = Box::new(ffi::ArrowSchema::empty()); @@ -21,7 +21,7 @@ pub fn array_to_rust(arrow_array: &PyAny) -> PyResult { // make the conversion through PyArrow's private API // this changes the pointer's memory and is thus unsafe. In particular, `_export_to_c` can go out of bounds arrow_array.call_method1( - pyo3::intern!(arrow_array.py(), "_export_to_c"), + pyo3::intern!(py, "_export_to_c"), (array_ptr as Py_uintptr_t, schema_ptr as Py_uintptr_t), )?; @@ -31,8 +31,13 @@ pub fn array_to_rust(arrow_array: &PyAny) -> PyResult { Ok(array) } } + #[cfg(feature = "python")] -pub fn to_py_array(array: ArrayRef, py: Python, pyarrow: &PyModule) -> PyResult { +pub fn to_py_array<'py>( + py: Python<'py>, + array: ArrayRef, + pyarrow: &Bound<'py, PyModule>, +) -> PyResult> { let schema = Box::new(ffi::export_field_to_c(&Field::new( "", array.data_type().clone(), @@ -49,18 +54,18 @@ pub fn to_py_array(array: ArrayRef, py: Python, pyarrow: &PyModule) -> PyResult< (array_ptr as Py_uintptr_t, schema_ptr as Py_uintptr_t), )?; - let array = PyModule::import(py, pyo3::intern!(py, "daft.arrow_utils"))? + let array = PyModule::import_bound(py, pyo3::intern!(py, "daft.arrow_utils"))? .getattr(pyo3::intern!(py, "remove_empty_struct_placeholders"))? .call1((array,))?; - Ok(array.to_object(py)) + Ok(array) } #[cfg(feature = "python")] pub fn field_to_py( - field: &arrow2::datatypes::Field, py: Python, - pyarrow: &PyModule, + field: &arrow2::datatypes::Field, + pyarrow: &Bound, ) -> PyResult { let schema = Box::new(ffi::export_field_to_c(field)); let schema_ptr: *const ffi::ArrowSchema = &*schema; @@ -70,15 +75,15 @@ pub fn field_to_py( (schema_ptr as Py_uintptr_t,), )?; - Ok(field.to_object(py)) + Ok(field.into()) } #[cfg(feature = "python")] -pub fn dtype_to_py( +pub fn dtype_to_py<'py>( + py: Python<'py>, dtype: &arrow2::datatypes::DataType, - py: Python, - pyarrow: &PyModule, -) -> PyResult { + pyarrow: Bound<'py, PyModule>, +) -> PyResult> { let schema = Box::new(ffi::export_field_to_c(&Field::new("", dtype.clone(), true))); let schema_ptr: *const ffi::ArrowSchema = &*schema; @@ -86,9 +91,8 @@ pub fn dtype_to_py( pyo3::intern!(py, "_import_from_c"), (schema_ptr as Py_uintptr_t,), )?; - let dtype = field.getattr(pyo3::intern!(py, "type"))?.to_object(py); - Ok(dtype.to_object(py)) + field.getattr(pyo3::intern!(py, "type")) } fn fix_child_array_slice_offsets(array: ArrayRef) -> ArrayRef { diff --git a/src/common/daft-config/Cargo.toml b/src/common/daft-config/Cargo.toml index 3837d5a500..6a208fdcae 100644 --- a/src/common/daft-config/Cargo.toml +++ b/src/common/daft-config/Cargo.toml @@ -1,6 +1,6 @@ [dependencies] -bincode = {workspace = true} common-io-config = {path = "../io-config", default-features = false} +common-py-serde = {path = "../py-serde", default-features = false} pyo3 = {workspace = true, optional = true} serde = {workspace = true} diff --git a/src/common/daft-config/src/lib.rs b/src/common/daft-config/src/lib.rs index c25ff4ca3f..edef9aea9d 100644 --- a/src/common/daft-config/src/lib.rs +++ b/src/common/daft-config/src/lib.rs @@ -115,7 +115,7 @@ pub use python::PyDaftPlanningConfig; use pyo3::prelude::*; #[cfg(feature = "python")] -pub fn register_modules(_py: Python, parent: &PyModule) -> PyResult<()> { +pub fn register_modules(parent: &Bound) -> PyResult<()> { parent.add_class::()?; parent.add_class::()?; diff --git a/src/common/daft-config/src/python.rs b/src/common/daft-config/src/python.rs index 5ce219d1aa..0b187d6a9c 100644 --- a/src/common/daft-config/src/python.rs +++ b/src/common/daft-config/src/python.rs @@ -1,6 +1,7 @@ use std::sync::Arc; -use pyo3::{prelude::*, PyTypeInfo}; +use common_py_serde::impl_bincode_py_state_serialization; +use pyo3::prelude::*; use serde::{Deserialize, Serialize}; use crate::{DaftExecutionConfig, DaftPlanningConfig}; @@ -52,28 +53,10 @@ impl PyDaftPlanningConfig { fn enable_actor_pool_projections(&self) -> PyResult { Ok(self.config.enable_actor_pool_projections) } - - fn __reduce__(&self, py: Python) -> PyResult<(PyObject, (Vec,))> { - let bin_data = bincode::serialize(self.config.as_ref()) - .expect("DaftPlanningConfig should be serializable to bytes"); - Ok(( - Self::type_object(py) - .getattr("_from_serialized")? - .to_object(py), - (bin_data,), - )) - } - - #[staticmethod] - fn _from_serialized(bin_data: Vec) -> PyResult { - let daft_planning_config: DaftPlanningConfig = bincode::deserialize(bin_data.as_slice()) - .expect("DaftExecutionConfig should be deserializable from bytes"); - Ok(PyDaftPlanningConfig { - config: daft_planning_config.into(), - }) - } } +impl_bincode_py_state_serialization!(PyDaftPlanningConfig); + #[derive(Clone, Default, Serialize, Deserialize)] #[pyclass(module = "daft.daft")] pub struct PyDaftExecutionConfig { @@ -264,24 +247,6 @@ impl PyDaftExecutionConfig { fn default_morsel_size(&self) -> PyResult { Ok(self.config.default_morsel_size) } - - fn __reduce__(&self, py: Python) -> PyResult<(PyObject, (Vec,))> { - let bin_data = bincode::serialize(self.config.as_ref()) - .expect("DaftExecutionConfig should be serializable to bytes"); - Ok(( - Self::type_object(py) - .getattr("_from_serialized")? - .to_object(py), - (bin_data,), - )) - } - - #[staticmethod] - fn _from_serialized(bin_data: Vec) -> PyResult { - let daft_execution_config: DaftExecutionConfig = bincode::deserialize(bin_data.as_slice()) - .expect("DaftExecutionConfig should be deserializable from bytes"); - Ok(PyDaftExecutionConfig { - config: daft_execution_config.into(), - }) - } } + +impl_bincode_py_state_serialization!(PyDaftExecutionConfig); diff --git a/src/common/file-formats/src/file_format_config.rs b/src/common/file-formats/src/file_format_config.rs index 2900882b72..05d3c2ce9a 100644 --- a/src/common/file-formats/src/file_format_config.rs +++ b/src/common/file-formats/src/file_format_config.rs @@ -10,7 +10,7 @@ use common_py_serde::impl_bincode_py_state_serialization; use { common_py_serde::{deserialize_py_object, serialize_py_object}, daft_schema::python::{datatype::PyTimeUnit, field::PyField}, - pyo3::{pyclass, pymethods, PyObject, PyResult, Python}, + pyo3::{pyclass, pymethods, types::PyAnyMethods, PyObject, PyResult, Python}, }; /// Configuration for parsing a particular file format. @@ -303,7 +303,7 @@ pub struct DatabaseSourceConfig { impl PartialEq for DatabaseSourceConfig { fn eq(&self, other: &Self) -> bool { self.sql == other.sql - && Python::with_gil(|py| self.conn.as_ref(py).eq(other.conn.as_ref(py)).unwrap()) + && Python::with_gil(|py| self.conn.bind(py).eq(other.conn.bind(py)).unwrap()) } } @@ -314,7 +314,7 @@ impl Eq for DatabaseSourceConfig {} impl Hash for DatabaseSourceConfig { fn hash(&self, state: &mut H) { self.sql.hash(state); - let py_obj_hash = Python::with_gil(|py| self.conn.as_ref(py).hash()); + let py_obj_hash = Python::with_gil(|py| self.conn.bind(py).hash()); match py_obj_hash { Ok(hash) => hash.hash(state), Err(_) => serde_json::to_vec(self).unwrap().hash(state), diff --git a/src/common/file-formats/src/python.rs b/src/common/file-formats/src/python.rs index 982baa8b86..86e1230b73 100644 --- a/src/common/file-formats/src/python.rs +++ b/src/common/file-formats/src/python.rs @@ -84,7 +84,7 @@ impl From> for PyFileFormatConfig { } } -pub fn register_modules(_py: Python, parent: &PyModule) -> PyResult<()> { +pub fn register_modules(parent: &Bound) -> PyResult<()> { parent.add_class::()?; Ok(()) } diff --git a/src/common/io-config/src/python.rs b/src/common/io-config/src/python.rs index a6d32d3cb0..01ce95f689 100644 --- a/src/common/io-config/src/python.rs +++ b/src/common/io-config/src/python.rs @@ -228,12 +228,12 @@ impl IOConfig { } pub fn __reduce__(&self, py: Python) -> PyResult<(PyObject, (String,))> { - let io_config_module = py.import("daft.io.config")?; + let io_config_module = py.import_bound(pyo3::intern!(py, "daft.io.config"))?; let json_string = serde_json::to_string(&self.config).map_err(DaftError::from)?; Ok(( io_config_module - .getattr("_io_config_from_json")? - .to_object(py), + .getattr(pyo3::intern!(py, "_io_config_from_json"))? + .into(), (json_string,), )) } @@ -253,13 +253,12 @@ impl S3Config { #[allow(clippy::too_many_arguments)] #[new] pub fn new( - py: Python, region_name: Option, endpoint_url: Option, key_id: Option, session_token: Option, access_key: Option, - credentials_provider: Option<&PyAny>, + credentials_provider: Option>, buffer_time: Option, max_connections: Option, retry_initial_backoff_ms: Option, @@ -285,7 +284,7 @@ impl S3Config { access_key: access_key.map(|v| v.into()).or(def.access_key), credentials_provider: credentials_provider .map(|p| { - Ok::<_, PyErr>(Box::new(PyS3CredentialsProvider::new(py, p)?) + Ok::<_, PyErr>(Box::new(PyS3CredentialsProvider::new(p)?) as Box) }) .transpose()? @@ -314,13 +313,12 @@ impl S3Config { #[allow(clippy::too_many_arguments)] pub fn replace( &self, - py: Python, region_name: Option, endpoint_url: Option, key_id: Option, session_token: Option, access_key: Option, - credentials_provider: Option<&PyAny>, + credentials_provider: Option>, buffer_time: Option, max_connections: Option, retry_initial_backoff_ms: Option, @@ -349,7 +347,7 @@ impl S3Config { .or_else(|| self.config.access_key.clone()), credentials_provider: credentials_provider .map(|p| { - Ok::<_, PyErr>(Box::new(PyS3CredentialsProvider::new(py, p)?) + Ok::<_, PyErr>(Box::new(PyS3CredentialsProvider::new(p)?) as Box) }) .transpose()? @@ -380,7 +378,7 @@ impl S3Config { #[staticmethod] pub fn from_env(py: Python) -> PyResult { let io_config_from_env_func = py - .import(pyo3::intern!(py, "daft"))? + .import_bound(pyo3::intern!(py, "daft"))? .getattr(pyo3::intern!(py, "daft"))? .getattr(pyo3::intern!(py, "s3_config_from_env"))?; io_config_from_env_func.call0().map(|pyany| { @@ -446,7 +444,7 @@ impl S3Config { Ok(self.config.credentials_provider.as_ref().and_then(|p| { p.as_any() .downcast_ref::() - .map(|p| p.provider.as_ref(py).into()) + .map(|p| p.provider.clone_ref(py)) })) } @@ -533,15 +531,16 @@ impl S3Config { impl S3Credentials { #[new] pub fn new( + py: Python, key_id: String, access_key: String, session_token: Option, - expiry: Option<&PyAny>, + expiry: Option>, ) -> PyResult { // TODO(Kevin): Refactor when upgrading to PyO3 0.21 (https://github.com/Eventual-Inc/Daft/issues/2288) let expiry = expiry .map(|e| { - let ts = e.call_method0("timestamp")?.extract()?; + let ts = e.call_method0(pyo3::intern!(py, "timestamp"))?.extract()?; Ok::<_, PyErr>(SystemTime::UNIX_EPOCH + Duration::from_secs_f64(ts)) }) @@ -575,19 +574,21 @@ impl S3Credentials { /// AWS Session Token #[getter] - pub fn expiry<'a>(&self, py: Python<'a>) -> PyResult> { + pub fn expiry<'py>(&self, py: Python<'py>) -> PyResult>> { // TODO(Kevin): Refactor when upgrading to PyO3 0.21 (https://github.com/Eventual-Inc/Daft/issues/2288) self.credentials .expiry .map(|e| { - let datetime = py.import("datetime")?; - - datetime.getattr("datetime")?.call_method1( - "fromtimestamp", - (e.duration_since(SystemTime::UNIX_EPOCH) - .unwrap() - .as_secs_f64(),), - ) + let datetime = py.import_bound(pyo3::intern!(py, "datetime"))?; + + datetime + .getattr(pyo3::intern!(py, "datetime"))? + .call_method1( + pyo3::intern!(py, "fromtimestamp"), + (e.duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_secs_f64(),), + ) }) .transpose() } @@ -604,10 +605,11 @@ pub struct PyS3CredentialsProvider { } impl PyS3CredentialsProvider { - pub fn new(py: Python, provider: &PyAny) -> PyResult { + pub fn new(provider: Bound) -> PyResult { + let hash = provider.hash()?; Ok(PyS3CredentialsProvider { - provider: provider.to_object(py), - hash: provider.hash()?, + provider: provider.into(), + hash, }) } } @@ -916,7 +918,7 @@ impl HTTPConfig { } } -pub fn register_modules(_py: Python, parent: &PyModule) -> PyResult<()> { +pub fn register_modules(parent: &Bound) -> PyResult<()> { parent.add_class::()?; parent.add_class::()?; parent.add_class::()?; diff --git a/src/common/py-serde/src/python.rs b/src/common/py-serde/src/python.rs index aa505f4187..1dbbd8d97b 100644 --- a/src/common/py-serde/src/python.rs +++ b/src/common/py-serde/src/python.rs @@ -1,7 +1,5 @@ #[cfg(feature = "python")] -pub use pyo3::PyObject; -#[cfg(feature = "python")] -use pyo3::{Python, ToPyObject}; +use pyo3::{types::PyAnyMethods, PyObject, Python}; use serde::{de::Error as DeError, de::Visitor, ser::Error as SerError, Deserializer, Serializer}; use std::fmt; @@ -12,7 +10,7 @@ where S: Serializer, { let bytes = Python::with_gil(|py| { - py.import(pyo3::intern!(py, "daft.pickle")) + py.import_bound(pyo3::intern!(py, "daft.pickle")) .and_then(|m| m.getattr(pyo3::intern!(py, "dumps"))) .and_then(|f| f.call1((obj,))) .and_then(|b| b.extract::>()) @@ -37,9 +35,9 @@ impl<'de> Visitor<'de> for PyObjectVisitor { E: DeError, { Python::with_gil(|py| { - py.import(pyo3::intern!(py, "daft.pickle")) + py.import_bound(pyo3::intern!(py, "daft.pickle")) .and_then(|m| m.getattr(pyo3::intern!(py, "loads"))) - .and_then(|f| Ok(f.call1((v,))?.to_object(py))) + .and_then(|f| Ok(f.call1((v,))?.into())) .map_err(|e| DeError::custom(e.to_string())) }) } @@ -49,9 +47,9 @@ impl<'de> Visitor<'de> for PyObjectVisitor { E: DeError, { Python::with_gil(|py| { - py.import(pyo3::intern!(py, "daft.pickle")) + py.import_bound(pyo3::intern!(py, "daft.pickle")) .and_then(|m| m.getattr(pyo3::intern!(py, "loads"))) - .and_then(|f| Ok(f.call1((v,))?.to_object(py))) + .and_then(|f| Ok(f.call1((v,))?.into())) .map_err(|e| DeError::custom(e.to_string())) }) } @@ -71,25 +69,28 @@ macro_rules! impl_bincode_py_state_serialization { #[cfg(feature = "python")] #[pymethods] impl $ty { - pub fn __reduce__(&self, py: Python) -> PyResult<(PyObject, PyObject)> { - use pyo3::types::PyBytes; - use pyo3::PyTypeInfo; - use pyo3::ToPyObject; + pub fn __reduce__<'py>( + &self, + py: Python<'py>, + ) -> PyResult<(PyObject, (pyo3::Bound<'py, pyo3::types::PyBytes>,))> { + use pyo3::{ + types::{PyAnyMethods, PyBytes}, + PyTypeInfo, ToPyObject, + }; Ok(( - Self::type_object(py) - .getattr("_from_serialized")? - .to_object(py), - (PyBytes::new(py, &$crate::bincode::serialize(&self).unwrap()).to_object(py),) - .to_object(py), + Self::type_object_bound(py) + .getattr(pyo3::intern!(py, "_from_serialized"))? + .into(), + (PyBytes::new_bound( + py, + &$crate::bincode::serialize(&self).unwrap(), + ),), )) } #[staticmethod] - pub fn _from_serialized(py: Python, serialized: PyObject) -> PyResult { - use pyo3::types::PyBytes; - serialized - .extract::<&PyBytes>(py) - .map(|s| $crate::bincode::deserialize(s.as_bytes()).unwrap()) + pub fn _from_serialized(serialized: &[u8]) -> Self { + $crate::bincode::deserialize(serialized).unwrap() } } }; diff --git a/src/common/resource-request/src/lib.rs b/src/common/resource-request/src/lib.rs index 34c63446f3..c73bf88381 100644 --- a/src/common/resource-request/src/lib.rs +++ b/src/common/resource-request/src/lib.rs @@ -1,7 +1,13 @@ use common_hashable_float_wrapper::FloatWrapper; use common_py_serde::impl_bincode_py_state_serialization; #[cfg(feature = "python")] -use pyo3::{pyclass, pyclass::CompareOp, pymethods, types::PyModule, PyObject, PyResult, Python}; +use pyo3::{ + pyclass, + pyclass::CompareOp, + pymethods, + types::{PyModule, PyModuleMethods}, + Bound, PyObject, PyResult, Python, +}; use std::hash::{Hash, Hasher}; use std::ops::Add; @@ -236,7 +242,7 @@ impl ResourceRequest { impl_bincode_py_state_serialization!(ResourceRequest); #[cfg(feature = "python")] -pub fn register_modules(_py: Python, parent: &PyModule) -> PyResult<()> { +pub fn register_modules(parent: &Bound) -> PyResult<()> { parent.add_class::()?; Ok(()) } diff --git a/src/common/system-info/src/lib.rs b/src/common/system-info/src/lib.rs index 9946effc74..237e75ba8f 100644 --- a/src/common/system-info/src/lib.rs +++ b/src/common/system-info/src/lib.rs @@ -41,7 +41,7 @@ impl SystemInfo { } #[cfg(feature = "python")] -pub fn register_modules(_py: Python, parent: &PyModule) -> PyResult<()> { +pub fn register_modules(parent: &Bound) -> PyResult<()> { parent.add_class::()?; Ok(()) } diff --git a/src/daft-core/Cargo.toml b/src/daft-core/Cargo.toml index e6f6cb0929..7c462ceb38 100644 --- a/src/daft-core/Cargo.toml +++ b/src/daft-core/Cargo.toml @@ -52,7 +52,7 @@ unicode-normalization = "0.1.23" [dependencies.numpy] optional = true -version = "0.19" +version = "0.21.0" [dependencies.xxhash-rust] features = ["xxh3", "const_xxh3"] diff --git a/src/daft-core/src/array/ops/cast.rs b/src/daft-core/src/array/ops/cast.rs index 83b4ee9298..e48815e490 100644 --- a/src/daft-core/src/array/ops/cast.rs +++ b/src/daft-core/src/array/ops/cast.rs @@ -43,10 +43,9 @@ use { common_arrow_ffi as ffi, ndarray::IntoDimension, num_traits::{NumCast, ToPrimitive}, - numpy::{PyArray3, PyReadonlyArrayDyn}, + numpy::{PyArray3, PyReadonlyArrayDyn, PyUntypedArrayMethods}, pyo3::prelude::*, std::iter, - std::ops::Deref, }; fn arrow_logical_cast( @@ -245,7 +244,7 @@ where PySeries::from(Series::try_from((self.name(), self.data.clone()))?); let new_pyseries: PySeries = Python::with_gil(|py| -> PyResult { - PyModule::import(py, pyo3::intern!(py, "daft.series"))? + PyModule::import_bound(py, pyo3::intern!(py, "daft.series"))? .getattr(pyo3::intern!(py, "Series"))? .getattr(pyo3::intern!(py, "_from_pyseries"))? .call1((old_pyseries,))? @@ -485,20 +484,20 @@ macro_rules! pycast_then_arrowcast { let new_pyseries = Python::with_gil(|py| -> PyResult { let old_daft_series = { - PyModule::import(py, pyo3::intern!(py, "daft.series"))? + PyModule::import_bound(py, pyo3::intern!(py, "daft.series"))? .getattr(pyo3::intern!(py, "Series"))? .getattr(pyo3::intern!(py, "_from_pyseries"))? .call1((old_pyseries,))? }; let py_type_fn = { - PyModule::import(py, pyo3::intern!(py, "builtins"))? + PyModule::import_bound(py, pyo3::intern!(py, "builtins"))? .getattr(pyo3::intern!(py, $pytype_str))? }; old_daft_series .call_method1( - pyo3::intern!(py, "_pycast_to_pynative"), + (pyo3::intern!(py, "_pycast_to_pynative")), (py_type_fn,), )? .getattr(pyo3::intern!(py, "_series"))? @@ -519,9 +518,10 @@ macro_rules! pycast_then_arrowcast { fn append_values_from_numpy< Tgt: numpy::Element + NumCast + ToPrimitive + arrow2::types::NativeType, >( - pyarray: &PyAny, + py: Python, + pyarray: Bound, index: usize, - from_numpy_dtype_fn: &PyAny, + from_numpy_dtype_fn: &Bound, enforce_dtype: Option<&DataType>, values_vec: &mut Vec, shapes_vec: &mut Vec, @@ -529,11 +529,11 @@ fn append_values_from_numpy< use daft_schema::python::PyDataType; use std::num::Wrapping; - let np_dtype = pyarray.getattr(pyo3::intern!(pyarray.py(), "dtype"))?; + let np_dtype = pyarray.getattr(pyo3::intern!(py, "dtype"))?; let datatype = from_numpy_dtype_fn .call1((np_dtype,))? - .getattr(pyo3::intern!(pyarray.py(), "_dtype"))? + .getattr(pyo3::intern!(py, "_dtype"))? .extract::()?; let datatype = datatype.dtype; if let Some(enforce_dtype) = enforce_dtype { @@ -593,7 +593,7 @@ type ArrayPayload = ( fn extract_python_to_vec< Tgt: numpy::Element + NumCast + ToPrimitive + arrow2::types::NativeType, >( - py: Python<'_>, + py: Python, python_objects: &PythonArray, child_dtype: &DataType, enforce_dtype: Option<&DataType>, @@ -617,22 +617,23 @@ fn extract_python_to_vec< } let from_numpy_dtype = { - PyModule::import(py, pyo3::intern!(py, "daft.datatype"))? + PyModule::import_bound(py, pyo3::intern!(py, "daft.datatype"))? .getattr(pyo3::intern!(py, "DataType"))? .getattr(pyo3::intern!(py, "from_numpy_dtype"))? }; - let pytype = match child_dtype { - dtype if dtype.is_integer() => Ok("int"), - dtype if dtype.is_floating() => Ok("float"), + let builtins = PyModule::import_bound(py, pyo3::intern!(py, "builtins"))?; + + let py_type_fn = match child_dtype { + dtype if dtype.is_integer() => Ok(builtins.getattr(pyo3::intern!(py, "int"))?), + dtype if dtype.is_floating() => Ok(builtins.getattr(pyo3::intern!(py, "float"))?), dtype => Err(DaftError::ValueError(format!( "We only support numeric types when converting to List or FixedSizeList, got {dtype}" ))), }?; - let py_type_fn = { PyModule::import(py, pyo3::intern!(py, "builtins"))?.getattr(pytype)? }; let py_memory_view = py - .import("builtins")? + .import_bound(pyo3::intern!(py, "builtins"))? .getattr(pyo3::intern!(py, "memoryview"))?; // TODO: use this to extract our the image mode @@ -642,8 +643,7 @@ fn extract_python_to_vec< for (i, object) in python_objects.as_arrow().iter().enumerate() { if let Some(object) = object { - let object = object.into_py(py); - let object = object.as_ref(py); + let object = object.bind(py); let supports_buffer_protocol = py_memory_view.call1((object,)).is_ok(); let supports_array_interface_protocol = @@ -655,12 +655,15 @@ fn extract_python_to_vec< || supports_array_protocol { // Path if object supports buffer/array protocols. - let np_as_array_fn = py.import("numpy")?.getattr(pyo3::intern!(py, "asarray"))?; + let np_as_array_fn = py + .import_bound(pyo3::intern!(py, "numpy"))? + .getattr(pyo3::intern!(py, "asarray"))?; let pyarray = np_as_array_fn.call1((object,))?; let (num_values, shape_size) = append_values_from_numpy( + py, pyarray, i, - from_numpy_dtype, + &from_numpy_dtype, enforce_dtype, &mut values_vec, &mut shapes_vec, @@ -773,7 +776,7 @@ fn extract_python_to_vec< fn extract_python_like_to_fixed_size_list< Tgt: numpy::Element + NumCast + ToPrimitive + arrow2::types::NativeType, >( - py: Python<'_>, + py: Python, python_objects: &PythonArray, child_dtype: &DataType, list_size: usize, @@ -807,7 +810,7 @@ fn extract_python_like_to_fixed_size_list< fn extract_python_like_to_list< Tgt: numpy::Element + NumCast + ToPrimitive + arrow2::types::NativeType, >( - py: Python<'_>, + py: Python, python_objects: &PythonArray, child_dtype: &DataType, ) -> DaftResult { @@ -844,7 +847,7 @@ fn extract_python_like_to_list< fn extract_python_like_to_image_array< Tgt: numpy::Element + NumCast + ToPrimitive + arrow2::types::NativeType, >( - py: Python<'_>, + py: Python, python_objects: &PythonArray, dtype: &DataType, child_dtype: &DataType, @@ -939,7 +942,7 @@ fn extract_python_like_to_image_array< fn extract_python_like_to_tensor_array< Tgt: numpy::Element + NumCast + ToPrimitive + arrow2::types::NativeType, >( - py: Python<'_>, + py: Python, python_objects: &PythonArray, dtype: &DataType, child_dtype: &DataType, @@ -1139,19 +1142,18 @@ impl EmbeddingArray { (DataType::Python, DataType::Embedding(_, size)) => Python::with_gil(|py| { let physical_arrow = self.physical.flat_child.to_arrow(); let shape = (self.len(), *size); - let pyarrow = py.import("pyarrow")?; + let pyarrow = py.import_bound(pyo3::intern!(py, "pyarrow"))?; // Only go through FFI layer once instead of for every embedding. // We create an ndarray view on the entire embeddings array // buffer sans the validity mask, and then create a subndarray view // for each embedding ndarray in the PythonArray. - let py_array = ffi::to_py_array(physical_arrow.with_validity(None), py, pyarrow)? - .call_method1(py, pyo3::intern!(py, "to_numpy"), (false,))? - .call_method1(py, pyo3::intern!(py, "reshape"), (shape,))?; + let py_array = ffi::to_py_array(py, physical_arrow.with_validity(None), &pyarrow)? + .call_method1(pyo3::intern!(py, "to_numpy"), (false,))? + .call_method1(pyo3::intern!(py, "reshape"), (shape,))?; let ndarrays = py_array - .as_ref(py) .iter()? - .map(|a| a.unwrap().to_object(py)) - .collect::>(); + .map(|a| a.unwrap().unbind()) + .collect::>(); let values_array = PseudoArrowArray::new(ndarrays.into(), self.physical.validity().cloned()); Ok(PythonArray::new( @@ -1185,7 +1187,7 @@ impl ImageArray { let ca = self.channel_array(); let ha = self.height_array(); let wa = self.width_array(); - let pyarrow = py.import("pyarrow")?; + let pyarrow = py.import_bound(pyo3::intern!(py, "pyarrow"))?; for i in 0..da.len() { let element = da.get(i); let shape = ( @@ -1194,14 +1196,13 @@ impl ImageArray { ca.value(i) as usize, ); let py_array = match element { - Some(element) => ffi::to_py_array(element.to_arrow(), py, pyarrow)? - .call_method1(py, pyo3::intern!(py, "to_numpy"), (false,))? - .call_method1(py, pyo3::intern!(py, "reshape"), (shape,))?, - None => PyArray3::::zeros(py, shape.into_dimension(), false) - .deref() - .to_object(py), + Some(element) => ffi::to_py_array(py, element.to_arrow(), &pyarrow)? + .call_method1(pyo3::intern!(py, "to_numpy"), (false,))? + .call_method1(pyo3::intern!(py, "reshape"), (shape,))?, + None => PyArray3::::zeros_bound(py, shape.into_dimension(), false) + .into_any(), }; - ndarrays.push(py_array); + ndarrays.push(py_array.unbind()); } let values_array = PseudoArrowArray::new(ndarrays.into(), self.physical.validity().cloned()); @@ -1299,20 +1300,19 @@ impl FixedShapeImageArray { *width as usize, mode.num_channels() as usize, ); - let pyarrow = py.import("pyarrow")?; + let pyarrow = py.import_bound(pyo3::intern!(py, "pyarrow"))?; // Only go through FFI layer once instead of for every image. // We create an (N, H, W, C) ndarray view on the entire image array // buffer sans the validity mask, and then create a subndarray view // for each image ndarray in the PythonArray. let py_array = - ffi::to_py_array(physical_arrow.with_validity(None), py, pyarrow)? - .call_method1(py, pyo3::intern!(py, "to_numpy"), (false,))? - .call_method1(py, pyo3::intern!(py, "reshape"), (shape,))?; + ffi::to_py_array(py, physical_arrow.with_validity(None), &pyarrow)? + .call_method1(pyo3::intern!(py, "to_numpy"), (false,))? + .call_method1(pyo3::intern!(py, "reshape"), (shape,))?; let ndarrays = py_array - .as_ref(py) .iter()? - .map(|a| a.unwrap().to_object(py)) - .collect::>(); + .map(|a| a.unwrap().unbind()) + .collect::>(); let values_array = PseudoArrowArray::new(ndarrays.into(), self.physical.validity().cloned()); Ok(PythonArray::new( @@ -1364,15 +1364,15 @@ impl TensorArray { let mut ndarrays = Vec::with_capacity(self.len()); let da = self.data_array(); let sa = self.shape_array(); - let pyarrow = py.import("pyarrow")?; + let pyarrow = py.import_bound(pyo3::intern!(py, "pyarrow"))?; for (arrow_array, shape_array) in (0..self.len()).map(|i| (da.get(i), sa.get(i))) { if let (Some(arrow_array), Some(shape_array)) = (arrow_array, shape_array) { let shape_array = shape_array.u64().unwrap().as_arrow(); let shape = shape_array.values().to_vec(); - let py_array = ffi::to_py_array(arrow_array.to_arrow(), py, pyarrow)? - .call_method1(py, pyo3::intern!(py, "to_numpy"), (false,))? - .call_method1(py, pyo3::intern!(py, "reshape"), (shape,))?; - ndarrays.push(py_array); + let py_array = ffi::to_py_array(py, arrow_array.to_arrow(), &pyarrow)? + .call_method1(pyo3::intern!(py, "to_numpy"), (false,))? + .call_method1(pyo3::intern!(py, "reshape"), (shape,))?; + ndarrays.push(py_array.unbind()); } else { ndarrays.push(py.None()) } @@ -1541,7 +1541,7 @@ impl FixedShapeTensorArray { (DataType::Python, DataType::FixedShapeTensor(_, shape)) => { let physical_arrow = self.physical.flat_child.to_arrow(); pyo3::Python::with_gil(|py| { - let pyarrow = py.import("pyarrow")?; + let pyarrow = py.import_bound(pyo3::intern!(py, "pyarrow"))?; let mut np_shape: Vec = vec![self.len() as u64]; np_shape.extend(shape); // Only go through FFI layer once instead of for every tensor element. @@ -1549,14 +1549,13 @@ impl FixedShapeTensorArray { // sans the validity mask, and then create a subndarray view for each ndarray // element in the PythonArray. let py_array = - ffi::to_py_array(physical_arrow.with_validity(None), py, pyarrow)? - .call_method1(py, pyo3::intern!(py, "to_numpy"), (false,))? - .call_method1(py, pyo3::intern!(py, "reshape"), (np_shape,))?; + ffi::to_py_array(py, physical_arrow.with_validity(None), &pyarrow)? + .call_method1(pyo3::intern!(py, "to_numpy"), (false,))? + .call_method1(pyo3::intern!(py, "reshape"), (np_shape,))?; let ndarrays = py_array - .as_ref(py) .iter()? - .map(|a| a.unwrap().to_object(py)) - .collect::>(); + .map(|a| a.unwrap().unbind()) + .collect::>(); let values_array = PseudoArrowArray::new(ndarrays.into(), self.physical.validity().cloned()); Ok(PythonArray::new( @@ -1853,10 +1852,10 @@ where Python::with_gil(|py| { let arrow_dtype = array.data_type().to_arrow()?; let arrow_array = array.as_arrow().to_type(arrow_dtype).with_validity(None); - let pyarrow = py.import("pyarrow")?; - let py_array: Vec = ffi::to_py_array(arrow_array.to_boxed(), py, pyarrow)? - .call_method0(py, pyo3::intern!(py, "to_pylist"))? - .extract(py)?; + let pyarrow = py.import_bound(pyo3::intern!(py, "pyarrow"))?; + let py_array: Vec = ffi::to_py_array(py, arrow_array.to_boxed(), &pyarrow)? + .call_method0(pyo3::intern!(py, "to_pylist"))? + .extract()?; let values_array = PseudoArrowArray::new(py_array.into(), array.as_arrow().validity().cloned()); Ok(PythonArray::new( diff --git a/src/daft-core/src/array/ops/concat_agg.rs b/src/daft-core/src/array/ops/concat_agg.rs index 5450bf093e..ca81098a9b 100644 --- a/src/daft-core/src/array/ops/concat_agg.rs +++ b/src/daft-core/src/array/ops/concat_agg.rs @@ -18,7 +18,7 @@ impl DaftConcatAggable for crate::datatypes::PythonArray { let pyobj_vec = self.as_arrow().to_pyobj_vec(); let pylist: Py = Python::with_gil(|py| -> PyResult> { - let pylist: Py = PyList::empty(py).into(); + let pylist: Py = PyList::empty_bound(py).into(); for pyobj in pyobj_vec { if !pyobj.is_none(py) { pylist.call_method1(py, pyo3::intern!(py, "extend"), (pyobj,))?; @@ -40,7 +40,7 @@ impl DaftConcatAggable for crate::datatypes::PythonArray { for group in groups { let indices_as_array = crate::datatypes::UInt64Array::from(("", group.clone())); let group_pyobjs = self.take(&indices_as_array)?.as_arrow().to_pyobj_vec(); - let pylist: Py = PyList::empty(py).into(); + let pylist: Py = PyList::empty_bound(py).into(); for pyobj in group_pyobjs { if !pyobj.is_none(py) { pylist.call_method1(py, pyo3::intern!(py, "extend"), (pyobj,))?; diff --git a/src/daft-core/src/array/ops/len.rs b/src/daft-core/src/array/ops/len.rs index 87aa87a835..6808736c90 100644 --- a/src/daft-core/src/array/ops/len.rs +++ b/src/daft-core/src/array/ops/len.rs @@ -28,11 +28,11 @@ impl PythonArray { let vector = self.as_arrow().values().to_vec(); Python::with_gil(|py| { - let daft_utils = PyModule::import(py, pyo3::intern!(py, "daft.utils"))?; + let daft_utils = PyModule::import_bound(py, pyo3::intern!(py, "daft.utils"))?; let estimate_size_bytes_pylist = daft_utils.getattr(pyo3::intern!(py, "estimate_size_bytes_pylist"))?; let size_bytes: usize = estimate_size_bytes_pylist - .call1((PyList::new(py, vector),))? + .call1((PyList::new_bound(py, vector),))? .extract()?; Ok(size_bytes) }) diff --git a/src/daft-core/src/array/ops/list_agg.rs b/src/daft-core/src/array/ops/list_agg.rs index 057319e709..28ea5884a4 100644 --- a/src/daft-core/src/array/ops/list_agg.rs +++ b/src/daft-core/src/array/ops/list_agg.rs @@ -69,7 +69,7 @@ impl DaftListAggable for crate::datatypes::PythonArray { let pyobj_vec = self.as_arrow().to_pyobj_vec(); - let pylist: Py = Python::with_gil(|py| PyList::new(py, pyobj_vec).into()); + let pylist: Py = Python::with_gil(|py| PyList::new_bound(py, pyobj_vec).into()); let arrow_array = PseudoArrowArray::::from_pyobj_vec(vec![pylist.into()]); Self::new(self.field().clone().into(), Box::new(arrow_array)) @@ -86,7 +86,7 @@ impl DaftListAggable for crate::datatypes::PythonArray { for group in groups { let indices_as_array = crate::datatypes::UInt64Array::from(("", group.clone())); let group_pyobjs = self.take(&indices_as_array)?.as_arrow().to_pyobj_vec(); - result_pylists.push(PyList::new(py, group_pyobjs).into()); + result_pylists.push(PyList::new_bound(py, group_pyobjs).into()); } Ok(()) })?; diff --git a/src/daft-core/src/array/ops/repr.rs b/src/daft-core/src/array/ops/repr.rs index 03df8366b8..d01e5d72f4 100644 --- a/src/daft-core/src/array/ops/repr.rs +++ b/src/daft-core/src/array/ops/repr.rs @@ -381,10 +381,10 @@ impl crate::datatypes::PythonArray { let custom_viz_hook_result: Option = Python::with_gil(|py| { // Find visualization hooks for this object's class - let pyany = val.into_ref(py); + let pyany = val.bind(py); let get_viz_hook = py - .import("daft.viz.html_viz_hooks")? - .getattr("get_viz_hook")?; + .import_bound(pyo3::intern!(py, "daft.viz.html_viz_hooks"))? + .getattr(pyo3::intern!(py, "get_viz_hook"))?; let hook = get_viz_hook.call1((pyany,))?; if hook.is_none() { diff --git a/src/daft-core/src/lib.rs b/src/daft-core/src/lib.rs index 50ad0063e4..322a0db3ec 100644 --- a/src/daft-core/src/lib.rs +++ b/src/daft-core/src/lib.rs @@ -18,7 +18,7 @@ use pyo3::prelude::*; pub mod prelude; #[cfg(feature = "python")] -pub fn register_modules(_py: Python, parent: &PyModule) -> PyResult<()> { +pub fn register_modules(parent: &Bound) -> PyResult<()> { parent.add_class::()?; parent.add_class::()?; parent.add_class::()?; diff --git a/src/daft-core/src/python/mod.rs b/src/daft-core/src/python/mod.rs index 5b4c4f57f7..8134d7c41c 100644 --- a/src/daft-core/src/python/mod.rs +++ b/src/daft-core/src/python/mod.rs @@ -5,8 +5,8 @@ pub use series::PySeries; pub use daft_schema::python::{field::PyField, schema::PySchema, PyDataType, PyTimeUnit}; -pub fn register_modules(py: Python, parent: &PyModule) -> PyResult<()> { +pub fn register_modules(parent: &Bound) -> PyResult<()> { parent.add_class::()?; - daft_schema::python::register_modules(py, parent)?; + daft_schema::python::register_modules(parent)?; Ok(()) } diff --git a/src/daft-core/src/python/series.rs b/src/daft-core/src/python/series.rs index ccc072984a..bc265d3c67 100644 --- a/src/daft-core/src/python/series.rs +++ b/src/daft-core/src/python/series.rs @@ -34,8 +34,8 @@ pub struct PySeries { #[pymethods] impl PySeries { #[staticmethod] - pub fn from_arrow(name: &str, pyarrow_array: &PyAny) -> PyResult { - let arrow_array = ffi::array_to_rust(pyarrow_array)?; + pub fn from_arrow(py: Python, name: &str, pyarrow_array: Bound) -> PyResult { + let arrow_array = ffi::array_to_rust(py, pyarrow_array)?; let arrow_array = cast_array_for_daft_if_needed(arrow_array.to_boxed()); let series = series::Series::try_from((name, arrow_array))?; Ok(series.into()) @@ -43,7 +43,7 @@ impl PySeries { // This ingests a Python list[object] directly into a Rust PythonArray. #[staticmethod] - pub fn from_pylist(name: &str, pylist: &PyAny, pyobj: &str) -> PyResult { + pub fn from_pylist(name: &str, pylist: Bound, pyobj: &str) -> PyResult { let vec_pyobj: Vec = pylist.extract()?; let py = pylist.py(); let dtype = match pyobj { @@ -66,15 +66,15 @@ impl PySeries { pub fn to_pylist(&self) -> PyResult { let pseudo_arrow_array = self.series.python()?.as_arrow(); let pyobj_vec = pseudo_arrow_array.to_pyobj_vec(); - Python::with_gil(|py| Ok(PyList::new(py, pyobj_vec).into())) + Python::with_gil(|py| Ok(PyList::new_bound(py, pyobj_vec).into())) } pub fn to_arrow(&self) -> PyResult { let arrow_array = self.series.to_arrow(); let arrow_array = cast_array_from_daft_if_needed(arrow_array); Python::with_gil(|py| { - let pyarrow = py.import("pyarrow")?; - ffi::to_py_array(arrow_array, py, pyarrow) + let pyarrow = py.import_bound(pyo3::intern!(py, "pyarrow"))?; + Ok(ffi::to_py_array(py, arrow_array, &pyarrow)?.unbind()) }) } @@ -695,12 +695,12 @@ impl PySeries { pub fn _debug_bincode_serialize(&self, py: Python) -> PyResult { let values = bincode::serialize(&self.series).unwrap(); - Ok(PyBytes::new(py, &values).to_object(py)) + Ok(PyBytes::new_bound(py, &values).into()) } #[staticmethod] - pub fn _debug_bincode_deserialize(bytes: &PyBytes) -> PyResult { - let values = bincode::deserialize::(bytes.as_bytes()).unwrap(); + pub fn _debug_bincode_deserialize(bytes: &[u8]) -> PyResult { + let values = bincode::deserialize::(bytes).unwrap(); Ok(Self { series: values }) } @@ -727,23 +727,23 @@ fn infer_daft_dtype_for_sequence( _name: &str, ) -> PyResult> { let py_pil_image_type = py - .import(pyo3::intern!(py, "PIL.Image")) + .import_bound(pyo3::intern!(py, "PIL.Image")) .and_then(|m| m.getattr(pyo3::intern!(py, "Image"))); let np_ndarray_type = py - .import(pyo3::intern!(py, "numpy")) + .import_bound(pyo3::intern!(py, "numpy")) .and_then(|m| m.getattr(pyo3::intern!(py, "ndarray"))); let np_generic_type = py - .import(pyo3::intern!(py, "numpy")) + .import_bound(pyo3::intern!(py, "numpy")) .and_then(|m| m.getattr(pyo3::intern!(py, "generic"))); let from_numpy_dtype = { - py.import(pyo3::intern!(py, "daft.datatype"))? + py.import_bound(pyo3::intern!(py, "daft.datatype"))? .getattr(pyo3::intern!(py, "DataType"))? .getattr(pyo3::intern!(py, "from_numpy_dtype"))? }; let mut dtype: Option = None; for obj in vec_pyobj.iter() { - let obj = obj.as_ref(py); - if let Ok(pil_image_type) = py_pil_image_type + let obj = obj.bind(py); + if let Ok(pil_image_type) = &py_pil_image_type && obj.is_instance(pil_image_type)? { let mode_str = obj @@ -769,8 +769,8 @@ fn infer_daft_dtype_for_sequence( break; } } - } else if let Ok(np_ndarray_type) = np_ndarray_type - && let Ok(np_generic_type) = np_generic_type + } else if let Ok(np_ndarray_type) = &np_ndarray_type + && let Ok(np_generic_type) = &np_generic_type && (obj.is_instance(np_ndarray_type)? || obj.is_instance(np_generic_type)?) { let np_dtype = obj.getattr(pyo3::intern!(py, "dtype"))?; diff --git a/src/daft-core/src/series/ops/mod.rs b/src/daft-core/src/series/ops/mod.rs index 66f7b9b91e..84587dbe62 100644 --- a/src/daft-core/src/series/ops/mod.rs +++ b/src/daft-core/src/series/ops/mod.rs @@ -76,14 +76,14 @@ macro_rules! py_binary_op_utilfn { let right_pylist = PySeries::from(rhs.clone()).to_pylist()?; let result_series: Series = Python::with_gil(|py| -> PyResult { - let py_operator = PyModule::import(py, pyo3::intern!(py, "operator"))? + let py_operator = PyModule::import_bound(py, pyo3::intern!(py, "operator"))? .getattr(pyo3::intern!(py, $pyoperator))?; - let result_pylist = PyModule::import(py, pyo3::intern!(py, "daft.utils"))? + let result_pylist = PyModule::import_bound(py, pyo3::intern!(py, "daft.utils"))? .getattr(pyo3::intern!(py, $utilfn))? .call1((py_operator, left_pylist, right_pylist))?; - PyModule::import(py, pyo3::intern!(py, "daft.series"))? + PyModule::import_bound(py, pyo3::intern!(py, "daft.series"))? .getattr(pyo3::intern!(py, "Series"))? .getattr(pyo3::intern!(py, "from_pylist"))? .call1((result_pylist, lhs.name(), pyo3::intern!(py, "disallow")))? @@ -111,11 +111,11 @@ pub(super) fn py_membership_op_utilfn(lhs: &Series, rhs: &Series) -> DaftResult< let right_pylist = PySeries::from(rhs_casted.clone()).to_pylist()?; let result_series: Series = Python::with_gil(|py| -> PyResult { - let result_pylist = PyModule::import(py, pyo3::intern!(py, "daft.utils"))? + let result_pylist = PyModule::import_bound(py, pyo3::intern!(py, "daft.utils"))? .getattr(pyo3::intern!(py, "python_list_membership_check"))? .call1((left_pylist, right_pylist))?; - PyModule::import(py, pyo3::intern!(py, "daft.series"))? + PyModule::import_bound(py, pyo3::intern!(py, "daft.series"))? .getattr(pyo3::intern!(py, "Series"))? .getattr(pyo3::intern!(py, "from_pylist"))? .call1(( @@ -176,11 +176,11 @@ pub(super) fn py_between_op_utilfn( let upper_pylist = PySeries::from(upper_casted.clone()).to_pylist()?; let result_series: Series = Python::with_gil(|py| -> PyResult { - let result_pylist = PyModule::import(py, pyo3::intern!(py, "daft.utils"))? + let result_pylist = PyModule::import_bound(py, pyo3::intern!(py, "daft.utils"))? .getattr(pyo3::intern!(py, "python_list_between_check"))? .call1((value_pylist, lower_pylist, upper_pylist))?; - PyModule::import(py, pyo3::intern!(py, "daft.series"))? + PyModule::import_bound(py, pyo3::intern!(py, "daft.series"))? .getattr(pyo3::intern!(py, "Series"))? .getattr(pyo3::intern!(py, "from_pylist"))? .call1(( diff --git a/src/daft-csv/src/lib.rs b/src/daft-csv/src/lib.rs index 59a62ad2d1..17d254d520 100644 --- a/src/daft-csv/src/lib.rs +++ b/src/daft-csv/src/lib.rs @@ -68,11 +68,14 @@ impl From for pyo3::PyErr { type Result = std::result::Result; #[cfg(feature = "python")] -pub fn register_modules(_py: Python, parent: &PyModule) -> PyResult<()> { +pub fn register_modules(parent: &Bound) -> PyResult<()> { parent.add_class::()?; parent.add_class::()?; parent.add_class::()?; - parent.add_wrapped(wrap_pyfunction!(python::pylib::read_csv))?; - parent.add_wrapped(wrap_pyfunction!(python::pylib::read_csv_schema))?; + parent.add_function(wrap_pyfunction_bound!(python::pylib::read_csv, parent)?)?; + parent.add_function(wrap_pyfunction_bound!( + python::pylib::read_csv_schema, + parent + )?)?; Ok(()) } diff --git a/src/daft-dsl/src/functions/python/udf.rs b/src/daft-dsl/src/functions/python/udf.rs index 5cb99967d2..e40efe0fce 100644 --- a/src/daft-dsl/src/functions/python/udf.rs +++ b/src/daft-dsl/src/functions/python/udf.rs @@ -1,7 +1,10 @@ use daft_core::datatypes::DataType; #[cfg(feature = "python")] -use pyo3::{types::PyModule, PyAny, PyResult}; +use pyo3::{ + types::{PyAnyMethods, PyModule}, + Bound, PyAny, PyResult, +}; use daft_core::prelude::*; @@ -63,10 +66,10 @@ fn run_udf( use daft_core::python::PyDataType; use daft_core::python::PySeries; - // Convert input Rust &[Series] to wrapped Python Vec<&PyAny> - let py_series_module = PyModule::import(py, pyo3::intern!(py, "daft.series"))?; + // Convert input Rust &[Series] to wrapped Python Vec> + let py_series_module = PyModule::import_bound(py, pyo3::intern!(py, "daft.series"))?; let py_series_class = py_series_module.getattr(pyo3::intern!(py, "Series"))?; - let pyseries: PyResult> = inputs + let pyseries: PyResult>> = inputs .iter() .map(|s| { py_series_class.call_method( @@ -78,8 +81,8 @@ fn run_udf( .collect(); let pyseries = pyseries?; - // Run the function on the converted Vec<&PyAny> - let py_udf_module = PyModule::import(py, pyo3::intern!(py, "daft.udf"))?; + // Run the function on the converted Vec> + let py_udf_module = PyModule::import_bound(py, pyo3::intern!(py, "daft.udf"))?; let run_udf_func = py_udf_module.getattr(pyo3::intern!(py, "run_udf"))?; let result = run_udf_func.call1(( func, // Function to run @@ -220,20 +223,22 @@ impl FunctionEvaluator for StatefulPythonUDF { Some(init_args) => { let init_args = init_args .as_ref() - .as_ref(py) + .bind(py) .downcast::() .expect("init_args should be a Python tuple"); let (args, kwargs) = ( init_args .get_item(0)? .downcast::() - .expect("init_args[0] should be a tuple of *args"), + .expect("init_args[0] should be a tuple of *args") + .clone(), init_args .get_item(1)? .downcast::() - .expect("init_args[1] should be a dict of **kwargs"), + .expect("init_args[1] should be a dict of **kwargs") + .clone(), ); - func.call(py, args, Some(kwargs))? + func.call_bound(py, args, Some(&kwargs))? } }; diff --git a/src/daft-dsl/src/lib.rs b/src/daft-dsl/src/lib.rs index 9e068d5d77..754578eb6d 100644 --- a/src/daft-dsl/src/lib.rs +++ b/src/daft-dsl/src/lib.rs @@ -27,22 +27,28 @@ pub use resolve_expr::{ }; #[cfg(feature = "python")] -pub fn register_modules(_py: Python, parent: &PyModule) -> PyResult<()> { +pub fn register_modules(parent: &Bound) -> PyResult<()> { parent.add_class::()?; - parent.add_wrapped(wrap_pyfunction!(python::col))?; - parent.add_wrapped(wrap_pyfunction!(python::lit))?; - parent.add_wrapped(wrap_pyfunction!(python::date_lit))?; - parent.add_wrapped(wrap_pyfunction!(python::time_lit))?; - parent.add_wrapped(wrap_pyfunction!(python::timestamp_lit))?; - parent.add_wrapped(wrap_pyfunction!(python::decimal_lit))?; - parent.add_wrapped(wrap_pyfunction!(python::series_lit))?; - parent.add_wrapped(wrap_pyfunction!(python::stateless_udf))?; - parent.add_wrapped(wrap_pyfunction!(python::stateful_udf))?; - parent.add_wrapped(wrap_pyfunction!(python::extract_partial_stateful_udf_py))?; - parent.add_wrapped(wrap_pyfunction!(python::bind_stateful_udfs))?; - parent.add_wrapped(wrap_pyfunction!(python::eq))?; - parent.add_wrapped(wrap_pyfunction!(python::check_column_name_validity))?; + parent.add_function(wrap_pyfunction_bound!(python::col, parent)?)?; + parent.add_function(wrap_pyfunction_bound!(python::lit, parent)?)?; + parent.add_function(wrap_pyfunction_bound!(python::date_lit, parent)?)?; + parent.add_function(wrap_pyfunction_bound!(python::time_lit, parent)?)?; + parent.add_function(wrap_pyfunction_bound!(python::timestamp_lit, parent)?)?; + parent.add_function(wrap_pyfunction_bound!(python::decimal_lit, parent)?)?; + parent.add_function(wrap_pyfunction_bound!(python::series_lit, parent)?)?; + parent.add_function(wrap_pyfunction_bound!(python::stateless_udf, parent)?)?; + parent.add_function(wrap_pyfunction_bound!(python::stateful_udf, parent)?)?; + parent.add_function(wrap_pyfunction_bound!( + python::extract_partial_stateful_udf_py, + parent + )?)?; + parent.add_function(wrap_pyfunction_bound!(python::bind_stateful_udfs, parent)?)?; + parent.add_function(wrap_pyfunction_bound!(python::eq, parent)?)?; + parent.add_function(wrap_pyfunction_bound!( + python::check_column_name_validity, + parent + )?)?; Ok(()) } diff --git a/src/daft-dsl/src/pyobj_serde.rs b/src/daft-dsl/src/pyobj_serde.rs index 49eec53057..e5ec52d70b 100644 --- a/src/daft-dsl/src/pyobj_serde.rs +++ b/src/daft-dsl/src/pyobj_serde.rs @@ -4,7 +4,7 @@ use std::{ }; use common_py_serde::{deserialize_py_object, serialize_py_object}; -use pyo3::{PyObject, Python}; +use pyo3::{types::PyAnyMethods, PyObject, Python}; use serde::{Deserialize, Serialize}; // This is a Rust wrapper on top of a Python PartialStatelessUDF or PartialStatefulUDF to make it serde-able and hashable @@ -19,7 +19,7 @@ pub struct PyObjectWrapper( impl PartialEq for PyObjectWrapper { fn eq(&self, other: &Self) -> bool { - Python::with_gil(|py| self.0.as_ref(py).eq(other.0.as_ref(py)).unwrap()) + Python::with_gil(|py| self.0.bind(py).eq(other.0.bind(py)).unwrap()) } } @@ -27,7 +27,7 @@ impl Eq for PyObjectWrapper {} impl Hash for PyObjectWrapper { fn hash(&self, state: &mut H) { - let py_obj_hash = Python::with_gil(|py| self.0.as_ref(py).hash()); + let py_obj_hash = Python::with_gil(|py| self.0.bind(py).hash()); match py_obj_hash { // If Python object is hashable, hash the Python-side hash. Ok(py_obj_hash) => py_obj_hash.hash(state), diff --git a/src/daft-dsl/src/python.rs b/src/daft-dsl/src/python.rs index d3bb2cfdde..c770d27ee1 100644 --- a/src/daft-dsl/src/python.rs +++ b/src/daft-dsl/src/python.rs @@ -101,7 +101,7 @@ pub fn series_lit(series: PySeries) -> PyResult { } #[pyfunction] -pub fn lit(item: &PyAny) -> PyResult { +pub fn lit(item: Bound) -> PyResult { if item.is_instance_of::() { let val = item.extract::()?; Ok(crate::lit(val).into()) @@ -125,7 +125,7 @@ pub fn lit(item: &PyAny) -> PyResult { } else if let Ok(pystr) = item.downcast::() { Ok(crate::lit( pystr - .to_str() + .extract::() .expect("could not transform Python string to Rust Unicode"), ) .into()) @@ -145,9 +145,8 @@ pub fn lit(item: &PyAny) -> PyResult { // * `return_dtype` - returned column's DataType #[pyfunction] pub fn stateless_udf( - py: Python, name: &str, - partial_stateless_udf: &PyAny, + partial_stateless_udf: PyObject, expressions: Vec, return_dtype: PyDataType, resource_request: Option, @@ -163,9 +162,6 @@ pub fn stateless_udf( } } - // Convert &PyAny values to a GIL-independent reference to Python objects (PyObject) so that we can store them in our Rust Expr enums - // See: https://pyo3.rs/v0.18.2/types#pyt-and-pyobject - let partial_stateless_udf = partial_stateless_udf.to_object(py); let expressions_map: Vec = expressions.into_iter().map(|pyexpr| pyexpr.expr).collect(); Ok(PyExpr { expr: stateless_udf( @@ -187,13 +183,12 @@ pub fn stateless_udf( #[pyfunction] #[allow(clippy::too_many_arguments)] pub fn stateful_udf( - py: Python, name: &str, - partial_stateful_udf: &PyAny, + partial_stateful_udf: PyObject, expressions: Vec, return_dtype: PyDataType, resource_request: Option, - init_args: Option<&PyAny>, + init_args: Option, batch_size: Option, concurrency: Option, ) -> PyResult { @@ -207,11 +202,8 @@ pub fn stateful_udf( } } - // Convert &PyAny values to a GIL-independent reference to Python objects (PyObject) so that we can store them in our Rust Expr enums - // See: https://pyo3.rs/v0.18.2/types#pyt-and-pyobject - let partial_stateful_udf = partial_stateful_udf.to_object(py); let expressions_map: Vec = expressions.into_iter().map(|pyexpr| pyexpr.expr).collect(); - let init_args = init_args.map(|args| args.to_object(py)); + let init_args = init_args.map(|args| args.into()); Ok(PyExpr { expr: stateful_udf( name, @@ -219,7 +211,7 @@ pub fn stateful_udf( &expressions_map, return_dtype.dtype, resource_request, - init_args.map(|a| a.into()), + init_args, batch_size, concurrency, )? diff --git a/src/daft-functions/src/image/mod.rs b/src/daft-functions/src/image/mod.rs index a6a0fa62da..782df08855 100644 --- a/src/daft-functions/src/image/mod.rs +++ b/src/daft-functions/src/image/mod.rs @@ -8,12 +8,12 @@ pub mod to_mode; use pyo3::prelude::*; #[cfg(feature = "python")] -pub fn register_modules(_py: Python, parent: &PyModule) -> PyResult<()> { - parent.add_wrapped(wrap_pyfunction!(crop::py_crop))?; - parent.add_wrapped(wrap_pyfunction!(decode::py_decode))?; - parent.add_wrapped(wrap_pyfunction!(encode::py_encode))?; - parent.add_wrapped(wrap_pyfunction!(resize::py_resize))?; - parent.add_wrapped(wrap_pyfunction!(to_mode::py_image_to_mode))?; +pub fn register_modules(parent: &Bound) -> PyResult<()> { + parent.add_function(wrap_pyfunction_bound!(crop::py_crop, parent)?)?; + parent.add_function(wrap_pyfunction_bound!(decode::py_decode, parent)?)?; + parent.add_function(wrap_pyfunction_bound!(encode::py_encode, parent)?)?; + parent.add_function(wrap_pyfunction_bound!(resize::py_resize, parent)?)?; + parent.add_function(wrap_pyfunction_bound!(to_mode::py_image_to_mode, parent)?)?; Ok(()) } diff --git a/src/daft-functions/src/lib.rs b/src/daft-functions/src/lib.rs index 6100a4563f..94a886eba3 100644 --- a/src/daft-functions/src/lib.rs +++ b/src/daft-functions/src/lib.rs @@ -16,20 +16,38 @@ use pyo3::prelude::*; use snafu::Snafu; #[cfg(feature = "python")] -pub fn register_modules(_py: Python, parent: &PyModule) -> PyResult<()> { +pub fn register_modules(parent: &Bound) -> PyResult<()> { // keep in sorted order - parent.add_wrapped(wrap_pyfunction!(count_matches::python::utf8_count_matches))?; - parent.add_wrapped(wrap_pyfunction!(distance::cosine::python::cosine_distance))?; - parent.add_wrapped(wrap_pyfunction!(hash::python::hash))?; - parent.add_wrapped(wrap_pyfunction!(list_sort::python::list_sort))?; - parent.add_wrapped(wrap_pyfunction!(minhash::python::minhash))?; - parent.add_wrapped(wrap_pyfunction!(numeric::cbrt::python::cbrt))?; - parent.add_wrapped(wrap_pyfunction!(to_struct::python::to_struct))?; - parent.add_wrapped(wrap_pyfunction!(tokenize::python::tokenize_decode))?; - parent.add_wrapped(wrap_pyfunction!(tokenize::python::tokenize_encode))?; - parent.add_wrapped(wrap_pyfunction!(uri::python::url_download))?; - parent.add_wrapped(wrap_pyfunction!(uri::python::url_upload))?; - image::register_modules(_py, parent)?; + parent.add_function(wrap_pyfunction_bound!( + count_matches::python::utf8_count_matches, + parent + )?)?; + parent.add_function(wrap_pyfunction_bound!( + distance::cosine::python::cosine_distance, + parent + )?)?; + parent.add_function(wrap_pyfunction_bound!(hash::python::hash, parent)?)?; + parent.add_function(wrap_pyfunction_bound!( + list_sort::python::list_sort, + parent + )?)?; + parent.add_function(wrap_pyfunction_bound!(minhash::python::minhash, parent)?)?; + parent.add_function(wrap_pyfunction_bound!(numeric::cbrt::python::cbrt, parent)?)?; + parent.add_function(wrap_pyfunction_bound!( + to_struct::python::to_struct, + parent + )?)?; + parent.add_function(wrap_pyfunction_bound!( + tokenize::python::tokenize_decode, + parent + )?)?; + parent.add_function(wrap_pyfunction_bound!( + tokenize::python::tokenize_encode, + parent + )?)?; + parent.add_function(wrap_pyfunction_bound!(uri::python::url_download, parent)?)?; + parent.add_function(wrap_pyfunction_bound!(uri::python::url_upload, parent)?)?; + image::register_modules(parent)?; Ok(()) } diff --git a/src/daft-image/src/python.rs b/src/daft-image/src/python.rs index 99b93dd265..6b73b33e54 100644 --- a/src/daft-image/src/python.rs +++ b/src/daft-image/src/python.rs @@ -42,12 +42,12 @@ pub fn to_mode(s: &PySeries, mode: &ImageMode) -> PyResult { Ok(s.into()) } -pub fn register_modules(_py: Python, parent: &PyModule) -> PyResult<()> { - let module = PyModule::new(_py, "image")?; +pub fn register_modules(parent: &Bound) -> PyResult<()> { + let module = PyModule::new_bound(parent.py(), "image")?; module.add_wrapped(wrap_pyfunction!(decode))?; module.add_wrapped(wrap_pyfunction!(encode))?; module.add_wrapped(wrap_pyfunction!(resize))?; module.add_wrapped(wrap_pyfunction!(to_mode))?; - parent.add_submodule(module)?; + parent.add_submodule(&module)?; Ok(()) } diff --git a/src/daft-io/src/python.rs b/src/daft-io/src/python.rs index 2f45434508..f573a1bc6f 100644 --- a/src/daft-io/src/python.rs +++ b/src/daft-io/src/python.rs @@ -5,10 +5,7 @@ mod py { use crate::{get_io_client, get_runtime, parse_url, s3_like, stats::IOStatsContext}; use common_error::DaftResult; use futures::TryStreamExt; - use pyo3::{ - prelude::*, - types::{PyDict, PyList}, - }; + use pyo3::{prelude::*, types::PyDict}; #[pyfunction] fn io_glob( @@ -19,7 +16,7 @@ mod py { fanout_limit: Option, page_size: Option, limit: Option, - ) -> PyResult<&PyList> { + ) -> PyResult>> { let multithreaded_io = multithreaded_io.unwrap_or(true); let io_stats = IOStatsContext::new(format!("io_glob for {path}")); let io_stats_handle = io_stats.clone(); @@ -49,16 +46,15 @@ mod py { Ok(files) }) }); - let lsr = lsr?; let mut to_rtn = vec![]; - for file in lsr { - let dict = PyDict::new(py); + for file in lsr? { + let dict = PyDict::new_bound(py); dict.set_item("type", format!("{:?}", file.filetype))?; dict.set_item("path", file.filepath)?; dict.set_item("size", file.size)?; to_rtn.push(dict); } - Ok(PyList::new(py, to_rtn)) + Ok(to_rtn) } /// Creates an S3Config from the current environment, auto-discovering variables such as @@ -72,10 +68,10 @@ mod py { Ok(common_io_config::python::S3Config { config: s3_config? }) } - pub fn register_modules(py: Python, parent: &PyModule) -> PyResult<()> { - common_io_config::python::register_modules(py, parent)?; - parent.add_function(wrap_pyfunction!(io_glob, parent)?)?; - parent.add_function(wrap_pyfunction!(s3_config_from_env, parent)?)?; + pub fn register_modules(parent: &Bound) -> PyResult<()> { + common_io_config::python::register_modules(parent)?; + parent.add_function(wrap_pyfunction_bound!(io_glob, parent)?)?; + parent.add_function(wrap_pyfunction_bound!(s3_config_from_env, parent)?)?; Ok(()) } } diff --git a/src/daft-json/Cargo.toml b/src/daft-json/Cargo.toml index f28c6bbead..241156403c 100644 --- a/src/daft-json/Cargo.toml +++ b/src/daft-json/Cargo.toml @@ -1,5 +1,5 @@ [dependencies] -arrow2 = {workspace = true, features = ["io_json"]} +arrow2 = {workspace = true} chrono = {workspace = true} common-error = {path = "../common/error", default-features = false} common-py-serde = {path = "../common/py-serde", default-features = false} diff --git a/src/daft-json/src/lib.rs b/src/daft-json/src/lib.rs index 5195ee9984..1c56ba7167 100644 --- a/src/daft-json/src/lib.rs +++ b/src/daft-json/src/lib.rs @@ -67,11 +67,14 @@ impl From for Error { type Result = std::result::Result; #[cfg(feature = "python")] -pub fn register_modules(_py: Python, parent: &PyModule) -> PyResult<()> { +pub fn register_modules(parent: &Bound) -> PyResult<()> { parent.add_class::()?; parent.add_class::()?; parent.add_class::()?; - parent.add_wrapped(wrap_pyfunction!(python::pylib::read_json))?; - parent.add_wrapped(wrap_pyfunction!(python::pylib::read_json_schema))?; + parent.add_function(wrap_pyfunction_bound!(python::pylib::read_json, parent)?)?; + parent.add_function(wrap_pyfunction_bound!( + python::pylib::read_json_schema, + parent + )?)?; Ok(()) } diff --git a/src/daft-local-execution/src/lib.rs b/src/daft-local-execution/src/lib.rs index 92e42e6fe8..5d1fb6d795 100644 --- a/src/daft-local-execution/src/lib.rs +++ b/src/daft-local-execution/src/lib.rs @@ -95,7 +95,7 @@ impl From for DaftError { type Result = std::result::Result; #[cfg(feature = "python")] -pub fn register_modules(_py: Python, parent: &PyModule) -> PyResult<()> { +pub fn register_modules(parent: &Bound) -> PyResult<()> { parent.add_class::()?; Ok(()) } diff --git a/src/daft-local-execution/src/run.rs b/src/daft-local-execution/src/run.rs index ba500c272b..182b298457 100644 --- a/src/daft-local-execution/src/run.rs +++ b/src/daft-local-execution/src/run.rs @@ -40,7 +40,7 @@ impl LocalPartitionIterator { fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> { slf } - fn __next__(mut slf: PyRefMut<'_, Self>, py: Python<'_>) -> PyResult> { + fn __next__(mut slf: PyRefMut<'_, Self>, py: Python) -> PyResult> { let iter = &mut slf.iter; Ok(py.allow_threads(|| iter.next().transpose())?) } @@ -57,7 +57,7 @@ impl NativeExecutor { #[staticmethod] pub fn from_logical_plan_builder( logical_plan_builder: &PyLogicalPlanBuilder, - py: Python<'_>, + py: Python, ) -> PyResult { py.allow_threads(|| { let logical_plan = logical_plan_builder.builder.build(); diff --git a/src/daft-micropartition/src/micropartition.rs b/src/daft-micropartition/src/micropartition.rs index 57a47dedf2..261124c099 100644 --- a/src/daft-micropartition/src/micropartition.rs +++ b/src/daft-micropartition/src/micropartition.rs @@ -382,7 +382,7 @@ fn materialize_scan_task( })? } FileFormatConfig::PythonFunction => { - use pyo3::ToPyObject; + use pyo3::{types::PyAnyMethods, PyObject}; let table_iterators = scan_task.sources.iter().map(|source| { // Call Python function to create an Iterator (Grabs the GIL and then releases it) @@ -394,8 +394,13 @@ fn materialize_scan_task( .. } => { Python::with_gil(|py| { - let func = py.import(pyo3::types::PyString::new(py, module)).unwrap_or_else(|_| panic!("Cannot import factory function from module {module}")).getattr(pyo3::types::PyString::new(py, func_name)).unwrap_or_else(|_| panic!("Cannot find function {func_name} in module {module}")); - Ok(func.call(func_args.to_pytuple(py), None).with_context(|_| PyIOSnafu)?.downcast::().expect("Function must return an iterator of tables")).map(|it| it.to_object(py)) + let func = py.import_bound(module.as_str()) + .unwrap_or_else(|_| panic!("Cannot import factory function from module {module}")) + .getattr(func_name.as_str()) + .unwrap_or_else(|_| panic!("Cannot find function {func_name} in module {module}")); + func.call(func_args.to_pytuple(py), None) + .with_context(|_| PyIOSnafu) + .map(Into::::into) }) }, _ => unreachable!("PythonFunction file format must be paired with PythonFactoryFunction data file sources"), @@ -417,8 +422,9 @@ fn materialize_scan_task( // Grab the GIL to call next() on the iterator, and then release it once we have the Table let table = match Python::with_gil(|py| { iterator - .downcast::(py) - .unwrap() + .downcast_bound::(py) + .expect("Function must return an iterator of tables") + .clone() .next() .map(|result| { result @@ -1007,11 +1013,11 @@ fn _read_delete_files( } #[allow(clippy::too_many_arguments)] -fn _read_parquet_into_loaded_micropartition( +fn _read_parquet_into_loaded_micropartition>( io_client: Arc, multithreaded_io: bool, uris: &[&str], - columns: Option<&[&str]>, + columns: Option<&[T]>, start_offset: Option, num_rows: Option, iceberg_delete_files: Option>, @@ -1039,7 +1045,9 @@ fn _read_parquet_into_loaded_micropartition( }) .transpose()?; - let file_column_names = _get_file_column_names(columns, partition_spec); + let columns = columns.map(|cols| cols.iter().map(|c| c.as_ref()).collect::>()); + + let file_column_names = _get_file_column_names(columns.as_deref(), partition_spec); let all_tables = read_parquet_bulk( uris, file_column_names.as_deref(), @@ -1070,7 +1078,7 @@ fn _read_parquet_into_loaded_micropartition( } }; - let pruned_daft_schema = prune_fields_from_schema(full_daft_schema, columns)?; + let pruned_daft_schema = prune_fields_from_schema(full_daft_schema, columns.as_deref())?; let fill_map = partition_spec.map(|pspec| pspec.to_fill_map()); let all_tables = all_tables @@ -1088,9 +1096,9 @@ fn _read_parquet_into_loaded_micropartition( } #[allow(clippy::too_many_arguments)] -pub(crate) fn read_parquet_into_micropartition( +pub(crate) fn read_parquet_into_micropartition>( uris: &[&str], - columns: Option<&[&str]>, + columns: Option<&[T]>, start_offset: Option, num_rows: Option, iceberg_delete_files: Option>, @@ -1285,8 +1293,13 @@ pub(crate) fn read_parquet_into_micropartition( Pushdowns::new( None, None, - columns - .map(|cols| Arc::new(cols.iter().map(|v| v.to_string()).collect::>())), + columns.map(|cols| { + Arc::new( + cols.iter() + .map(|v| v.as_ref().to_string()) + .collect::>(), + ) + }), num_rows, ), ); diff --git a/src/daft-micropartition/src/python.rs b/src/daft-micropartition/src/python.rs index 86bec613bb..651b6c1556 100644 --- a/src/daft-micropartition/src/python.rs +++ b/src/daft-micropartition/src/python.rs @@ -116,12 +116,12 @@ impl PyMicroPartition { #[staticmethod] pub fn from_arrow_record_batches( py: Python, - record_batches: Vec<&PyAny>, + record_batches: Vec>, schema: &PySchema, ) -> PyResult { // TODO: Cleanup and refactor code for sharing with Table let tables = record_batches - .iter() + .into_iter() .map(|rb| daft_table::ffi::record_batches_to_table(py, &[rb], schema.schema.clone())) .collect::>>()?; @@ -570,7 +570,7 @@ impl PyMicroPartition { pub fn read_parquet( py: Python, uri: &str, - columns: Option>, + columns: Option>, start_offset: Option, num_rows: Option, row_groups: Option>, @@ -614,8 +614,8 @@ impl PyMicroPartition { #[staticmethod] pub fn read_parquet_bulk( py: Python, - uris: Vec<&str>, - columns: Option>, + uris: Vec, + columns: Option>, start_offset: Option, num_rows: Option, row_groups: Option>>>, @@ -635,7 +635,7 @@ impl PyMicroPartition { ); crate::micropartition::read_parquet_into_micropartition( - uris.as_ref(), + uris.iter().map(AsRef::as_ref).collect::>().as_ref(), columns.as_deref(), start_offset, num_rows, @@ -659,17 +659,15 @@ impl PyMicroPartition { #[staticmethod] pub fn _from_unloaded_table_state( - schema_bytes: &PyBytes, - loading_scan_task_bytes: &PyBytes, - metadata_bytes: &PyBytes, - statistics_bytes: &PyBytes, + schema_bytes: &[u8], + loading_scan_task_bytes: &[u8], + metadata_bytes: &[u8], + statistics_bytes: &[u8], ) -> PyResult { - let schema = bincode::deserialize::(schema_bytes.as_bytes()).unwrap(); - let scan_task = - bincode::deserialize::(loading_scan_task_bytes.as_bytes()).unwrap(); - let metadata = bincode::deserialize::(metadata_bytes.as_bytes()).unwrap(); - let statistics = - bincode::deserialize::>(statistics_bytes.as_bytes()).unwrap(); + let schema = bincode::deserialize::(schema_bytes).unwrap(); + let scan_task = bincode::deserialize::(loading_scan_task_bytes).unwrap(); + let metadata = bincode::deserialize::(metadata_bytes).unwrap(); + let statistics = bincode::deserialize::>(statistics_bytes).unwrap(); Ok(MicroPartition { schema: Arc::new(schema), @@ -683,15 +681,14 @@ impl PyMicroPartition { #[staticmethod] pub fn _from_loaded_table_state( py: Python, - schema_bytes: &PyBytes, + schema_bytes: &[u8], table_objs: Vec, - metadata_bytes: &PyBytes, - statistics_bytes: &PyBytes, + metadata_bytes: &[u8], + statistics_bytes: &[u8], ) -> PyResult { - let schema = bincode::deserialize::(schema_bytes.as_bytes()).unwrap(); - let metadata = bincode::deserialize::(metadata_bytes.as_bytes()).unwrap(); - let statistics = - bincode::deserialize::>(statistics_bytes.as_bytes()).unwrap(); + let schema = bincode::deserialize::(schema_bytes).unwrap(); + let metadata = bincode::deserialize::(metadata_bytes).unwrap(); + let statistics = bincode::deserialize::>(statistics_bytes).unwrap(); let tables = table_objs .into_iter() @@ -712,16 +709,17 @@ impl PyMicroPartition { } pub fn __reduce__(&self, py: Python) -> PyResult<(PyObject, PyObject)> { - let schema_bytes = PyBytes::new(py, &bincode::serialize(&self.inner.schema).unwrap()); + let schema_bytes = PyBytes::new_bound(py, &bincode::serialize(&self.inner.schema).unwrap()); let py_metadata_bytes = - PyBytes::new(py, &bincode::serialize(&self.inner.metadata).unwrap()); - let py_stats_bytes = PyBytes::new(py, &bincode::serialize(&self.inner.statistics).unwrap()); + PyBytes::new_bound(py, &bincode::serialize(&self.inner.metadata).unwrap()); + let py_stats_bytes = + PyBytes::new_bound(py, &bincode::serialize(&self.inner.statistics).unwrap()); let guard = self.inner.state.lock().unwrap(); if let TableState::Loaded(tables) = guard.deref() { let _from_pytable = py - .import(pyo3::intern!(py, "daft.table"))? + .import_bound(pyo3::intern!(py, "daft.table"))? .getattr(pyo3::intern!(py, "Table"))? .getattr(pyo3::intern!(py, "_from_pytable"))?; @@ -730,17 +728,17 @@ impl PyMicroPartition { .map(|pt| _from_pytable.call1((pt,))) .collect::>>()?; Ok(( - Self::type_object(py) + Self::type_object_bound(py) .getattr(pyo3::intern!(py, "_from_loaded_table_state"))? - .to_object(py), + .into(), (schema_bytes, pyobjs, py_metadata_bytes, py_stats_bytes).to_object(py), )) } else if let TableState::Unloaded(params) = guard.deref() { - let py_params_bytes = PyBytes::new(py, &bincode::serialize(params).unwrap()); + let py_params_bytes = PyBytes::new_bound(py, &bincode::serialize(params).unwrap()); Ok(( - Self::type_object(py) + Self::type_object_bound(py) .getattr(pyo3::intern!(py, "_from_unloaded_table_state"))? - .to_object(py), + .into(), ( schema_bytes, py_params_bytes, @@ -764,15 +762,15 @@ pub(crate) fn read_json_into_py_table( num_rows: Option, ) -> PyResult { let read_options = py - .import(pyo3::intern!(py, "daft.runners.partitioning"))? + .import_bound(pyo3::intern!(py, "daft.runners.partitioning"))? .getattr(pyo3::intern!(py, "TableReadOptions"))? .call1((num_rows, include_columns))?; let py_schema = py - .import(pyo3::intern!(py, "daft.logical.schema"))? + .import_bound(pyo3::intern!(py, "daft.logical.schema"))? .getattr(pyo3::intern!(py, "Schema"))? .getattr(pyo3::intern!(py, "_from_pyschema"))? .call1((schema,))?; - py.import(pyo3::intern!(py, "daft.table.table_io"))? + py.import_bound(pyo3::intern!(py, "daft.table.table_io"))? .getattr(pyo3::intern!(py, "read_json"))? .call1((uri, py_schema, storage_config, read_options))? .getattr(pyo3::intern!(py, "to_table"))? @@ -794,20 +792,20 @@ pub(crate) fn read_csv_into_py_table( num_rows: Option, ) -> PyResult { let py_schema = py - .import(pyo3::intern!(py, "daft.logical.schema"))? + .import_bound(pyo3::intern!(py, "daft.logical.schema"))? .getattr(pyo3::intern!(py, "Schema"))? .getattr(pyo3::intern!(py, "_from_pyschema"))? .call1((schema,))?; let read_options = py - .import(pyo3::intern!(py, "daft.runners.partitioning"))? + .import_bound(pyo3::intern!(py, "daft.runners.partitioning"))? .getattr(pyo3::intern!(py, "TableReadOptions"))? .call1((num_rows, include_columns))?; let header_idx = if has_header { Some(0) } else { None }; let parse_options = py - .import(pyo3::intern!(py, "daft.runners.partitioning"))? + .import_bound(pyo3::intern!(py, "daft.runners.partitioning"))? .getattr(pyo3::intern!(py, "TableParseCSVOptions"))? .call1((delimiter, header_idx, double_quote))?; - py.import(pyo3::intern!(py, "daft.table.table_io"))? + py.import_bound(pyo3::intern!(py, "daft.table.table_io"))? .getattr(pyo3::intern!(py, "read_csv"))? .call1((uri, py_schema, storage_config, parse_options, read_options))? .getattr(pyo3::intern!(py, "to_table"))? @@ -826,24 +824,24 @@ pub(crate) fn read_parquet_into_py_table( num_rows: Option, ) -> PyResult { let py_schema = py - .import(pyo3::intern!(py, "daft.logical.schema"))? + .import_bound(pyo3::intern!(py, "daft.logical.schema"))? .getattr(pyo3::intern!(py, "Schema"))? .getattr(pyo3::intern!(py, "_from_pyschema"))? .call1((schema,))?; let read_options = py - .import(pyo3::intern!(py, "daft.runners.partitioning"))? + .import_bound(pyo3::intern!(py, "daft.runners.partitioning"))? .getattr(pyo3::intern!(py, "TableReadOptions"))? .call1((num_rows, include_columns))?; let py_coerce_int96_timestamp_unit = py - .import(pyo3::intern!(py, "daft.datatype"))? + .import_bound(pyo3::intern!(py, "daft.datatype"))? .getattr(pyo3::intern!(py, "TimeUnit"))? .getattr(pyo3::intern!(py, "_from_pytimeunit"))? .call1((coerce_int96_timestamp_unit,))?; let parse_options = py - .import(pyo3::intern!(py, "daft.runners.partitioning"))? + .import_bound(pyo3::intern!(py, "daft.runners.partitioning"))? .getattr(pyo3::intern!(py, "TableParseParquetOptions"))? .call1((py_coerce_int96_timestamp_unit,))?; - py.import(pyo3::intern!(py, "daft.table.table_io"))? + py.import_bound(pyo3::intern!(py, "daft.table.table_io"))? .getattr(pyo3::intern!(py, "read_parquet"))? .call1((uri, py_schema, storage_config, read_options, parse_options))? .getattr(pyo3::intern!(py, "to_table"))? @@ -862,13 +860,13 @@ pub(crate) fn read_sql_into_py_table( num_rows: Option, ) -> PyResult { let py_schema = py - .import(pyo3::intern!(py, "daft.logical.schema"))? + .import_bound(pyo3::intern!(py, "daft.logical.schema"))? .getattr(pyo3::intern!(py, "Schema"))? .getattr(pyo3::intern!(py, "_from_pyschema"))? .call1((schema,))?; let py_predicate = match predicate { Some(p) => Some( - py.import(pyo3::intern!(py, "daft.expressions.expressions"))? + py.import_bound(pyo3::intern!(py, "daft.expressions.expressions"))? .getattr(pyo3::intern!(py, "Expression"))? .getattr(pyo3::intern!(py, "_from_pyexpr"))? .call1((p,))?, @@ -876,10 +874,10 @@ pub(crate) fn read_sql_into_py_table( None => None, }; let read_options = py - .import(pyo3::intern!(py, "daft.runners.partitioning"))? + .import_bound(pyo3::intern!(py, "daft.runners.partitioning"))? .getattr(pyo3::intern!(py, "TableReadOptions"))? .call1((num_rows, include_columns))?; - py.import(pyo3::intern!(py, "daft.table.table_io"))? + py.import_bound(pyo3::intern!(py, "daft.table.table_io"))? .getattr(pyo3::intern!(py, "read_sql"))? .call1((sql, conn, py_schema, read_options, py_predicate))? .getattr(pyo3::intern!(py, "to_table"))? @@ -906,7 +904,7 @@ impl From for Arc { } } -pub fn register_modules(_py: Python, parent: &PyModule) -> PyResult<()> { +pub fn register_modules(parent: &Bound) -> PyResult<()> { parent.add_class::()?; Ok(()) } diff --git a/src/daft-parquet/src/file.rs b/src/daft-parquet/src/file.rs index 57e7649ed0..d5bb1a38dd 100644 --- a/src/daft-parquet/src/file.rs +++ b/src/daft-parquet/src/file.rs @@ -223,8 +223,8 @@ impl ParquetReaderBuilder { &self.metadata } - pub fn prune_columns>(mut self, columns: &[S]) -> super::Result { - self.selected_columns = Some(HashSet::from_iter(columns.iter().map(|s| s.to_string()))); + pub fn prune_columns(mut self, columns: &[String]) -> super::Result { + self.selected_columns = Some(HashSet::from_iter(columns.iter().cloned())); Ok(self) } diff --git a/src/daft-parquet/src/python.rs b/src/daft-parquet/src/python.rs index c22e6da97d..66c400e9ec 100644 --- a/src/daft-parquet/src/python.rs +++ b/src/daft-parquet/src/python.rs @@ -7,7 +7,7 @@ pub mod pylib { use daft_dsl::python::PyExpr; use daft_io::{get_io_client, python::IOConfig, IOStatsContext}; use daft_table::python::PyTable; - use pyo3::{pyfunction, types::PyModule, PyResult, Python}; + use pyo3::{pyfunction, types::PyModule, Bound, PyResult, Python}; use std::{collections::BTreeMap, sync::Arc}; use crate::read::{ArrowChunk, ParquetSchemaInferenceOptions}; @@ -16,7 +16,7 @@ pub mod pylib { pub fn read_parquet( py: Python, uri: &str, - columns: Option>, + columns: Option>, start_offset: Option, num_rows: Option, row_groups: Option>, @@ -38,7 +38,7 @@ pub mod pylib { let result = crate::read::read_parquet( uri, - columns.as_deref(), + columns, start_offset, num_rows, row_groups, @@ -66,20 +66,20 @@ pub mod pylib { schema: arrow2::datatypes::SchemaRef, all_arrays: Vec, num_rows: usize, - pyarrow: &PyModule, + pyarrow: &Bound, ) -> PyResult { let converted_arrays = all_arrays .into_iter() .map(|v| { v.into_iter() - .map(|a| to_py_array(a, py, pyarrow)) + .map(|a| to_py_array(py, a, pyarrow).map(|pyarray| pyarray.unbind())) .collect::>>() }) .collect::>>()?; let fields = schema .fields .iter() - .map(|f| field_to_py(f, py, pyarrow)) + .map(|f| field_to_py(py, f, pyarrow)) .collect::, _>>()?; let metadata = &schema.metadata; Ok((fields, metadata.clone(), converted_arrays, num_rows)) @@ -90,7 +90,7 @@ pub mod pylib { pub fn read_parquet_into_pyarrow( py: Python, uri: &str, - columns: Option>, + columns: Option>, start_offset: Option, num_rows: Option, row_groups: Option>, @@ -110,7 +110,7 @@ pub mod pylib { crate::read::read_parquet_into_pyarrow( uri, - columns.as_deref(), + columns, start_offset, num_rows, row_groups, @@ -122,15 +122,15 @@ pub mod pylib { ) })?; let (schema, all_arrays, num_rows) = read_parquet_result; - let pyarrow = py.import(pyo3::intern!(py, "pyarrow"))?; - convert_pyarrow_parquet_read_result_into_py(py, schema, all_arrays, num_rows, pyarrow) + let pyarrow = py.import_bound(pyo3::intern!(py, "pyarrow"))?; + convert_pyarrow_parquet_read_result_into_py(py, schema, all_arrays, num_rows, &pyarrow) } #[allow(clippy::too_many_arguments)] #[pyfunction] pub fn read_parquet_bulk( py: Python, - uris: Vec<&str>, - columns: Option>, + uris: Vec, + columns: Option>, start_offset: Option, num_rows: Option, row_groups: Option>>>, @@ -151,7 +151,7 @@ pub mod pylib { coerce_int96_timestamp_unit.map(|tu| tu.timeunit), ); Ok(crate::read::read_parquet_bulk( - uris.as_ref(), + uris.iter().map(AsRef::as_ref).collect::>().as_ref(), columns.as_deref(), start_offset, num_rows, @@ -177,8 +177,8 @@ pub mod pylib { #[pyfunction] pub fn read_parquet_into_pyarrow_bulk( py: Python, - uris: Vec<&str>, - columns: Option>, + uris: Vec, + columns: Option>, start_offset: Option, num_rows: Option, row_groups: Option>>>, @@ -197,7 +197,7 @@ pub mod pylib { ); crate::read::read_parquet_into_pyarrow_bulk( - uris.as_ref(), + uris.iter().map(AsRef::as_ref).collect::>().as_ref(), columns.as_deref(), start_offset, num_rows, @@ -209,11 +209,11 @@ pub mod pylib { schema_infer_options, ) })?; - let pyarrow = py.import(pyo3::intern!(py, "pyarrow"))?; + let pyarrow = py.import_bound(pyo3::intern!(py, "pyarrow"))?; parquet_read_results .into_iter() .map(|(s, all_arrays, num_rows)| { - convert_pyarrow_parquet_read_result_into_py(py, s, all_arrays, num_rows, pyarrow) + convert_pyarrow_parquet_read_result_into_py(py, s, all_arrays, num_rows, &pyarrow) }) .collect::>>() } @@ -276,12 +276,21 @@ pub mod pylib { }) } } -pub fn register_modules(_py: Python, parent: &PyModule) -> PyResult<()> { - parent.add_wrapped(wrap_pyfunction!(pylib::read_parquet))?; - parent.add_wrapped(wrap_pyfunction!(pylib::read_parquet_into_pyarrow))?; - parent.add_wrapped(wrap_pyfunction!(pylib::read_parquet_into_pyarrow_bulk))?; - parent.add_wrapped(wrap_pyfunction!(pylib::read_parquet_bulk))?; - parent.add_wrapped(wrap_pyfunction!(pylib::read_parquet_schema))?; - parent.add_wrapped(wrap_pyfunction!(pylib::read_parquet_statistics))?; +pub fn register_modules(parent: &Bound) -> PyResult<()> { + parent.add_function(wrap_pyfunction_bound!(pylib::read_parquet, parent)?)?; + parent.add_function(wrap_pyfunction_bound!( + pylib::read_parquet_into_pyarrow, + parent + )?)?; + parent.add_function(wrap_pyfunction_bound!( + pylib::read_parquet_into_pyarrow_bulk, + parent + )?)?; + parent.add_function(wrap_pyfunction_bound!(pylib::read_parquet_bulk, parent)?)?; + parent.add_function(wrap_pyfunction_bound!(pylib::read_parquet_schema, parent)?)?; + parent.add_function(wrap_pyfunction_bound!( + pylib::read_parquet_statistics, + parent + )?)?; Ok(()) } diff --git a/src/daft-parquet/src/read.rs b/src/daft-parquet/src/read.rs index 6f33a1c51b..e9d16e901c 100644 --- a/src/daft-parquet/src/read.rs +++ b/src/daft-parquet/src/read.rs @@ -99,7 +99,7 @@ fn limit_with_delete_rows( #[allow(clippy::too_many_arguments)] async fn read_parquet_single( uri: &str, - columns: Option<&[&str]>, + columns: Option>, start_offset: Option, num_rows: Option, row_groups: Option>, @@ -113,10 +113,10 @@ async fn read_parquet_single( chunk_size: Option, ) -> DaftResult { let field_id_mapping_provided = field_id_mapping.is_some(); + let mut columns_to_read = columns.clone(); let columns_to_return = columns; let num_rows_to_return = num_rows; let mut num_rows_to_read = num_rows; - let mut columns_to_read = columns.map(|s| s.iter().map(|s| s.to_string()).collect_vec()); let requested_columns = columns_to_read.as_ref().map(|v| v.len()); if let Some(ref pred) = predicate { num_rows_to_read = None; @@ -162,8 +162,8 @@ async fn read_parquet_single( .await?; let builder = builder.set_infer_schema_options(schema_infer_options); - let builder = if let Some(columns) = columns_to_read.as_ref() { - builder.prune_columns(columns.as_slice())? + let builder = if let Some(columns) = &columns_to_read { + builder.prune_columns(columns)? } else { builder }; @@ -246,7 +246,7 @@ async fn read_parquet_single( // TODO ideally pipeline this with IO and before concatenating, rather than after table = table.filter(&[predicate])?; if let Some(oc) = columns_to_return { - table = table.get_columns(oc)?; + table = table.get_columns(&oc)?; } if let Some(nr) = num_rows_to_return { table = table.head(nr)?; @@ -375,8 +375,8 @@ async fn stream_parquet_single( .await?; let builder = builder.set_infer_schema_options(schema_infer_options); - let builder = if let Some(columns) = columns_to_read.as_ref() { - builder.prune_columns(columns.as_slice())? + let builder = if let Some(columns) = &columns_to_read { + builder.prune_columns(columns)? } else { builder }; @@ -462,7 +462,7 @@ async fn stream_parquet_single( #[allow(clippy::too_many_arguments)] async fn read_parquet_single_into_arrow( uri: &str, - columns: Option<&[&str]>, + columns: Option>, start_offset: Option, num_rows: Option, row_groups: Option>, @@ -478,7 +478,7 @@ async fn read_parquet_single_into_arrow( let (metadata, schema, all_arrays, num_rows_read) = crate::stream_reader::local_parquet_read_into_arrow_async( fixed_uri.as_ref(), - columns.map(|s| s.iter().map(|s| s.to_string()).collect_vec()), + columns.clone(), start_offset, num_rows, row_groups.clone(), @@ -499,7 +499,7 @@ async fn read_parquet_single_into_arrow( .await?; let builder = builder.set_infer_schema_options(schema_infer_options); - let builder = if let Some(columns) = columns { + let builder = if let Some(columns) = &columns { builder.prune_columns(columns)? } else { builder @@ -586,7 +586,7 @@ async fn read_parquet_single_into_arrow( }?; }; - let expected_num_columns = if let Some(columns) = columns { + let expected_num_columns = if let Some(columns) = &columns { columns.len() } else { metadata_num_columns @@ -610,7 +610,7 @@ async fn read_parquet_single_into_arrow( #[allow(clippy::too_many_arguments)] pub fn read_parquet( uri: &str, - columns: Option<&[&str]>, + columns: Option>, start_offset: Option, num_rows: Option, row_groups: Option>, @@ -650,7 +650,7 @@ pub type ParquetPyarrowChunk = (arrow2::datatypes::SchemaRef, Vec, u #[allow(clippy::too_many_arguments)] pub fn read_parquet_into_pyarrow( uri: &str, - columns: Option<&[&str]>, + columns: Option>, start_offset: Option, num_rows: Option, row_groups: Option>, @@ -690,9 +690,9 @@ pub fn read_parquet_into_pyarrow( } #[allow(clippy::too_many_arguments)] -pub fn read_parquet_bulk( +pub fn read_parquet_bulk>( uris: &[&str], - columns: Option<&[&str]>, + columns: Option<&[T]>, start_offset: Option, num_rows: Option, row_groups: Option>>>, @@ -709,7 +709,7 @@ pub fn read_parquet_bulk( ) -> DaftResult> { let runtime_handle = daft_io::get_runtime(multithreaded_io)?; - let owned_columns = columns.map(|s| s.iter().map(|v| String::from(*v)).collect::>()); + let columns = columns.map(|s| s.iter().map(|v| v.as_ref().to_string()).collect::>()); if let Some(ref row_groups) = row_groups { if row_groups.len() != uris.len() { return Err(common_error::DaftError::ValueError(format!( @@ -723,7 +723,7 @@ pub fn read_parquet_bulk( .block_on_current_thread(async move { let task_stream = futures::stream::iter(uris.iter().enumerate().map(|(i, uri)| { let uri = uri.to_string(); - let owned_columns = owned_columns.clone(); + let owned_columns = columns.clone(); let owned_row_group = row_groups.as_ref().and_then(|rgs| rgs[i].clone()); let owned_predicate = predicate.clone(); let metadata = metadata.as_ref().map(|mds| mds[i].clone()); @@ -734,12 +734,9 @@ pub fn read_parquet_bulk( let owned_field_id_mapping = field_id_mapping.clone(); let delete_rows = delete_map.as_ref().and_then(|m| m.get(&uri).cloned()); tokio::task::spawn(async move { - let columns = owned_columns - .as_ref() - .map(|s| s.iter().map(AsRef::as_ref).collect::>()); read_parquet_single( &uri, - columns.as_deref(), + owned_columns, start_offset, num_rows, owned_row_group, @@ -817,9 +814,9 @@ pub async fn stream_parquet( } #[allow(clippy::too_many_arguments)] -pub fn read_parquet_into_pyarrow_bulk( +pub fn read_parquet_into_pyarrow_bulk>( uris: &[&str], - columns: Option<&[&str]>, + columns: Option<&[T]>, start_offset: Option, num_rows: Option, row_groups: Option>>>, @@ -830,7 +827,7 @@ pub fn read_parquet_into_pyarrow_bulk( schema_infer_options: ParquetSchemaInferenceOptions, ) -> DaftResult> { let runtime_handle = get_runtime(multithreaded_io)?; - let owned_columns = columns.map(|s| s.iter().map(|v| String::from(*v)).collect::>()); + let columns = columns.map(|s| s.iter().map(|v| v.as_ref().to_string()).collect::>()); if let Some(ref row_groups) = row_groups { if row_groups.len() != uris.len() { return Err(common_error::DaftError::ValueError(format!( @@ -844,21 +841,18 @@ pub fn read_parquet_into_pyarrow_bulk( .block_on_current_thread(async move { futures::stream::iter(uris.iter().enumerate().map(|(i, uri)| { let uri = uri.to_string(); - let owned_columns = owned_columns.clone(); + let owned_columns = columns.clone(); let owned_row_group = row_groups.as_ref().and_then(|rgs| rgs[i].clone()); let io_client = io_client.clone(); let io_stats = io_stats.clone(); tokio::task::spawn(async move { - let columns = owned_columns - .as_ref() - .map(|s| s.iter().map(AsRef::as_ref).collect::>()); Ok(( i, read_parquet_single_into_arrow( &uri, - columns.as_deref(), + owned_columns, start_offset, num_rows, owned_row_group, diff --git a/src/daft-plan/src/builder.rs b/src/daft-plan/src/builder.rs index 66909097e6..10e6c2d5f6 100644 --- a/src/daft-plan/src/builder.rs +++ b/src/daft-plan/src/builder.rs @@ -526,7 +526,7 @@ impl PyLogicalPlanBuilder { #[staticmethod] pub fn in_memory_scan( partition_key: &str, - cache_entry: &PyAny, + cache_entry: PyObject, schema: PySchema, num_partitions: usize, size_bytes: usize, @@ -534,7 +534,7 @@ impl PyLogicalPlanBuilder { ) -> PyResult { Ok(LogicalPlanBuilder::in_memory_scan( partition_key, - cache_entry.to_object(cache_entry.py()), + cache_entry, schema.into(), num_partitions, size_bytes, diff --git a/src/daft-plan/src/lib.rs b/src/daft-plan/src/lib.rs index f6ebc4aa0a..c9be5a7aa9 100644 --- a/src/daft-plan/src/lib.rs +++ b/src/daft-plan/src/lib.rs @@ -45,7 +45,7 @@ use { }; #[cfg(feature = "python")] -pub fn register_modules(_py: Python, parent: &PyModule) -> PyResult<()> { +pub fn register_modules(parent: &Bound) -> PyResult<()> { parent.add_class::()?; parent.add_class::()?; parent.add_class::()?; diff --git a/src/daft-scan/src/glob.rs b/src/daft-scan/src/glob.rs index cf08fcd309..5383235c67 100644 --- a/src/daft-scan/src/glob.rs +++ b/src/daft-scan/src/glob.rs @@ -125,7 +125,7 @@ fn run_glob_parallel( impl GlobScanOperator { pub fn try_new( - glob_paths: &[&str], + glob_paths: Vec, file_format_config: Arc, storage_config: Arc, infer_schema: bool, @@ -242,7 +242,7 @@ impl GlobScanOperator { false => schema.expect("Schema must be provided if infer_schema is false"), }; Ok(Self { - glob_paths: glob_paths.iter().map(|s| s.to_string()).collect(), + glob_paths, file_format_config, schema, storage_config, diff --git a/src/daft-scan/src/lib.rs b/src/daft-scan/src/lib.rs index c87833c1f7..f8c9781be4 100644 --- a/src/daft-scan/src/lib.rs +++ b/src/daft-scan/src/lib.rs @@ -1017,10 +1017,8 @@ mod test { sources.push(format!("../../tests/assets/parquet-data/mvp.parquet")); } - let glob_paths: Vec<&str> = sources.iter().map(|s| s.as_str()).collect(); - let glob_scan_operator: GlobScanOperator = GlobScanOperator::try_new( - &glob_paths, + sources, Arc::new(file_format_config), Arc::new(StorageConfig::Native(Arc::new( NativeStorageConfig::new_internal(false, None), diff --git a/src/daft-scan/src/python.rs b/src/daft-scan/src/python.rs index 27e5a2997b..99c7e5b04c 100644 --- a/src/daft-scan/src/python.rs +++ b/src/daft-scan/src/python.rs @@ -1,4 +1,4 @@ -use pyo3::{prelude::*, types::PyTuple, AsPyPointer}; +use pyo3::{prelude::*, types::PyTuple}; use serde::{Deserialize, Serialize}; use common_py_serde::{deserialize_py_object, serialize_py_object}; @@ -21,8 +21,8 @@ impl PythonTablesFactoryArgs { Self(args.into_iter().map(PyObjectSerializableWrapper).collect()) } - pub fn to_pytuple<'a>(&self, py: Python<'a>) -> &'a PyTuple { - pyo3::types::PyTuple::new(py, self.0.iter().map(|x| x.0.as_ref(py))) + pub fn to_pytuple<'a>(&self, py: Python<'a>) -> Bound<'a, PyTuple> { + pyo3::types::PyTuple::new_bound(py, self.0.iter().map(|x| x.0.bind(py))) } } @@ -114,7 +114,7 @@ pub mod pylib { #[staticmethod] pub fn glob_scan( py: Python, - glob_path: Vec<&str>, + glob_path: Vec, file_format_config: PyFileFormatConfig, storage_config: PyStorageConfig, infer_schema: bool, @@ -122,7 +122,7 @@ pub mod pylib { ) -> PyResult { py.allow_threads(|| { let operator = Arc::new(GlobScanOperator::try_new( - glob_path.as_slice(), + glob_path, file_format_config.into(), storage_config.into(), infer_schema, @@ -249,7 +249,7 @@ pub mod pylib { let pyiter = self.operator .call_method1(py, pyo3::intern!(py, "to_scan_tasks"), (pypd,))?; - let pyiter = PyIterator::from_object(py, &pyiter)?; + let pyiter = PyIterator::from_bound_object(pyiter.bind(py))?; DaftResult::Ok( pyiter .map(|v| { @@ -402,7 +402,7 @@ pub mod pylib { py: Python, module: String, func_name: String, - func_args: Vec<&PyAny>, + func_args: Vec>, schema: PySchema, num_rows: Option, size_bytes: Option, @@ -580,7 +580,7 @@ pub mod pylib { } } -pub fn register_modules(_py: Python, parent: &PyModule) -> PyResult<()> { +pub fn register_modules(parent: &Bound) -> PyResult<()> { parent.add_class::()?; parent.add_class::()?; parent.add_class::()?; diff --git a/src/daft-scheduler/src/adaptive.rs b/src/daft-scheduler/src/adaptive.rs index ae5b1267c0..f9ea81c4b2 100644 --- a/src/daft-scheduler/src/adaptive.rs +++ b/src/daft-scheduler/src/adaptive.rs @@ -32,7 +32,7 @@ impl AdaptivePhysicalPlanScheduler { #[staticmethod] pub fn from_logical_plan_builder( logical_plan_builder: &PyLogicalPlanBuilder, - py: Python<'_>, + py: Python, cfg: PyDaftExecutionConfig, ) -> PyResult { py.allow_threads(|| { @@ -59,13 +59,12 @@ impl AdaptivePhysicalPlanScheduler { &mut self, source_id: usize, partition_key: &str, - cache_entry: &PyAny, + cache_entry: PyObject, num_partitions: usize, size_bytes: usize, num_rows: usize, py: Python, ) -> PyResult<()> { - let cache_entry = cache_entry.into(); py.allow_threads(|| { let in_memory_info = InMemoryInfo::new( Schema::empty().into(), // TODO thread in schema from in memory scan diff --git a/src/daft-scheduler/src/lib.rs b/src/daft-scheduler/src/lib.rs index 3b7154bd15..f5d12b1a54 100644 --- a/src/daft-scheduler/src/lib.rs +++ b/src/daft-scheduler/src/lib.rs @@ -8,7 +8,7 @@ pub use scheduler::PhysicalPlanScheduler; use pyo3::prelude::*; #[cfg(feature = "python")] -pub fn register_modules(_py: Python, parent: &PyModule) -> PyResult<()> { +pub fn register_modules(parent: &Bound) -> PyResult<()> { parent.add_class::()?; parent.add_class::()?; diff --git a/src/daft-scheduler/src/scheduler.rs b/src/daft-scheduler/src/scheduler.rs index 0e44569faf..9dfbea2e39 100644 --- a/src/daft-scheduler/src/scheduler.rs +++ b/src/daft-scheduler/src/scheduler.rs @@ -16,7 +16,7 @@ use { daft_dsl::Expr, daft_plan::{OutputFileInfo, PyLogicalPlanBuilder}, daft_scan::python::pylib::PyScanTask, - pyo3::{pyclass, pymethods, PyObject, PyRef, PyRefMut, PyResult, Python}, + pyo3::{pyclass, pymethods, types::PyAnyMethods, PyObject, PyRef, PyRefMut, PyResult, Python}, std::collections::HashMap, }; @@ -51,7 +51,7 @@ impl PhysicalPlanScheduler { #[staticmethod] pub fn from_logical_plan_builder( logical_plan_builder: &PyLogicalPlanBuilder, - py: Python<'_>, + py: Python, cfg: PyDaftExecutionConfig, ) -> PyResult { py.allow_threads(|| { @@ -97,7 +97,7 @@ impl StreamingPartitionIterator { fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> { slf } - fn __next__(mut slf: PyRefMut<'_, Self>, py: Python<'_>) -> PyResult> { + fn __next__(mut slf: PyRefMut<'_, Self>, py: Python) -> PyResult> { let iter = &mut slf.iter; Ok(py.allow_threads(|| iter.next().transpose())?) } @@ -136,7 +136,7 @@ impl PartitionIterator { #[allow(clippy::too_many_arguments)] #[cfg(feature = "python")] fn tabular_write( - py: Python<'_>, + py: Python, upstream_iter: PyObject, file_format: &FileFormat, schema: &SchemaRef, @@ -151,7 +151,7 @@ fn tabular_write( .collect::>() }); let py_iter = py - .import(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? + .import_bound(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? .getattr(pyo3::intern!(py, "write_file"))? .call1(( upstream_iter, @@ -172,12 +172,12 @@ fn tabular_write( #[allow(clippy::too_many_arguments)] #[cfg(feature = "python")] fn iceberg_write( - py: Python<'_>, + py: Python, upstream_iter: PyObject, iceberg_info: &IcebergCatalogInfo, ) -> PyResult { let py_iter = py - .import(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? + .import_bound(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? .getattr(pyo3::intern!(py, "write_iceberg"))? .call1(( upstream_iter, @@ -198,12 +198,12 @@ fn iceberg_write( #[allow(clippy::too_many_arguments)] #[cfg(feature = "python")] fn deltalake_write( - py: Python<'_>, + py: Python, upstream_iter: PyObject, delta_lake_info: &DeltaLakeCatalogInfo, ) -> PyResult { let py_iter = py - .import(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? + .import_bound(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? .getattr(pyo3::intern!(py, "write_deltalake"))? .call1(( upstream_iter, @@ -222,12 +222,12 @@ fn deltalake_write( #[allow(clippy::too_many_arguments)] #[cfg(feature = "python")] fn lance_write( - py: Python<'_>, + py: Python, upstream_iter: PyObject, lance_info: &LanceCatalogInfo, ) -> PyResult { let py_iter = py - .import(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? + .import_bound(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? .getattr(pyo3::intern!(py, "write_lance"))? .call1(( upstream_iter, @@ -247,7 +247,7 @@ fn lance_write( #[cfg(feature = "python")] fn physical_plan_to_partition_tasks( physical_plan: &PhysicalPlan, - py: Python<'_>, + py: Python, psets: &HashMap>, ) -> PyResult { match physical_plan { @@ -260,14 +260,14 @@ fn physical_plan_to_partition_tasks( index: 0usize, }; let py_iter = py - .import(pyo3::intern!(py, "daft.execution.physical_plan"))? + .import_bound(pyo3::intern!(py, "daft.execution.physical_plan"))? .getattr(pyo3::intern!(py, "partition_read"))? .call1((partition_iter,))?; Ok(py_iter.into()) } PhysicalPlan::TabularScan(TabularScan { scan_tasks, .. }) => { let py_iter = py - .import(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? + .import_bound(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? .getattr(pyo3::intern!(py, "scan_with_tasks"))? .call1((scan_tasks .iter() @@ -276,7 +276,7 @@ fn physical_plan_to_partition_tasks( Ok(py_iter.into()) } PhysicalPlan::EmptyScan(EmptyScan { schema, .. }) => { - let schema_mod = py.import(pyo3::intern!(py, "daft.logical.schema"))?; + let schema_mod = py.import_bound(pyo3::intern!(py, "daft.logical.schema"))?; let python_schema = schema_mod .getattr(pyo3::intern!(py, "Schema"))? .getattr(pyo3::intern!(py, "_from_pyschema"))? @@ -285,7 +285,7 @@ fn physical_plan_to_partition_tasks( },))?; let py_iter = py - .import(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? + .import_bound(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? .getattr(pyo3::intern!(py, "empty_scan"))? .call1((python_schema,))?; Ok(py_iter.into()) @@ -302,7 +302,7 @@ fn physical_plan_to_partition_tasks( .map(|expr| PyExpr::from(expr.clone())) .collect(); let py_iter = py - .import(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? + .import_bound(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? .getattr(pyo3::intern!(py, "project"))? .call1(( upstream_iter, @@ -319,7 +319,7 @@ fn physical_plan_to_partition_tasks( ) => { let upstream_iter = physical_plan_to_partition_tasks(input, py, psets)?; let py_iter = py - .import(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? + .import_bound(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? .getattr(pyo3::intern!(py, "actor_pool_project"))? .call1(( upstream_iter, @@ -335,7 +335,8 @@ fn physical_plan_to_partition_tasks( PhysicalPlan::Filter(Filter { input, predicate }) => { let upstream_iter = physical_plan_to_partition_tasks(input, py, psets)?; - let expressions_mod = py.import(pyo3::intern!(py, "daft.expressions.expressions"))?; + let expressions_mod = + py.import_bound(pyo3::intern!(py, "daft.expressions.expressions"))?; let py_predicate = expressions_mod .getattr(pyo3::intern!(py, "Expression"))? .getattr(pyo3::intern!(py, "_from_pyexpr"))? @@ -344,7 +345,7 @@ fn physical_plan_to_partition_tasks( .getattr(pyo3::intern!(py, "ExpressionsProjection"))? .call1((vec![py_predicate],))?; let execution_step_mod = - py.import(pyo3::intern!(py, "daft.execution.execution_step"))?; + py.import_bound(pyo3::intern!(py, "daft.execution.execution_step"))?; let filter_step = execution_step_mod .getattr(pyo3::intern!(py, "Filter"))? .call1((expressions_projection,))?; @@ -352,7 +353,7 @@ fn physical_plan_to_partition_tasks( .getattr(pyo3::intern!(py, "ResourceRequest"))? .call0()?; let py_iter = py - .import(pyo3::intern!(py, "daft.execution.physical_plan"))? + .import_bound(pyo3::intern!(py, "daft.execution.physical_plan"))? .getattr(pyo3::intern!(py, "pipeline_instruction"))? .call1((upstream_iter, filter_step, resource_request))?; Ok(py_iter.into()) @@ -364,7 +365,8 @@ fn physical_plan_to_partition_tasks( num_partitions, }) => { let upstream_iter = physical_plan_to_partition_tasks(input, py, psets)?; - let py_physical_plan = py.import(pyo3::intern!(py, "daft.execution.physical_plan"))?; + let py_physical_plan = + py.import_bound(pyo3::intern!(py, "daft.execution.physical_plan"))?; let global_limit_iter = py_physical_plan .getattr(pyo3::intern!(py, "global_limit"))? .call1((upstream_iter, *limit, *eager, *num_partitions))?; @@ -379,7 +381,7 @@ fn physical_plan_to_partition_tasks( .map(|expr| PyExpr::from(expr.clone())) .collect(); let py_iter = py - .import(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? + .import_bound(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? .getattr(pyo3::intern!(py, "explode"))? .call1((upstream_iter, explode_pyexprs))?; Ok(py_iter.into()) @@ -400,7 +402,7 @@ fn physical_plan_to_partition_tasks( .map(|expr| PyExpr::from(expr.clone())) .collect(); let py_iter = py - .import(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? + .import_bound(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? .getattr(pyo3::intern!(py, "unpivot"))? .call1(( upstream_iter, @@ -419,7 +421,7 @@ fn physical_plan_to_partition_tasks( }) => { let upstream_iter = physical_plan_to_partition_tasks(input, py, psets)?; let py_iter = py - .import(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? + .import_bound(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? .getattr(pyo3::intern!(py, "sample"))? .call1((upstream_iter, *fraction, *with_replacement, *seed))?; Ok(py_iter.into()) @@ -430,7 +432,7 @@ fn physical_plan_to_partition_tasks( }) => { let upstream_iter = physical_plan_to_partition_tasks(input, py, psets)?; let py_iter = py - .import(pyo3::intern!(py, "daft.execution.physical_plan"))? + .import_bound(pyo3::intern!(py, "daft.execution.physical_plan"))? .getattr(pyo3::intern!(py, "monotonically_increasing_id"))? .call1((upstream_iter, column_name))?; Ok(py_iter.into()) @@ -447,7 +449,7 @@ fn physical_plan_to_partition_tasks( .map(|expr| PyExpr::from(expr.clone())) .collect(); let py_iter = py - .import(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? + .import_bound(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? .getattr(pyo3::intern!(py, "sort"))? .call1(( upstream_iter, @@ -464,7 +466,7 @@ fn physical_plan_to_partition_tasks( }) => { let upstream_iter = physical_plan_to_partition_tasks(input, py, psets)?; let py_iter = py - .import(pyo3::intern!(py, "daft.execution.physical_plan"))? + .import_bound(pyo3::intern!(py, "daft.execution.physical_plan"))? .getattr(pyo3::intern!(py, "split"))? .call1((upstream_iter, *input_num_partitions, *output_num_partitions))?; Ok(py_iter.into()) @@ -472,7 +474,7 @@ fn physical_plan_to_partition_tasks( PhysicalPlan::Flatten(Flatten { input }) => { let upstream_iter = physical_plan_to_partition_tasks(input, py, psets)?; let py_iter = py - .import(pyo3::intern!(py, "daft.execution.physical_plan"))? + .import_bound(pyo3::intern!(py, "daft.execution.physical_plan"))? .getattr(pyo3::intern!(py, "flatten_plan"))? .call1((upstream_iter,))?; Ok(py_iter.into()) @@ -483,7 +485,7 @@ fn physical_plan_to_partition_tasks( }) => { let upstream_iter = physical_plan_to_partition_tasks(input, py, psets)?; let py_iter = py - .import(pyo3::intern!(py, "daft.execution.physical_plan"))? + .import_bound(pyo3::intern!(py, "daft.execution.physical_plan"))? .getattr(pyo3::intern!(py, "fanout_random"))? .call1((upstream_iter, *num_partitions))?; Ok(py_iter.into()) @@ -499,7 +501,7 @@ fn physical_plan_to_partition_tasks( .map(|expr| PyExpr::from(expr.clone())) .collect(); let py_iter = py - .import(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? + .import_bound(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? .getattr(pyo3::intern!(py, "split_by_hash"))? .call1((upstream_iter, *num_partitions, partition_by_pyexprs))?; Ok(py_iter.into()) @@ -510,7 +512,7 @@ fn physical_plan_to_partition_tasks( PhysicalPlan::ReduceMerge(ReduceMerge { input }) => { let upstream_iter = physical_plan_to_partition_tasks(input, py, psets)?; let py_iter = py - .import(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? + .import_bound(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? .getattr(pyo3::intern!(py, "reduce_merge"))? .call1((upstream_iter,))?; Ok(py_iter.into()) @@ -531,7 +533,7 @@ fn physical_plan_to_partition_tasks( .map(|expr| PyExpr::from(expr.clone())) .collect(); let py_iter = py - .import(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? + .import_bound(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? .getattr(pyo3::intern!(py, "local_aggregate"))? .call1((upstream_iter, aggs_as_pyexprs, groupbys_as_pyexprs))?; Ok(py_iter.into()) @@ -551,7 +553,7 @@ fn physical_plan_to_partition_tasks( let pivot_column_pyexpr = PyExpr::from(pivot_column.clone()); let value_column_pyexpr = PyExpr::from(value_column.clone()); let py_iter = py - .import(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? + .import_bound(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? .getattr(pyo3::intern!(py, "pivot"))? .call1(( upstream_iter, @@ -569,7 +571,7 @@ fn physical_plan_to_partition_tasks( }) => { let upstream_iter = physical_plan_to_partition_tasks(input, py, psets)?; let py_iter = py - .import(pyo3::intern!(py, "daft.execution.physical_plan"))? + .import_bound(pyo3::intern!(py, "daft.execution.physical_plan"))? .getattr(pyo3::intern!(py, "coalesce"))? .call1((upstream_iter, *num_from, *num_to))?; Ok(py_iter.into()) @@ -578,7 +580,7 @@ fn physical_plan_to_partition_tasks( let upstream_input_iter = physical_plan_to_partition_tasks(input, py, psets)?; let upstream_other_iter = physical_plan_to_partition_tasks(other, py, psets)?; let py_iter = py - .import(pyo3::intern!(py, "daft.execution.physical_plan"))? + .import_bound(pyo3::intern!(py, "daft.execution.physical_plan"))? .getattr(pyo3::intern!(py, "concat"))? .call1((upstream_input_iter, upstream_other_iter))?; Ok(py_iter.into()) @@ -602,7 +604,7 @@ fn physical_plan_to_partition_tasks( .map(|expr| PyExpr::from(expr.clone())) .collect(); let py_iter = py - .import(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? + .import_bound(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? .getattr(pyo3::intern!(py, "hash_join"))? .call1(( upstream_left_iter, @@ -635,7 +637,7 @@ fn physical_plan_to_partition_tasks( .collect(); // TODO(Clark): Elide sorting one side of the join if already range-partitioned, where we'd use that side's boundaries to sort the other side. let py_iter = if *needs_presort { - py.import(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? + py.import_bound(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? .getattr(pyo3::intern!(py, "sort_merge_join_aligned_boundaries"))? .call1(( left_iter, @@ -647,7 +649,7 @@ fn physical_plan_to_partition_tasks( *left_is_larger, ))? } else { - py.import(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? + py.import_bound(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? .getattr(pyo3::intern!(py, "merge_join_sorted"))? .call1(( left_iter, @@ -679,7 +681,7 @@ fn physical_plan_to_partition_tasks( .map(|expr| PyExpr::from(expr.clone())) .collect(); let py_iter = py - .import(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? + .import_bound(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? .getattr(pyo3::intern!(py, "broadcast_join"))? .call1(( upstream_left_iter, diff --git a/src/daft-schema/src/python/datatype.rs b/src/daft-schema/src/python/datatype.rs index 42990c6f0b..3baf3d9625 100644 --- a/src/daft-schema/src/python/datatype.rs +++ b/src/daft-schema/src/python/datatype.rs @@ -5,12 +5,8 @@ use crate::image_mode::ImageMode; use common_arrow_ffi as ffi; use common_py_serde::impl_bincode_py_state_serialization; -use pyo3::{ - class::basic::CompareOp, - exceptions::PyValueError, - prelude::*, - types::{PyDict, PyString}, -}; +use indexmap::IndexMap; +use pyo3::{class::basic::CompareOp, exceptions::PyValueError, prelude::*}; use serde::{Deserialize, Serialize}; use crate::time_unit::TimeUnit; @@ -225,19 +221,14 @@ impl PyDataType { } #[staticmethod] - pub fn r#struct(fields: &PyDict) -> PyResult { - Ok(DataType::Struct( + pub fn r#struct(fields: IndexMap) -> Self { + DataType::Struct( fields - .iter() - .map(|(name, dtype)| { - Ok(Field::new( - name.downcast::()?.to_str()?, - dtype.extract::()?.dtype, - )) - }) - .collect::>>()?, + .into_iter() + .map(|(name, dtype)| Field::new(name, dtype.dtype)) + .collect::>(), ) - .into()) + .into() } #[staticmethod] @@ -311,12 +302,12 @@ impl PyDataType { Ok(DataType::Python.into()) } - pub fn to_arrow(&self, py: Python) -> PyResult { - let pyarrow = py.import(pyo3::intern!(py, "pyarrow"))?; + pub fn to_arrow<'py>(&self, py: Python<'py>) -> PyResult> { + let pyarrow = py.import_bound(pyo3::intern!(py, "pyarrow"))?; match &self.dtype { - DataType::FixedShapeTensor(dtype, shape) => Ok( + DataType::FixedShapeTensor(dtype, shape) => { if py - .import(pyo3::intern!(py, "daft.utils"))? + .import_bound(pyo3::intern!(py, "daft.utils"))? .getattr(pyo3::intern!(py, "pyarrow_supports_fixed_shape_tensor"))? .call0()? .extract()? @@ -328,16 +319,15 @@ impl PyDataType { dtype: *dtype.clone(), } .to_arrow(py)?, - pyo3::types::PyTuple::new(py, shape.clone()), - ))? - .to_object(py) + pyo3::types::PyTuple::new_bound(py, shape.clone()), + )) } else { // Fall back to default Daft super extension representation if installed pyarrow doesn't have the // canonical tensor extension type. - ffi::dtype_to_py(&self.dtype.to_arrow()?, py, pyarrow)? - }, - ), - _ => ffi::dtype_to_py(&self.dtype.to_arrow()?, py, pyarrow), + ffi::dtype_to_py(py, &self.dtype.to_arrow()?, pyarrow) + } + } + _ => ffi::dtype_to_py(py, &self.dtype.to_arrow()?, pyarrow), } } @@ -385,7 +375,7 @@ impl PyDataType { Ok(self.dtype.is_temporal()) } - pub fn is_equal(&self, other: &PyAny) -> PyResult { + pub fn is_equal(&self, other: Bound) -> PyResult { if other.is_instance_of::() { let other = other.extract::()?; Ok(self.dtype == other.dtype) diff --git a/src/daft-schema/src/python/mod.rs b/src/daft-schema/src/python/mod.rs index 3241eaa90c..2f9b7dabd9 100644 --- a/src/daft-schema/src/python/mod.rs +++ b/src/daft-schema/src/python/mod.rs @@ -8,7 +8,7 @@ use crate::image_mode::ImageMode; pub use datatype::PyDataType; pub use datatype::PyTimeUnit; -pub fn register_modules(_py: Python, parent: &PyModule) -> PyResult<()> { +pub fn register_modules(parent: &Bound) -> PyResult<()> { parent.add_class::()?; parent.add_class::()?; parent.add_class::()?; diff --git a/src/daft-schema/src/python/schema.rs b/src/daft-schema/src/python/schema.rs index 533867c7f6..e65bfe00c3 100644 --- a/src/daft-schema/src/python/schema.rs +++ b/src/daft-schema/src/python/schema.rs @@ -22,8 +22,8 @@ impl PySchema { Ok(self.schema.get_field(name)?.clone().into()) } - pub fn to_pyarrow_schema<'py>(&self, py: Python<'py>) -> PyResult<&'py PyAny> { - let pyarrow = py.import(pyo3::intern!(py, "pyarrow"))?; + pub fn to_pyarrow_schema<'py>(&self, py: Python<'py>) -> PyResult> { + let pyarrow = py.import_bound(pyo3::intern!(py, "pyarrow"))?; let pyarrow_fields = self .schema .fields diff --git a/src/daft-sql/src/lib.rs b/src/daft-sql/src/lib.rs index c4795ac70a..12009e82ba 100644 --- a/src/daft-sql/src/lib.rs +++ b/src/daft-sql/src/lib.rs @@ -11,10 +11,10 @@ pub mod python; use pyo3::prelude::*; #[cfg(feature = "python")] -pub fn register_modules(_py: Python, parent: &PyModule) -> PyResult<()> { +pub fn register_modules(parent: &Bound) -> PyResult<()> { parent.add_class::()?; - parent.add_wrapped(wrap_pyfunction!(python::sql))?; - parent.add_wrapped(wrap_pyfunction!(python::sql_expr))?; + parent.add_function(wrap_pyfunction_bound!(python::sql, parent)?)?; + parent.add_function(wrap_pyfunction_bound!(python::sql_expr, parent)?)?; Ok(()) } diff --git a/src/daft-table/Cargo.toml b/src/daft-table/Cargo.toml index 02326764e3..56931e04d5 100644 --- a/src/daft-table/Cargo.toml +++ b/src/daft-table/Cargo.toml @@ -8,6 +8,7 @@ daft-core = {path = "../daft-core", default-features = false} daft-dsl = {path = "../daft-dsl", default-features = false} daft-image = {path = "../daft-image", default-features = false} html-escape = {workspace = true} +indexmap = {workspace = true} num-traits = {workspace = true} pyo3 = {workspace = true, optional = true} rand = {workspace = true} diff --git a/src/daft-table/src/ffi.rs b/src/daft-table/src/ffi.rs index a313cd4b9d..15c2ce96a2 100644 --- a/src/daft-table/src/ffi.rs +++ b/src/daft-table/src/ffi.rs @@ -13,7 +13,7 @@ use daft_core::{ pub fn record_batches_to_table( py: Python, - batches: &[&PyAny], + batches: &[Bound], schema: SchemaRef, ) -> PyResult
{ if batches.is_empty() { @@ -27,11 +27,11 @@ pub fn record_batches_to_table( Vec::with_capacity(num_batches); for rb in batches { - let pycolumns = rb.getattr(pyo3::intern!(rb.py(), "columns"))?; + let pycolumns = rb.getattr(pyo3::intern!(py, "columns"))?; let columns = pycolumns .downcast::()? .into_iter() - .map(common_arrow_ffi::array_to_rust) + .map(|col| common_arrow_ffi::array_to_rust(py, col)) .collect::>>()?; if names.len() != columns.len() { return Err(PyValueError::new_err(format!("Error when converting Arrow Record Batches to Daft Table. Expected: {} columns, got: {}", names.len(), columns.len()))); @@ -56,7 +56,11 @@ pub fn record_batches_to_table( }) } -pub fn table_to_record_batch(table: &Table, py: Python, pyarrow: &PyModule) -> PyResult { +pub fn table_to_record_batch( + py: Python, + table: &Table, + pyarrow: Bound, +) -> PyResult { let mut arrays = Vec::with_capacity(table.num_columns()); let mut names: Vec = Vec::with_capacity(table.num_columns()); @@ -64,7 +68,7 @@ pub fn table_to_record_batch(table: &Table, py: Python, pyarrow: &PyModule) -> P let s = table.get_column_by_index(i)?; let arrow_array = s.to_arrow(); let arrow_array = cast_array_from_daft_if_needed(arrow_array.to_boxed()); - let py_array = common_arrow_ffi::to_py_array(arrow_array, py, pyarrow)?; + let py_array = common_arrow_ffi::to_py_array(py, arrow_array, &pyarrow)?; arrays.push(py_array); names.push(s.name().to_string()); } @@ -73,5 +77,5 @@ pub fn table_to_record_batch(table: &Table, py: Python, pyarrow: &PyModule) -> P .getattr(pyo3::intern!(py, "RecordBatch"))? .call_method1(pyo3::intern!(py, "from_arrays"), (arrays, names.to_vec()))?; - Ok(record.to_object(py)) + Ok(record.into()) } diff --git a/src/daft-table/src/python.rs b/src/daft-table/src/python.rs index 0c39ea2c40..de1d685a60 100644 --- a/src/daft-table/src/python.rs +++ b/src/daft-table/src/python.rs @@ -1,7 +1,7 @@ use daft_core::join::JoinType; +use indexmap::IndexMap; use pyo3::exceptions::PyValueError; use pyo3::prelude::*; -use pyo3::types::PyDict; use crate::ffi; use crate::Table; @@ -404,7 +404,7 @@ impl PyTable { #[staticmethod] pub fn from_arrow_record_batches( py: Python, - record_batches: Vec<&PyAny>, + record_batches: Vec>, schema: &PySchema, ) -> PyResult { let table = @@ -413,16 +413,15 @@ impl PyTable { } #[staticmethod] - pub fn from_pylist_series(dict: &PyDict) -> PyResult { + pub fn from_pylist_series(dict: IndexMap) -> PyResult { let mut fields: Vec = Vec::new(); let mut columns: Vec = Vec::new(); fields.reserve(dict.len()); columns.reserve(dict.len()); - for (k, v) in dict.iter() { - let name = k.extract::()?; - let series = v.extract::()?.series; - fields.push(Field::new(name.clone(), series.data_type().clone())); + for (name, series) in dict.into_iter() { + let series = series.series; + fields.push(Field::new(name.as_str(), series.data_type().clone())); columns.push(series.rename(name)); } @@ -448,8 +447,8 @@ impl PyTable { pub fn to_arrow_record_batch(&self) -> PyResult { Python::with_gil(|py| { - let pyarrow = py.import("pyarrow")?; - ffi::table_to_record_batch(&self.table, py, pyarrow) + let pyarrow = py.import_bound(pyo3::intern!(py, "pyarrow"))?; + ffi::table_to_record_batch(py, &self.table, pyarrow) }) } @@ -481,7 +480,7 @@ impl AsRef
for PyTable { } } -pub fn register_modules(_py: Python, parent: &PyModule) -> PyResult<()> { +pub fn register_modules(parent: &Bound) -> PyResult<()> { parent.add_class::()?; Ok(()) } diff --git a/src/lib.rs b/src/lib.rs index 79816562f3..604901b056 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -71,11 +71,11 @@ pub mod pylib { #[pyfunction] pub fn refresh_logger(py: Python) -> PyResult<()> { use log::LevelFilter; - let logging = py.import("logging")?; + let logging = py.import_bound(pyo3::intern!(py, "logging"))?; let python_log_level = logging - .getattr("getLogger")? + .getattr(pyo3::intern!(py, "getLogger"))? .call0()? - .getattr("level")? + .getattr(pyo3::intern!(py, "level"))? .extract::() .unwrap_or(0); @@ -94,34 +94,34 @@ pub mod pylib { } #[pymodule] - fn daft(py: Python<'_>, m: &PyModule) -> PyResult<()> { + fn daft(py: Python, m: &Bound) -> PyResult<()> { refresh_logger(py)?; init_tracing(crate::should_enable_chrome_trace()); - common_daft_config::register_modules(py, m)?; - common_system_info::register_modules(py, m)?; - common_resource_request::register_modules(py, m)?; - common_file_formats::python::register_modules(py, m)?; - daft_core::register_modules(py, m)?; - daft_core::python::register_modules(py, m)?; - daft_local_execution::register_modules(py, m)?; - daft_dsl::register_modules(py, m)?; - daft_table::register_modules(py, m)?; - daft_io::register_modules(py, m)?; - daft_parquet::register_modules(py, m)?; - daft_csv::register_modules(py, m)?; - daft_json::register_modules(py, m)?; - daft_plan::register_modules(py, m)?; - daft_micropartition::register_modules(py, m)?; - daft_scan::register_modules(py, m)?; - daft_scheduler::register_modules(py, m)?; - daft_sql::register_modules(py, m)?; - daft_functions::register_modules(py, m)?; + common_daft_config::register_modules(m)?; + common_system_info::register_modules(m)?; + common_resource_request::register_modules(m)?; + common_file_formats::python::register_modules(m)?; + daft_core::register_modules(m)?; + daft_core::python::register_modules(m)?; + daft_local_execution::register_modules(m)?; + daft_dsl::register_modules(m)?; + daft_table::register_modules(m)?; + daft_io::register_modules(m)?; + daft_parquet::register_modules(m)?; + daft_csv::register_modules(m)?; + daft_json::register_modules(m)?; + daft_plan::register_modules(m)?; + daft_micropartition::register_modules(m)?; + daft_scan::register_modules(m)?; + daft_scheduler::register_modules(m)?; + daft_sql::register_modules(m)?; + daft_functions::register_modules(m)?; m.add_wrapped(wrap_pyfunction!(version))?; m.add_wrapped(wrap_pyfunction!(build_type))?; m.add_wrapped(wrap_pyfunction!(refresh_logger))?; m.add_wrapped(wrap_pyfunction!(get_max_log_level))?; - daft_image::python::register_modules(py, m)?; + daft_image::python::register_modules(m)?; Ok(()) } }