diff --git a/crates/core/src/operations/cdc.rs b/crates/core/src/operations/cdc.rs index 365f4a649a..b6165d01d6 100644 --- a/crates/core/src/operations/cdc.rs +++ b/crates/core/src/operations/cdc.rs @@ -105,6 +105,7 @@ impl CDCTracker { true, false, )?; + debug!("prestream: {batch:?}"); let new_column = Arc::new(StringArray::from(vec![ Some("update_preimage"); batch.num_rows() @@ -124,6 +125,7 @@ impl CDCTracker { true, false, )?; + debug!("poststream: {batch:?}"); let new_column = Arc::new(StringArray::from(vec![ Some("update_postimage"); batch.num_rows() @@ -302,7 +304,7 @@ mod tests { use crate::kernel::{Action, PrimitiveType, Protocol}; use crate::operations::DeltaOps; use crate::{DeltaConfigKey, DeltaTable}; - use arrow::array::Int32Array; + use arrow::array::{ArrayRef, Int32Array, StructArray}; use datafusion::assert_batches_sorted_eq; /// A simple test which validates primitive writer version 1 tables should @@ -404,7 +406,7 @@ mod tests { let result = should_write_cdc(table.snapshot().unwrap()).expect("Failed to use table"); assert!( - result == true, + result, "A v7 table must not write CDC files unless the writer feature is set" ); } @@ -451,4 +453,171 @@ mod tests { } } } + + // This cannot be re-enabled until DataFrame.except() works: + #[ignore] + #[tokio::test] + async fn test_sanity_check_with_pure_df() { + let _ = pretty_env_logger::try_init(); + let nested_schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, true), + Field::new("lat", DataType::Int32, true), + Field::new("long", DataType::Int32, true), + ])); + let schema = Arc::new(Schema::new(vec![ + Field::new("value", DataType::Int32, true), + Field::new( + "nested", + DataType::Struct(nested_schema.fields.clone()), + true, + ), + ])); + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3)])), + Arc::new(StructArray::from(vec![ + ( + Arc::new(Field::new("id", DataType::Int32, true)), + Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef, + ), + ( + Arc::new(Field::new("lat", DataType::Int32, true)), + Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef, + ), + ( + Arc::new(Field::new("long", DataType::Int32, true)), + Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef, + ), + ])), + ], + ) + .unwrap(); + + let updated_batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int32Array::from(vec![Some(1), Some(12), Some(3)])), + Arc::new(StructArray::from(vec![ + ( + Arc::new(Field::new("id", DataType::Int32, true)), + Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef, + ), + ( + Arc::new(Field::new("lat", DataType::Int32, true)), + Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef, + ), + ( + Arc::new(Field::new("long", DataType::Int32, true)), + Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef, + ), + ])), + ], + ) + .unwrap(); + let _ = arrow::util::pretty::print_batches(&vec![batch.clone()]); + let _ = arrow::util::pretty::print_batches(&vec![updated_batch.clone()]); + + let ctx = SessionContext::new(); + let before = ctx.read_batch(batch).expect("Failed to make DataFrame"); + let after = ctx + .read_batch(updated_batch) + .expect("Failed to make DataFrame"); + + let diff = before + .except(after) + .expect("Failed to except") + .collect() + .await + .expect("Failed to diff"); + assert_eq!(diff.len(), 1); + } + + // This cannot be re-enabled until DataFrame.except() works: + #[ignore] + #[tokio::test] + async fn test_sanity_check_with_struct() { + let _ = pretty_env_logger::try_init(); + let nested_schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, true), + Field::new("lat", DataType::Int32, true), + Field::new("long", DataType::Int32, true), + ])); + let schema = Arc::new(Schema::new(vec![ + Field::new("value", DataType::Int32, true), + Field::new( + "nested", + DataType::Struct(nested_schema.fields.clone()), + true, + ), + ])); + + let tracker = CDCTracker::new(schema.clone()); + + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3)])), + Arc::new(StructArray::from(vec![ + ( + Arc::new(Field::new("id", DataType::Int32, true)), + Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef, + ), + ( + Arc::new(Field::new("lat", DataType::Int32, true)), + Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef, + ), + ( + Arc::new(Field::new("long", DataType::Int32, true)), + Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef, + ), + ])), + ], + ) + .unwrap(); + + let updated_batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int32Array::from(vec![Some(1), Some(12), Some(3)])), + Arc::new(StructArray::from(vec![ + ( + Arc::new(Field::new("id", DataType::Int32, true)), + Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef, + ), + ( + Arc::new(Field::new("lat", DataType::Int32, true)), + Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef, + ), + ( + Arc::new(Field::new("long", DataType::Int32, true)), + Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef, + ), + ])), + ], + ) + .unwrap(); + + let _ = tracker.pre_sender().send(batch).await; + let _ = tracker.post_sender().send(updated_batch).await; + + match tracker.collect().await { + Ok(batches) => { + let _ = arrow::util::pretty::print_batches(&batches); + assert_eq!(batches.len(), 2); + assert_batches_sorted_eq! {[ + "+-------+------------------+", + "| value | _change_type |", + "+-------+------------------+", + "| 2 | update_preimage |", + "| 12 | update_postimage |", + "+-------+------------------+", + ], &batches } + } + Err(err) => { + println!("err: {err:#?}"); + panic!("Should have never reached this assertion"); + } + } + } } diff --git a/crates/core/src/operations/update.rs b/crates/core/src/operations/update.rs index 9440942e2c..9ec8519b9b 100644 --- a/crates/core/src/operations/update.rs +++ b/crates/core/src/operations/update.rs @@ -447,8 +447,7 @@ async fn execute( Ok(batches) => { if batches.is_empty() { debug!("CDCObserver collected zero batches"); - } - else { + } else { debug!( "Collected {} batches to write as part of this transaction:", batches.len()