Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add enforcement for invariants to commit checker #1882

Closed
wants to merge 12 commits into from
Closed
52 changes: 45 additions & 7 deletions crates/deltalake-core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ use serde::{Deserialize, Serialize};
use url::Url;

use crate::errors::{DeltaResult, DeltaTableError};
use crate::kernel::{Add, DataType as DeltaDataType, Invariant, PrimitiveType};
use crate::kernel::{Add, DataType as DeltaDataType, Invariant, PrimitiveType, WriterFeatures};
use crate::logstore::LogStoreRef;
use crate::protocol::{ColumnCountStat, ColumnValueStat};
use crate::table::builder::ensure_table_uri;
Expand Down Expand Up @@ -1013,6 +1013,14 @@ pub(crate) fn logical_expr_to_physical_expr(
create_physical_expr(expr, &df_schema, schema, &execution_props).unwrap()
}

#[inline]
fn get_invariants(snapshot: &DeltaTableState) -> Vec<Invariant> {
snapshot
.current_metadata()
.and_then(|meta| meta.schema.get_invariants().ok())
.unwrap_or_default()
}

/// Responsible for checking batches of data conform to table's invariants.
#[derive(Clone)]
pub struct DeltaDataChecker {
Expand All @@ -1022,7 +1030,31 @@ pub struct DeltaDataChecker {

impl DeltaDataChecker {
/// Create a new DeltaDataChecker
pub fn new(invariants: Vec<Invariant>) -> Self {
pub fn new(snapshot: &DeltaTableState) -> Self {
// Only check if invariants are enabled if we are at a high enough writer level
// otherwise always get the invariants to check
let invariants = if snapshot.min_writer_version() >= 7 {
if snapshot
.writer_features()
.map(|wf| wf.contains(&WriterFeatures::Invariants))
.unwrap_or(false)
{
get_invariants(snapshot)
} else {
vec![]
}
} else {
get_invariants(snapshot)
};
Self {
invariants,
ctx: SessionContext::new(),
}
}

/// Directly create a checker with invariants, mostly used for tests, but
/// also in the python interface
pub fn with_invariants(invariants: Vec<Invariant>) -> Self {
Self {
invariants,
ctx: SessionContext::new(),
Expand Down Expand Up @@ -1640,7 +1672,7 @@ mod tests {
.unwrap();
// Empty invariants is okay
let invariants: Vec<Invariant> = vec![];
assert!(DeltaDataChecker::new(invariants)
assert!(DeltaDataChecker::with_invariants(invariants)
.check_batch(&batch)
.await
.is_ok());
Expand All @@ -1650,7 +1682,7 @@ mod tests {
Invariant::new("a", "a is not null"),
Invariant::new("b", "b < 1000"),
];
assert!(DeltaDataChecker::new(invariants)
assert!(DeltaDataChecker::with_invariants(invariants)
.check_batch(&batch)
.await
.is_ok());
Expand All @@ -1660,7 +1692,9 @@ mod tests {
Invariant::new("a", "a is null"),
Invariant::new("b", "b < 100"),
];
let result = DeltaDataChecker::new(invariants).check_batch(&batch).await;
let result = DeltaDataChecker::with_invariants(invariants)
.check_batch(&batch)
.await;
assert!(result.is_err());
assert!(matches!(result, Err(DeltaTableError::InvalidData { .. })));
if let Err(DeltaTableError::InvalidData { violations }) = result {
Expand All @@ -1669,7 +1703,9 @@ mod tests {

// Irrelevant invariants return a different error
let invariants = vec![Invariant::new("c", "c > 2000")];
let result = DeltaDataChecker::new(invariants).check_batch(&batch).await;
let result = DeltaDataChecker::with_invariants(invariants)
.check_batch(&batch)
.await;
assert!(result.is_err());

// Nested invariants are unsupported
Expand All @@ -1683,7 +1719,9 @@ mod tests {
let batch = RecordBatch::try_new(schema, vec![inner]).unwrap();

let invariants = vec![Invariant::new("x.b", "x.b < 1000")];
let result = DeltaDataChecker::new(invariants).check_batch(&batch).await;
let result = DeltaDataChecker::with_invariants(invariants)
.check_batch(&batch)
.await;
assert!(result.is_err());
assert!(matches!(result, Err(DeltaTableError::Generic { .. })));
}
Expand Down
7 changes: 1 addition & 6 deletions crates/deltalake-core/src/operations/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,19 +306,14 @@ pub(crate) async fn write_execution_plan(
safe_cast: bool,
overwrite_schema: bool,
) -> DeltaResult<Vec<Add>> {
let invariants = snapshot
.metadata()
.and_then(|meta| meta.schema.get_invariants().ok())
.unwrap_or_default();

// Use input schema to prevent wrapping partitions columns into a dictionary.
let schema: ArrowSchemaRef = if overwrite_schema {
plan.schema()
} else {
snapshot.input_schema().unwrap_or(plan.schema())
};

let checker = DeltaDataChecker::new(invariants);
let checker = DeltaDataChecker::new(snapshot);

// Write data to disk
let mut tasks = vec![];
Expand Down
2 changes: 1 addition & 1 deletion python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1345,7 +1345,7 @@ impl PyDeltaDataChecker {
})
.collect();
Self {
inner: DeltaDataChecker::new(invariants),
inner: DeltaDataChecker::with_invariants(invariants),
rt: tokio::runtime::Runtime::new().unwrap(),
}
}
Expand Down
Loading