Skip to content

Commit

Permalink
chore: add a Python integration test
Browse files Browse the repository at this point in the history
This test has highlighted an apparent race condition when handling
structs or lists in how excerpt() is treated by the CDCObserver.
  • Loading branch information
rtyler authored and ion-elgreco committed Jun 4, 2024
1 parent 6f7951b commit 34e27b8
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 4 deletions.
5 changes: 4 additions & 1 deletion crates/core/src/operations/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,10 @@ async fn execute(

match tracker.collect().await {
Ok(batches) => {
if !batches.is_empty() {
if batches.is_empty() {
debug!("CDCObserver collected zero batches");
}
else {
debug!(
"Collected {} batches to write as part of this transaction:",
batches.len()
Expand Down
4 changes: 3 additions & 1 deletion crates/core/src/operations/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,9 @@ async fn write_execution_plan_with_predicate(
)?;

if let Some(s) = sendable.as_ref() {
let _ = s.send(arr.clone()).await;
if let Err(e) = s.send(arr.clone()).await {
error!("Failed to send data to observer: {e:#?}");
}
} else {
debug!("write_execution_plan_with_predicate did not send any batches, no sender.");
}
Expand Down
48 changes: 46 additions & 2 deletions python/tests/test_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import random
import threading
from datetime import date, datetime
from decimal import Decimal
from math import inf
from typing import Any, Dict, Iterable, List, Literal
from unittest.mock import Mock
Expand Down Expand Up @@ -1450,7 +1451,6 @@ def test_issue_1651_roundtrip_timestamp(tmp_path: pathlib.Path):
@pytest.mark.parametrize("engine", ["rust", "pyarrow"])
def test_invalid_decimals(tmp_path: pathlib.Path, engine):
import re
from decimal import Decimal

data = pa.table(
{"x": pa.array([Decimal("10000000000000000000000000000000000000.0")])}
Expand Down Expand Up @@ -1558,7 +1558,6 @@ def test_empty(existing_table: DeltaTable):

def test_rust_decimal_cast(tmp_path: pathlib.Path):
import re
from decimal import Decimal

data = pa.table({"x": pa.array([Decimal("100.1")])})

Expand Down Expand Up @@ -1729,3 +1728,48 @@ def test_parse_stats_with_new_schema(tmp_path, engine):
write_deltalake(
tmp_path, sample_data, mode="overwrite", schema_mode="overwrite", engine=engine
)


def test_roundtrip_cdc_evolution(tmp_path: pathlib.Path):
"""
This test is used as a CDC integration test from Python to ensure,
approximately, that CDC files are being written
"""
raw_commit = r"""{"metaData":{"id":"bb0fdeb2-76dd-4f5e-b1ea-845ecec8fa7e","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableChangeDataFeed":"true"},"createdTime":1713110303902}}
{"protocol":{"minReaderVersion":1,"minWriterVersion":4,"writerFeatures":["changeDataFeed"]}}
"""
# timestampNtz looks like it might be an unnecessary requirement to write from Python
os.mkdir(os.path.join(tmp_path, "_delta_log"))
# This is a stupid hack to make sure we have a CDC capable table from the jump
with open(
os.path.join(tmp_path, "_delta_log", "00000000000000000000.json"), "w+"
) as fd:
fd.write(raw_commit)
assert ("0" * 20 + ".json") in os.listdir(tmp_path / "_delta_log")

# Make sure the _change_data doesn't exist
assert not os.path.isdir(os.path.join(tmp_path, "_change_data"))

nrows = 5
sample_data = pa.table(
{
"utf8": pa.array([str(x) for x in range(nrows)]),
"int64": pa.array(list(range(nrows)), pa.int64()),
# See <https://github.com/delta-io/delta-rs/issues/2568>
# "struct": pa.array([{"x": x, "y": str(x)} for x in range(nrows)]),
# "list": pa.array([list(range(x + 1)) for x in range(nrows)]),
}
)

write_deltalake(
tmp_path, sample_data, mode="append", schema_mode="merge", engine="rust"
)
assert ("0" * 19 + "1.json") in os.listdir(tmp_path / "_delta_log")

delta_table = DeltaTable(tmp_path)
delta_table.update(predicate="utf8 = '1'", updates={"utf8": "'hello world'"})

delta_table = DeltaTable(tmp_path)
print(os.listdir(tmp_path))
# This is kind of a weak test to verify that CDFs were written
assert os.path.isdir(os.path.join(tmp_path, "_change_data"))

0 comments on commit 34e27b8

Please sign in to comment.