Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] Iceberg MOR for streaming parquet #2975

Merged
merged 7 commits into from
Oct 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,10 @@ jobs:
matrix:
python-version: ['3.8'] # can't use 3.7 due to requiring anon mode for adlfs
daft-runner: [py, ray]
enable-native-executor: [0, 1]
exclude:
- daft-runner: ray
enable-native-executor: 1
steps:
- uses: actions/checkout@v4
with:
Expand Down Expand Up @@ -513,6 +517,7 @@ jobs:
pytest tests/integration/iceberg -m 'integration' --durations=50
env:
DAFT_RUNNER: ${{ matrix.daft-runner }}
DAFT_ENABLE_NATIVE_EXECUTOR: ${{ matrix.enable-native-executor }}
- name: Send Slack notification on failure
uses: slackapi/[email protected]
if: ${{ failure() && (github.ref == 'refs/heads/main') }}
Expand Down
9 changes: 7 additions & 2 deletions src/daft-local-execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,21 @@ lazy_static! {
pub static ref NUM_CPUS: usize = std::thread::available_parallelism().unwrap().get();
}

pub(crate) type TaskSet<T> = tokio::task::JoinSet<T>;
pub(crate) fn create_task_set<T>() -> TaskSet<T> {
tokio::task::JoinSet::new()
}
Comment on lines +17 to +20
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why wrap tokio::task::JoinSet here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanna keep the operators agnostic from the implementation details, to reduce complexity.


pub struct ExecutionRuntimeHandle {
worker_set: tokio::task::JoinSet<crate::Result<()>>,
worker_set: TaskSet<crate::Result<()>>,
default_morsel_size: usize,
}

impl ExecutionRuntimeHandle {
#[must_use]
pub fn new(default_morsel_size: usize) -> Self {
Self {
worker_set: tokio::task::JoinSet::new(),
worker_set: create_task_set(),
default_morsel_size,
}
}
Expand Down
127 changes: 105 additions & 22 deletions src/daft-local-execution/src/sources/scan_task.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
use std::sync::Arc;
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};

use common_error::DaftResult;
use common_file_formats::{FileFormatConfig, ParquetSourceConfig};
use daft_core::prelude::{AsArrow, Int64Array, Utf8Array};
use daft_csv::{CsvConvertOptions, CsvParseOptions, CsvReadOptions};
use daft_io::IOStatsRef;
use daft_json::{JsonConvertOptions, JsonParseOptions, JsonReadOptions};
use daft_micropartition::MicroPartition;
use daft_parquet::read::ParquetSchemaInferenceOptions;
use daft_parquet::read::{read_parquet_bulk_async, ParquetSchemaInferenceOptions};
use daft_scan::{storage_config::StorageConfig, ChunkSpec, ScanTask};
use futures::{Stream, StreamExt};
use snafu::ResultExt;
Expand All @@ -16,7 +20,7 @@
use crate::{
channel::{create_channel, Sender},
sources::source::{Source, SourceStream},
ExecutionRuntimeHandle,
ExecutionRuntimeHandle, JoinSnafu, TaskSet, NUM_CPUS,
};

pub struct ScanTaskSource {
Expand All @@ -38,9 +42,11 @@
sender: Sender<Arc<MicroPartition>>,
maintain_order: bool,
io_stats: IOStatsRef,
delete_map: Option<Arc<HashMap<String, Vec<i64>>>>,
) -> DaftResult<()> {
let schema = scan_task.materialized_schema();
let mut stream = stream_scan_task(scan_task, Some(io_stats), maintain_order).await?;
let mut stream =
stream_scan_task(scan_task, Some(io_stats), delete_map, maintain_order).await?;
let mut has_data = false;
while let Some(partition) = stream.next().await {
let _ = sender.send(partition?).await;
Expand Down Expand Up @@ -77,17 +83,28 @@
vec![receiver],
)
};
for (scan_task, sender) in self.scan_tasks.iter().zip(senders) {
runtime_handle.spawn(
Self::process_scan_task_stream(
scan_task.clone(),
sender,
maintain_order,
io_stats.clone(),
),
self.name(),
);
}
let scan_tasks = self.scan_tasks.clone();
runtime_handle.spawn(
async move {
let mut task_set = TaskSet::new();
let delete_map = get_delete_map(&scan_tasks).await?.map(Arc::new);
for (scan_task, sender) in scan_tasks.into_iter().zip(senders) {
task_set.spawn(Self::process_scan_task_stream(
scan_task,
sender,
maintain_order,
io_stats.clone(),
delete_map.clone(),
));
}
while let Some(result) = task_set.join_next().await {
result.context(JoinSnafu)??;
}
Ok(())
},
self.name(),
);

let stream = futures::stream::iter(receivers.into_iter().map(ReceiverStream::new));
Ok(Box::pin(stream.flatten()))
}
Expand All @@ -97,9 +114,80 @@
}
}

