Skip to content

Commit

Permalink
[FEAT] Parameter to set num_parallel_tasks for bulk readers (#1399)
Browse files Browse the repository at this point in the history
adds `num_parallel_tasks` argument (defaults to 128)
  • Loading branch information
samster25 authored Sep 21, 2023
1 parent 83b73b2 commit ad81ae2
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 27 deletions.
2 changes: 2 additions & 0 deletions daft/daft.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,7 @@ def read_parquet_bulk(
num_rows: int | None = None,
row_groups: list[list[int]] | None = None,
io_config: IOConfig | None = None,
num_parallel_tasks: int | None = 128,
multithreaded_io: bool | None = None,
coerce_int96_timestamp_unit: PyTimeUnit | None = None,
): ...
Expand All @@ -400,6 +401,7 @@ def read_parquet_into_pyarrow_bulk(
num_rows: int | None = None,
row_groups: list[list[int]] | None = None,
io_config: IOConfig | None = None,
num_parallel_tasks: int | None = 128,
multithreaded_io: bool | None = None,
coerce_int96_timestamp_unit: PyTimeUnit | None = None,
): ...
Expand Down
4 changes: 4 additions & 0 deletions daft/table/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,7 @@ def read_parquet_bulk(
num_rows: int | None = None,
row_groups_per_path: list[list[int]] | None = None,
io_config: IOConfig | None = None,
num_parallel_tasks: int | None = 128,
multithreaded_io: bool | None = None,
coerce_int96_timestamp_unit: TimeUnit = TimeUnit.ns(),
) -> list[Table]:
Expand All @@ -406,6 +407,7 @@ def read_parquet_bulk(
num_rows=num_rows,
row_groups=row_groups_per_path,
io_config=io_config,
num_parallel_tasks=num_parallel_tasks,
multithreaded_io=multithreaded_io,
coerce_int96_timestamp_unit=coerce_int96_timestamp_unit._timeunit,
)
Expand Down Expand Up @@ -486,6 +488,7 @@ def read_parquet_into_pyarrow_bulk(
num_rows: int | None = None,
row_groups_per_path: list[list[int]] | None = None,
io_config: IOConfig | None = None,
num_parallel_tasks: int | None = 128,
multithreaded_io: bool | None = None,
coerce_int96_timestamp_unit: TimeUnit = TimeUnit.ns(),
) -> list[pa.Table]:
Expand All @@ -496,6 +499,7 @@ def read_parquet_into_pyarrow_bulk(
num_rows=num_rows,
row_groups=row_groups_per_path,
io_config=io_config,
num_parallel_tasks=num_parallel_tasks,
multithreaded_io=multithreaded_io,
coerce_int96_timestamp_unit=coerce_int96_timestamp_unit._timeunit,
)
Expand Down
4 changes: 4 additions & 0 deletions src/daft-parquet/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ pub mod pylib {
num_rows: Option<usize>,
row_groups: Option<Vec<Vec<i64>>>,
io_config: Option<IOConfig>,
num_parallel_tasks: Option<i64>,
multithreaded_io: Option<bool>,
coerce_int96_timestamp_unit: Option<PyTimeUnit>,
) -> PyResult<Vec<PyTable>> {
Expand All @@ -137,6 +138,7 @@ pub mod pylib {
num_rows,
row_groups,
io_client,
num_parallel_tasks.unwrap_or(128) as usize,
multithreaded_io.unwrap_or(true),
&schema_infer_options,
)?
Expand All @@ -156,6 +158,7 @@ pub mod pylib {
num_rows: Option<usize>,
row_groups: Option<Vec<Vec<i64>>>,
io_config: Option<IOConfig>,
num_parallel_tasks: Option<i64>,
multithreaded_io: Option<bool>,
coerce_int96_timestamp_unit: Option<PyTimeUnit>,
) -> PyResult<Vec<PyArrowParquetType>> {
Expand All @@ -175,6 +178,7 @@ pub mod pylib {
num_rows,
row_groups,
io_client,
num_parallel_tasks.unwrap_or(128) as usize,
multithreaded_io.unwrap_or(true),
&schema_infer_options,
)
Expand Down
72 changes: 45 additions & 27 deletions src/daft-parquet/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use daft_core::{
};
use daft_io::{get_runtime, IOClient};
use daft_table::Table;
use futures::future::{join_all, try_join_all};
use futures::{future::join_all, StreamExt, TryStreamExt};
use snafu::ResultExt;

use crate::{file::ParquetReaderBuilder, JoinSnafu};
Expand Down Expand Up @@ -321,6 +321,7 @@ pub fn read_parquet_bulk(
num_rows: Option<usize>,
row_groups: Option<Vec<Vec<i64>>>,
io_client: Arc<IOClient>,
num_parallel_tasks: usize,
multithreaded_io: bool,
schema_infer_options: &ParquetSchemaInferenceOptions,
) -> DaftResult<Vec<Table>> {
Expand All @@ -338,7 +339,7 @@ pub fn read_parquet_bulk(
}
let tables = runtime_handle
.block_on(async move {
try_join_all(uris.iter().enumerate().map(|(i, uri)| {
let task_stream = futures::stream::iter(uris.iter().enumerate().map(|(i, uri)| {
let uri = uri.to_string();
let owned_columns = owned_columns.clone();
let owned_row_group = match &row_groups {
Expand All @@ -352,22 +353,31 @@ pub fn read_parquet_bulk(
let columns = owned_columns
.as_ref()
.map(|s| s.iter().map(AsRef::as_ref).collect::<Vec<_>>());
read_parquet_single(
&uri,
columns.as_deref(),
start_offset,
num_rows,
owned_row_group.as_deref(),
io_client,
&schema_infer_options,
)
.await
Ok((
i,
read_parquet_single(
&uri,
columns.as_deref(),
start_offset,
num_rows,
owned_row_group.as_deref(),
io_client,
&schema_infer_options,
)
.await?,
))
})
}))
.await
}));
task_stream
.buffer_unordered(num_parallel_tasks)
.try_collect::<Vec<_>>()
.await
})
.context(JoinSnafu { path: "UNKNOWN" })?;
tables.into_iter().collect::<DaftResult<Vec<_>>>()

let mut collected = tables.into_iter().collect::<DaftResult<Vec<_>>>()?;
collected.sort_by_key(|(idx, _)| *idx);
Ok(collected.into_iter().map(|(_, v)| v).collect())
}

#[allow(clippy::too_many_arguments)]
Expand All @@ -378,6 +388,7 @@ pub fn read_parquet_into_pyarrow_bulk(
num_rows: Option<usize>,
row_groups: Option<Vec<Vec<i64>>>,
io_client: Arc<IOClient>,
num_parallel_tasks: usize,
multithreaded_io: bool,
schema_infer_options: &ParquetSchemaInferenceOptions,
) -> DaftResult<Vec<ParquetPyarrowChunk>> {
Expand All @@ -395,7 +406,7 @@ pub fn read_parquet_into_pyarrow_bulk(
}
let tables = runtime_handle
.block_on(async move {
try_join_all(uris.iter().enumerate().map(|(i, uri)| {
futures::stream::iter(uris.iter().enumerate().map(|(i, uri)| {
let uri = uri.to_string();
let owned_columns = owned_columns.clone();
let owned_row_group = match &row_groups {
Expand All @@ -409,22 +420,29 @@ pub fn read_parquet_into_pyarrow_bulk(
let columns = owned_columns
.as_ref()
.map(|s| s.iter().map(AsRef::as_ref).collect::<Vec<_>>());
read_parquet_single_into_arrow(
&uri,
columns.as_deref(),
start_offset,
num_rows,
owned_row_group.as_deref(),
io_client,
&schema_infer_options,
)
.await
Ok((
i,
read_parquet_single_into_arrow(
&uri,
columns.as_deref(),
start_offset,
num_rows,
owned_row_group.as_deref(),
io_client,
&schema_infer_options,
)
.await?,
))
})
}))
.buffer_unordered(num_parallel_tasks)
.try_collect::<Vec<_>>()
.await
})
.context(JoinSnafu { path: "UNKNOWN" })?;
tables.into_iter().collect::<DaftResult<Vec<_>>>()
let mut collected = tables.into_iter().collect::<DaftResult<Vec<_>>>()?;
collected.sort_by_key(|(idx, _)| *idx);
Ok(collected.into_iter().map(|(_, v)| v).collect())
}

pub fn read_parquet_schema(
Expand Down

0 comments on commit ad81ae2

Please sign in to comment.