Skip to content

Commit

Permalink
bug: add workspace_id to query (#20011)
Browse files Browse the repository at this point in the history
  • Loading branch information
cgardens authored Dec 2, 2022
1 parent 6dee382 commit ef13c3e
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public boolean healthCheck() {

public StandardWorkspace getStandardWorkspaceNoSecrets(final UUID workspaceId, final boolean includeTombstone)
throws JsonValidationException, IOException, ConfigNotFoundException {
return listWorkspaceQuery(includeTombstone)
return listWorkspaceQuery(Optional.of(workspaceId), includeTombstone)
.findFirst()
.orElseThrow(() -> new ConfigNotFoundException(ConfigSchema.STANDARD_WORKSPACE, workspaceId));
}
Expand All @@ -158,13 +158,14 @@ public StandardWorkspace getWorkspaceBySlug(final String slug, final boolean inc
}

public List<StandardWorkspace> listStandardWorkspaces(final boolean includeTombstone) throws IOException {
return listWorkspaceQuery(includeTombstone).toList();
return listWorkspaceQuery(Optional.empty(), includeTombstone).toList();
}

private Stream<StandardWorkspace> listWorkspaceQuery(final boolean includeTombstone) throws IOException {
private Stream<StandardWorkspace> listWorkspaceQuery(final Optional<UUID> workspaceId, final boolean includeTombstone) throws IOException {
return database.query(ctx -> ctx.select(WORKSPACE.asterisk())
.from(WORKSPACE)
.where(includeTombstone ? noCondition() : WORKSPACE.TOMBSTONE.notEqual(true))
.and(workspaceId.map(WORKSPACE.ID::eq).orElse(noCondition()))
.fetch())
.stream()
.map(DbConverter::buildStandardWorkspace);
Expand Down Expand Up @@ -907,7 +908,7 @@ private static StandardSyncOperation buildStandardSyncOperation(final Record rec
}

public StandardSyncOperation getStandardSyncOperation(final UUID operationId) throws JsonValidationException, IOException, ConfigNotFoundException {
return listStandardSyncOperationQuery(Optional.empty())
return listStandardSyncOperationQuery(Optional.of(operationId))
.findFirst()
.orElseThrow(() -> new ConfigNotFoundException(ConfigSchema.STANDARD_SYNC_OPERATION, operationId));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ private SourceConnection createSourceConnection(final UUID workspaceId, final St
}

private DestinationConnection createDestinationConnection(final UUID workspaceId, final StandardDestinationDefinition destDef)
throws JsonValidationException, IOException {
throws IOException {
final UUID destinationId = UUID.randomUUID();
final DestinationConnection dest = new DestinationConnection()
.withName("source-" + destinationId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@

class StatePersistenceTest extends BaseConfigDatabaseTest {

private ConfigRepository configRepository;
private StatePersistence statePersistence;
private UUID connectionId;
private static final String STATE_ONE = "\"state1\"";
Expand All @@ -58,7 +57,7 @@ void beforeEach() throws DatabaseInitializationException, IOException, JsonValid
}

private void setupTestData() throws JsonValidationException, IOException {
configRepository = new ConfigRepository(
final ConfigRepository configRepository = new ConfigRepository(
database,
new ActorDefinitionMigrator(new ExceptionWrappingDatabase(database)),
new StandardSyncPersistence(database));
Expand All @@ -68,15 +67,14 @@ private void setupTestData() throws JsonValidationException, IOException {
final SourceConnection sourceConnection = MockData.sourceConnections().get(0);
final StandardDestinationDefinition destinationDefinition = MockData.publicDestinationDefinition();
final DestinationConnection destinationConnection = MockData.destinationConnections().get(0);
final StandardSync sync = MockData.standardSyncs().get(0);
// we don't need sync operations in this test suite, zero them out.
final StandardSync sync = Jsons.clone(MockData.standardSyncs().get(0)).withOperationIds(Collections.emptyList());

configRepository.writeStandardWorkspaceNoSecrets(workspace);
configRepository.writeStandardSourceDefinition(sourceDefinition);
configRepository.writeSourceConnectionNoSecrets(sourceConnection);
configRepository.writeStandardDestinationDefinition(destinationDefinition);
configRepository.writeDestinationConnectionNoSecrets(destinationConnection);
configRepository.writeStandardSyncOperation(MockData.standardSyncOperations().get(0));
configRepository.writeStandardSyncOperation(MockData.standardSyncOperations().get(1));
configRepository.writeStandardSync(sync);

connectionId = sync.getConnectionId();
Expand Down Expand Up @@ -239,7 +237,7 @@ void testGlobalPartialReset() throws IOException {
.withType(AirbyteStateType.GLOBAL)
.withGlobal(new AirbyteGlobalState()
.withSharedState(Jsons.deserialize(GLOBAL_STATE))
.withStreamStates(Arrays.asList(
.withStreamStates(List.of(
new AirbyteStreamState()
.withStreamDescriptor(new StreamDescriptor().withName("s1"))
.withStreamState(Jsons.deserialize(STATE_TWO))))));
Expand Down Expand Up @@ -424,7 +422,7 @@ void testStreamPartialUpdates() throws IOException {
assertEquals(
new StateWrapper()
.withStateType(StateType.STREAM)
.withStateMessages(Arrays.asList(
.withStateMessages(List.of(
new AirbyteStateMessage()
.withType(AirbyteStateType.STREAM)
.withStream(new AirbyteStreamState()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.config.persistence;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

import io.airbyte.config.Geography;
import io.airbyte.config.OperatorDbt;
import io.airbyte.config.OperatorNormalization;
import io.airbyte.config.OperatorNormalization.Option;
import io.airbyte.config.OperatorWebhook;
import io.airbyte.config.StandardSyncOperation;
import io.airbyte.config.StandardSyncOperation.OperatorType;
import io.airbyte.config.StandardWorkspace;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
import java.util.List;
import java.util.UUID;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class SyncOperationPersistenceTest extends BaseConfigDatabaseTest {

private static final UUID WORKSPACE_ID = UUID.randomUUID();
private static final UUID WEBHOOK_CONFIG_ID = UUID.randomUUID();
private static final String WEBHOOK_OPERATION_EXECUTION_URL = "test-webhook-url";
private static final String WEBHOOK_OPERATION_EXECUTION_BODY = "test-webhook-body";

private ConfigRepository configRepository;

private static final StandardSyncOperation DBT_OP = new StandardSyncOperation()
.withName("operation-1")
.withTombstone(false)
.withOperationId(UUID.randomUUID())
.withWorkspaceId(WORKSPACE_ID)
.withOperatorDbt(new OperatorDbt()
.withDbtArguments("dbt-arguments")
.withDockerImage("image-tag")
.withGitRepoBranch("git-repo-branch")
.withGitRepoUrl("git-repo-url"))
.withOperatorNormalization(null)
.withOperatorType(OperatorType.DBT);
private static final StandardSyncOperation NORMALIZATION_OP = new StandardSyncOperation()
.withName("operation-1")
.withTombstone(false)
.withOperationId(UUID.randomUUID())
.withWorkspaceId(WORKSPACE_ID)
.withOperatorDbt(null)
.withOperatorNormalization(new OperatorNormalization().withOption(Option.BASIC))
.withOperatorType(OperatorType.NORMALIZATION);
private static final StandardSyncOperation WEBHOOK_OP = new StandardSyncOperation()
.withName("webhook-operation")
.withTombstone(false)
.withOperationId(UUID.randomUUID())
.withWorkspaceId(WORKSPACE_ID)
.withOperatorType(OperatorType.WEBHOOK)
.withOperatorDbt(null)
.withOperatorNormalization(null)
.withOperatorWebhook(
new OperatorWebhook()
.withWebhookConfigId(WEBHOOK_CONFIG_ID)
.withExecutionUrl(WEBHOOK_OPERATION_EXECUTION_URL)
.withExecutionBody(WEBHOOK_OPERATION_EXECUTION_BODY));
private static final List<StandardSyncOperation> OPS = List.of(DBT_OP, NORMALIZATION_OP, WEBHOOK_OP);

@BeforeEach
void beforeEach() throws Exception {
truncateAllTables();

configRepository = new ConfigRepository(database);
createWorkspace();

for (final StandardSyncOperation op : OPS) {
configRepository.writeStandardSyncOperation(op);
}
}

@Test
void testReadWrite() throws IOException, ConfigNotFoundException, JsonValidationException {
for (final StandardSyncOperation op : OPS) {
assertEquals(op, configRepository.getStandardSyncOperation(op.getOperationId()));
}
}

@Test
void testReadNotExists() {
assertThrows(ConfigNotFoundException.class, () -> configRepository.getStandardSyncOperation(UUID.randomUUID()));
}

@Test
void testList() throws IOException, JsonValidationException {
assertEquals(OPS, configRepository.listStandardSyncOperations());
}

@Test
void testDelete() throws IOException, ConfigNotFoundException, JsonValidationException {
for (final StandardSyncOperation op : OPS) {
assertEquals(op, configRepository.getStandardSyncOperation(op.getOperationId()));
configRepository.deleteStandardSyncOperation(op.getOperationId());
assertThrows(ConfigNotFoundException.class, () -> configRepository.getStandardSyncOperation(UUID.randomUUID()));

}
}

private void createWorkspace() throws IOException, JsonValidationException {
final StandardWorkspace workspace = new StandardWorkspace()
.withWorkspaceId(WORKSPACE_ID)
.withName("Another Workspace")
.withSlug("another-workspace")
.withInitialSetupComplete(true)
.withTombstone(false)
.withDefaultGeography(Geography.AUTO);
configRepository.writeStandardWorkspaceNoSecrets(workspace);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ void setup() {
null));
}

@Test
void testGetWorkspace() throws ConfigNotFoundException, IOException, JsonValidationException {
configRepository.writeStandardWorkspaceNoSecrets(createBaseStandardWorkspace().withWorkspaceId(UUID.randomUUID()));
assertReturnsWorkspace(createBaseStandardWorkspace());
}

@Test
void testWorkspaceWithNullTombstone() throws ConfigNotFoundException, IOException, JsonValidationException {
assertReturnsWorkspace(createBaseStandardWorkspace());
Expand Down

0 comments on commit ef13c3e

Please sign in to comment.