// Read all iceberg delete files and return a map of file paths to delete positions
async fn get_delete_map(
scan_tasks: &[Arc<ScanTask>],
) -> DaftResult<Option<HashMap<String, Vec<i64>>>> {
let delete_files = scan_tasks
.iter()
.flat_map(|st| {
st.sources
.iter()
.filter_map(|source| source.get_iceberg_delete_files())
.flatten()
.cloned()
})
.collect::<HashSet<_>>();
if delete_files.is_empty() {
return Ok(None);
}

Check warning on line 133 in src/daft-local-execution/src/sources/scan_task.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-local-execution/src/sources/scan_task.rs#L133

Added line #L133 was not covered by tests

let (runtime, io_client) = scan_tasks
.first()
.unwrap() // Safe to unwrap because we checked that the list is not empty
.storage_config
.get_io_client_and_runtime()?;
let scan_tasks = scan_tasks.to_vec();
runtime.block_on_io_pool(async move {
let mut delete_map = scan_tasks
.iter()
.flat_map(|st| st.sources.iter().map(|s| s.get_path().to_string()))
.map(|path| (path, vec![]))
.collect::<std::collections::HashMap<_, _>>();
let columns_to_read = Some(vec!["file_path".to_string(), "pos".to_string()]);
let result = read_parquet_bulk_async(
delete_files.into_iter().collect(),
columns_to_read,
None,
None,
None,
None,
io_client,
None,
*NUM_CPUS,
ParquetSchemaInferenceOptions::new(None),
None,
None,
None,
None,
)
.await?;

Check warning on line 164 in src/daft-local-execution/src/sources/scan_task.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-local-execution/src/sources/scan_task.rs#L135-L164

Added lines #L135 - L164 were not covered by tests

for table_result in result {
let table = table_result?;

Check warning on line 167 in src/daft-local-execution/src/sources/scan_task.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-local-execution/src/sources/scan_task.rs#L166-L167

Added lines #L166 - L167 were not covered by tests
// values in the file_path column are guaranteed by the iceberg spec to match the full URI of the corresponding data file
// https://iceberg.apache.org/spec/#position-delete-files
let file_paths = table.get_column("file_path")?.downcast::<Utf8Array>()?;
let positions = table.get_column("pos")?.downcast::<Int64Array>()?;

Check warning on line 171 in src/daft-local-execution/src/sources/scan_task.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-local-execution/src/sources/scan_task.rs#L170-L171

Added lines #L170 - L171 were not covered by tests

for (file, pos) in file_paths
.as_arrow()
.values_iter()
.zip(positions.as_arrow().values_iter())

Check warning on line 176 in src/daft-local-execution/src/sources/scan_task.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-local-execution/src/sources/scan_task.rs#L173-L176

Added lines #L173 - L176 were not covered by tests
{
if delete_map.contains_key(file) {
delete_map.get_mut(file).unwrap().push(*pos);
}

Check warning on line 180 in src/daft-local-execution/src/sources/scan_task.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-local-execution/src/sources/scan_task.rs#L178-L180

Added lines #L178 - L180 were not covered by tests
}
}
Ok(Some(delete_map))
})?

Check warning on line 184 in src/daft-local-execution/src/sources/scan_task.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-local-execution/src/sources/scan_task.rs#L183-L184

Added lines #L183 - L184 were not covered by tests
}

async fn stream_scan_task(
scan_task: Arc<ScanTask>,
io_stats: Option<IOStatsRef>,
delete_map: Option<Arc<HashMap<String, Vec<i64>>>>,
maintain_order: bool,
) -> DaftResult<impl Stream<Item = DaftResult<Arc<MicroPartition>>> + Send> {
let pushdown_columns = scan_task.pushdowns.columns.as_ref().map(|v| {
Expand Down Expand Up @@ -159,12 +247,7 @@
let inference_options =
ParquetSchemaInferenceOptions::new(Some(*coerce_int96_timestamp_unit));

if source.get_iceberg_delete_files().is_some() {
return Err(common_error::DaftError::TypeError(
"Streaming reads not supported for Iceberg delete files".to_string(),
));
}

let delete_rows = delete_map.as_ref().and_then(|m| m.get(url).cloned());
let row_groups = if let Some(ChunkSpec::Parquet(row_groups)) = source.get_chunk_spec() {
Some(row_groups.clone())
} else {
Expand All @@ -177,7 +260,6 @@
daft_parquet::read::stream_parquet(
url,
file_column_names.as_deref(),
None,
scan_task.pushdowns.limit,
row_groups,
scan_task.pushdowns.filters.clone(),
Expand All @@ -187,6 +269,7 @@
field_id_mapping.clone(),
metadata,
maintain_order,
delete_rows,
)
.await?
}
Expand Down
3 changes: 3 additions & 0 deletions src/daft-parquet/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,7 @@ impl ParquetFileReader {
predicate: Option<ExprRef>,
original_columns: Option<Vec<String>>,
original_num_rows: Option<usize>,
delete_rows: Option<Vec<i64>>,
) -> DaftResult<BoxStream<'static, DaftResult<Table>>> {
let daft_schema = Arc::new(daft_core::prelude::Schema::try_from(
self.arrow_schema.as_ref(),
Expand Down Expand Up @@ -426,6 +427,7 @@ impl ParquetFileReader {
let ranges = ranges.clone();
let predicate = predicate.clone();
let original_columns = original_columns.clone();
let delete_rows = delete_rows.clone();
let row_range = *row_range;

tokio::task::spawn(async move {
Expand Down Expand Up @@ -515,6 +517,7 @@ impl ParquetFileReader {
predicate,
original_columns,
original_num_rows,
delete_rows,
);
if table_iter.is_none() {
let table =
Expand Down
Loading
Loading