From d98143e1cc4cd9889c934c7f21a7ac8d3cd3e375 Mon Sep 17 00:00:00 2001 From: Stephen Carman Date: Sat, 18 Nov 2023 17:17:52 -0500 Subject: [PATCH 1/6] feat: Added enforcement for invariants to commit checker, fixed a typo I noticed --- .../src/kernel/actions/types.rs | 10 +++++----- .../src/operations/transaction/protocol.rs | 19 +++++++++++++++++++ 2 files changed, 24 insertions(+), 5 deletions(-) diff --git a/crates/deltalake-core/src/kernel/actions/types.rs b/crates/deltalake-core/src/kernel/actions/types.rs index a788315b82..5acbbf49ea 100644 --- a/crates/deltalake-core/src/kernel/actions/types.rs +++ b/crates/deltalake-core/src/kernel/actions/types.rs @@ -264,7 +264,7 @@ pub enum WriterFeatures { /// ID Columns IdentityColumns, /// Deletion vectors for merge, update, delete - DeleteionVecotrs, + DeletionVectors, /// Row tracking on tables RowTracking, /// timestamps without timezone support @@ -291,7 +291,7 @@ impl Into for WriterFeatures { WriterFeatures::ChangeDataFeed | WriterFeatures::GeneratedColumns => 4, WriterFeatures::ColumnMapping => 5, WriterFeatures::IdentityColumns - | WriterFeatures::DeleteionVecotrs + | WriterFeatures::DeletionVectors | WriterFeatures::RowTracking | WriterFeatures::TimestampWithoutTimezone | WriterFeatures::DomainMetadata @@ -311,7 +311,7 @@ impl From for WriterFeatures { "generatedColumns" => WriterFeatures::GeneratedColumns, "columnMapping" => WriterFeatures::ColumnMapping, "identityColumns" => WriterFeatures::IdentityColumns, - "deletionVectors" => WriterFeatures::DeleteionVecotrs, + "deletionVectors" => WriterFeatures::DeletionVectors, "rowTracking" => WriterFeatures::RowTracking, "timestampNtz" => WriterFeatures::TimestampWithoutTimezone, "domainMetadata" => WriterFeatures::DomainMetadata, @@ -332,7 +332,7 @@ impl AsRef for WriterFeatures { WriterFeatures::GeneratedColumns => "generatedColumns", WriterFeatures::ColumnMapping => "columnMapping", WriterFeatures::IdentityColumns => "identityColumns", - WriterFeatures::DeleteionVecotrs => "deletionVectors", + WriterFeatures::DeletionVectors => "deletionVectors", WriterFeatures::RowTracking => "rowTracking", WriterFeatures::TimestampWithoutTimezone => "timestampNtz", WriterFeatures::DomainMetadata => "domainMetadata", @@ -361,7 +361,7 @@ impl From<&parquet::record::Field> for WriterFeatures { "generatedColumns" => WriterFeatures::GeneratedColumns, "columnMapping" => WriterFeatures::ColumnMapping, "identityColumns" => WriterFeatures::IdentityColumns, - "deletionVectors" => WriterFeatures::DeleteionVecotrs, + "deletionVectors" => WriterFeatures::DeletionVectors, "rowTracking" => WriterFeatures::RowTracking, "timestampNtz" => WriterFeatures::TimestampWithoutTimezone, "domainMetadata" => WriterFeatures::DomainMetadata, diff --git a/crates/deltalake-core/src/operations/transaction/protocol.rs b/crates/deltalake-core/src/operations/transaction/protocol.rs index 47e4d0a41a..ceb1762620 100644 --- a/crates/deltalake-core/src/operations/transaction/protocol.rs +++ b/crates/deltalake-core/src/operations/transaction/protocol.rs @@ -142,6 +142,25 @@ impl ProtocolChecker { })?; } + // Does the table have invariants? + let invariants = snapshot + .current_metadata() + .and_then(|meta| meta.schema.get_invariants().ok()) + .unwrap_or_default() + .is_empty(); + + // The table is at least version 7, has actual invariants and has the proper writer feature + // enabled + if snapshot.min_writer_version() >= 7 + && !invariants + && !snapshot + .writer_features() + .unwrap() + .contains(&WriterFeatures::Invariants) + { + return Err(TransactionError::WriterFeaturesRequired); + } + Ok(()) } } From 4bacb48e51219a074b2c39786bdf7ac1fda2f9e5 Mon Sep 17 00:00:00 2001 From: Stephen Carman Date: Sat, 18 Nov 2023 23:13:32 -0500 Subject: [PATCH 2/6] feat: Added enforcement for invariants to commit checker, fixed a typo I noticed --- crates/deltalake-core/src/operations/transaction/protocol.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/deltalake-core/src/operations/transaction/protocol.rs b/crates/deltalake-core/src/operations/transaction/protocol.rs index ceb1762620..478f0b5792 100644 --- a/crates/deltalake-core/src/operations/transaction/protocol.rs +++ b/crates/deltalake-core/src/operations/transaction/protocol.rs @@ -143,7 +143,7 @@ impl ProtocolChecker { } // Does the table have invariants? - let invariants = snapshot + let has_invariants = !snapshot .current_metadata() .and_then(|meta| meta.schema.get_invariants().ok()) .unwrap_or_default() @@ -152,7 +152,7 @@ impl ProtocolChecker { // The table is at least version 7, has actual invariants and has the proper writer feature // enabled if snapshot.min_writer_version() >= 7 - && !invariants + && has_invariants && !snapshot .writer_features() .unwrap() From 476c2e027b3111ab383425fa5eceddfef7458937 Mon Sep 17 00:00:00 2001 From: Stephen Carman Date: Sat, 18 Nov 2023 23:18:02 -0500 Subject: [PATCH 3/6] feat: Added enforcement for invariants to commit checker, fixed a typo I noticed --- crates/deltalake-core/src/kernel/actions/types.rs | 2 +- .../deltalake-core/src/operations/transaction/protocol.rs | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/deltalake-core/src/kernel/actions/types.rs b/crates/deltalake-core/src/kernel/actions/types.rs index 802d9fa0c0..1cbc48fa00 100644 --- a/crates/deltalake-core/src/kernel/actions/types.rs +++ b/crates/deltalake-core/src/kernel/actions/types.rs @@ -291,7 +291,7 @@ impl Into for WriterFeatures { | WriterFeatures::V2Checkpoint | WriterFeatures::IcebergCompatV1 => 7, } - } + } } impl From for WriterFeatures { diff --git a/crates/deltalake-core/src/operations/transaction/protocol.rs b/crates/deltalake-core/src/operations/transaction/protocol.rs index 478f0b5792..341e5bdb68 100644 --- a/crates/deltalake-core/src/operations/transaction/protocol.rs +++ b/crates/deltalake-core/src/operations/transaction/protocol.rs @@ -154,9 +154,9 @@ impl ProtocolChecker { if snapshot.min_writer_version() >= 7 && has_invariants && !snapshot - .writer_features() - .unwrap() - .contains(&WriterFeatures::Invariants) + .writer_features() + .unwrap() + .contains(&WriterFeatures::Invariants) { return Err(TransactionError::WriterFeaturesRequired); } From 05be8514a023e6f84fc2393eef38802cac8f9259 Mon Sep 17 00:00:00 2001 From: Stephen Carman Date: Sun, 19 Nov 2023 01:27:17 -0500 Subject: [PATCH 4/6] feat: Added enforcement for invariants to commit checker, fixed a typo I noticed --- .../src/kernel/actions/types.rs | 20 ------------------- 1 file changed, 20 deletions(-) diff --git a/crates/deltalake-core/src/kernel/actions/types.rs b/crates/deltalake-core/src/kernel/actions/types.rs index 1cbc48fa00..aa60823e4a 100644 --- a/crates/deltalake-core/src/kernel/actions/types.rs +++ b/crates/deltalake-core/src/kernel/actions/types.rs @@ -274,26 +274,6 @@ pub enum WriterFeatures { Other(String), } -#[allow(clippy::from_over_into)] -impl Into for WriterFeatures { - fn into(self) -> usize { - match self { - WriterFeatures::Other(_) => 0, - WriterFeatures::AppendOnly | WriterFeatures::Invariants => 2, - WriterFeatures::CheckConstraints => 3, - WriterFeatures::ChangeDataFeed | WriterFeatures::GeneratedColumns => 4, - WriterFeatures::ColumnMapping => 5, - WriterFeatures::IdentityColumns - | WriterFeatures::DeletionVectors - | WriterFeatures::RowTracking - | WriterFeatures::TimestampWithoutTimezone - | WriterFeatures::DomainMetadata - | WriterFeatures::V2Checkpoint - | WriterFeatures::IcebergCompatV1 => 7, - } - } -} - impl From for WriterFeatures { fn from(value: String) -> Self { value.as_str().into() From 4d06c7215401a0c469c0578c11e659dbcf04359d Mon Sep 17 00:00:00 2001 From: Stephen Carman Date: Sat, 25 Nov 2023 11:49:38 -0500 Subject: [PATCH 5/6] feat: Added enforcement for invariants to data checker, fixed a typo I noticed --- .../src/delta_datafusion/mod.rs | 46 ++++++++++++++++--- .../src/operations/transaction/protocol.rs | 19 -------- crates/deltalake-core/src/operations/write.rs | 7 +-- python/src/lib.rs | 2 +- 4 files changed, 41 insertions(+), 33 deletions(-) diff --git a/crates/deltalake-core/src/delta_datafusion/mod.rs b/crates/deltalake-core/src/delta_datafusion/mod.rs index 8dea811383..19ed72c09d 100644 --- a/crates/deltalake-core/src/delta_datafusion/mod.rs +++ b/crates/deltalake-core/src/delta_datafusion/mod.rs @@ -71,7 +71,7 @@ use serde::{Deserialize, Serialize}; use url::Url; use crate::errors::{DeltaResult, DeltaTableError}; -use crate::kernel::{Add, DataType as DeltaDataType, Invariant, PrimitiveType}; +use crate::kernel::{Add, DataType as DeltaDataType, Invariant, PrimitiveType, WriterFeatures}; use crate::logstore::LogStoreRef; use crate::protocol::{ColumnCountStat, ColumnValueStat}; use crate::table::builder::ensure_table_uri; @@ -1013,6 +1013,14 @@ pub(crate) fn logical_expr_to_physical_expr( create_physical_expr(expr, &df_schema, schema, &execution_props).unwrap() } +#[inline] +fn get_invariants(snapshot: &DeltaTableState) -> Vec { + snapshot + .current_metadata() + .and_then(|meta| meta.schema.get_invariants().ok()) + .unwrap_or_default() +} + /// Responsible for checking batches of data conform to table's invariants. #[derive(Clone)] pub struct DeltaDataChecker { @@ -1022,7 +1030,31 @@ pub struct DeltaDataChecker { impl DeltaDataChecker { /// Create a new DeltaDataChecker - pub fn new(invariants: Vec) -> Self { + pub fn new(snapshot: &DeltaTableState) -> Self { + // Only check if invariants are enabled if we are at a high enough writer level + // otherwise always get the invariants to check + let invariants = if snapshot.min_writer_version() >= 7 { + if snapshot + .writer_features() + .map(|wf| wf.contains(&WriterFeatures::Invariants)) + .unwrap_or(false) + { + get_invariants(snapshot) + } else { + vec![] + } + } else { + get_invariants(snapshot) + }; + Self { + invariants, + ctx: SessionContext::new(), + } + } + + /// Directly create a checker with invariants, mostly used for tests, but + /// also in the python interface + pub fn with_invariants(invariants: Vec) -> Self { Self { invariants, ctx: SessionContext::new(), @@ -1642,7 +1674,7 @@ mod tests { .unwrap(); // Empty invariants is okay let invariants: Vec = vec![]; - assert!(DeltaDataChecker::new(invariants) + assert!(DeltaDataChecker::with_invariants(invariants) .check_batch(&batch) .await .is_ok()); @@ -1652,7 +1684,7 @@ mod tests { Invariant::new("a", "a is not null"), Invariant::new("b", "b < 1000"), ]; - assert!(DeltaDataChecker::new(invariants) + assert!(DeltaDataChecker::with_invariants(invariants) .check_batch(&batch) .await .is_ok()); @@ -1662,7 +1694,7 @@ mod tests { Invariant::new("a", "a is null"), Invariant::new("b", "b < 100"), ]; - let result = DeltaDataChecker::new(invariants).check_batch(&batch).await; + let result = DeltaDataChecker::with_invariants(invariants).check_batch(&batch).await; assert!(result.is_err()); assert!(matches!(result, Err(DeltaTableError::InvalidData { .. }))); if let Err(DeltaTableError::InvalidData { violations }) = result { @@ -1671,7 +1703,7 @@ mod tests { // Irrelevant invariants return a different error let invariants = vec![Invariant::new("c", "c > 2000")]; - let result = DeltaDataChecker::new(invariants).check_batch(&batch).await; + let result = DeltaDataChecker::with_invariants(invariants).check_batch(&batch).await; assert!(result.is_err()); // Nested invariants are unsupported @@ -1685,7 +1717,7 @@ mod tests { let batch = RecordBatch::try_new(schema, vec![inner]).unwrap(); let invariants = vec![Invariant::new("x.b", "x.b < 1000")]; - let result = DeltaDataChecker::new(invariants).check_batch(&batch).await; + let result = DeltaDataChecker::with_invariants(invariants).check_batch(&batch).await; assert!(result.is_err()); assert!(matches!(result, Err(DeltaTableError::Generic { .. }))); } diff --git a/crates/deltalake-core/src/operations/transaction/protocol.rs b/crates/deltalake-core/src/operations/transaction/protocol.rs index 341e5bdb68..47e4d0a41a 100644 --- a/crates/deltalake-core/src/operations/transaction/protocol.rs +++ b/crates/deltalake-core/src/operations/transaction/protocol.rs @@ -142,25 +142,6 @@ impl ProtocolChecker { })?; } - // Does the table have invariants? - let has_invariants = !snapshot - .current_metadata() - .and_then(|meta| meta.schema.get_invariants().ok()) - .unwrap_or_default() - .is_empty(); - - // The table is at least version 7, has actual invariants and has the proper writer feature - // enabled - if snapshot.min_writer_version() >= 7 - && has_invariants - && !snapshot - .writer_features() - .unwrap() - .contains(&WriterFeatures::Invariants) - { - return Err(TransactionError::WriterFeaturesRequired); - } - Ok(()) } } diff --git a/crates/deltalake-core/src/operations/write.rs b/crates/deltalake-core/src/operations/write.rs index cb68b72bb2..ff502b070b 100644 --- a/crates/deltalake-core/src/operations/write.rs +++ b/crates/deltalake-core/src/operations/write.rs @@ -252,15 +252,10 @@ pub(crate) async fn write_execution_plan( writer_properties: Option, safe_cast: bool, ) -> DeltaResult> { - let invariants = snapshot - .current_metadata() - .and_then(|meta| meta.schema.get_invariants().ok()) - .unwrap_or_default(); - // Use input schema to prevent wrapping partitions columns into a dictionary. let schema = snapshot.input_schema().unwrap_or(plan.schema()); - let checker = DeltaDataChecker::new(invariants); + let checker = DeltaDataChecker::new(snapshot); // Write data to disk let mut tasks = vec![]; diff --git a/python/src/lib.rs b/python/src/lib.rs index 69195e866d..f4fca8a5cb 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -1242,7 +1242,7 @@ impl PyDeltaDataChecker { }) .collect(); Self { - inner: DeltaDataChecker::new(invariants), + inner: DeltaDataChecker::with_invariants(invariants), rt: tokio::runtime::Runtime::new().unwrap(), } } From 8ac5ebf17522658004ce57424c23e4a26e86bd07 Mon Sep 17 00:00:00 2001 From: Stephen Carman Date: Sat, 25 Nov 2023 19:05:21 -0500 Subject: [PATCH 6/6] chore: cargo fmt --- crates/deltalake-core/src/delta_datafusion/mod.rs | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/crates/deltalake-core/src/delta_datafusion/mod.rs b/crates/deltalake-core/src/delta_datafusion/mod.rs index 19ed72c09d..ae76dc4fe4 100644 --- a/crates/deltalake-core/src/delta_datafusion/mod.rs +++ b/crates/deltalake-core/src/delta_datafusion/mod.rs @@ -1694,7 +1694,9 @@ mod tests { Invariant::new("a", "a is null"), Invariant::new("b", "b < 100"), ]; - let result = DeltaDataChecker::with_invariants(invariants).check_batch(&batch).await; + let result = DeltaDataChecker::with_invariants(invariants) + .check_batch(&batch) + .await; assert!(result.is_err()); assert!(matches!(result, Err(DeltaTableError::InvalidData { .. }))); if let Err(DeltaTableError::InvalidData { violations }) = result { @@ -1703,7 +1705,9 @@ mod tests { // Irrelevant invariants return a different error let invariants = vec![Invariant::new("c", "c > 2000")]; - let result = DeltaDataChecker::with_invariants(invariants).check_batch(&batch).await; + let result = DeltaDataChecker::with_invariants(invariants) + .check_batch(&batch) + .await; assert!(result.is_err()); // Nested invariants are unsupported @@ -1717,7 +1721,9 @@ mod tests { let batch = RecordBatch::try_new(schema, vec![inner]).unwrap(); let invariants = vec![Invariant::new("x.b", "x.b < 1000")]; - let result = DeltaDataChecker::with_invariants(invariants).check_batch(&batch).await; + let result = DeltaDataChecker::with_invariants(invariants) + .check_batch(&batch) + .await; assert!(result.is_err()); assert!(matches!(result, Err(DeltaTableError::Generic { .. }))); }