Skip to content

Commit

Permalink
Ignore the some CDC related integration tests until upstream works
Browse files Browse the repository at this point in the history
  • Loading branch information
rtyler authored and ion-elgreco committed Jun 4, 2024
1 parent 34e27b8 commit 573dd1f
Show file tree
Hide file tree
Showing 2 changed files with 172 additions and 4 deletions.
173 changes: 171 additions & 2 deletions crates/core/src/operations/cdc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ impl CDCTracker {
true,
false,
)?;
debug!("prestream: {batch:?}");
let new_column = Arc::new(StringArray::from(vec![
Some("update_preimage");
batch.num_rows()
Expand All @@ -124,6 +125,7 @@ impl CDCTracker {
true,
false,
)?;
debug!("poststream: {batch:?}");
let new_column = Arc::new(StringArray::from(vec![
Some("update_postimage");
batch.num_rows()
Expand Down Expand Up @@ -302,7 +304,7 @@ mod tests {
use crate::kernel::{Action, PrimitiveType, Protocol};
use crate::operations::DeltaOps;
use crate::{DeltaConfigKey, DeltaTable};
use arrow::array::Int32Array;
use arrow::array::{ArrayRef, Int32Array, StructArray};
use datafusion::assert_batches_sorted_eq;

/// A simple test which validates primitive writer version 1 tables should
Expand Down Expand Up @@ -404,7 +406,7 @@ mod tests {

let result = should_write_cdc(table.snapshot().unwrap()).expect("Failed to use table");
assert!(
result == true,
result,
"A v7 table must not write CDC files unless the writer feature is set"
);
}
Expand Down Expand Up @@ -451,4 +453,171 @@ mod tests {
}
}
}

// This cannot be re-enabled until DataFrame.except() works: <https://github.com/apache/datafusion/issues/10749>
#[ignore]
#[tokio::test]
async fn test_sanity_check_with_pure_df() {
let _ = pretty_env_logger::try_init();
let nested_schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, true),
Field::new("lat", DataType::Int32, true),
Field::new("long", DataType::Int32, true),
]));
let schema = Arc::new(Schema::new(vec![
Field::new("value", DataType::Int32, true),
Field::new(
"nested",
DataType::Struct(nested_schema.fields.clone()),
true,
),
]));
let batch = RecordBatch::try_new(
Arc::clone(&schema),
vec![
Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3)])),
Arc::new(StructArray::from(vec![
(
Arc::new(Field::new("id", DataType::Int32, true)),
Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef,
),
(
Arc::new(Field::new("lat", DataType::Int32, true)),
Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef,
),
(
Arc::new(Field::new("long", DataType::Int32, true)),
Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef,
),
])),
],
)
.unwrap();

let updated_batch = RecordBatch::try_new(
Arc::clone(&schema),
vec![
Arc::new(Int32Array::from(vec![Some(1), Some(12), Some(3)])),
Arc::new(StructArray::from(vec![
(
Arc::new(Field::new("id", DataType::Int32, true)),
Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef,
),
(
Arc::new(Field::new("lat", DataType::Int32, true)),
Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef,
),
(
Arc::new(Field::new("long", DataType::Int32, true)),
Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef,
),
])),
],
)
.unwrap();
let _ = arrow::util::pretty::print_batches(&vec![batch.clone()]);
let _ = arrow::util::pretty::print_batches(&vec![updated_batch.clone()]);

let ctx = SessionContext::new();
let before = ctx.read_batch(batch).expect("Failed to make DataFrame");
let after = ctx
.read_batch(updated_batch)
.expect("Failed to make DataFrame");

let diff = before
.except(after)
.expect("Failed to except")
.collect()
.await
.expect("Failed to diff");
assert_eq!(diff.len(), 1);
}

// This cannot be re-enabled until DataFrame.except() works: <https://github.com/apache/datafusion/issues/10749>
#[ignore]
#[tokio::test]
async fn test_sanity_check_with_struct() {
let _ = pretty_env_logger::try_init();
let nested_schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, true),
Field::new("lat", DataType::Int32, true),
Field::new("long", DataType::Int32, true),
]));
let schema = Arc::new(Schema::new(vec![
Field::new("value", DataType::Int32, true),
Field::new(
"nested",
DataType::Struct(nested_schema.fields.clone()),
true,
),
]));

let tracker = CDCTracker::new(schema.clone());

let batch = RecordBatch::try_new(
Arc::clone(&schema),
vec![
Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3)])),
Arc::new(StructArray::from(vec![
(
Arc::new(Field::new("id", DataType::Int32, true)),
Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef,
),
(
Arc::new(Field::new("lat", DataType::Int32, true)),
Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef,
),
(
Arc::new(Field::new("long", DataType::Int32, true)),
Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef,
),
])),
],
)
.unwrap();

let updated_batch = RecordBatch::try_new(
Arc::clone(&schema),
vec![
Arc::new(Int32Array::from(vec![Some(1), Some(12), Some(3)])),
Arc::new(StructArray::from(vec![
(
Arc::new(Field::new("id", DataType::Int32, true)),
Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef,
),
(
Arc::new(Field::new("lat", DataType::Int32, true)),
Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef,
),
(
Arc::new(Field::new("long", DataType::Int32, true)),
Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef,
),
])),
],
)
.unwrap();

let _ = tracker.pre_sender().send(batch).await;
let _ = tracker.post_sender().send(updated_batch).await;

match tracker.collect().await {
Ok(batches) => {
let _ = arrow::util::pretty::print_batches(&batches);
assert_eq!(batches.len(), 2);
assert_batches_sorted_eq! {[
"+-------+------------------+",
"| value | _change_type |",
"+-------+------------------+",
"| 2 | update_preimage |",
"| 12 | update_postimage |",
"+-------+------------------+",
], &batches }
}
Err(err) => {
println!("err: {err:#?}");
panic!("Should have never reached this assertion");
}
}
}
}
3 changes: 1 addition & 2 deletions crates/core/src/operations/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -447,8 +447,7 @@ async fn execute(
Ok(batches) => {
if batches.is_empty() {
debug!("CDCObserver collected zero batches");
}
else {
} else {
debug!(
"Collected {} batches to write as part of this transaction:",
batches.len()
Expand Down

0 comments on commit 573dd1f

Please sign in to comment.