Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pg cdc feature branch #2548

Merged
merged 37 commits into from
Apr 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
2a7d0c5
spike
cgardens Mar 16, 2021
0c49603
more
cgardens Mar 17, 2021
2eb4029
debezium wip
jrhizor Mar 22, 2021
320b952
use oneof for configuration
jrhizor Mar 23, 2021
1e28f45
iterator wrapping structure
jrhizor Mar 24, 2021
7101ba7
push current
jrhizor Mar 25, 2021
4a6170e
working loop
jrhizor Mar 25, 2021
87f46d6
Merge branch 'master' into jrhizor/debezium
jrhizor Mar 25, 2021
49ccf09
move capability into source
jrhizor Mar 25, 2021
448280f
hack it into a sharable state
jrhizor Mar 25, 2021
867b9ca
debezium test runner (#2617)
cgardens Mar 25, 2021
9eb5190
CDC Wait for Values (#2618)
cgardens Mar 25, 2021
9be6bcf
output actual AirbyteMessages for cdc (#2631)
jrhizor Mar 30, 2021
825e324
add lsn extraction and comparison (#2613)
cgardens Mar 31, 2021
79094f1
postgres cdc catalog (#2673)
jrhizor Apr 1, 2021
4cf62eb
table selection for cdc (#2690)
jrhizor Apr 1, 2021
690bb80
Add state management to CDC (#2718)
cgardens Apr 2, 2021
ec6abf5
CDC: Fix Producer/Consumer State Machine (#2721)
cgardens Apr 3, 2021
a2a979b
CDC Postgres Tests (#2777)
cgardens Apr 7, 2021
13f682d
fix postgres cdc image name and run check before reading data (#2785)
jrhizor Apr 7, 2021
201e421
remove unused props, remove todos, add some more sanity tests (#2791)
cgardens Apr 7, 2021
e30ab53
cdc: add offset store tests (#2793)
cgardens Apr 7, 2021
40834cf
clean (#2798)
cgardens Apr 7, 2021
d6b1b8c
Merge branch 'master' into jrhizor/debezium
cgardens Apr 7, 2021
7b21969
postgres cdc docs (#2784)
jrhizor Apr 7, 2021
85df233
various merge conflict fixes (#2799)
cgardens Apr 7, 2021
d5ecbd1
cdc standard tests (#2813)
cgardens Apr 9, 2021
2fd3582
require cdc users to create publications & update docs (#2818)
jrhizor Apr 9, 2021
3d47ec4
Use oneOf in PG CDC spec (#2827)
cgardens Apr 9, 2021
0893dd1
add oneOf configuration for postgres cdc (#2831)
jrhizor Apr 9, 2021
cdb8869
fix test (#2834)
cgardens Apr 9, 2021
4d2364c
fix test
cgardens Apr 9, 2021
321c0e9
bump version
cgardens Apr 9, 2021
35f44bc
add docs on creating replica identities (#2838)
jrhizor Apr 9, 2021
02779a0
Merge branch 'master' into jrhizor/debezium
cgardens Apr 9, 2021
df00a59
bump pg version in source catalog
cgardens Apr 9, 2021
7d8b119
generate seed files
jrhizor Apr 9, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions airbyte-commons/src/main/java/io/airbyte/commons/io/IOs.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,15 @@ public static Path writeFile(Path path, String fileName, String contents) {
return writeFile(filePath, contents);
}

public static Path writeFile(Path filePath, byte[] contents) {
try {
Files.write(filePath, contents);
return filePath;
} catch (IOException e) {
throw new RuntimeException(e);
}
}

public static Path writeFile(Path filePath, String contents) {
try {
Files.writeString(filePath, contents, StandardCharsets.UTF_8);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ public static String readResource(String name) throws IOException {
return Resources.toString(resource, StandardCharsets.UTF_8);
}

public static byte[] readBytes(String name) throws IOException {
URL resource = Resources.getResource(name);
return Resources.toByteArray(resource);
}

/**
* This class is a bit of a hack. Might have unexpected behavior.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,15 @@ public static <T> AutoCloseableIterator<T> fromStream(Stream<T> stream) {
return new DefaultAutoCloseableIterator<>(stream.iterator(), stream::close);
}

/**
* Consumes entire iterator and collect it into a list. Then it closes the iterator.
*/
public static <T> List<T> toListAndClose(AutoCloseableIterator<T> iterator) throws Exception {
try (iterator) {
return MoreIterators.toList(iterator);
}
}

/**
* Returns a {@link AutoCloseableIterator} that will call the provided supplier ONE time when
* {@link AutoCloseableIterator#hasNext()} is called the first time. The supplier returns a stream
Expand Down Expand Up @@ -131,6 +140,11 @@ public static <T> AutoCloseableIterator<T> transform(Function<AutoCloseableItera
return new DefaultAutoCloseableIterator<>(iteratorCreator.apply(autoCloseableIterator), autoCloseableIterator::close);
}

@SafeVarargs
public static <T> CompositeIterator<T> concatWithEagerClose(AutoCloseableIterator<T>... iterators) {
return concatWithEagerClose(List.of(iterators));
}

public static <T> CompositeIterator<T> concatWithEagerClose(List<AutoCloseableIterator<T>> iterators) {
return new CompositeIterator<>(iterators);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@

package io.airbyte.commons.util;

import com.google.common.collect.AbstractIterator;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.function.Supplier;

public class MoreIterators {

Expand Down Expand Up @@ -74,4 +76,22 @@ public static <T> Set<T> toSet(Iterator<T> iterator) {
return set;
}

public static <T> Iterator<T> singletonIteratorFromSupplier(Supplier<T> supplier) {
return new AbstractIterator<T>() {

private boolean hasSupplied = false;

@Override
protected T computeNext() {
if (!hasSupplied) {
hasSupplied = true;
return supplier.get();
} else {
return endOfData();
}
}

};
}

}
2 changes: 1 addition & 1 deletion airbyte-commons/src/main/resources/log4j2.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
</Appenders>

<Loggers>
<Root level="DEBUG">
<Root level="INFO">
<AppenderRef ref="Default"/>
<AppenderRef ref="LogSplit"/>
<AppenderRef ref="AppLogSplit"/>
Expand Down
11 changes: 11 additions & 0 deletions airbyte-commons/src/test/java/io/airbyte/commons/io/IOsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.io.FileWriter;
import java.io.IOException;
import java.io.Writer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
Expand All @@ -55,6 +56,16 @@ void testReadWrite() throws IOException {
assertEquals("abc", IOs.readFile(path.resolve("file")));
}

@Test
void testWriteBytes() throws IOException {
final Path path = Files.createTempDirectory("tmp");

final Path filePath = IOs.writeFile(path.resolve("file"), "abc".getBytes(StandardCharsets.UTF_8));

assertEquals(path.resolve("file"), filePath);
assertEquals("abc", IOs.readFile(path, "file"));
}

@Test
public void testWriteFileToRandomDir() throws IOException {
final String contents = "something to remember";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import com.google.common.collect.Sets;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.stream.Collectors;
import org.junit.jupiter.api.Test;
Expand All @@ -43,6 +44,14 @@ void testResourceRead() throws IOException {
assertThrows(IllegalArgumentException.class, () -> MoreResources.readResource("invalid"));
}

@Test
void testReadBytes() throws IOException {
assertEquals("content1\n", new String(MoreResources.readBytes("resource_test"), StandardCharsets.UTF_8));
assertEquals("content2\n", new String(MoreResources.readBytes("subdir/resource_test_sub"), StandardCharsets.UTF_8));

assertThrows(IllegalArgumentException.class, () -> MoreResources.readBytes("invalid"));
}

@Test
void testResourceReadDuplicateName() throws IOException {
assertEquals("content1\n", MoreResources.readResource("resource_test_a"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "decd338e-5647-4c0b-adf4-da0e75f5a750",
"name": "Postgres",
"dockerRepository": "airbyte/source-postgres",
"dockerImageTag": "0.2.3",
"dockerImageTag": "0.2.4",
"documentationUrl": "https://hub.docker.com/r/airbyte/source-postgres"
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
- sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
name: Postgres
dockerRepository: airbyte/source-postgres
dockerImageTag: 0.2.3
dockerImageTag: 0.2.4
documentationUrl: https://hub.docker.com/r/airbyte/source-postgres
- sourceDefinitionId: cd42861b-01fc-4658-a8ab-5d11d0510f01
name: Recurly
Expand Down
104 changes: 104 additions & 0 deletions airbyte-db/src/main/java/io/airbyte/db/PgLsn.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.db;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.nio.ByteBuffer;

/**
* Doc on the structure of a Postgres LSN
* https://www.postgresql.org/docs/current/datatype-pg-lsn.html
*/
public class PgLsn implements Comparable<PgLsn> {

private final long lsn;

public static PgLsn fromLong(final long lsn) {
return new PgLsn(lsn);
}

public static PgLsn fromPgString(final String lsn) {
return new PgLsn(lsnToLong(lsn));
}

private PgLsn(final long lsn) {
this.lsn = lsn;
}

public long asLong() {
return lsn;
}

public String asPgString() {
return longToLsn(lsn);
}

@Override
public int compareTo(final PgLsn o) {
return Long.compare(lsn, o.asLong());
}

/**
* The LSN returned by Postgres is a 64-bit integer represented as hex encoded 32-bit integers
* separated by a /. reference: https://github.com/davecramer/LogicalDecode
*
* @param lsn string representation as returned by postgres
* @return long representation of the lsn string.
*/
@VisibleForTesting
static long lsnToLong(String lsn) {
int slashIndex = lsn.lastIndexOf('/');
Preconditions.checkArgument(slashIndex >= 0);

String logicalXLogStr = lsn.substring(0, slashIndex);
// parses as a long but then cast to int. this allows us to retain the full 32 bits of the integer
// as opposed to the reduced value of Integer.MAX_VALUE.
int logicalXlog = (int) Long.parseLong(logicalXLogStr, 16);
String segmentStr = lsn.substring(slashIndex + 1, lsn.length());
int segment = (int) Long.parseLong(segmentStr, 16);

ByteBuffer buf = ByteBuffer.allocate(8);
buf.putInt(logicalXlog);
buf.putInt(segment);
buf.position(0);
return buf.getLong();
}

@VisibleForTesting
static String longToLsn(long long1) {
int front = (int) (long1 >> 32);
int back = (int) long1;
return (Integer.toHexString(front) + "/" + Integer.toHexString(back)).toUpperCase();
}

@Override
public String toString() {
return "PgLsn{" +
"lsn=" + lsn +
'}';
}

}
45 changes: 45 additions & 0 deletions airbyte-db/src/main/java/io/airbyte/db/PostgresUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.db;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Preconditions;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.db.jdbc.JdbcUtils;
import java.sql.SQLException;
import java.util.List;

public class PostgresUtils {

public static PgLsn getLsn(JdbcDatabase database) throws SQLException {
// pg version 10+.
final List<JsonNode> jsonNodes = database
.bufferedResultSetQuery(conn -> conn.createStatement().executeQuery("SELECT pg_current_wal_lsn()"), JdbcUtils::rowToJson);

Preconditions.checkState(jsonNodes.size() == 1);
return PgLsn.fromPgString(jsonNodes.get(0).get("pg_current_wal_lsn").asText());
}

}
56 changes: 56 additions & 0 deletions airbyte-db/src/test/java/io/airbyte/db/PgLsnTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.db;

import static org.junit.jupiter.api.Assertions.assertEquals;

import com.google.common.collect.ImmutableMap;
import java.util.Map;
import org.junit.jupiter.api.Test;

class PgLsnTest {

private static final Map<String, Long> TEST_LSNS = ImmutableMap.<String, Long>builder()
.put("0/15E7A10", 22968848L)
.put("0/15E7B08", 22969096L)
.put("16/15E7B08", 94512249608L)
.put("16/FFFFFFFF", 98784247807L)
.put("7FFFFFFF/FFFFFFFF", Long.MAX_VALUE)
.put("0/0", 0L)
.build();

@Test
void testLsnToLong() {
TEST_LSNS.forEach(
(key, value) -> assertEquals(value, PgLsn.lsnToLong(key), String.format("Conversion failed. lsn: %s long value: %s", key, value)));
}

@Test
void testLongToLsn() {
TEST_LSNS.forEach(
(key, value) -> assertEquals(key, PgLsn.longToLsn(value), String.format("Conversion failed. lsn: %s long value: %s", key, value)));
}

}
Loading