From 9a848051367d5e1c8ce1aea457f1ac69362ea307 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Thu, 30 May 2024 13:46:57 +0000 Subject: [PATCH] chore: add a Python integration test This test has highlighted an apparent race condition when handling structs or lists in how excerpt() is treated by the CDCObserver. --- crates/core/src/operations/update.rs | 5 ++- crates/core/src/operations/write.rs | 4 ++- python/tests/test_writer.py | 50 ++++++++++++++++++++++++++-- 3 files changed, 55 insertions(+), 4 deletions(-) diff --git a/crates/core/src/operations/update.rs b/crates/core/src/operations/update.rs index 6ca79c6244..9440942e2c 100644 --- a/crates/core/src/operations/update.rs +++ b/crates/core/src/operations/update.rs @@ -445,7 +445,10 @@ async fn execute( match tracker.collect().await { Ok(batches) => { - if !batches.is_empty() { + if batches.is_empty() { + debug!("CDCObserver collected zero batches"); + } + else { debug!( "Collected {} batches to write as part of this transaction:", batches.len() diff --git a/crates/core/src/operations/write.rs b/crates/core/src/operations/write.rs index 84705c415d..c435a3df08 100644 --- a/crates/core/src/operations/write.rs +++ b/crates/core/src/operations/write.rs @@ -431,7 +431,9 @@ async fn write_execution_plan_with_predicate( )?; if let Some(s) = sendable.as_ref() { - let _ = s.send(arr.clone()).await; + if let Err(e) = s.send(arr.clone()).await { + error!("Failed to send data to observer: {e:#?}"); + } } else { debug!("write_execution_plan_with_predicate did not send any batches, no sender."); } diff --git a/python/tests/test_writer.py b/python/tests/test_writer.py index 0a7e766cac..52f726a196 100644 --- a/python/tests/test_writer.py +++ b/python/tests/test_writer.py @@ -5,6 +5,7 @@ import random import threading from datetime import date, datetime +from decimal import Decimal from math import inf from typing import Any, Dict, Iterable, List, Literal from unittest.mock import Mock @@ -1450,7 +1451,6 @@ def test_issue_1651_roundtrip_timestamp(tmp_path: pathlib.Path): @pytest.mark.parametrize("engine", ["rust", "pyarrow"]) def test_invalid_decimals(tmp_path: pathlib.Path, engine): import re - from decimal import Decimal data = pa.table( {"x": pa.array([Decimal("10000000000000000000000000000000000000.0")])} @@ -1558,7 +1558,6 @@ def test_empty(existing_table: DeltaTable): def test_rust_decimal_cast(tmp_path: pathlib.Path): import re - from decimal import Decimal data = pa.table({"x": pa.array([Decimal("100.1")])}) @@ -1729,3 +1728,50 @@ def test_parse_stats_with_new_schema(tmp_path, engine): write_deltalake( tmp_path, sample_data, mode="overwrite", schema_mode="overwrite", engine=engine ) + + +def test_roundtrip_cdc_evolution(tmp_path: pathlib.Path): + """ + This test is used as a CDC integration test from Python to ensure, + approximately, that CDC files are being written + """ + raw_commit = r"""{"metaData":{"id":"bb0fdeb2-76dd-4f5e-b1ea-845ecec8fa7e","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableChangeDataFeed":"true"},"createdTime":1713110303902}} +{"protocol":{"minReaderVersion":1,"minWriterVersion":4,"writerFeatures":["changeDataFeed"]}} +""" + # timestampNtz looks like it might be an unnecessary requirement to write from Python + os.mkdir(os.path.join(tmp_path, "_delta_log")) + # This is a stupid hack to make sure we have a CDC capable table from the jump + with open( + os.path.join(tmp_path, "_delta_log", "00000000000000000000.json"), "w+" + ) as fd: + fd.write(raw_commit) + assert ("0" * 20 + ".json") in os.listdir(tmp_path / "_delta_log") + + # Make sure the _change_data doesn't exist + assert not os.path.isdir(os.path.join(tmp_path, "_change_data")) + + nrows = 5 + sample_data = pa.table( + { + "utf8": pa.array([str(x) for x in range(nrows)]), + "int64": pa.array(list(range(nrows)), pa.int64()), + "struct": pa.array([{"x": x, "y": str(x)} for x in range(nrows)]), + "list": pa.array([list(range(x + 1)) for x in range(nrows)]), + } + ) + + write_deltalake( + tmp_path, sample_data, mode="append", schema_mode="merge", engine="rust" + ) + assert ("0" * 19 + "1.json") in os.listdir(tmp_path / "_delta_log") + + delta_table = DeltaTable(tmp_path) + print(delta_table.to_pandas()) + delta_table.update(predicate="utf8 = '1'", updates={"utf8": "'hello world'"}) + + delta_table = DeltaTable(tmp_path) + print(delta_table.to_pandas()) + + print(os.listdir(tmp_path)) + # This is kind of a weak test to verify that CDFs were written + assert os.path.isdir(os.path.join(tmp_path, "_change_data"))