Skip to content

Commit

Permalink
[BUG] Fix for Iceberg schema projection (#1815)
Browse files Browse the repository at this point in the history
Correctly "projects" the schema retrieved from Iceberg onto Parquet
files after reads

This PR also introduces `ScanTask::materialized_schema()`, which returns
the schema of a ScanTask after pushdowns (column pruning) is applied.

1. When an unloaded MicroPartition is created from a ScanTask, it
inherits the ScanTask's `materialized_schema`
2. When an unloaded MicroPartition is materialized, it casts all
resulting tables to its `self.schema`, which might have been modified by
operations such as `.select` or `.cast`.

---------

Co-authored-by: Fokko Driesprong <[email protected]>
Co-authored-by: Jay Chia <[email protected]@users.noreply.github.com>
  • Loading branch information
3 people authored Jan 24, 2024
1 parent 854037e commit 5814922
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 70 deletions.
79 changes: 13 additions & 66 deletions src/daft-micropartition/src/micropartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,19 +77,13 @@ fn materialize_scan_task(
cast_to_schema: Option<SchemaRef>,
io_stats: Option<IOStatsRef>,
) -> crate::Result<(Vec<Table>, SchemaRef)> {
log::debug!("Materializing ScanTask: {scan_task:?}");

let column_names = scan_task
.pushdowns
.columns
.as_ref()
.map(|v| v.iter().map(|s| s.as_ref()).collect::<Vec<&str>>());
let urls = scan_task.sources.iter().map(|s| s.get_path());

// Schema to cast resultant tables into, ensuring that all Tables have the same schema.
// Note that we need to apply column pruning here if specified by the ScanTask
let cast_to_schema = cast_to_schema.unwrap_or_else(|| scan_task.schema.clone());

let table_values = match scan_task.storage_config.as_ref() {
StorageConfig::Native(native_storage_config) => {
let runtime_handle =
Expand Down Expand Up @@ -135,9 +129,10 @@ fn materialize_scan_task(
// Native CSV Reads
// ****************
FileFormatConfig::Csv(cfg) => {
let schema_of_file = scan_task.schema.clone();
let col_names = if !cfg.has_headers {
Some(
cast_to_schema
schema_of_file
.fields
.values()
.map(|f| f.name.as_str())
Expand Down Expand Up @@ -226,7 +221,7 @@ fn materialize_scan_task(
crate::python::read_parquet_into_py_table(
py,
url,
cast_to_schema.clone().into(),
scan_task.schema.clone().into(),
(*coerce_int96_timestamp_unit).into(),
scan_task.storage_config.clone().into(),
scan_task
Expand Down Expand Up @@ -254,7 +249,7 @@ fn materialize_scan_task(
*has_headers,
*delimiter,
*double_quote,
cast_to_schema.clone().into(),
scan_task.schema.clone().into(),
scan_task.storage_config.clone().into(),
scan_task
.pushdowns
Expand All @@ -273,7 +268,7 @@ fn materialize_scan_task(
crate::python::read_json_into_py_table(
py,
url,
cast_to_schema.clone().into(),
scan_task.schema.clone().into(),
scan_task.storage_config.clone().into(),
scan_task
.pushdowns
Expand All @@ -290,8 +285,10 @@ fn materialize_scan_task(
}
}
};
let cast_to_schema = prune_fields_from_schema_ref(cast_to_schema, column_names.as_deref())
.context(DaftCoreComputeSnafu)?;

// Schema to cast resultant tables into, ensuring that all Tables have the same schema.
// Note that we need to apply column pruning here if specified by the ScanTask
let cast_to_schema = cast_to_schema.unwrap_or_else(|| scan_task.materialized_schema());

let casted_table_values = table_values
.iter()
Expand All @@ -305,23 +302,13 @@ impl MicroPartition {
/// Create a new "unloaded" MicroPartition using an associated [`ScanTask`]
///
/// Schema invariants:
/// 1. All columns in `schema` must be exist in the `scan_task` schema
/// 2. Each Loaded column statistic in `statistics` must be castable to the corresponding column in the MicroPartition's schema
/// 1. Each Loaded column statistic in `statistics` must be castable to the corresponding column in the MicroPartition's schema
pub fn new_unloaded(
schema: SchemaRef,
scan_task: Arc<ScanTask>,
metadata: TableMetadata,
statistics: TableStatistics,
) -> Self {
assert!(
schema
.fields
.keys()
.collect::<HashSet<_>>()
.is_subset(&scan_task.schema.fields.keys().collect::<HashSet<_>>()),
"Unloaded MicroPartition's schema names must be a subset of its ScanTask's schema"
);

MicroPartition {
schema: schema.clone(),
state: Mutex::new(TableState::Unloaded(scan_task)),
Expand Down Expand Up @@ -370,7 +357,7 @@ impl MicroPartition {
}

pub fn from_scan_task(scan_task: Arc<ScanTask>, io_stats: IOStatsRef) -> crate::Result<Self> {
let schema = scan_task.schema.clone();
let schema = scan_task.materialized_schema();
match (
&scan_task.metadata,
&scan_task.statistics,
Expand Down Expand Up @@ -428,13 +415,7 @@ impl MicroPartition {
)
.context(DaftCoreComputeSnafu)?;

let applied_schema = Arc::new(
mp.schema
.apply_hints(&schema)
.context(DaftCoreComputeSnafu)?,
);
mp.cast_to_schema(applied_schema)
.context(DaftCoreComputeSnafu)
mp.cast_to_schema(schema).context(DaftCoreComputeSnafu)
}

// CASE: Last resort fallback option
Expand Down Expand Up @@ -555,40 +536,6 @@ fn prune_fields_from_schema(schema: Schema, columns: Option<&[&str]>) -> DaftRes
}
}

fn prune_fields_from_schema_ref(
schema: SchemaRef,
columns: Option<&[&str]>,
) -> DaftResult<SchemaRef> {
if let Some(columns) = columns {
let avail_names = schema
.fields
.keys()
.map(|f| f.as_str())
.collect::<HashSet<_>>();
let mut names_to_keep = HashSet::new();
for col_name in columns.iter() {
if avail_names.contains(col_name) {
names_to_keep.insert(*col_name);
} else {
return Err(super::Error::FieldNotFound {
field: col_name.to_string(),
available_fields: avail_names.iter().map(|v| v.to_string()).collect(),
}
.into());
}
}
let filtered_columns = schema
.fields
.values()
.filter(|field| names_to_keep.contains(field.name.as_str()))
.cloned()
.collect::<Vec<_>>();
Ok(Schema::new(filtered_columns)?.into())
} else {
Ok(schema)
}
}

fn parquet_sources_to_row_groups(sources: &[DataFileSource]) -> Option<Vec<Option<Vec<i64>>>> {
let row_groups = sources
.iter()
Expand Down Expand Up @@ -882,7 +829,7 @@ pub(crate) fn read_parquet_into_micropartition(
let stats = stats.eval_expression_list(exprs.as_slice(), daft_schema.as_ref())?;

Ok(MicroPartition::new_unloaded(
scan_task.schema.clone(),
scan_task.materialized_schema(),
Arc::new(scan_task),
TableMetadata { length: total_rows },
stats,
Expand Down
27 changes: 25 additions & 2 deletions src/daft-scan/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ use std::{
};

use common_error::{DaftError, DaftResult};
use daft_core::{datatypes::Field, schema::SchemaRef};
use daft_core::{
datatypes::Field,
schema::{Schema, SchemaRef},
};
use daft_dsl::ExprRef;
use daft_stats::{PartitionSpec, TableMetadata, TableStatistics};
use file_format::FileFormatConfig;
Expand Down Expand Up @@ -165,8 +168,13 @@ impl DataFileSource {
#[derive(Debug, Serialize, Deserialize)]
pub struct ScanTask {
pub sources: Vec<DataFileSource>,
pub file_format_config: Arc<FileFormatConfig>,

/// Schema to use when reading the DataFileSources.
/// Note that this is different than the schema of the data after pushdowns have been applied,
/// which can be obtained with [`ScanTask::materialized_schema`] instead.
pub schema: SchemaRef,

pub file_format_config: Arc<FileFormatConfig>,
pub storage_config: Arc<StorageConfig>,
pub pushdowns: Pushdowns,
pub size_bytes_on_disk: Option<u64>,
Expand Down Expand Up @@ -269,6 +277,21 @@ impl ScanTask {
))
}

pub fn materialized_schema(&self) -> SchemaRef {
match &self.pushdowns.columns {
None => self.schema.clone(),
Some(columns) => Arc::new(Schema {
fields: self
.schema
.fields
.clone()
.into_iter()
.filter(|(name, _)| columns.contains(name))
.collect(),
}),
}
}

pub fn num_rows(&self) -> Option<usize> {
if self.pushdowns.filters.is_some() {
None
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/iceberg/docker-compose/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ WORKDIR ${SPARK_HOME}

ENV SPARK_VERSION=3.4.2
ENV ICEBERG_SPARK_RUNTIME_VERSION=3.4_2.12
ENV ICEBERG_VERSION=1.4.0
ENV ICEBERG_VERSION=1.4.3
ENV AWS_SDK_VERSION=2.20.18
ENV PYICEBERG_VERSION=0.4.0
ENV PYICEBERG_VERSION=0.5.1

RUN curl --retry 3 -s -C - https://daft-public-data.s3.us-west-2.amazonaws.com/distribution/spark-${SPARK_VERSION}-bin-hadoop3.tgz -o spark-${SPARK_VERSION}-bin-hadoop3.tgz \
&& tar xzf spark-${SPARK_VERSION}-bin-hadoop3.tgz --directory /opt/spark --strip-components 1 \
Expand Down
16 changes: 16 additions & 0 deletions tests/integration/iceberg/docker-compose/provision.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,3 +322,19 @@
('123')
"""
)

spark.sql(
"""
CREATE OR REPLACE TABLE default.add_new_column
USING iceberg
AS SELECT
1 AS idx
UNION ALL SELECT
2 AS idx
UNION ALL SELECT
3 AS idx
"""
)

spark.sql("ALTER TABLE default.add_new_column ADD COLUMN name STRING")
spark.sql("INSERT INTO default.add_new_column VALUES (3, 'abc'), (4, 'def')")
1 change: 1 addition & 0 deletions tests/integration/iceberg/test_table_load.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ def test_daft_iceberg_table_open(local_iceberg_tables):
# "test_table_sanitized_character", # Bug in scan().to_arrow().to_arrow()
"test_table_version", # we have bugs when loading no files
"test_uuid_and_fixed_unpartitioned",
"add_new_column",
]


Expand Down

0 comments on commit 5814922

Please sign in to comment.