Skip to content

Commit

Permalink
cdc: add offset store tests (#2793)
Browse files Browse the repository at this point in the history
  • Loading branch information
cgardens authored Apr 7, 2021
1 parent 201e421 commit e30ab53
Show file tree
Hide file tree
Showing 18 changed files with 108 additions and 1,304 deletions.
9 changes: 9 additions & 0 deletions airbyte-commons/src/main/java/io/airbyte/commons/io/IOs.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,15 @@ public static Path writeFile(Path path, String fileName, String contents) {
return writeFile(filePath, contents);
}

public static Path writeFile(Path filePath, byte[] contents) {
try {
Files.write(filePath, contents);
return filePath;
} catch (IOException e) {
throw new RuntimeException(e);
}
}

public static Path writeFile(Path filePath, String contents) {
try {
Files.writeString(filePath, contents, StandardCharsets.UTF_8);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ public static String readResource(String name) throws IOException {
return Resources.toString(resource, StandardCharsets.UTF_8);
}

public static byte[] readBytes(String name) throws IOException {
URL resource = Resources.getResource(name);
return Resources.toByteArray(resource);
}

/**
* This class is a bit of a hack. Might have unexpected behavior.
*
Expand Down
11 changes: 11 additions & 0 deletions airbyte-commons/src/test/java/io/airbyte/commons/io/IOsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.io.FileWriter;
import java.io.IOException;
import java.io.Writer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
Expand All @@ -55,6 +56,16 @@ void testReadWrite() throws IOException {
assertEquals("abc", IOs.readFile(path.resolve("file")));
}

@Test
void testWriteBytes() throws IOException {
final Path path = Files.createTempDirectory("tmp");

final Path filePath = IOs.writeFile(path.resolve("file"), "abc".getBytes(StandardCharsets.UTF_8));

assertEquals(path.resolve("file"), filePath);
assertEquals("abc", IOs.readFile(path, "file"));
}

@Test
public void testGetTailDoesNotExist() throws IOException {
List<String> tail = IOs.getTail(100, Path.of(RandomStringUtils.random(100)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import com.google.common.collect.Sets;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.stream.Collectors;
import org.junit.jupiter.api.Test;
Expand All @@ -42,6 +43,14 @@ void testResourceRead() throws IOException {
assertThrows(IllegalArgumentException.class, () -> MoreResources.readResource("invalid"));
}

@Test
void testReadBytes() throws IOException {
assertEquals("content1\n", new String(MoreResources.readBytes("resource_test"), StandardCharsets.UTF_8));
assertEquals("content2\n", new String(MoreResources.readBytes("subdir/resource_test_sub"), StandardCharsets.UTF_8));

assertThrows(IllegalArgumentException.class, () -> MoreResources.readBytes("invalid"));
}

@Test
void testResourceReadDuplicateName() throws IOException {
assertEquals("content1\n", MoreResources.readResource("resource_test_a"));
Expand Down
2 changes: 1 addition & 1 deletion airbyte-db/src/main/java/io/airbyte/db/PostgresUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
public class PostgresUtils {

public static PgLsn getLsn(JdbcDatabase database) throws SQLException {
// pg version > 9.
// pg version 10+.
final List<JsonNode> jsonNodes = database
.bufferedResultSetQuery(conn -> conn.createStatement().executeQuery("SELECT pg_current_wal_lsn()"), JdbcUtils::rowToJson);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ public void run(String[] args) throws Exception {
final AutoCloseableIterator<AirbyteMessage> messageIterator = source.read(config, catalog, stateOptional.orElse(null));
try (messageIterator) {
messageIterator.forEachRemaining(v -> {
LOGGER.info("peeking at message: " + v);
stdoutConsumer.accept(Jsons.serialize(v));
});
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
package io.airbyte.integrations.source.postgres;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.source.jdbc.models.CdcState;
Expand All @@ -43,7 +42,6 @@
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.storage.FileOffsetBackingStore;
import org.apache.kafka.connect.util.SafeObjectInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -58,18 +56,11 @@
*/
public class AirbyteFileOffsetBackingStore {

public static final Path DEFAULT_OFFSET_STORAGE_PATH = Path.of("/tmp/offset.dat");

private static final Logger LOGGER = LoggerFactory.getLogger(AirbyteFileOffsetBackingStore.class);

private final Path offsetFilePath;

public AirbyteFileOffsetBackingStore() {
this(DEFAULT_OFFSET_STORAGE_PATH);
}

@VisibleForTesting
AirbyteFileOffsetBackingStore(final Path offsetFilePath) {
public AirbyteFileOffsetBackingStore(final Path offsetFilePath) {
this.offsetFilePath = offsetFilePath;
}

Expand Down Expand Up @@ -98,7 +89,7 @@ public void persist(CdcState cdcState) {
e -> stringToByteBuffer(e.getKey()),
e -> stringToByteBuffer(e.getValue())));

FileUtils.deleteQuietly(DEFAULT_OFFSET_STORAGE_PATH.toFile());
FileUtils.deleteQuietly(offsetFilePath.toFile());
save(mappedAsStrings);
}

Expand All @@ -113,8 +104,8 @@ private static ByteBuffer stringToByteBuffer(String s) {
}

/**
* See {@link FileOffsetBackingStore#load} - logic is mostly borrowed from here. duplicated because
* this method is not public.
* See FileOffsetBackingStore#load - logic is mostly borrowed from here. duplicated because this
* method is not public.
*/
@SuppressWarnings("unchecked")
private Map<ByteBuffer, ByteBuffer> load() {
Expand All @@ -141,8 +132,8 @@ private Map<ByteBuffer, ByteBuffer> load() {
}

/**
* See {@link FileOffsetBackingStore#save} - logic is mostly borrowed from here. duplicated because
* this method is not public.
* See FileOffsetBackingStore#save - logic is mostly borrowed from here. duplicated because this
* method is not public.
*/
private void save(Map<ByteBuffer, ByteBuffer> data) {
try (ObjectOutputStream os = new ObjectOutputStream(Files.newOutputStream(offsetFilePath))) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public static AirbyteMessage toAirbyteMessage(ChangeEvent<String, String> event,
final JsonNode source = debeziumRecord.get("source");
final String op = debeziumRecord.get("op").asText();

final JsonNode data = formatDebeziumData(before, after, source, op);
final JsonNode data = formatDebeziumData(before, after, source);

final String streamName = source.get("schema").asText() + "." + source.get("table").asText();

Expand All @@ -56,10 +56,7 @@ public static AirbyteMessage toAirbyteMessage(ChangeEvent<String, String> event,
}

// warning mutates input args.
private static JsonNode formatDebeziumData(JsonNode before, JsonNode after, JsonNode source, final String op) {
if (op.equals("d")) {
System.out.println("before = " + before);
}
private static JsonNode formatDebeziumData(JsonNode before, JsonNode after, JsonNode source) {
final ObjectNode base = (ObjectNode) (after.isNull() ? before : after);

long transactionMillis = source.get("ts_ms").asLong();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.integrations.source.postgres;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import io.airbyte.commons.io.IOs;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.integrations.source.jdbc.models.CdcState;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import org.junit.jupiter.api.Test;

class AirbyteFileOffsetBackingStoreTest {

@SuppressWarnings("UnstableApiUsage")
@Test
void test() throws IOException {
final Path testRoot = Files.createTempDirectory(Path.of("/tmp"), "offset-store-test");

final byte[] bytes = MoreResources.readBytes("test_debezium_offset.dat");
final Path templateFilePath = testRoot.resolve("template_offset.dat");
IOs.writeFile(templateFilePath, bytes);

final Path writeFilePath = testRoot.resolve("offset.dat");

final AirbyteFileOffsetBackingStore offsetStore = new AirbyteFileOffsetBackingStore(templateFilePath);
final CdcState stateFromTemplateFile = offsetStore.read();

final AirbyteFileOffsetBackingStore offsetStore2 = new AirbyteFileOffsetBackingStore(writeFilePath);
offsetStore2.persist(stateFromTemplateFile);

final CdcState stateFromOffsetStoreRoundTrip = offsetStore2.read();

// verify that, after a round trip through the offset store, we get back the same data.
assertEquals(stateFromTemplateFile, stateFromOffsetStoreRoundTrip);
// verify that the file written by the offset store is identical to the template file.
assertTrue(com.google.common.io.Files.equal(templateFilePath.toFile(), writeFilePath.toFile()));
}

}
Binary file not shown.

This file was deleted.

12 changes: 0 additions & 12 deletions airbyte-integrations/connectors/source-postgres2/Dockerfile

This file was deleted.

30 changes: 0 additions & 30 deletions airbyte-integrations/connectors/source-postgres2/build.gradle

This file was deleted.

Loading

0 comments on commit e30ab53

Please sign in to comment.