From 9a6db67963dc2887e4c15c3cf33052c806f73d79 Mon Sep 17 00:00:00 2001 From: miguelbirdie Date: Tue, 16 Jan 2024 12:23:41 +0100 Subject: [PATCH] fix: Outbox must skip messages from dbz_signal --- .../com/birdie/kafka/connect/smt/Outbox.java | 6 ++++ .../birdie/kafka/connect/smt/OutboxTest.java | 30 ++++++++++++++++++- 2 files changed, 35 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/birdie/kafka/connect/smt/Outbox.java b/src/main/java/com/birdie/kafka/connect/smt/Outbox.java index e2de2c4..1ab1459 100644 --- a/src/main/java/com/birdie/kafka/connect/smt/Outbox.java +++ b/src/main/java/com/birdie/kafka/connect/smt/Outbox.java @@ -70,6 +70,12 @@ public void configure(Map props) { public SourceRecord apply(SourceRecord sourceRecord) { LOGGER.debug("Received source record: {}", sourceRecord); + /* Skipping messages from incremental snapshots */ + if (sourceRecord.topic().toLowerCase().contains("dbz_signal")){ + LOGGER.debug("Skipping dbz_signal record: {}", sourceRecord); + return sourceRecord; + } + if (sourceRecord.value() == null) { LOGGER.debug("Dropping debezium-generated tombstones with null partition_key: {}", sourceRecord); return null; diff --git a/src/test/java/com/birdie/kafka/connect/smt/OutboxTest.java b/src/test/java/com/birdie/kafka/connect/smt/OutboxTest.java index de129d5..2df8ecc 100644 --- a/src/test/java/com/birdie/kafka/connect/smt/OutboxTest.java +++ b/src/test/java/com/birdie/kafka/connect/smt/OutboxTest.java @@ -8,7 +8,6 @@ import org.junit.Before; import org.junit.Test; import java.util.HashMap; -import java.util.Map; import static org.junit.Assert.*; public class OutboxTest { @@ -34,6 +33,13 @@ public void after(){ .field("payload", SchemaBuilder.string().name("io.debezium.data.Json").optional().build()) .build(); + private final Schema schemaSignal = SchemaBuilder.struct() + .name("Value") + .field("key", SchemaBuilder.STRING_SCHEMA) + .field("type", SchemaBuilder.STRING_SCHEMA) + .field("data", SchemaBuilder.STRING_SCHEMA) + .build(); + private final Schema schemaWithPartitionKey = SchemaBuilder.struct() .name("Value") .field("key", SchemaBuilder.STRING_SCHEMA) @@ -95,6 +101,28 @@ public void sendsAMessageToCorrectPartitionNumber() { assertEquals(Schema.Type.STRING, transformedRecord.valueSchema().type()); } + @Test + public void sendsSignalMessage() { + Struct value = new Struct(schemaSignal); + value.put("key", "ad-hoc"); + value.put("type", "execute-snapshot"); + value.put("data", "data-collections"); + + SourceRecord record = new SourceRecord( + null, + null, + "public.dbz_signal", + null, + SchemaBuilder.bytes().optional().build(), + "ad-hoc".getBytes(), + schemaSignal, + value + ); + + final SourceRecord transformedRecord = transformer.apply(record); + assertEquals("public.dbz_signal", transformedRecord.topic()); + } + @Test public void sendsAMessageWithStructHeaders() { Struct headers = new Struct(headersStructSchema);