Skip to content

Commit

Permalink
[FEAT] Native Parquet Reader into pyarrow directly (#1366)
Browse files Browse the repository at this point in the history
* Implements `daft.table.read_parquet_into_pyarrow` that reads parquet
directly into pyarrow without concating or casting.
* This reduces memory pressure by 50%
* Also gives a speed up by around 45%.

---------

Co-authored-by: Jay Chia <[email protected]@users.noreply.github.com>
  • Loading branch information
samster25 and Jay Chia authored Sep 12, 2023
1 parent 3f28cf1 commit 601260b
Show file tree
Hide file tree
Showing 10 changed files with 451 additions and 17 deletions.
6 changes: 6 additions & 0 deletions benchmarking/parquet/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,20 @@ def daft_native_read(path: str, columns: list[str] | None = None) -> pa.Table:
return tbl.to_arrow()


def daft_native_read_to_arrow(path: str, columns: list[str] | None = None) -> pa.Table:
return daft.table.read_parquet_into_pyarrow(path, columns=columns)


@pytest.fixture(
params=[
daft_native_read,
daft_native_read_to_arrow,
pyarrow_read,
boto3_get_object_read,
],
ids=[
"daft_native_read",
"daft_native_read_to_arrow",
"pyarrow",
"boto3_get_object",
],
Expand Down
4 changes: 2 additions & 2 deletions daft/table/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from __future__ import annotations

from .table import Table
from .table import Table, read_parquet_into_pyarrow

__all__ = ["Table"]
__all__ = ["Table", "read_parquet_into_pyarrow"]
27 changes: 27 additions & 0 deletions daft/table/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from daft.daft import PyTable as _PyTable
from daft.daft import read_parquet as _read_parquet
from daft.daft import read_parquet_bulk as _read_parquet_bulk
from daft.daft import read_parquet_into_pyarrow as _read_parquet_into_pyarrow
from daft.daft import read_parquet_statistics as _read_parquet_statistics
from daft.datatype import DataType, TimeUnit
from daft.expressions import Expression, ExpressionsProjection
Expand Down Expand Up @@ -451,3 +452,29 @@ def _trim_pyarrow_large_arrays(arr: pa.ChunkedArray) -> pa.ChunkedArray:
return pa.chunked_array(all_chunks, type=target_type)
else:
return arr


def read_parquet_into_pyarrow(
path: str,
columns: list[str] | None = None,
start_offset: int | None = None,
num_rows: int | None = None,
row_groups: list[int] | None = None,
io_config: IOConfig | None = None,
multithreaded_io: bool | None = None,
coerce_int96_timestamp_unit: TimeUnit = TimeUnit.ns(),
) -> pa.Table:

fields, metadata, columns = _read_parquet_into_pyarrow(
uri=path,
columns=columns,
start_offset=start_offset,
num_rows=num_rows,
row_groups=row_groups,
io_config=io_config,
multithreaded_io=multithreaded_io,
coerce_int96_timestamp_unit=coerce_int96_timestamp_unit._timeunit,
)
schema = pa.schema(fields, metadata=metadata)
columns = [pa.chunked_array(c) for c in columns] # type: ignore
return pa.table(columns, schema=schema)
16 changes: 16 additions & 0 deletions src/daft-core/src/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,22 @@ pub fn to_py_array(array: ArrayRef, py: Python, pyarrow: &PyModule) -> PyResult<
Ok(array.to_object(py))
}

pub fn field_to_py(
field: &arrow2::datatypes::Field,
py: Python,
pyarrow: &PyModule,
) -> PyResult<PyObject> {
let schema = Box::new(ffi::export_field_to_c(field));
let schema_ptr: *const ffi::ArrowSchema = &*schema;

let field = pyarrow.getattr(pyo3::intern!(py, "Field"))?.call_method1(
pyo3::intern!(py, "_import_from_c"),
(schema_ptr as Py_uintptr_t,),
)?;

Ok(field.to_object(py))
}

pub fn to_py_schema(
dtype: &arrow2::datatypes::DataType,
py: Python,
Expand Down
159 changes: 154 additions & 5 deletions src/daft-parquet/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ struct RowGroupRange {
pub(crate) struct ParquetFileReader {
uri: String,
metadata: Arc<parquet2::metadata::FileMetaData>,
arrow_schema: arrow2::datatypes::Schema,
arrow_schema: arrow2::datatypes::SchemaRef,
row_ranges: Arc<Vec<RowGroupRange>>,
}

Expand All @@ -250,12 +250,12 @@ impl ParquetFileReader {
Ok(ParquetFileReader {
uri,
metadata: Arc::new(metadata),
arrow_schema,
arrow_schema: arrow_schema.into(),
row_ranges: Arc::new(row_ranges),
})
}

pub fn arrow_schema(&self) -> &arrow2::datatypes::Schema {
pub fn arrow_schema(&self) -> &Arc<arrow2::datatypes::Schema> {
&self.arrow_schema
}

Expand Down Expand Up @@ -308,7 +308,10 @@ impl ParquetFileReader {
read_planner.collect(io_client)
}

pub async fn read_from_ranges(self, ranges: Arc<RangesContainer>) -> DaftResult<Table> {
pub async fn read_from_ranges_into_table(
self,
ranges: Arc<RangesContainer>,
) -> DaftResult<Table> {
let metadata = self.metadata;
let all_handles = self
.arrow_schema
Expand Down Expand Up @@ -466,8 +469,154 @@ impl ParquetFileReader {
})?
.into_iter()
.collect::<DaftResult<Vec<_>>>()?;
let daft_schema = daft_core::schema::Schema::try_from(&self.arrow_schema)?;
let daft_schema = daft_core::schema::Schema::try_from(self.arrow_schema.as_ref())?;

Table::new(daft_schema, all_series)
}

pub async fn read_from_ranges_into_arrow_arrays(
self,
ranges: Arc<RangesContainer>,
) -> DaftResult<Vec<Vec<Box<dyn arrow2::array::Array>>>> {
let metadata = self.metadata;
let all_handles = self
.arrow_schema
.fields
.iter()
.map(|field| {
let owned_row_ranges = self.row_ranges.clone();

let field_handles = owned_row_ranges
.iter()
.map(|row_range| {
let row_range = *row_range;
let rt_handle = tokio::runtime::Handle::current();
let field = field.clone();
let owned_uri = self.uri.clone();
let rg = metadata
.row_groups
.get(row_range.row_group_index)
.expect("Row Group index should be in bounds");
let num_rows = rg.num_rows().min(row_range.start + row_range.num_rows);
let columns = rg.columns();
let field_name = &field.name;
let filtered_cols_idx = columns
.iter()
.enumerate()
.filter(|(_, x)| &x.descriptor().path_in_schema[0] == field_name)
.map(|(i, _)| i)
.collect::<Vec<_>>();

let range_readers = filtered_cols_idx
.iter()
.map(|i| {
let c = columns.get(*i).unwrap();
let (start, len) = c.byte_range();
let end: u64 = start + len;
let range_reader = ranges
.get_range_reader(start as usize..end as usize)
.unwrap();

Box::pin(range_reader)
})
.collect::<Vec<_>>();
let metadata = metadata.clone();
let handle = tokio::task::spawn(async move {
let mut decompressed_iters =
Vec::with_capacity(filtered_cols_idx.len());
let mut ptypes = Vec::with_capacity(filtered_cols_idx.len());

for (col_idx, range_reader) in
filtered_cols_idx.into_iter().zip(range_readers)
{
let col = metadata
.row_groups
.get(row_range.row_group_index)
.expect("Row Group index should be in bounds")
.columns()
.get(col_idx)
.expect("Column index should be in bounds");
ptypes.push(col.descriptor().descriptor.primitive_type.clone());

let compressed_page_stream =
get_owned_page_stream_from_column_start(
col,
range_reader,
vec![],
Arc::new(|_, _| true),
4 * 1024 * 1024,
)
.await
.with_context(|_| {
UnableToCreateParquetPageStreamSnafu::<String> {
path: owned_uri.clone(),
}
})?;
let page_stream = streaming_decompression(compressed_page_stream);
let pinned_stream = Box::pin(page_stream);
decompressed_iters
.push(StreamIterator::new(pinned_stream, rt_handle.clone()))
}

let (send, recv) = tokio::sync::oneshot::channel();
rayon::spawn(move || {
let arr_iter = column_iter_to_arrays(
decompressed_iters,
ptypes.iter().collect(),
field.clone(),
Some(128 * 1024),
num_rows,
);

let ser = (|| {
let mut all_arrays = vec![];
let mut curr_index = 0;

for arr in arr_iter? {
let arr = arr?;
if (curr_index + arr.len()) < row_range.start {
// throw arrays less than what we need
curr_index += arr.len();
continue;
} else if curr_index < row_range.start {
let offset = row_range.start.saturating_sub(curr_index);
all_arrays.push(arr.sliced(offset, arr.len() - offset));
curr_index += arr.len();
} else {
curr_index += arr.len();
all_arrays.push(arr);
}
}
Ok(all_arrays)
})();

let _ = send.send(ser);
});
recv.await.context(OneShotRecvSnafu {})?
});
Ok(handle)
})
.collect::<DaftResult<Vec<_>>>()?;
let owned_uri = self.uri.clone();
let array_handle = tokio::task::spawn(async move {
let all_arrays = try_join_all(field_handles).await.context(JoinSnafu {
path: owned_uri.to_string(),
})?;
let all_arrays = all_arrays.into_iter().collect::<DaftResult<Vec<_>>>()?;
let concated = all_arrays.concat();
Ok(concated)
});
Ok(array_handle)
})
.collect::<DaftResult<Vec<_>>>()?;

let all_field_arrays = try_join_all(all_handles)
.await
.context(JoinSnafu {
path: self.uri.to_string(),
})?
.into_iter()
.collect::<DaftResult<Vec<_>>>()?;
Ok(all_field_arrays)
}
}
6 changes: 6 additions & 0 deletions src/daft-parquet/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ pub enum Error {
read_rows: usize,
},

#[snafu(display(
"Parquet file: {} has multiple columns with different number of rows",
path,
))]
ParquetColumnsDontHaveEqualRows { path: String },

