Skip to content

Commit

Permalink
Added validations but now failing roundtrip tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Jay Chia committed Nov 23, 2023
1 parent c5284ef commit 1d49106
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 18 deletions.
19 changes: 15 additions & 4 deletions src/daft-core/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,21 @@ impl Schema {
}
}

/// Checks if [`self`] is a strict subset of `other`
/// The types and names of each Field have to match exactly.
pub fn is_subset(&self, _other: &Schema) -> bool {
todo!("Implement Schema::is_subset");
/// Checks if [`self`] is a strict subset of `other` based on their fields' equality
pub fn is_subset(&self, other: &Schema) -> bool {
for (field_name, self_field) in self.fields.iter() {
match other.fields.get(field_name) {
None => {
return false;
}
Some(other_field) => {
if self_field != other_field {
return false;
}
}
}
}
true
}

pub fn to_arrow(&self) -> DaftResult<arrow2::datatypes::Schema> {
Expand Down
50 changes: 36 additions & 14 deletions src/daft-micropartition/src/micropartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use crate::PyIOSnafu;
use crate::{DaftCSVSnafu, DaftCoreComputeSnafu};

use daft_io::{IOConfig, IOStatsRef};
use daft_stats::TableMetadata;
use daft_stats::TableStatistics;
use daft_stats::{ColumnRangeStatistics, TableMetadata};

pub(crate) enum TableState {
Unloaded(Arc<ScanTask>),
Expand Down Expand Up @@ -274,22 +274,47 @@ fn materialize_scan_task(
Ok((casted_table_values, cast_to_schema))
}

/// Helper that validates [`TableStatistics`] against a MicroPartition's schema
///
/// If any columns in `statistics` are "Loaded", and there is a match with a field in the MicroPartition's schema,
/// we need to assert that the types are compatible.
fn _validate_statistics(statistics: &TableStatistics, schema: &Schema) {
for (stats_col_name, stats) in statistics.columns.iter() {
match (stats, schema.fields.get(stats_col_name)) {
// Missing stats: statistics are trivially redundant
(ColumnRangeStatistics::Missing, _) => (),
// No column name matched for provided statistic: the statistic is redundant
(_, None) => (),
// Both statistics and MicroPartition field exist: we assert type compatibility
(ColumnRangeStatistics::Loaded(l, r), Some(mp_schema_field)) => {
for stats_data in [l, r] {
assert!(
// TODO: Is this too strict of a bound/is there a different criteria for compatibility?
stats_data.data_type() == &mp_schema_field.dtype,
"Unloaded MicroPartition's schema must be be compatible with statistics' types: expected field {} but received type {}",
mp_schema_field,
stats_data.data_type(),
);
}
}
}
}
}

impl MicroPartition {
/// Create a new "unloaded" MicroPartition using an associated [`ScanTask`]
///
/// Schema invariants:
/// 1. `schema` must be a strict subset of the `scan_task` schema (applying "column pruning")
/// 2. `schema` must be a strict subset of the `statistics` schema
/// 2. Check that each Loaded column in the statistics is compatible with the MicroPartition's schema
pub fn new_unloaded(
schema: SchemaRef,
scan_task: Arc<ScanTask>,
metadata: TableMetadata,
statistics: TableStatistics,
) -> Self {
assert!(
schema.is_subset(&statistics.schema()),
"Unloaded MicroPartition's schema must be a subset of its statistics' schema"
);
// Check and validate invariants with asserts
_validate_statistics(&statistics, schema.as_ref());
assert!(
schema.is_subset(&scan_task.schema),
"Unloaded MicroPartition's schema must be a subset of its ScanTask's schema"
Expand All @@ -316,19 +341,16 @@ impl MicroPartition {
///
/// Schema invariants:
/// 1. `schema` must match each Table's schema exactly
/// 2. `schema` must be a strict subset of the `statistics` schema exactly, if provided
/// 2. If `statistics` is provided, we check that each Loaded column in the statistics is compatible with the MicroPartition's schema
pub fn new_loaded(
schema: SchemaRef,
tables: Arc<Vec<Table>>,
statistics: Option<TableStatistics>,
) -> Self {
assert!(
statistics
.as_ref()
.map(|stats| schema.is_subset(&stats.schema()))
.unwrap_or(true),
"Loaded MicroPartition's statistics' schema must be a subset of its statistics' schema"
);
// Check and validate invariants with asserts
if let Some(statistics) = &statistics {
_validate_statistics(statistics, schema.as_ref());
}
for table in tables.iter() {
assert!(
table.schema == schema,
Expand Down

0 comments on commit 1d49106

Please sign in to comment.