From a3ca3ab6b2ccd3493e83c138c3dcf9ade4528f5b Mon Sep 17 00:00:00 2001 From: VitaliiMaltsev <39538064+VitaliiMaltsev@users.noreply.github.com> Date: Mon, 13 Jun 2022 23:58:26 +0300 Subject: [PATCH] :bug: Postgres Source: fixed unsupported date-time datatypes during incremental sync (#13655) * Postgres Source: fixed unsupposted date-time datatypes during incremental sync * updated CHANGELOG * add tests for incremental cursor check * removed star import * Postgres Source: fixed unsupposted date-time datatypes during incremental sync * updated CHANGELOG * add tests for incremental cursor check * removed star import * add timestamp datatype test * Bump version in Dockerfile * auto-bump connector version Co-authored-by: grishick Co-authored-by: Octavia Squidington III --- .../resources/seed/source_definitions.yaml | 2 +- .../src/main/resources/seed/source_specs.yaml | 2 +- .../jdbc/test/JdbcSourceAcceptanceTest.java | 125 ++++---- ...StrictEncryptJdbcSourceAcceptanceTest.java | 2 +- .../connectors/source-postgres/Dockerfile | 2 +- .../postgres/PostgresSourceOperations.java | 57 +++- .../PostgresJdbcSourceAcceptanceTest.java | 267 +++++++++++++++++- docs/integrations/sources/postgres.md | 1 + 8 files changed, 385 insertions(+), 73 deletions(-) diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 611c045c0571..58334a7a5717 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -715,7 +715,7 @@ - name: Postgres sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750 dockerRepository: airbyte/source-postgres - dockerImageTag: 0.4.21 + dockerImageTag: 0.4.22 documentationUrl: https://docs.airbyte.io/integrations/sources/postgres icon: postgresql.svg sourceType: database diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 3a8606a656d2..88c0959a6e00 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -6719,7 +6719,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-postgres:0.4.21" +- dockerImage: "airbyte/source-postgres:0.4.22" spec: documentationUrl: "https://docs.airbyte.com/integrations/sources/postgres" connectionSpecification: diff --git a/airbyte-integrations/connectors/source-jdbc/src/testFixtures/java/io/airbyte/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-jdbc/src/testFixtures/java/io/airbyte/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.java index be29c888993f..802d8ac79bc7 100644 --- a/airbyte-integrations/connectors/source-jdbc/src/testFixtures/java/io/airbyte/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-jdbc/src/testFixtures/java/io/airbyte/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.java @@ -388,6 +388,13 @@ void testReadOneColumn() throws Exception { setEmittedAtToNull(actualMessages); + final List expectedMessages = getAirbyteMessagesReadOneColumn(); + assertTrue(expectedMessages.size() == actualMessages.size()); + assertTrue(expectedMessages.containsAll(actualMessages)); + assertTrue(actualMessages.containsAll(expectedMessages)); + } + + protected List getAirbyteMessagesReadOneColumn() { final List expectedMessages = getTestMessages().stream() .map(Jsons::clone) .peek(m -> { @@ -397,9 +404,7 @@ void testReadOneColumn() throws Exception { convertIdBasedOnDatabase(m.getRecord().getData().get(COL_ID).asInt())); }) .collect(Collectors.toList()); - assertTrue(expectedMessages.size() == actualMessages.size()); - assertTrue(expectedMessages.containsAll(actualMessages)); - assertTrue(actualMessages.containsAll(expectedMessages)); + return expectedMessages; } @Test @@ -432,17 +437,7 @@ void testReadMultipleTables() throws Exception { Field.of(COL_ID, JsonSchemaType.NUMBER), Field.of(COL_NAME, JsonSchemaType.STRING))); - final List secondStreamExpectedMessages = getTestMessages() - .stream() - .map(Jsons::clone) - .peek(m -> { - m.getRecord().setStream(streamName2); - m.getRecord().setNamespace(getDefaultNamespace()); - ((ObjectNode) m.getRecord().getData()).remove(COL_UPDATED_AT); - ((ObjectNode) m.getRecord().getData()).replace(COL_ID, - convertIdBasedOnDatabase(m.getRecord().getData().get(COL_ID).asInt())); - }) - .collect(Collectors.toList()); + final List secondStreamExpectedMessages = getAirbyteMessagesSecondSync(streamName2); expectedMessages.addAll(secondStreamExpectedMessages); } @@ -456,6 +451,21 @@ void testReadMultipleTables() throws Exception { assertTrue(actualMessages.containsAll(expectedMessages)); } + protected List getAirbyteMessagesSecondSync(String streamName2) { + return getTestMessages() + .stream() + .map(Jsons::clone) + .peek(m -> { + m.getRecord().setStream(streamName2); + m.getRecord().setNamespace(getDefaultNamespace()); + ((ObjectNode) m.getRecord().getData()).remove(COL_UPDATED_AT); + ((ObjectNode) m.getRecord().getData()).replace(COL_ID, + convertIdBasedOnDatabase(m.getRecord().getData().get(COL_ID).asInt())); + }) + .collect(Collectors.toList()); + + } + @Test void testTablesWithQuoting() throws Exception { final ConfiguredAirbyteStream streamForTableWithSpaces = createTableWithSpaces(); @@ -469,7 +479,17 @@ void testTablesWithQuoting() throws Exception { setEmittedAtToNull(actualMessages); - final List secondStreamExpectedMessages = getTestMessages() + final List secondStreamExpectedMessages = getAirbyteMessagesForTablesWithQuoting(streamForTableWithSpaces); + final List expectedMessages = new ArrayList<>(getTestMessages()); + expectedMessages.addAll(secondStreamExpectedMessages); + + assertTrue(expectedMessages.size() == actualMessages.size()); + assertTrue(expectedMessages.containsAll(actualMessages)); + assertTrue(actualMessages.containsAll(expectedMessages)); + } + + protected List getAirbyteMessagesForTablesWithQuoting(ConfiguredAirbyteStream streamForTableWithSpaces) { + return getTestMessages() .stream() .map(Jsons::clone) .peek(m -> { @@ -481,12 +501,6 @@ void testTablesWithQuoting() throws Exception { convertIdBasedOnDatabase(m.getRecord().getData().get(COL_ID).asInt())); }) .collect(Collectors.toList()); - final List expectedMessages = new ArrayList<>(getTestMessages()); - expectedMessages.addAll(secondStreamExpectedMessages); - - assertTrue(expectedMessages.size() == actualMessages.size()); - assertTrue(expectedMessages.containsAll(actualMessages)); - assertTrue(actualMessages.containsAll(expectedMessages)); } @SuppressWarnings("ResultOfMethodCallIgnored") @@ -532,6 +546,17 @@ void testIncrementalStringCheckCursor() throws Exception { void testIncrementalStringCheckCursorSpaceInColumnName() throws Exception { final ConfiguredAirbyteStream streamWithSpaces = createTableWithSpaces(); + final ArrayList expectedRecordMessages = getAirbyteMessagesCheckCursorSpaceInColumnName(streamWithSpaces); + incrementalCursorCheck( + COL_LAST_NAME_WITH_SPACE, + COL_LAST_NAME_WITH_SPACE, + "patent", + "vash", + expectedRecordMessages, + streamWithSpaces); + } + + protected ArrayList getAirbyteMessagesCheckCursorSpaceInColumnName(ConfiguredAirbyteStream streamWithSpaces) { final AirbyteMessage firstMessage = getTestMessages().get(0); firstMessage.getRecord().setStream(streamWithSpaces.getStream().getName()); ((ObjectNode) firstMessage.getRecord().getData()).remove(COL_UPDATED_AT); @@ -546,21 +571,15 @@ void testIncrementalStringCheckCursorSpaceInColumnName() throws Exception { Lists.newArrayList(getTestMessages().get(0), getTestMessages().get(2)); - incrementalCursorCheck( - COL_LAST_NAME_WITH_SPACE, - COL_LAST_NAME_WITH_SPACE, - "patent", - "vash", - Lists.newArrayList(firstMessage, secondMessage), - streamWithSpaces); + return Lists.newArrayList(firstMessage, secondMessage); } @Test - void testIncrementalTimestampCheckCursor() throws Exception { - incrementalTimestampCheck(); + void testIncrementalDateCheckCursor() throws Exception { + incrementalDateCheck(); } - protected void incrementalTimestampCheck() throws Exception { + protected void incrementalDateCheck() throws Exception { incrementalCursorCheck( COL_UPDATED_AT, "2005-10-18T00:00:00Z", @@ -600,14 +619,7 @@ void testReadOneTableIncrementallyTwice() throws Exception { .filter(r -> r.getType() == Type.STATE).findFirst(); assertTrue(stateAfterFirstSyncOptional.isPresent()); - database.execute(connection -> { - connection.createStatement().execute( - String.format("INSERT INTO %s(id, name, updated_at) VALUES (4,'riker', '2006-10-19')", - getFullyQualifiedTableName(TABLE_NAME))); - connection.createStatement().execute( - String.format("INSERT INTO %s(id, name, updated_at) VALUES (5, 'data', '2006-10-19')", - getFullyQualifiedTableName(TABLE_NAME))); - }); + executeStatementReadIncrementallyTwice(); final List actualMessagesSecondSync = MoreIterators .toList(source.read(config, configuredCatalog, @@ -624,6 +636,17 @@ void testReadOneTableIncrementallyTwice() throws Exception { assertTrue(actualMessagesSecondSync.containsAll(expectedMessages)); } + protected void executeStatementReadIncrementallyTwice() throws SQLException { + database.execute(connection -> { + connection.createStatement().execute( + String.format("INSERT INTO %s(id, name, updated_at) VALUES (4,'riker', '2006-10-19')", + getFullyQualifiedTableName(TABLE_NAME))); + connection.createStatement().execute( + String.format("INSERT INTO %s(id, name, updated_at) VALUES (5, 'data', '2006-10-19')", + getFullyQualifiedTableName(TABLE_NAME))); + }); + } + protected List getExpectedAirbyteMessagesSecondSync(String namespace) { final List expectedMessages = new ArrayList<>(); expectedMessages.add(new AirbyteMessage().withType(Type.RECORD) @@ -696,16 +719,7 @@ void testReadMultipleTablesIncrementally() throws Exception { // we know the second streams messages are the same as the first minus the updated at column. so we // cheat and generate the expected messages off of the first expected messages. - final List secondStreamExpectedMessages = getTestMessages() - .stream() - .map(Jsons::clone) - .peek(m -> { - m.getRecord().setStream(streamName2); - ((ObjectNode) m.getRecord().getData()).remove(COL_UPDATED_AT); - ((ObjectNode) m.getRecord().getData()).replace(COL_ID, - convertIdBasedOnDatabase(m.getRecord().getData().get(COL_ID).asInt())); - }) - .collect(Collectors.toList()); + final List secondStreamExpectedMessages = getAirbyteMessagesSecondStreamWithNamespace(streamName2); final List expectedMessagesFirstSync = new ArrayList<>(getTestMessages()); expectedMessagesFirstSync.add(new AirbyteMessage() .withType(Type.STATE) @@ -748,6 +762,19 @@ void testReadMultipleTablesIncrementally() throws Exception { assertTrue(actualMessagesFirstSync.containsAll(expectedMessagesFirstSync)); } + protected List getAirbyteMessagesSecondStreamWithNamespace(String streamName2) { + return getTestMessages() + .stream() + .map(Jsons::clone) + .peek(m -> { + m.getRecord().setStream(streamName2); + ((ObjectNode) m.getRecord().getData()).remove(COL_UPDATED_AT); + ((ObjectNode) m.getRecord().getData()).replace(COL_ID, + convertIdBasedOnDatabase(m.getRecord().getData().get(COL_ID).asInt())); + }) + .collect(Collectors.toList()); + } + // when initial and final cursor fields are the same. protected void incrementalCursorCheck( final String cursorField, diff --git a/airbyte-integrations/connectors/source-postgres-strict-encrypt/src/test/java/io/airbyte/integrations/source/postgres/PostgresStrictEncryptJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-postgres-strict-encrypt/src/test/java/io/airbyte/integrations/source/postgres/PostgresStrictEncryptJdbcSourceAcceptanceTest.java index ba0d126de1fe..aa7cda5d248c 100644 --- a/airbyte-integrations/connectors/source-postgres-strict-encrypt/src/test/java/io/airbyte/integrations/source/postgres/PostgresStrictEncryptJdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-postgres-strict-encrypt/src/test/java/io/airbyte/integrations/source/postgres/PostgresStrictEncryptJdbcSourceAcceptanceTest.java @@ -164,7 +164,7 @@ protected AirbyteCatalog getCatalog(final String defaultNamespace) { } @Override - protected void incrementalTimestampCheck() throws Exception { + protected void incrementalDateCheck() throws Exception { super.incrementalCursorCheck(COL_UPDATED_AT, "2005-10-18", "2006-10-19", diff --git a/airbyte-integrations/connectors/source-postgres/Dockerfile b/airbyte-integrations/connectors/source-postgres/Dockerfile index 42f2984e039d..1b07db6a7749 100644 --- a/airbyte-integrations/connectors/source-postgres/Dockerfile +++ b/airbyte-integrations/connectors/source-postgres/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.4.21 +LABEL io.airbyte.version=0.4.22 LABEL io.airbyte.name=airbyte/source-postgres diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java index 4d8247798a79..798286efb297 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java @@ -21,7 +21,6 @@ import io.airbyte.db.jdbc.JdbcSourceOperations; import io.airbyte.protocol.models.JsonSchemaType; import java.math.BigDecimal; -import java.sql.Date; import java.sql.JDBCType; import java.sql.PreparedStatement; import java.sql.ResultSet; @@ -30,6 +29,8 @@ import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.time.OffsetTime; import java.util.Collections; import org.postgresql.jdbc.PgResultSetMetaData; import org.slf4j.Logger; @@ -79,15 +80,57 @@ public JsonNode rowToJson(final ResultSet queryContext) throws SQLException { } @Override - protected void setDate(final PreparedStatement preparedStatement, final int parameterIndex, final String value) throws SQLException { - try { - Date date = Date.valueOf(value); - preparedStatement.setDate(parameterIndex, date); - } catch (final Exception e) { - throw new RuntimeException(e); + public void setStatementField(final PreparedStatement preparedStatement, + final int parameterIndex, + final JDBCType cursorFieldType, + final String value) + throws SQLException { + switch (cursorFieldType) { + + case TIMESTAMP -> setTimestamp(preparedStatement, parameterIndex, value); + case TIMESTAMP_WITH_TIMEZONE -> setTimestampWithTimezone(preparedStatement, parameterIndex, value); + case TIME -> setTime(preparedStatement, parameterIndex, value); + case TIME_WITH_TIMEZONE -> setTimeWithTimezone(preparedStatement, parameterIndex, value); + case DATE -> setDate(preparedStatement, parameterIndex, value); + case BIT -> setBit(preparedStatement, parameterIndex, value); + case BOOLEAN -> setBoolean(preparedStatement, parameterIndex, value); + case TINYINT, SMALLINT -> setShortInt(preparedStatement, parameterIndex, value); + case INTEGER -> setInteger(preparedStatement, parameterIndex, value); + case BIGINT -> setBigInteger(preparedStatement, parameterIndex, value); + case FLOAT, DOUBLE -> setDouble(preparedStatement, parameterIndex, value); + case REAL -> setReal(preparedStatement, parameterIndex, value); + case NUMERIC, DECIMAL -> setDecimal(preparedStatement, parameterIndex, value); + case CHAR, NCHAR, NVARCHAR, VARCHAR, LONGVARCHAR -> setString(preparedStatement, parameterIndex, value); + case BINARY, BLOB -> setBinary(preparedStatement, parameterIndex, value); + // since cursor are expected to be comparable, handle cursor typing strictly and error on + // unrecognized types + default -> throw new IllegalArgumentException(String.format("%s is not supported.", cursorFieldType)); } } + private void setTimeWithTimezone(PreparedStatement preparedStatement, int parameterIndex, String value) throws SQLException { + preparedStatement.setObject(parameterIndex, OffsetTime.parse(value)); + } + + private void setTimestampWithTimezone(PreparedStatement preparedStatement, int parameterIndex, String value) throws SQLException { + preparedStatement.setObject(parameterIndex, OffsetDateTime.parse(value)); + } + + @Override + protected void setTimestamp(PreparedStatement preparedStatement, int parameterIndex, String value) throws SQLException { + preparedStatement.setObject(parameterIndex, LocalDateTime.parse(value)); + } + + @Override + protected void setTime(PreparedStatement preparedStatement, int parameterIndex, String value) throws SQLException { + preparedStatement.setObject(parameterIndex, LocalTime.parse(value)); + } + + @Override + protected void setDate(final PreparedStatement preparedStatement, final int parameterIndex, final String value) throws SQLException { + preparedStatement.setObject(parameterIndex, LocalDate.parse(value)); + } + @Override public void setJsonField(final ResultSet resultSet, final int colIndex, final ObjectNode json) throws SQLException { final PgResultSetMetaData metadata = (PgResultSetMetaData) resultSet.getMetaData(); diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java index bb25b4493fc2..459a44fa86e3 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java @@ -7,6 +7,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; @@ -14,7 +15,11 @@ import io.airbyte.commons.json.Jsons; import io.airbyte.commons.resources.MoreResources; import io.airbyte.commons.string.Strings; +import io.airbyte.db.factory.DataSourceFactory; import io.airbyte.db.jdbc.JdbcSourceOperations; +import io.airbyte.db.jdbc.JdbcUtils; +import io.airbyte.db.jdbc.StreamingJdbcDatabase; +import io.airbyte.db.jdbc.streaming.AdaptiveStreamingQueryConfig; import io.airbyte.integrations.source.jdbc.AbstractJdbcSource; import io.airbyte.integrations.source.jdbc.test.JdbcSourceAcceptanceTest; import io.airbyte.integrations.source.relationaldb.models.DbState; @@ -24,15 +29,18 @@ import io.airbyte.protocol.models.AirbyteRecordMessage; import io.airbyte.protocol.models.AirbyteStateMessage; import io.airbyte.protocol.models.CatalogHelpers; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; import io.airbyte.protocol.models.ConnectorSpecification; import io.airbyte.protocol.models.Field; import io.airbyte.protocol.models.JsonSchemaType; import io.airbyte.protocol.models.SyncMode; import io.airbyte.test.utils.PostgreSQLContainerHelper; import java.sql.JDBCType; +import java.sql.SQLException; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.stream.Collectors; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; @@ -43,8 +51,9 @@ class PostgresJdbcSourceAcceptanceTest extends JdbcSourceAcceptanceTest { private static PostgreSQLContainer PSQL_DB; - - private JsonNode config; + public static String COL_WAKEUP_AT = "wakeup_at"; + public static String COL_LAST_VISITED_AT = "last_visited_at"; + public static String COL_LAST_COMMENT_AT = "last_comment_at"; @BeforeAll static void init() { @@ -55,6 +64,12 @@ static void init() { @BeforeEach public void setup() throws Exception { final String dbName = Strings.addRandomSuffix("db", "_", 10).toLowerCase(); + COLUMN_CLAUSE_WITH_PK = + "id INTEGER, name VARCHAR(200), updated_at DATE, wakeup_at TIMETZ, last_visited_at TIMESTAMPTZ, last_comment_at TIMESTAMP"; + COLUMN_CLAUSE_WITHOUT_PK = + "id INTEGER, name VARCHAR(200), updated_at DATE, wakeup_at TIMETZ, last_visited_at TIMESTAMPTZ, last_comment_at TIMESTAMP"; + COLUMN_CLAUSE_WITH_COMPOSITE_PK = + "first_name VARCHAR(200), last_name VARCHAR(200), updated_at DATE, wakeup_at TIMETZ, last_visited_at TIMESTAMPTZ, last_comment_at TIMESTAMP"; config = Jsons.jsonNode(ImmutableMap.builder() .put("host", PSQL_DB.getHost()) @@ -70,7 +85,170 @@ public void setup() throws Exception { final String tmpFilePath = IOs.writeFileToRandomTmpDir(initScriptName, "CREATE DATABASE " + dbName + ";"); PostgreSQLContainerHelper.runSqlScript(MountableFile.forHostPath(tmpFilePath), PSQL_DB); - super.setup(); + source = getSource(); + final JsonNode jdbcConfig = getToDatabaseConfigFunction().apply(config); + + streamName = TABLE_NAME; + + dataSource = DataSourceFactory.create( + jdbcConfig.get("username").asText(), + jdbcConfig.has("password") ? jdbcConfig.get("password").asText() : null, + getDriverClass(), + jdbcConfig.get("jdbc_url").asText(), + JdbcUtils.parseJdbcParameters(jdbcConfig, "connection_properties", getJdbcParameterDelimiter())); + + database = new StreamingJdbcDatabase(dataSource, + JdbcUtils.getDefaultSourceOperations(), + AdaptiveStreamingQueryConfig::new); + + createSchemas(); + + database.execute(connection -> { + + connection.createStatement().execute( + createTableQuery(getFullyQualifiedTableName(TABLE_NAME), COLUMN_CLAUSE_WITH_PK, + primaryKeyClause(Collections.singletonList("id")))); + connection.createStatement().execute( + String.format( + "INSERT INTO %s(id, name, updated_at, wakeup_at, last_visited_at, last_comment_at) VALUES (1,'picard', '2004-10-19','10:10:10.123456-05:00','2004-10-19T17:23:54.123456Z','2004-01-01T17:23:54.123456')", + getFullyQualifiedTableName(TABLE_NAME))); + connection.createStatement().execute( + String.format( + "INSERT INTO %s(id, name, updated_at, wakeup_at, last_visited_at, last_comment_at) VALUES (2, 'crusher', '2005-10-19','11:11:11.123456-05:00','2005-10-19T17:23:54.123456Z','2005-01-01T17:23:54.123456')", + getFullyQualifiedTableName(TABLE_NAME))); + connection.createStatement().execute( + String.format( + "INSERT INTO %s(id, name, updated_at, wakeup_at, last_visited_at, last_comment_at) VALUES (3, 'vash', '2006-10-19','12:12:12.123456-05:00','2006-10-19T17:23:54.123456Z','2006-01-01T17:23:54.123456')", + getFullyQualifiedTableName(TABLE_NAME))); + + connection.createStatement().execute( + createTableQuery(getFullyQualifiedTableName(TABLE_NAME_WITHOUT_PK), + COLUMN_CLAUSE_WITHOUT_PK, "")); + connection.createStatement().execute( + String.format( + "INSERT INTO %s(id, name, updated_at, wakeup_at, last_visited_at, last_comment_at) VALUES (1,'picard', '2004-10-19','12:12:12.123456-05:00','2004-10-19T17:23:54.123456Z','2004-01-01T17:23:54.123456')", + getFullyQualifiedTableName(TABLE_NAME_WITHOUT_PK))); + connection.createStatement().execute( + String.format( + "INSERT INTO %s(id, name, updated_at, wakeup_at, last_visited_at, last_comment_at) VALUES (2, 'crusher', '2005-10-19','11:11:11.123456-05:00','2005-10-19T17:23:54.123456Z','2005-01-01T17:23:54.123456')", + getFullyQualifiedTableName(TABLE_NAME_WITHOUT_PK))); + connection.createStatement().execute( + String.format( + "INSERT INTO %s(id, name, updated_at, wakeup_at, last_visited_at, last_comment_at) VALUES (3, 'vash', '2006-10-19','10:10:10.123456-05:00','2006-10-19T17:23:54.123456Z','2006-01-01T17:23:54.123456')", + getFullyQualifiedTableName(TABLE_NAME_WITHOUT_PK))); + + connection.createStatement().execute( + createTableQuery(getFullyQualifiedTableName(TABLE_NAME_COMPOSITE_PK), + COLUMN_CLAUSE_WITH_COMPOSITE_PK, + primaryKeyClause(ImmutableList.of("first_name", "last_name")))); + connection.createStatement().execute( + String.format( + "INSERT INTO %s(first_name, last_name, updated_at, wakeup_at, last_visited_at, last_comment_at) VALUES ('first' ,'picard', '2004-10-19','12:12:12.123456-05:00','2004-10-19T17:23:54.123456Z','2004-01-01T17:23:54.123456')", + getFullyQualifiedTableName(TABLE_NAME_COMPOSITE_PK))); + connection.createStatement().execute( + String.format( + "INSERT INTO %s(first_name, last_name, updated_at, wakeup_at, last_visited_at, last_comment_at) VALUES ('second', 'crusher', '2005-10-19','11:11:11.123456-05:00','2005-10-19T17:23:54.123456Z','2005-01-01T17:23:54.123456')", + getFullyQualifiedTableName(TABLE_NAME_COMPOSITE_PK))); + connection.createStatement().execute( + String.format( + "INSERT INTO %s(first_name, last_name, updated_at, wakeup_at, last_visited_at, last_comment_at) VALUES ('third', 'vash', '2006-10-19','10:10:10.123456-05:00','2006-10-19T17:23:54.123456Z','2006-01-01T17:23:54.123456')", + getFullyQualifiedTableName(TABLE_NAME_COMPOSITE_PK))); + + }); + + } + + @Override + protected List getAirbyteMessagesReadOneColumn() { + return getTestMessages().stream() + .map(Jsons::clone) + .peek(m -> { + ((ObjectNode) m.getRecord().getData()).remove(COL_NAME); + ((ObjectNode) m.getRecord().getData()).remove(COL_UPDATED_AT); + ((ObjectNode) m.getRecord().getData()).remove(COL_WAKEUP_AT); + ((ObjectNode) m.getRecord().getData()).remove(COL_LAST_VISITED_AT); + ((ObjectNode) m.getRecord().getData()).remove(COL_LAST_COMMENT_AT); + ((ObjectNode) m.getRecord().getData()).replace(COL_ID, + Jsons.jsonNode(m.getRecord().getData().get(COL_ID).asInt())); + }) + .collect(Collectors.toList()); + } + + @Override + protected ArrayList getAirbyteMessagesCheckCursorSpaceInColumnName(ConfiguredAirbyteStream streamWithSpaces) { + final AirbyteMessage firstMessage = getTestMessages().get(0); + firstMessage.getRecord().setStream(streamWithSpaces.getStream().getName()); + ((ObjectNode) firstMessage.getRecord().getData()).remove(COL_UPDATED_AT); + ((ObjectNode) firstMessage.getRecord().getData()).remove(COL_WAKEUP_AT); + ((ObjectNode) firstMessage.getRecord().getData()).remove(COL_LAST_VISITED_AT); + ((ObjectNode) firstMessage.getRecord().getData()).remove(COL_LAST_COMMENT_AT); + ((ObjectNode) firstMessage.getRecord().getData()).set(COL_LAST_NAME_WITH_SPACE, + ((ObjectNode) firstMessage.getRecord().getData()).remove(COL_NAME)); + + final AirbyteMessage secondMessage = getTestMessages().get(2); + secondMessage.getRecord().setStream(streamWithSpaces.getStream().getName()); + ((ObjectNode) secondMessage.getRecord().getData()).remove(COL_UPDATED_AT); + ((ObjectNode) secondMessage.getRecord().getData()).remove(COL_WAKEUP_AT); + ((ObjectNode) secondMessage.getRecord().getData()).remove(COL_LAST_VISITED_AT); + ((ObjectNode) secondMessage.getRecord().getData()).remove(COL_LAST_COMMENT_AT); + ((ObjectNode) secondMessage.getRecord().getData()).set(COL_LAST_NAME_WITH_SPACE, + ((ObjectNode) secondMessage.getRecord().getData()).remove(COL_NAME)); + + Lists.newArrayList(getTestMessages().get(0), getTestMessages().get(2)); + + return Lists.newArrayList(firstMessage, secondMessage); + } + + @Override + protected List getAirbyteMessagesSecondSync(String streamName2) { + return getTestMessages() + .stream() + .map(Jsons::clone) + .peek(m -> { + m.getRecord().setStream(streamName2); + m.getRecord().setNamespace(getDefaultNamespace()); + ((ObjectNode) m.getRecord().getData()).remove(COL_UPDATED_AT); + ((ObjectNode) m.getRecord().getData()).remove(COL_WAKEUP_AT); + ((ObjectNode) m.getRecord().getData()).remove(COL_LAST_VISITED_AT); + ((ObjectNode) m.getRecord().getData()).remove(COL_LAST_COMMENT_AT); + ((ObjectNode) m.getRecord().getData()).replace(COL_ID, + Jsons.jsonNode(m.getRecord().getData().get(COL_ID).asInt())); + }) + .collect(Collectors.toList()); + } + + protected List getAirbyteMessagesSecondStreamWithNamespace(String streamName2) { + return getTestMessages() + .stream() + .map(Jsons::clone) + .peek(m -> { + m.getRecord().setStream(streamName2); + ((ObjectNode) m.getRecord().getData()).remove(COL_UPDATED_AT); + ((ObjectNode) m.getRecord().getData()).remove(COL_WAKEUP_AT); + ((ObjectNode) m.getRecord().getData()).remove(COL_LAST_VISITED_AT); + ((ObjectNode) m.getRecord().getData()).remove(COL_LAST_COMMENT_AT); + ((ObjectNode) m.getRecord().getData()).replace(COL_ID, + Jsons.jsonNode(m.getRecord().getData().get(COL_ID).asInt())); + }) + .collect(Collectors.toList()); + } + + protected List getAirbyteMessagesForTablesWithQuoting(ConfiguredAirbyteStream streamForTableWithSpaces) { + return getTestMessages() + .stream() + .map(Jsons::clone) + .peek(m -> { + m.getRecord().setStream(streamForTableWithSpaces.getStream().getName()); + ((ObjectNode) m.getRecord().getData()).set(COL_LAST_NAME_WITH_SPACE, + ((ObjectNode) m.getRecord().getData()).remove(COL_NAME)); + ((ObjectNode) m.getRecord().getData()).remove(COL_UPDATED_AT); + ((ObjectNode) m.getRecord().getData()).remove(COL_LAST_VISITED_AT); + ((ObjectNode) m.getRecord().getData()).remove(COL_LAST_COMMENT_AT); + ((ObjectNode) m.getRecord().getData()).remove(COL_WAKEUP_AT); + ((ObjectNode) m.getRecord().getData()).replace(COL_ID, + Jsons.jsonNode(m.getRecord().getData().get(COL_ID).asInt())); + }) + .collect(Collectors.toList()); } @Override @@ -114,20 +292,41 @@ protected List getTestMessages() { .withData(Jsons.jsonNode(ImmutableMap .of(COL_ID, ID_VALUE_1, COL_NAME, "picard", - COL_UPDATED_AT, "2004-10-19")))), + COL_UPDATED_AT, "2004-10-19", + COL_WAKEUP_AT, "10:10:10.123456-05:00", + COL_LAST_VISITED_AT, "2004-10-19T17:23:54.123456Z", + COL_LAST_COMMENT_AT, "2004-01-01T17:23:54.123456")))), new AirbyteMessage().withType(AirbyteMessage.Type.RECORD) .withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(getDefaultNamespace()) .withData(Jsons.jsonNode(ImmutableMap .of(COL_ID, ID_VALUE_2, COL_NAME, "crusher", - COL_UPDATED_AT, - "2005-10-19")))), + COL_UPDATED_AT, "2005-10-19", + COL_WAKEUP_AT, "11:11:11.123456-05:00", + COL_LAST_VISITED_AT, "2005-10-19T17:23:54.123456Z", + COL_LAST_COMMENT_AT, "2005-01-01T17:23:54.123456")))), new AirbyteMessage().withType(AirbyteMessage.Type.RECORD) .withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(getDefaultNamespace()) .withData(Jsons.jsonNode(ImmutableMap .of(COL_ID, ID_VALUE_3, COL_NAME, "vash", - COL_UPDATED_AT, "2006-10-19"))))); + COL_UPDATED_AT, "2006-10-19", + COL_WAKEUP_AT, "12:12:12.123456-05:00", + COL_LAST_VISITED_AT, "2006-10-19T17:23:54.123456Z", + COL_LAST_COMMENT_AT, "2006-01-01T17:23:54.123456"))))); + } + + protected void executeStatementReadIncrementallyTwice() throws SQLException { + database.execute(connection -> { + connection.createStatement().execute( + String.format( + "INSERT INTO %s(id, name, updated_at, wakeup_at, last_visited_at, last_comment_at) VALUES (4,'riker', '2006-10-19','12:12:12.123456-05:00','2006-10-19T17:23:54.123456Z','2006-01-01T17:23:54.123456')", + getFullyQualifiedTableName(TABLE_NAME))); + connection.createStatement().execute( + String.format( + "INSERT INTO %s(id, name, updated_at, wakeup_at, last_visited_at, last_comment_at) VALUES (5, 'data', '2006-10-19','12:12:12.123456-05:00','2006-10-19T17:23:54.123456Z','2006-01-01T17:23:54.123456')", + getFullyQualifiedTableName(TABLE_NAME))); + }); } @Override @@ -138,7 +337,10 @@ protected AirbyteCatalog getCatalog(final String defaultNamespace) { defaultNamespace, Field.of(COL_ID, JsonSchemaType.NUMBER), Field.of(COL_NAME, JsonSchemaType.STRING), - Field.of(COL_UPDATED_AT, JsonSchemaType.STRING_DATE)) + Field.of(COL_UPDATED_AT, JsonSchemaType.STRING_DATE), + Field.of(COL_WAKEUP_AT, JsonSchemaType.STRING_TIME_WITH_TIMEZONE), + Field.of(COL_LAST_VISITED_AT, JsonSchemaType.STRING_TIMESTAMP_WITH_TIMEZONE), + Field.of(COL_LAST_COMMENT_AT, JsonSchemaType.STRING_TIMESTAMP_WITHOUT_TIMEZONE)) .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) .withSourceDefinedPrimaryKey(List.of(List.of(COL_ID))), CatalogHelpers.createAirbyteStream( @@ -146,7 +348,10 @@ protected AirbyteCatalog getCatalog(final String defaultNamespace) { defaultNamespace, Field.of(COL_ID, JsonSchemaType.NUMBER), Field.of(COL_NAME, JsonSchemaType.STRING), - Field.of(COL_UPDATED_AT, JsonSchemaType.STRING_DATE)) + Field.of(COL_UPDATED_AT, JsonSchemaType.STRING_DATE), + Field.of(COL_WAKEUP_AT, JsonSchemaType.STRING_TIME_WITH_TIMEZONE), + Field.of(COL_LAST_VISITED_AT, JsonSchemaType.STRING_TIMESTAMP_WITH_TIMEZONE), + Field.of(COL_LAST_COMMENT_AT, JsonSchemaType.STRING_TIMESTAMP_WITHOUT_TIMEZONE)) .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) .withSourceDefinedPrimaryKey(Collections.emptyList()), CatalogHelpers.createAirbyteStream( @@ -154,14 +359,17 @@ protected AirbyteCatalog getCatalog(final String defaultNamespace) { defaultNamespace, Field.of(COL_FIRST_NAME, JsonSchemaType.STRING), Field.of(COL_LAST_NAME, JsonSchemaType.STRING), - Field.of(COL_UPDATED_AT, JsonSchemaType.STRING_DATE)) + Field.of(COL_UPDATED_AT, JsonSchemaType.STRING_DATE), + Field.of(COL_WAKEUP_AT, JsonSchemaType.STRING_TIME_WITH_TIMEZONE), + Field.of(COL_LAST_VISITED_AT, JsonSchemaType.STRING_TIMESTAMP_WITH_TIMEZONE), + Field.of(COL_LAST_COMMENT_AT, JsonSchemaType.STRING_TIMESTAMP_WITHOUT_TIMEZONE)) .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) .withSourceDefinedPrimaryKey( List.of(List.of(COL_FIRST_NAME), List.of(COL_LAST_NAME))))); } @Override - protected void incrementalTimestampCheck() throws Exception { + protected void incrementalDateCheck() throws Exception { super.incrementalCursorCheck(COL_UPDATED_AT, "2005-10-18", "2006-10-19", @@ -169,6 +377,33 @@ protected void incrementalTimestampCheck() throws Exception { getTestMessages().get(2))); } + @Test + void incrementalTimeTzCheck() throws Exception { + super.incrementalCursorCheck(COL_WAKEUP_AT, + "11:09:11.123456-05:00", + "12:12:12.123456-05:00", + Lists.newArrayList(getTestMessages().get(1), + getTestMessages().get(2))); + } + + @Test + void incrementalTimestampTzCheck() throws Exception { + super.incrementalCursorCheck(COL_LAST_VISITED_AT, + "2005-10-18T17:23:54.123456Z", + "2006-10-19T17:23:54.123456Z", + Lists.newArrayList(getTestMessages().get(1), + getTestMessages().get(2))); + } + + @Test + void incrementalTimestampCheck() throws Exception { + super.incrementalCursorCheck(COL_LAST_COMMENT_AT, + "2004-12-12T17:23:54.123456", + "2006-01-01T17:23:54.123456", + Lists.newArrayList(getTestMessages().get(1), + getTestMessages().get(2))); + } + @Override protected JdbcSourceOperations getSourceOperations() { return new PostgresSourceOperations(); @@ -182,13 +417,19 @@ protected List getExpectedAirbyteMessagesSecondSync(String names .withData(Jsons.jsonNode(ImmutableMap .of(COL_ID, ID_VALUE_4, COL_NAME, "riker", - COL_UPDATED_AT, "2006-10-19"))))); + COL_UPDATED_AT, "2006-10-19", + COL_WAKEUP_AT, "12:12:12.123456-05:00", + COL_LAST_VISITED_AT, "2006-10-19T17:23:54.123456Z", + COL_LAST_COMMENT_AT, "2006-01-01T17:23:54.123456"))))); expectedMessages.add(new AirbyteMessage().withType(AirbyteMessage.Type.RECORD) .withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(namespace) .withData(Jsons.jsonNode(ImmutableMap .of(COL_ID, ID_VALUE_5, COL_NAME, "data", - COL_UPDATED_AT, "2006-10-19"))))); + COL_UPDATED_AT, "2006-10-19", + COL_WAKEUP_AT, "12:12:12.123456-05:00", + COL_LAST_VISITED_AT, "2006-10-19T17:23:54.123456Z", + COL_LAST_COMMENT_AT, "2006-01-01T17:23:54.123456"))))); expectedMessages.add(new AirbyteMessage() .withType(AirbyteMessage.Type.STATE) .withState(new AirbyteStateMessage() diff --git a/docs/integrations/sources/postgres.md b/docs/integrations/sources/postgres.md index d7a3f8884de5..4b3781a3e55c 100644 --- a/docs/integrations/sources/postgres.md +++ b/docs/integrations/sources/postgres.md @@ -275,6 +275,7 @@ According to Postgres [documentation](https://www.postgresql.org/docs/14/datatyp | Version | Date | Pull Request | Subject | |:--------|:-----------|:-------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------| +| 0.4.22 | 2022-06-09 | [13655](https://github.com/airbytehq/airbyte/pull/13655) | Fixed bug with unsupported date-time datatypes during incremental sync | | 0.4.21 | 2022-06-06 | [13435](https://github.com/airbytehq/airbyte/pull/13435) | Adjust JDBC fetch size based on max memory and max row size | | 0.4.20 | 2022-06-02 | [13367](https://github.com/airbytehq/airbyte/pull/13367) | Added convertion hstore to json format | | 0.4.19 | 2022-05-25 | [13166](https://github.com/airbytehq/airbyte/pull/13166) | Added timezone awareness and handle BC dates |