diff --git a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java index 760e15c634d7..051d9597a5c6 100644 --- a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java +++ b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java @@ -132,7 +132,7 @@ public boolean healthCheck() { public StandardWorkspace getStandardWorkspaceNoSecrets(final UUID workspaceId, final boolean includeTombstone) throws JsonValidationException, IOException, ConfigNotFoundException { - return listWorkspaceQuery(includeTombstone) + return listWorkspaceQuery(Optional.of(workspaceId), includeTombstone) .findFirst() .orElseThrow(() -> new ConfigNotFoundException(ConfigSchema.STANDARD_WORKSPACE, workspaceId)); } @@ -158,13 +158,14 @@ public StandardWorkspace getWorkspaceBySlug(final String slug, final boolean inc } public List listStandardWorkspaces(final boolean includeTombstone) throws IOException { - return listWorkspaceQuery(includeTombstone).toList(); + return listWorkspaceQuery(Optional.empty(), includeTombstone).toList(); } - private Stream listWorkspaceQuery(final boolean includeTombstone) throws IOException { + private Stream listWorkspaceQuery(final Optional workspaceId, final boolean includeTombstone) throws IOException { return database.query(ctx -> ctx.select(WORKSPACE.asterisk()) .from(WORKSPACE) .where(includeTombstone ? noCondition() : WORKSPACE.TOMBSTONE.notEqual(true)) + .and(workspaceId.map(WORKSPACE.ID::eq).orElse(noCondition())) .fetch()) .stream() .map(DbConverter::buildStandardWorkspace); @@ -907,7 +908,7 @@ private static StandardSyncOperation buildStandardSyncOperation(final Record rec } public StandardSyncOperation getStandardSyncOperation(final UUID operationId) throws JsonValidationException, IOException, ConfigNotFoundException { - return listStandardSyncOperationQuery(Optional.empty()) + return listStandardSyncOperationQuery(Optional.of(operationId)) .findFirst() .orElseThrow(() -> new ConfigNotFoundException(ConfigSchema.STANDARD_SYNC_OPERATION, operationId)); } diff --git a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/StandardSyncPersistenceTest.java b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/StandardSyncPersistenceTest.java index e33c8adc65ad..332429c88f59 100644 --- a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/StandardSyncPersistenceTest.java +++ b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/StandardSyncPersistenceTest.java @@ -341,7 +341,7 @@ private SourceConnection createSourceConnection(final UUID workspaceId, final St } private DestinationConnection createDestinationConnection(final UUID workspaceId, final StandardDestinationDefinition destDef) - throws JsonValidationException, IOException { + throws IOException { final UUID destinationId = UUID.randomUUID(); final DestinationConnection dest = new DestinationConnection() .withName("source-" + destinationId) diff --git a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/StatePersistenceTest.java b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/StatePersistenceTest.java index c6d4d10aa3f6..f18d20f4b7f3 100644 --- a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/StatePersistenceTest.java +++ b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/StatePersistenceTest.java @@ -39,7 +39,6 @@ class StatePersistenceTest extends BaseConfigDatabaseTest { - private ConfigRepository configRepository; private StatePersistence statePersistence; private UUID connectionId; private static final String STATE_ONE = "\"state1\""; @@ -58,7 +57,7 @@ void beforeEach() throws DatabaseInitializationException, IOException, JsonValid } private void setupTestData() throws JsonValidationException, IOException { - configRepository = new ConfigRepository( + final ConfigRepository configRepository = new ConfigRepository( database, new ActorDefinitionMigrator(new ExceptionWrappingDatabase(database)), new StandardSyncPersistence(database)); @@ -68,15 +67,14 @@ private void setupTestData() throws JsonValidationException, IOException { final SourceConnection sourceConnection = MockData.sourceConnections().get(0); final StandardDestinationDefinition destinationDefinition = MockData.publicDestinationDefinition(); final DestinationConnection destinationConnection = MockData.destinationConnections().get(0); - final StandardSync sync = MockData.standardSyncs().get(0); + // we don't need sync operations in this test suite, zero them out. + final StandardSync sync = Jsons.clone(MockData.standardSyncs().get(0)).withOperationIds(Collections.emptyList()); configRepository.writeStandardWorkspaceNoSecrets(workspace); configRepository.writeStandardSourceDefinition(sourceDefinition); configRepository.writeSourceConnectionNoSecrets(sourceConnection); configRepository.writeStandardDestinationDefinition(destinationDefinition); configRepository.writeDestinationConnectionNoSecrets(destinationConnection); - configRepository.writeStandardSyncOperation(MockData.standardSyncOperations().get(0)); - configRepository.writeStandardSyncOperation(MockData.standardSyncOperations().get(1)); configRepository.writeStandardSync(sync); connectionId = sync.getConnectionId(); @@ -239,7 +237,7 @@ void testGlobalPartialReset() throws IOException { .withType(AirbyteStateType.GLOBAL) .withGlobal(new AirbyteGlobalState() .withSharedState(Jsons.deserialize(GLOBAL_STATE)) - .withStreamStates(Arrays.asList( + .withStreamStates(List.of( new AirbyteStreamState() .withStreamDescriptor(new StreamDescriptor().withName("s1")) .withStreamState(Jsons.deserialize(STATE_TWO)))))); @@ -424,7 +422,7 @@ void testStreamPartialUpdates() throws IOException { assertEquals( new StateWrapper() .withStateType(StateType.STREAM) - .withStateMessages(Arrays.asList( + .withStateMessages(List.of( new AirbyteStateMessage() .withType(AirbyteStateType.STREAM) .withStream(new AirbyteStreamState() diff --git a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/SyncOperationPersistenceTest.java b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/SyncOperationPersistenceTest.java new file mode 100644 index 000000000000..8e293f3a52d6 --- /dev/null +++ b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/SyncOperationPersistenceTest.java @@ -0,0 +1,119 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.config.persistence; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import io.airbyte.config.Geography; +import io.airbyte.config.OperatorDbt; +import io.airbyte.config.OperatorNormalization; +import io.airbyte.config.OperatorNormalization.Option; +import io.airbyte.config.OperatorWebhook; +import io.airbyte.config.StandardSyncOperation; +import io.airbyte.config.StandardSyncOperation.OperatorType; +import io.airbyte.config.StandardWorkspace; +import io.airbyte.validation.json.JsonValidationException; +import java.io.IOException; +import java.util.List; +import java.util.UUID; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class SyncOperationPersistenceTest extends BaseConfigDatabaseTest { + + private static final UUID WORKSPACE_ID = UUID.randomUUID(); + private static final UUID WEBHOOK_CONFIG_ID = UUID.randomUUID(); + private static final String WEBHOOK_OPERATION_EXECUTION_URL = "test-webhook-url"; + private static final String WEBHOOK_OPERATION_EXECUTION_BODY = "test-webhook-body"; + + private ConfigRepository configRepository; + + private static final StandardSyncOperation DBT_OP = new StandardSyncOperation() + .withName("operation-1") + .withTombstone(false) + .withOperationId(UUID.randomUUID()) + .withWorkspaceId(WORKSPACE_ID) + .withOperatorDbt(new OperatorDbt() + .withDbtArguments("dbt-arguments") + .withDockerImage("image-tag") + .withGitRepoBranch("git-repo-branch") + .withGitRepoUrl("git-repo-url")) + .withOperatorNormalization(null) + .withOperatorType(OperatorType.DBT); + private static final StandardSyncOperation NORMALIZATION_OP = new StandardSyncOperation() + .withName("operation-1") + .withTombstone(false) + .withOperationId(UUID.randomUUID()) + .withWorkspaceId(WORKSPACE_ID) + .withOperatorDbt(null) + .withOperatorNormalization(new OperatorNormalization().withOption(Option.BASIC)) + .withOperatorType(OperatorType.NORMALIZATION); + private static final StandardSyncOperation WEBHOOK_OP = new StandardSyncOperation() + .withName("webhook-operation") + .withTombstone(false) + .withOperationId(UUID.randomUUID()) + .withWorkspaceId(WORKSPACE_ID) + .withOperatorType(OperatorType.WEBHOOK) + .withOperatorDbt(null) + .withOperatorNormalization(null) + .withOperatorWebhook( + new OperatorWebhook() + .withWebhookConfigId(WEBHOOK_CONFIG_ID) + .withExecutionUrl(WEBHOOK_OPERATION_EXECUTION_URL) + .withExecutionBody(WEBHOOK_OPERATION_EXECUTION_BODY)); + private static final List OPS = List.of(DBT_OP, NORMALIZATION_OP, WEBHOOK_OP); + + @BeforeEach + void beforeEach() throws Exception { + truncateAllTables(); + + configRepository = new ConfigRepository(database); + createWorkspace(); + + for (final StandardSyncOperation op : OPS) { + configRepository.writeStandardSyncOperation(op); + } + } + + @Test + void testReadWrite() throws IOException, ConfigNotFoundException, JsonValidationException { + for (final StandardSyncOperation op : OPS) { + assertEquals(op, configRepository.getStandardSyncOperation(op.getOperationId())); + } + } + + @Test + void testReadNotExists() { + assertThrows(ConfigNotFoundException.class, () -> configRepository.getStandardSyncOperation(UUID.randomUUID())); + } + + @Test + void testList() throws IOException, JsonValidationException { + assertEquals(OPS, configRepository.listStandardSyncOperations()); + } + + @Test + void testDelete() throws IOException, ConfigNotFoundException, JsonValidationException { + for (final StandardSyncOperation op : OPS) { + assertEquals(op, configRepository.getStandardSyncOperation(op.getOperationId())); + configRepository.deleteStandardSyncOperation(op.getOperationId()); + assertThrows(ConfigNotFoundException.class, () -> configRepository.getStandardSyncOperation(UUID.randomUUID())); + + } + } + + private void createWorkspace() throws IOException, JsonValidationException { + final StandardWorkspace workspace = new StandardWorkspace() + .withWorkspaceId(WORKSPACE_ID) + .withName("Another Workspace") + .withSlug("another-workspace") + .withInitialSetupComplete(true) + .withTombstone(false) + .withDefaultGeography(Geography.AUTO); + configRepository.writeStandardWorkspaceNoSecrets(workspace); + } + +} diff --git a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/WorkspacePersistenceTest.java b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/WorkspacePersistenceTest.java index d41e2edcc05b..8077d3fdcbc6 100644 --- a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/WorkspacePersistenceTest.java +++ b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/WorkspacePersistenceTest.java @@ -41,6 +41,12 @@ void setup() { null)); } + @Test + void testGetWorkspace() throws ConfigNotFoundException, IOException, JsonValidationException { + configRepository.writeStandardWorkspaceNoSecrets(createBaseStandardWorkspace().withWorkspaceId(UUID.randomUUID())); + assertReturnsWorkspace(createBaseStandardWorkspace()); + } + @Test void testWorkspaceWithNullTombstone() throws ConfigNotFoundException, IOException, JsonValidationException { assertReturnsWorkspace(createBaseStandardWorkspace());