Skip to content

Commit

Permalink
[CHORE] Enable test_creation and test_parquet for native executor (Ev…
Browse files Browse the repository at this point in the history
…entual-Inc#2672)

Enables the `dataframe/test_creation.py` and `io/test_parquet.py` test
suite for the native executor.

Changes:
- Add `PythonStorageConfig` reading functionality (just copying the
existing logic in `materialize_scan_task`)
- Enable streaming parquet reads to read files with differing schemas:
See: Eventual-Inc#2514

---------

Co-authored-by: Colin Ho <[email protected]>
Co-authored-by: Colin Ho <[email protected]>
  • Loading branch information
3 people authored and sagiahrac committed Oct 7, 2024
1 parent 5ba8ad2 commit 6cb4fc0
Show file tree
Hide file tree
Showing 7 changed files with 195 additions and 201 deletions.
3 changes: 3 additions & 0 deletions src/daft-local-execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ pub enum Error {
OneShotRecvError {
source: tokio::sync::oneshot::error::RecvError,
},
#[cfg(feature = "python")]
#[snafu(display("PyIOError: {}", source))]
PyIO { source: PyErr },
#[snafu(display("Error creating pipeline from {}: {}", plan_name, source))]
PipelineCreationError {
source: DaftError,
Expand Down
276 changes: 127 additions & 149 deletions src/daft-local-execution/src/sources/scan_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,162 +137,140 @@ async fn stream_scan_task(
}
let source = scan_task.sources.first().unwrap();
let url = source.get_path();
let table_stream = match scan_task.storage_config.as_ref() {
StorageConfig::Native(native_storage_config) => {
let io_config = Arc::new(
native_storage_config
.io_config
.as_ref()
.cloned()
.unwrap_or_default(),
);
let multi_threaded_io = native_storage_config.multithreaded_io;
let io_client = daft_io::get_io_client(multi_threaded_io, io_config)?;

match scan_task.file_format_config.as_ref() {
// ********************
// Native Parquet Reads
// ********************
FileFormatConfig::Parquet(ParquetSourceConfig {
coerce_int96_timestamp_unit,
field_id_mapping,
..
}) => {
let inference_options =
ParquetSchemaInferenceOptions::new(Some(*coerce_int96_timestamp_unit));

if source.get_iceberg_delete_files().is_some() {
return Err(common_error::DaftError::TypeError(
"Streaming reads not supported for Iceberg delete files".to_string(),
));
}
let (io_config, multi_threaded_io) = match scan_task.storage_config.as_ref() {
StorageConfig::Native(native_storage_config) => (
native_storage_config.io_config.as_ref(),
native_storage_config.multithreaded_io,
),

let row_groups =
if let Some(ChunkSpec::Parquet(row_groups)) = source.get_chunk_spec() {
Some(row_groups.clone())
} else {
None
};
let metadata = scan_task
.sources
.first()
.and_then(|s| s.get_parquet_metadata().cloned());
daft_parquet::read::stream_parquet(
url,
file_column_names.as_deref(),
None,
scan_task.pushdowns.limit,
row_groups,
scan_task.pushdowns.filters.clone(),
io_client.clone(),
io_stats,
&inference_options,
field_id_mapping.clone(),
metadata,
maintain_order,
)
.await?
}
#[cfg(feature = "python")]
StorageConfig::Python(python_storage_config) => {
(python_storage_config.io_config.as_ref(), true)
}
};
let io_config = Arc::new(io_config.cloned().unwrap_or_default());
let io_client = daft_io::get_io_client(multi_threaded_io, io_config)?;
let table_stream = match scan_task.file_format_config.as_ref() {
FileFormatConfig::Parquet(ParquetSourceConfig {
coerce_int96_timestamp_unit,
field_id_mapping,
..
}) => {
let inference_options =
ParquetSchemaInferenceOptions::new(Some(*coerce_int96_timestamp_unit));

// ****************
// Native CSV Reads
// ****************
FileFormatConfig::Csv(cfg) => {
let schema_of_file = scan_task.schema.clone();
let col_names = if !cfg.has_headers {
Some(
schema_of_file
.fields
.values()
.map(|f| f.name.as_str())
.collect::<Vec<_>>(),
)
} else {
None
};
let convert_options = CsvConvertOptions::new_internal(
scan_task.pushdowns.limit,
file_column_names
.as_ref()
.map(|cols| cols.iter().map(|col| col.to_string()).collect()),
col_names
.as_ref()
.map(|cols| cols.iter().map(|col| col.to_string()).collect()),
Some(schema_of_file),
scan_task.pushdowns.filters.clone(),
);
let parse_options = CsvParseOptions::new_with_defaults(
cfg.has_headers,
cfg.delimiter,
cfg.double_quote,
cfg.quote,
cfg.allow_variable_columns,
cfg.escape_char,
cfg.comment,
)?;
let read_options =
CsvReadOptions::new_internal(cfg.buffer_size, cfg.chunk_size);
daft_csv::stream_csv(
url.to_string(),
Some(convert_options),
Some(parse_options),
Some(read_options),
io_client.clone(),
io_stats.clone(),
None,
// maintain_order, TODO: Implement maintain_order for CSV
)
.await?
}
if source.get_iceberg_delete_files().is_some() {
return Err(common_error::DaftError::TypeError(
"Streaming reads not supported for Iceberg delete files".to_string(),
));
}

// ****************
// Native JSON Reads
// ****************
FileFormatConfig::Json(cfg) => {
let schema_of_file = scan_task.schema.clone();
let convert_options = JsonConvertOptions::new_internal(
scan_task.pushdowns.limit,
file_column_names
.as_ref()
.map(|cols| cols.iter().map(|col| col.to_string()).collect()),
Some(schema_of_file),
scan_task.pushdowns.filters.clone(),
);
// let
let parse_options = JsonParseOptions::new_internal();
let read_options =
JsonReadOptions::new_internal(cfg.buffer_size, cfg.chunk_size);
let row_groups = if let Some(ChunkSpec::Parquet(row_groups)) = source.get_chunk_spec() {
Some(row_groups.clone())
} else {
None
};
let metadata = scan_task
.sources
.first()
.and_then(|s| s.get_parquet_metadata().cloned());
daft_parquet::read::stream_parquet(
url,
file_column_names.as_deref(),
None,
scan_task.pushdowns.limit,
row_groups,
scan_task.pushdowns.filters.clone(),
io_client,
io_stats,
&inference_options,
field_id_mapping.clone(),
metadata,
maintain_order,
)
.await?
}
FileFormatConfig::Csv(cfg) => {
let schema_of_file = scan_task.schema.clone();
let col_names = if !cfg.has_headers {
Some(
schema_of_file
.fields
.values()
.map(|f| f.name.as_str())
.collect::<Vec<_>>(),
)
} else {
None
};
let convert_options = CsvConvertOptions::new_internal(
scan_task.pushdowns.limit,
file_column_names
.as_ref()
.map(|cols| cols.iter().map(|col| col.to_string()).collect()),
col_names
.as_ref()
.map(|cols| cols.iter().map(|col| col.to_string()).collect()),
Some(schema_of_file),
scan_task.pushdowns.filters.clone(),
);
let parse_options = CsvParseOptions::new_with_defaults(
cfg.has_headers,
cfg.delimiter,
cfg.double_quote,
cfg.quote,
cfg.allow_variable_columns,
cfg.escape_char,
cfg.comment,
)?;
let read_options = CsvReadOptions::new_internal(cfg.buffer_size, cfg.chunk_size);
daft_csv::stream_csv(
url.to_string(),
Some(convert_options),
Some(parse_options),
Some(read_options),
io_client,
io_stats.clone(),
None,
// maintain_order, TODO: Implement maintain_order for CSV
)
.await?
}
FileFormatConfig::Json(cfg) => {
let schema_of_file = scan_task.schema.clone();
let convert_options = JsonConvertOptions::new_internal(
scan_task.pushdowns.limit,
file_column_names
.as_ref()
.map(|cols| cols.iter().map(|col| col.to_string()).collect()),
Some(schema_of_file),
scan_task.pushdowns.filters.clone(),
);
let parse_options = JsonParseOptions::new_internal();
let read_options = JsonReadOptions::new_internal(cfg.buffer_size, cfg.chunk_size);

daft_json::read::stream_json(
url.to_string(),
Some(convert_options),
Some(parse_options),
Some(read_options),
io_client,
io_stats,
None,
// maintain_order, TODO: Implement maintain_order for JSON
)
.await?
}
#[cfg(feature = "python")]
FileFormatConfig::Database(_) => {
return Err(common_error::DaftError::TypeError(
"Native reads for Database file format not implemented".to_string(),
));
}
#[cfg(feature = "python")]
FileFormatConfig::PythonFunction => {
return Err(common_error::DaftError::TypeError(
"Native reads for PythonFunction file format not implemented".to_string(),
));
}
}
daft_json::read::stream_json(
url.to_string(),
Some(convert_options),
Some(parse_options),
Some(read_options),
io_client,
io_stats,
None,
// maintain_order, TODO: Implement maintain_order for JSON
)
.await?
}
#[cfg(feature = "python")]
FileFormatConfig::Database(_) => {
return Err(common_error::DaftError::TypeError(
"Database file format not implemented".to_string(),
));
}
#[cfg(feature = "python")]
StorageConfig::Python(_) => {
FileFormatConfig::PythonFunction => {
return Err(common_error::DaftError::TypeError(
"Streaming reads not supported for Python storage config".to_string(),
"PythonFunction file format not implemented".to_string(),
));
}
};
Expand Down
6 changes: 3 additions & 3 deletions src/daft-micropartition/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -749,7 +749,7 @@ impl PyMicroPartition {
}
}

pub(crate) fn read_json_into_py_table(
pub fn read_json_into_py_table(
py: Python,
uri: &str,
schema: PySchema,
Expand All @@ -776,7 +776,7 @@ pub(crate) fn read_json_into_py_table(
}

#[allow(clippy::too_many_arguments)]
pub(crate) fn read_csv_into_py_table(
pub fn read_csv_into_py_table(
py: Python,
uri: &str,
has_header: bool,
Expand Down Expand Up @@ -810,7 +810,7 @@ pub(crate) fn read_csv_into_py_table(
.extract()
}

pub(crate) fn read_parquet_into_py_table(
pub fn read_parquet_into_py_table(
py: Python,
uri: &str,
schema: PySchema,
Expand Down
32 changes: 22 additions & 10 deletions src/daft-parquet/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -502,17 +502,29 @@ impl ParquetFileReader {
.into_iter()
.collect::<DaftResult<Vec<_>>>()?;

let table_iter = arrow_column_iters_to_table_iter(
arr_iters,
row_range.start,
daft_schema,
uri,
predicate,
original_columns,
original_num_rows,
);
rayon::spawn(move || {
for table_result in table_iter {
// Even if there are no columns to read, we still need to create a empty table with the correct number of rows
// This is because the columns may be present in other files. See https://github.com/Eventual-Inc/Daft/pull/2514
let table_iter = arrow_column_iters_to_table_iter(
arr_iters,
row_range.start,
daft_schema.clone(),
uri,
predicate,
original_columns,
original_num_rows,
);
if table_iter.is_none() {
let table =
Table::new_with_size(daft_schema, vec![], row_range.num_rows);
if let Err(crossbeam_channel::TrySendError::Full(_)) =
sender.try_send(table)
{
panic!("Parquet stream channel should not be full")
}
return;
}
for table_result in table_iter.unwrap() {
let is_err = table_result.is_err();
if let Err(crossbeam_channel::TrySendError::Full(_)) =
sender.try_send(table_result)
Expand Down
Loading

0 comments on commit 6cb4fc0

Please sign in to comment.