diff --git a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java index 7e396ed8b02e..777dec7eeaaf 100644 --- a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java +++ b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java @@ -5,6 +5,8 @@ package io.airbyte.config.persistence; import static io.airbyte.db.instance.configs.jooq.Tables.ACTOR; +import static io.airbyte.db.instance.configs.jooq.Tables.ACTOR_CATALOG; +import static io.airbyte.db.instance.configs.jooq.Tables.ACTOR_CATALOG_FETCH_EVENT; import static io.airbyte.db.instance.configs.jooq.Tables.CONNECTION; import com.fasterxml.jackson.databind.JsonNode; @@ -41,6 +43,7 @@ import io.airbyte.validation.json.JsonSchemaValidator; import io.airbyte.validation.json.JsonValidationException; import java.io.IOException; +import java.time.OffsetDateTime; import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedHashMap; @@ -53,6 +56,12 @@ import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; +import org.jooq.DSLContext; +import org.jooq.JSONB; +import org.jooq.Record; +import org.jooq.Record1; +import org.jooq.Record2; +import org.jooq.Result; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -627,35 +636,111 @@ public List listActorCatalogs() return actorCatalogs; } - public void writeCatalog(final AirbyteCatalog catalog, - final UUID sourceId, - final String configurationHash, - final String connectorVersion) - throws JsonValidationException, IOException { + private Map findCatalogByHash(final String catalogHash, final DSLContext context) { + final Result> records = context.select(ACTOR_CATALOG.ID, ACTOR_CATALOG.CATALOG) + .from(ACTOR_CATALOG) + .where(ACTOR_CATALOG.CATALOG_HASH.eq(catalogHash)).fetch(); + + final Map result = new HashMap<>(); + for (final Record record : records) { + final AirbyteCatalog catalog = Jsons.deserialize( + record.get(ACTOR_CATALOG.CATALOG).toString(), AirbyteCatalog.class); + result.put(record.get(ACTOR_CATALOG.ID), catalog); + } + return result; + } + + /** + * Store an Airbyte catalog in DB if it is not present already + * + * Checks in the config DB if the catalog is present already, if so returns it identifier. It is not + * present, it is inserted in DB with a new identifier and that identifier is returned. + * + * @param airbyteCatalog An Airbyte catalog to cache + * @param context + * @return the db identifier for the cached catalog. + */ + private UUID getOrInsertActorCatalog(final AirbyteCatalog airbyteCatalog, + final DSLContext context) { + final OffsetDateTime timestamp = OffsetDateTime.now(); final HashFunction hashFunction = Hashing.murmur3_32_fixed(); - final String catalogHash = hashFunction.hashBytes(Jsons.serialize(catalog).getBytes( + final String catalogHash = hashFunction.hashBytes(Jsons.serialize(airbyteCatalog).getBytes( Charsets.UTF_8)).toString(); - ActorCatalog actorCatalog = new ActorCatalog() - .withCatalog(Jsons.jsonNode(catalog)) - .withId(UUID.randomUUID()) - .withCatalogHash(catalogHash); - final Optional existingCatalog = findExistingCatalog(actorCatalog); - if (existingCatalog.isPresent()) { - actorCatalog = existingCatalog.get(); - } else { - persistence.writeConfig(ConfigSchema.ACTOR_CATALOG, - actorCatalog.getId().toString(), - actorCatalog); - } - final ActorCatalogFetchEvent actorCatalogFetchEvent = new ActorCatalogFetchEvent() - .withActorCatalogId(actorCatalog.getId()) - .withId(UUID.randomUUID()) - .withConfigHash(configurationHash) - .withConnectorVersion(connectorVersion) - .withActorId(sourceId); - persistence.writeConfig(ConfigSchema.ACTOR_CATALOG_FETCH_EVENT, - actorCatalogFetchEvent.getId().toString(), - actorCatalogFetchEvent); + final Map catalogs = findCatalogByHash(catalogHash, context); + + for (final Map.Entry entry : catalogs.entrySet()) { + if (entry.getValue().equals(airbyteCatalog)) { + return entry.getKey(); + } + } + + final UUID catalogId = UUID.randomUUID(); + context.insertInto(ACTOR_CATALOG) + .set(ACTOR_CATALOG.ID, catalogId) + .set(ACTOR_CATALOG.CATALOG, JSONB.valueOf(Jsons.serialize(airbyteCatalog))) + .set(ACTOR_CATALOG.CATALOG_HASH, catalogHash) + .set(ACTOR_CATALOG.CREATED_AT, timestamp) + .set(ACTOR_CATALOG.MODIFIED_AT, timestamp).execute(); + return catalogId; + } + + public Optional getActorCatalog(final UUID actorId, + final String actorVersion, + final String configHash) + throws IOException { + final Result> records = database.transaction(ctx -> ctx.select(ACTOR_CATALOG.CATALOG) + .from(ACTOR_CATALOG).join(ACTOR_CATALOG_FETCH_EVENT) + .on(ACTOR_CATALOG.ID.eq(ACTOR_CATALOG_FETCH_EVENT.ACTOR_CATALOG_ID)) + .where(ACTOR_CATALOG_FETCH_EVENT.ACTOR_ID.eq(actorId)) + .and(ACTOR_CATALOG_FETCH_EVENT.ACTOR_VERSION.eq(actorVersion)) + .and(ACTOR_CATALOG_FETCH_EVENT.CONFIG_HASH.eq(configHash)) + .orderBy(ACTOR_CATALOG_FETCH_EVENT.CREATED_AT.desc()).limit(1)).fetch(); + + if (records.size() >= 1) { + final JSONB record = records.get(0).get(ACTOR_CATALOG.CATALOG); + return Optional.of(Jsons.deserialize(record.toString(), AirbyteCatalog.class)); + } + return Optional.empty(); + + } + + /** + * Stores source catalog information. + * + * This function is called each time the schema of a source is fetched. This can occur because the + * source is set up for the first time, because the configuration or version of the connector + * changed or because the user explicitly requested a schema refresh. Schemas are stored separately + * and de-duplicated upon insertion. Once a schema has been successfully stored, a call to + * getActorCatalog(actorId, connectionVersion, configurationHash) will return the most recent schema + * stored for those parameters. + * + * @param catalog + * @param actorId + * @param connectorVersion + * @param configurationHash + * @return The identifier (UUID) of the fetch event inserted in the database + * @throws IOException + */ + public UUID writeActorCatalogFetchEvent(final AirbyteCatalog catalog, + final UUID actorId, + final String connectorVersion, + final String configurationHash) + throws IOException { + final OffsetDateTime timestamp = OffsetDateTime.now(); + final UUID fetchEventID = UUID.randomUUID(); + database.transaction(ctx -> { + final UUID catalogId = getOrInsertActorCatalog(catalog, ctx); + return ctx.insertInto(ACTOR_CATALOG_FETCH_EVENT) + .set(ACTOR_CATALOG_FETCH_EVENT.ID, fetchEventID) + .set(ACTOR_CATALOG_FETCH_EVENT.ACTOR_ID, actorId) + .set(ACTOR_CATALOG_FETCH_EVENT.ACTOR_CATALOG_ID, catalogId) + .set(ACTOR_CATALOG_FETCH_EVENT.CONFIG_HASH, configurationHash) + .set(ACTOR_CATALOG_FETCH_EVENT.ACTOR_VERSION, connectorVersion) + .set(ACTOR_CATALOG_FETCH_EVENT.MODIFIED_AT, timestamp) + .set(ACTOR_CATALOG_FETCH_EVENT.CREATED_AT, timestamp).execute(); + }); + + return fetchEventID; } public int countConnectionsForWorkspace(final UUID workspaceId) throws IOException { diff --git a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryE2EReadWriteTest.java b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryE2EReadWriteTest.java index 99eea3882ef2..09a1ebfd27d4 100644 --- a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryE2EReadWriteTest.java +++ b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryE2EReadWriteTest.java @@ -4,6 +4,7 @@ package io.airbyte.config.persistence; +import static io.airbyte.db.instance.configs.jooq.Tables.ACTOR_CATALOG; import static org.junit.jupiter.api.Assertions.*; import static org.mockito.Mockito.spy; @@ -12,8 +13,10 @@ import io.airbyte.config.SourceConnection; import io.airbyte.config.StandardDestinationDefinition; import io.airbyte.config.StandardSourceDefinition; +import io.airbyte.config.StandardSourceDefinition.SourceType; import io.airbyte.config.StandardSync; import io.airbyte.config.StandardSyncOperation; +import io.airbyte.config.StandardWorkspace; import io.airbyte.config.persistence.split_secrets.MemorySecretPersistence; import io.airbyte.config.persistence.split_secrets.NoOpSecretsHydrator; import io.airbyte.db.Database; @@ -21,9 +24,14 @@ import io.airbyte.db.instance.configs.ConfigsDatabaseMigrator; import io.airbyte.db.instance.development.DevDatabaseMigrator; import io.airbyte.db.instance.development.MigrationDevHelper; +import io.airbyte.protocol.models.AirbyteCatalog; +import io.airbyte.protocol.models.CatalogHelpers; import io.airbyte.protocol.models.ConnectorSpecification; +import io.airbyte.protocol.models.Field; +import io.airbyte.protocol.models.JsonSchemaType; import io.airbyte.validation.json.JsonValidationException; import java.io.IOException; +import java.sql.SQLException; import java.util.Optional; import java.util.UUID; import org.junit.jupiter.api.AfterAll; @@ -97,4 +105,54 @@ void testWorkspaceCountConnections() throws IOException { assertEquals(MockData.sourceConnections().size(), configRepository.countSourcesForWorkspace(workspaceId)); } + @Test + void testSimpleInsertActorCatalog() throws IOException, JsonValidationException, SQLException { + + final StandardWorkspace workspace = MockData.standardWorkspace(); + + final StandardSourceDefinition sourceDefinition = new StandardSourceDefinition() + .withSourceDefinitionId(UUID.randomUUID()) + .withSourceType(SourceType.DATABASE) + .withDockerRepository("docker-repo") + .withDockerImageTag("1.2.0") + .withName("sourceDefinition"); + configRepository.writeStandardSourceDefinition(sourceDefinition); + + final SourceConnection source = new SourceConnection() + .withSourceDefinitionId(sourceDefinition.getSourceDefinitionId()) + .withSourceId(UUID.randomUUID()) + .withName("SomeConnector") + .withWorkspaceId(workspace.getWorkspaceId()) + .withConfiguration(Jsons.deserialize("{}")); + final ConnectorSpecification specification = new ConnectorSpecification() + .withConnectionSpecification(Jsons.deserialize("{}")); + configRepository.writeSourceConnection(source, specification); + + final AirbyteCatalog actorCatalog = CatalogHelpers.createAirbyteCatalog("clothes", Field.of("name", JsonSchemaType.STRING)); + configRepository.writeActorCatalogFetchEvent( + actorCatalog, source.getSourceId(), "1.2.0", "ConfigHash"); + + final Optional catalog = + configRepository.getActorCatalog(source.getSourceId(), "1.2.0", "ConfigHash"); + assertTrue(catalog.isPresent()); + assertEquals(actorCatalog, catalog.get()); + assertFalse(configRepository.getSourceCatalog(source.getSourceId(), "1.3.0", "ConfigHash").isPresent()); + assertFalse(configRepository.getSourceCatalog(source.getSourceId(), "1.2.0", "OtherConfigHash").isPresent()); + + configRepository.writeActorCatalogFetchEvent(actorCatalog, source.getSourceId(), "1.3.0", "ConfigHash"); + final Optional catalogNewConnectorVersion = + configRepository.getActorCatalog(source.getSourceId(), "1.3.0", "ConfigHash"); + assertTrue(catalogNewConnectorVersion.isPresent()); + assertEquals(actorCatalog, catalogNewConnectorVersion.get()); + + configRepository.writeActorCatalogFetchEvent(actorCatalog, source.getSourceId(), "1.2.0", "OtherConfigHash"); + final Optional catalogNewConfig = + configRepository.getActorCatalog(source.getSourceId(), "1.2.0", "OtherConfigHash"); + assertTrue(catalogNewConfig.isPresent()); + assertEquals(actorCatalog, catalogNewConfig.get()); + + final int catalogDbEntry = database.query(ctx -> ctx.selectCount().from(ACTOR_CATALOG)).fetchOne().into(int.class); + assertEquals(1, catalogDbEntry); + } + }