#[snafu(display(
"Parquet file: {} metadata listed {} columns but only read: {} ",
path,
Expand Down
66 changes: 62 additions & 4 deletions src/daft-parquet/src/python.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
use pyo3::prelude::*;

pub mod pylib {
use std::sync::Arc;

use daft_core::python::{datatype::PyTimeUnit, schema::PySchema, PySeries};
use daft_core::{
ffi::field_to_py,
python::{datatype::PyTimeUnit, schema::PySchema, PySeries},
};
use daft_io::{get_io_client, python::IOConfig};
use daft_table::python::PyTable;
use pyo3::{pyfunction, PyResult, Python};
use std::{collections::BTreeMap, sync::Arc};

use crate::read::ParquetSchemaInferenceOptions;

use daft_core::ffi::to_py_array;
#[allow(clippy::too_many_arguments)]
#[pyfunction]
pub fn read_parquet(
Expand Down Expand Up @@ -44,6 +46,60 @@ pub mod pylib {
.into())
})
}
type PyArrowChunks = Vec<Vec<pyo3::PyObject>>;
type PyArrowFields = Vec<pyo3::PyObject>;

#[allow(clippy::too_many_arguments)]
#[pyfunction]
pub fn read_parquet_into_pyarrow(
py: Python,
uri: &str,
columns: Option<Vec<&str>>,
start_offset: Option<usize>,
num_rows: Option<usize>,
row_groups: Option<Vec<i64>>,
io_config: Option<IOConfig>,
multithreaded_io: Option<bool>,
coerce_int96_timestamp_unit: Option<PyTimeUnit>,
) -> PyResult<(PyArrowFields, BTreeMap<String, String>, PyArrowChunks)> {
let read_parquet_result = py.allow_threads(|| {
let io_client = get_io_client(
multithreaded_io.unwrap_or(true),
io_config.unwrap_or_default().config.into(),
)?;
let schema_infer_options = ParquetSchemaInferenceOptions::new(
coerce_int96_timestamp_unit.map(|tu| tu.timeunit),
);
crate::read::read_parquet_into_pyarrow(
uri,
columns.as_deref(),
start_offset,
num_rows,
row_groups.as_deref(),
io_client,
multithreaded_io.unwrap_or(true),
&schema_infer_options,
)
})?;
let (schema, all_arrays) = read_parquet_result;
let pyarrow = py.import("pyarrow")?;
let converted_arrays = all_arrays
.into_iter()
.map(|v| {
v.into_iter()
.map(|a| to_py_array(a, py, pyarrow))
.collect::<PyResult<Vec<_>>>()
})
.collect::<PyResult<Vec<_>>>()?;
let fields = schema
.fields
.iter()
.map(|f| field_to_py(f, py, pyarrow))
.collect::<Result<Vec<_>, _>>()?;
let metadata = &schema.metadata;

Ok((fields, metadata.clone(), converted_arrays))
}

#[allow(clippy::too_many_arguments)]
#[pyfunction]
Expand Down Expand Up @@ -126,6 +182,8 @@ pub mod pylib {
}
pub fn register_modules(_py: Python, parent: &PyModule) -> PyResult<()> {
parent.add_wrapped(wrap_pyfunction!(pylib::read_parquet))?;
parent.add_wrapped(wrap_pyfunction!(pylib::read_parquet_into_pyarrow))?;

parent.add_wrapped(wrap_pyfunction!(pylib::read_parquet_bulk))?;
parent.add_wrapped(wrap_pyfunction!(pylib::read_parquet_schema))?;
parent.add_wrapped(wrap_pyfunction!(pylib::read_parquet_statistics))?;
Expand Down
Loading

0 comments on commit 601260b

Please sign in to comment.