diff --git a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/DatabaseConfigPersistence.java b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/DatabaseConfigPersistence.java index 8d566baa8d37..25819c57fd2d 100644 --- a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/DatabaseConfigPersistence.java +++ b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/DatabaseConfigPersistence.java @@ -32,6 +32,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.annotations.VisibleForTesting; import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.version.AirbyteVersion; import io.airbyte.config.AirbyteConfig; import io.airbyte.config.ConfigSchema; import io.airbyte.config.ConfigSchemaMigrationSupport; @@ -50,6 +51,7 @@ import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.Stream; +import javax.annotation.Nullable; import org.jooq.DSLContext; import org.jooq.Field; import org.jooq.JSONB; @@ -121,8 +123,6 @@ public List listConfigs(AirbyteConfig configType, Class clazz) throws @Override public void writeConfig(AirbyteConfig configType, String configId, T config) throws IOException { - LOGGER.info("Upserting {} record {}", configType, configId); - database.transaction(ctx -> { boolean isExistingConfig = ctx.fetchExists(select() .from(AIRBYTE_CONFIGS) @@ -131,27 +131,9 @@ public void writeConfig(AirbyteConfig configType, String configId, T config) OffsetDateTime timestamp = OffsetDateTime.now(); if (isExistingConfig) { - int updateCount = ctx.update(AIRBYTE_CONFIGS) - .set(AIRBYTE_CONFIGS.CONFIG_BLOB, JSONB.valueOf(Jsons.serialize(config))) - .set(AIRBYTE_CONFIGS.UPDATED_AT, timestamp) - .where(AIRBYTE_CONFIGS.CONFIG_TYPE.eq(configType.name()), AIRBYTE_CONFIGS.CONFIG_ID.eq(configId)) - .execute(); - if (updateCount != 0 && updateCount != 1) { - LOGGER.warn("{} config {} has been updated; updated record count: {}", configType, configId, updateCount); - } - - return null; - } - - int insertionCount = ctx.insertInto(AIRBYTE_CONFIGS) - .set(AIRBYTE_CONFIGS.CONFIG_ID, configId) - .set(AIRBYTE_CONFIGS.CONFIG_TYPE, configType.name()) - .set(AIRBYTE_CONFIGS.CONFIG_BLOB, JSONB.valueOf(Jsons.serialize(config))) - .set(AIRBYTE_CONFIGS.CREATED_AT, timestamp) - .set(AIRBYTE_CONFIGS.UPDATED_AT, timestamp) - .execute(); - if (insertionCount != 1) { - LOGGER.warn("{} config {} has been inserted; insertion record count: {}", configType, configId, insertionCount); + updateConfigRecord(ctx, timestamp, configType.name(), Jsons.jsonNode(config), configId); + } else { + insertConfigRecord(ctx, timestamp, configType.name(), Jsons.jsonNode(config), configType.getIdFieldName()); } return null; @@ -215,20 +197,25 @@ public Map> dumpConfigs() throws IOException { * @return the number of inserted records for convenience, which is always 1. */ @VisibleForTesting - int insertConfigRecord(DSLContext ctx, OffsetDateTime timestamp, String configType, JsonNode configJson, String idFieldName) { + int insertConfigRecord(DSLContext ctx, OffsetDateTime timestamp, String configType, JsonNode configJson, @Nullable String idFieldName) { String configId = idFieldName == null ? UUID.randomUUID().toString() : configJson.get(idFieldName).asText(); LOGGER.info("Inserting {} record {}", configType, configId); - ctx.insertInto(AIRBYTE_CONFIGS) + int insertionCount = ctx.insertInto(AIRBYTE_CONFIGS) .set(AIRBYTE_CONFIGS.CONFIG_ID, configId) .set(AIRBYTE_CONFIGS.CONFIG_TYPE, configType) .set(AIRBYTE_CONFIGS.CONFIG_BLOB, JSONB.valueOf(Jsons.serialize(configJson))) .set(AIRBYTE_CONFIGS.CREATED_AT, timestamp) .set(AIRBYTE_CONFIGS.UPDATED_AT, timestamp) + .onConflict(AIRBYTE_CONFIGS.CONFIG_TYPE, AIRBYTE_CONFIGS.CONFIG_ID) + .doNothing() .execute(); - return 1; + if (insertionCount != 1) { + LOGGER.warn("{} config {} already exists (insertion record count: {})", configType, configId, insertionCount); + } + return insertionCount; } /** @@ -238,11 +225,15 @@ int insertConfigRecord(DSLContext ctx, OffsetDateTime timestamp, String configTy int updateConfigRecord(DSLContext ctx, OffsetDateTime timestamp, String configType, JsonNode configJson, String configId) { LOGGER.info("Updating {} record {}", configType, configId); - return ctx.update(AIRBYTE_CONFIGS) + int updateCount = ctx.update(AIRBYTE_CONFIGS) .set(AIRBYTE_CONFIGS.CONFIG_BLOB, JSONB.valueOf(Jsons.serialize(configJson))) .set(AIRBYTE_CONFIGS.UPDATED_AT, timestamp) .where(AIRBYTE_CONFIGS.CONFIG_TYPE.eq(configType), AIRBYTE_CONFIGS.CONFIG_ID.eq(configId)) .execute(); + if (updateCount != 1) { + LOGGER.warn("{} config {} is not updated (updated record count: {})", configType, configId, updateCount); + } + return updateCount; } @VisibleForTesting @@ -268,12 +259,14 @@ void copyConfigsFromSeed(DSLContext ctx, ConfigPersistence seedConfigPersistence LOGGER.info("Config database data loading completed with {} records", insertionCount); } - private static class ConnectorInfo { + static class ConnectorInfo { - private final String connectorDefinitionId; - private final String dockerImageTag; + final String dockerRepository; + final String connectorDefinitionId; + final String dockerImageTag; - private ConnectorInfo(String connectorDefinitionId, String dockerImageTag) { + private ConnectorInfo(String dockerRepository, String connectorDefinitionId, String dockerImageTag) { + this.dockerRepository = dockerRepository; this.connectorDefinitionId = connectorDefinitionId; this.dockerImageTag = dockerImageTag; } @@ -334,7 +327,8 @@ private ConnectorCounter updateConnectorDefinitions(DSLContext ctx, AirbyteConfig configType, List latestDefinitions, Set connectorRepositoriesInUse, - Map connectorRepositoryToIdVersionMap) { + Map connectorRepositoryToIdVersionMap) + throws IOException { int newCount = 0; int updatedCount = 0; for (T latestDefinition : latestDefinitions) { @@ -364,7 +358,8 @@ private ConnectorCounter updateConnectorDefinitions(DSLContext ctx, * repository name instead of definition id because connectors can be added manually by * users, and are not always the same as those in the seed. */ - private Map getConnectorRepositoryToInfoMap(DSLContext ctx) { + @VisibleForTesting + Map getConnectorRepositoryToInfoMap(DSLContext ctx) { Field repoField = field("config_blob ->> 'dockerRepository'", SQLDataType.VARCHAR).as("repository"); Field versionField = field("config_blob ->> 'dockerImageTag'", SQLDataType.VARCHAR).as("version"); return ctx.select(AIRBYTE_CONFIGS.CONFIG_ID, repoField, versionField) @@ -373,7 +368,20 @@ private Map getConnectorRepositoryToInfoMap(DSLContext ct .fetch().stream() .collect(Collectors.toMap( row -> row.getValue(repoField), - row -> new ConnectorInfo(row.getValue(AIRBYTE_CONFIGS.CONFIG_ID), row.getValue(versionField)))); + row -> new ConnectorInfo(row.getValue(repoField), row.getValue(AIRBYTE_CONFIGS.CONFIG_ID), row.getValue(versionField)), + // when there are duplicated connector definitions, return the latest one + (c1, c2) -> { + AirbyteVersion v1 = new AirbyteVersion(c1.dockerImageTag); + AirbyteVersion v2 = new AirbyteVersion(c2.dockerImageTag); + LOGGER.warn("Duplicated connector version found for {}: {} ({}) vs {} ({})", + c1.dockerRepository, c1.dockerImageTag, c1.connectorDefinitionId, c2.dockerImageTag, c2.connectorDefinitionId); + int comparison = v1.patchVersionCompareTo(v2); + if (comparison >= 0) { + return c1; + } else { + return c2; + } + })); } /** diff --git a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceLoadDataTest.java b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceLoadDataTest.java index 786f94fa7006..42ac33231712 100644 --- a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceLoadDataTest.java +++ b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceLoadDataTest.java @@ -128,9 +128,10 @@ public void testNoUpdateForUsedConnector() throws Exception { // create a sync to mark the destination as used StandardSync s3Sync = new StandardSync() + .withConnectionId(UUID.randomUUID()) .withSourceId(SOURCE_GITHUB.getSourceDefinitionId()) .withDestinationId(destinationS3V2.getDestinationDefinitionId()); - configPersistence.writeConfig(ConfigSchema.STANDARD_SYNC, UUID.randomUUID().toString(), s3Sync); + configPersistence.writeConfig(ConfigSchema.STANDARD_SYNC, s3Sync.getConnectionId().toString(), s3Sync); configPersistence.loadData(seedPersistence); // s3 destination is not updated diff --git a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceTest.java b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceTest.java index f2a570f8ddf5..dd5c4650ba76 100644 --- a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceTest.java +++ b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceTest.java @@ -33,9 +33,13 @@ import io.airbyte.config.AirbyteConfig; import io.airbyte.config.ConfigSchema; import io.airbyte.config.StandardDestinationDefinition; +import io.airbyte.config.StandardSourceDefinition; +import io.airbyte.config.persistence.DatabaseConfigPersistence.ConnectorInfo; import io.airbyte.db.instance.configs.ConfigsDatabaseInstance; +import java.time.OffsetDateTime; import java.util.List; import java.util.Map; +import java.util.UUID; import java.util.stream.Stream; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -120,4 +124,98 @@ public void testDumpConfigs() throws Exception { assertSameConfigDump(expected, actual); } + @Test + public void testGetConnectorRepositoryToInfoMap() throws Exception { + String connectorRepository = "airbyte/duplicated-connector"; + String oldVersion = "0.1.10"; + String newVersion = "0.2.0"; + StandardSourceDefinition source1 = new StandardSourceDefinition() + .withSourceDefinitionId(UUID.randomUUID()) + .withDockerRepository(connectorRepository) + .withDockerImageTag(oldVersion); + StandardSourceDefinition source2 = new StandardSourceDefinition() + .withSourceDefinitionId(UUID.randomUUID()) + .withDockerRepository(connectorRepository) + .withDockerImageTag(newVersion); + writeSource(configPersistence, source1); + writeSource(configPersistence, source2); + Map result = database.query(ctx -> configPersistence.getConnectorRepositoryToInfoMap(ctx)); + // when there are duplicated connector definitions, the one with the latest version should be + // retrieved + assertEquals(newVersion, result.get(connectorRepository).dockerImageTag); + } + + @Test + public void testInsertConfigRecord() throws Exception { + OffsetDateTime timestamp = OffsetDateTime.now(); + UUID definitionId = UUID.randomUUID(); + String connectorRepository = "airbyte/test-connector"; + + // when the record does not exist, it is inserted + StandardSourceDefinition source1 = new StandardSourceDefinition() + .withSourceDefinitionId(definitionId) + .withDockerRepository(connectorRepository) + .withDockerImageTag("0.1.2"); + int insertionCount = database.query(ctx -> configPersistence.insertConfigRecord( + ctx, + timestamp, + ConfigSchema.STANDARD_SOURCE_DEFINITION.name(), + Jsons.jsonNode(source1), + ConfigSchema.STANDARD_SOURCE_DEFINITION.getIdFieldName())); + assertEquals(1, insertionCount); + // write an irrelevant source to make sure that it is not changed + writeSource(configPersistence, SOURCE_GITHUB); + assertRecordCount(2); + assertHasSource(source1); + assertHasSource(SOURCE_GITHUB); + + // when the record already exists, it is ignored + StandardSourceDefinition source2 = new StandardSourceDefinition() + .withSourceDefinitionId(definitionId) + .withDockerRepository(connectorRepository) + .withDockerImageTag("0.1.5"); + insertionCount = database.query(ctx -> configPersistence.insertConfigRecord( + ctx, + timestamp, + ConfigSchema.STANDARD_SOURCE_DEFINITION.name(), + Jsons.jsonNode(source2), + ConfigSchema.STANDARD_SOURCE_DEFINITION.getIdFieldName())); + assertEquals(0, insertionCount); + assertRecordCount(2); + assertHasSource(source1); + assertHasSource(SOURCE_GITHUB); + } + + @Test + public void testUpdateConfigRecord() throws Exception { + OffsetDateTime timestamp = OffsetDateTime.now(); + UUID definitionId = UUID.randomUUID(); + String connectorRepository = "airbyte/test-connector"; + + StandardSourceDefinition oldSource = new StandardSourceDefinition() + .withSourceDefinitionId(definitionId) + .withDockerRepository(connectorRepository) + .withDockerImageTag("0.3.5"); + writeSource(configPersistence, oldSource); + // write an irrelevant source to make sure that it is not changed + writeSource(configPersistence, SOURCE_GITHUB); + assertRecordCount(2); + assertHasSource(oldSource); + assertHasSource(SOURCE_GITHUB); + + StandardSourceDefinition newSource = new StandardSourceDefinition() + .withSourceDefinitionId(definitionId) + .withDockerRepository(connectorRepository) + .withDockerImageTag("0.3.5"); + database.query(ctx -> configPersistence.updateConfigRecord( + ctx, + timestamp, + ConfigSchema.STANDARD_SOURCE_DEFINITION.name(), + Jsons.jsonNode(newSource), + definitionId.toString())); + assertRecordCount(2); + assertHasSource(newSource); + assertHasSource(SOURCE_GITHUB); + } + }