diff --git a/airbyte-server/src/main/java/io/airbyte/server/helpers/WorkspaceHelper.java b/airbyte-server/src/main/java/io/airbyte/server/helpers/WorkspaceHelper.java new file mode 100644 index 000000000000..19cf1b34ba7a --- /dev/null +++ b/airbyte-server/src/main/java/io/airbyte/server/helpers/WorkspaceHelper.java @@ -0,0 +1,154 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.server.helpers; + +import com.google.common.base.Preconditions; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import io.airbyte.api.model.ConnectionIdRequestBody; +import io.airbyte.api.model.ConnectionRead; +import io.airbyte.api.model.DestinationIdRequestBody; +import io.airbyte.api.model.DestinationRead; +import io.airbyte.api.model.SourceIdRequestBody; +import io.airbyte.api.model.SourceRead; +import io.airbyte.config.JobConfig; +import io.airbyte.config.persistence.ConfigNotFoundException; +import io.airbyte.config.persistence.ConfigRepository; +import io.airbyte.scheduler.models.Job; +import io.airbyte.scheduler.persistence.JobPersistence; +import io.airbyte.server.converters.SpecFetcher; +import io.airbyte.server.handlers.ConnectionsHandler; +import io.airbyte.server.handlers.DestinationHandler; +import io.airbyte.server.handlers.SourceHandler; +import io.airbyte.validation.json.JsonSchemaValidator; +import io.airbyte.validation.json.JsonValidationException; +import java.io.IOException; +import java.util.Objects; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import org.apache.commons.lang3.NotImplementedException; + +public class WorkspaceHelper { + + private final ConnectionsHandler connectionsHandler; + private final SourceHandler sourceHandler; + private final DestinationHandler destinationHandler; + + private final LoadingCache sourceToWorkspaceCache; + private final LoadingCache destinationToWorkspaceCache; + private final LoadingCache connectionToWorkspaceCache; + private final LoadingCache jobToWorkspaceCache; + + public WorkspaceHelper(ConfigRepository configRepository, + JobPersistence jobPersistence, + JsonSchemaValidator jsonSchemaValidator, + SpecFetcher specFetcher) { + this.connectionsHandler = new ConnectionsHandler(configRepository); + this.sourceHandler = new SourceHandler(configRepository, jsonSchemaValidator, specFetcher, connectionsHandler); + this.destinationHandler = new DestinationHandler(configRepository, jsonSchemaValidator, specFetcher, connectionsHandler); + + this.sourceToWorkspaceCache = getExpiringCache(new CacheLoader<>() { + + @Override + public UUID load(UUID sourceId) throws JsonValidationException, ConfigNotFoundException, IOException { + final SourceRead source = sourceHandler.getSource(new SourceIdRequestBody().sourceId(sourceId)); + return source.getWorkspaceId(); + } + + }); + + this.destinationToWorkspaceCache = getExpiringCache(new CacheLoader<>() { + + @Override + public UUID load(UUID destinationId) throws JsonValidationException, ConfigNotFoundException, IOException { + final DestinationRead destination = destinationHandler.getDestination(new DestinationIdRequestBody().destinationId(destinationId)); + return destination.getWorkspaceId(); + } + + }); + + this.connectionToWorkspaceCache = getExpiringCache(new CacheLoader<>() { + + @Override + public UUID load(UUID connectionId) throws JsonValidationException, ConfigNotFoundException, IOException, ExecutionException { + final ConnectionRead connection = connectionsHandler.getConnection(new ConnectionIdRequestBody().connectionId(connectionId)); + final UUID sourceId = connection.getSourceId(); + final UUID destinationId = connection.getDestinationId(); + return getWorkspaceForConnection(sourceId, destinationId); + } + + }); + + this.jobToWorkspaceCache = getExpiringCache(new CacheLoader<>() { + + @Override + public UUID load(Long jobId) throws IOException, ExecutionException { + final Job job = jobPersistence.getJob(jobId); + if (job.getConfigType() == JobConfig.ConfigType.SYNC || job.getConfigType() == JobConfig.ConfigType.RESET_CONNECTION) { + return getWorkspaceForConnectionId(UUID.fromString(job.getScope())); + } else { + throw new IllegalArgumentException("Only sync/reset jobs are associated with workspaces! A " + job.getConfigType() + " job was requested!"); + } + } + + }); + } + + public UUID getWorkspaceForSourceId(UUID sourceId) throws ExecutionException { + return sourceToWorkspaceCache.get(sourceId); + } + + public UUID getWorkspaceForDestinationId(UUID destinationId) throws ExecutionException { + return destinationToWorkspaceCache.get(destinationId); + } + + public UUID getWorkspaceForJobId(Long jobId) throws IOException, ExecutionException { + return jobToWorkspaceCache.get(jobId); + } + + public UUID getWorkspaceForConnection(UUID sourceId, UUID destinationId) throws ExecutionException { + final UUID sourceWorkspace = getWorkspaceForSourceId(sourceId); + final UUID destinationWorkspace = getWorkspaceForDestinationId(destinationId); + + Preconditions.checkArgument(Objects.equals(sourceWorkspace, destinationWorkspace), "Source and destination must be from the same workspace!"); + return sourceWorkspace; + } + + public UUID getWorkspaceForConnectionId(UUID connectionId) throws ExecutionException { + return connectionToWorkspaceCache.get(connectionId); + } + + public UUID getWorkspaceForOperationId(UUID operationId) { + throw new NotImplementedException(); + } + + private static LoadingCache getExpiringCache(CacheLoader cacheLoader) { + return CacheBuilder.newBuilder() + .maximumSize(20000) + .build(cacheLoader); + } + +} diff --git a/airbyte-server/src/test/java/io/airbyte/server/helpers/WorkspaceHelperTest.java b/airbyte-server/src/test/java/io/airbyte/server/helpers/WorkspaceHelperTest.java new file mode 100644 index 000000000000..a2d716f6d91d --- /dev/null +++ b/airbyte-server/src/test/java/io/airbyte/server/helpers/WorkspaceHelperTest.java @@ -0,0 +1,219 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.server.helpers; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.google.common.util.concurrent.UncheckedExecutionException; +import io.airbyte.commons.json.Jsons; +import io.airbyte.config.DestinationConnection; +import io.airbyte.config.JobConfig; +import io.airbyte.config.JobSyncConfig; +import io.airbyte.config.SourceConnection; +import io.airbyte.config.StandardDestinationDefinition; +import io.airbyte.config.StandardSourceDefinition; +import io.airbyte.config.StandardSync; +import io.airbyte.config.persistence.ConfigRepository; +import io.airbyte.config.persistence.FileSystemConfigPersistence; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.ConnectorSpecification; +import io.airbyte.scheduler.models.Job; +import io.airbyte.scheduler.models.JobStatus; +import io.airbyte.scheduler.persistence.JobPersistence; +import io.airbyte.server.converters.SpecFetcher; +import io.airbyte.validation.json.JsonSchemaValidator; +import io.airbyte.validation.json.JsonValidationException; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import org.apache.commons.lang3.RandomStringUtils; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class WorkspaceHelperTest { + + Path tmpDir; + ConfigRepository configRepository; + JobPersistence jobPersistence; + WorkspaceHelper workspaceHelper; + + @BeforeEach + public void setup() throws IOException { + tmpDir = Files.createTempDirectory("workspace_helper_test_" + RandomStringUtils.randomAlphabetic(5)); + + configRepository = new ConfigRepository(new FileSystemConfigPersistence(tmpDir)); + jobPersistence = mock(JobPersistence.class); + + SpecFetcher specFetcher = mock(SpecFetcher.class); + when(specFetcher.execute(any())).thenReturn(new ConnectorSpecification().withConnectionSpecification(Jsons.deserialize("{}"))); + + workspaceHelper = new WorkspaceHelper(configRepository, jobPersistence, new JsonSchemaValidator(), specFetcher); + } + + @Test + public void testObjectsThatDoNotExist() { + assertThrows(ExecutionException.class, () -> workspaceHelper.getWorkspaceForSourceId(UUID.randomUUID())); + assertThrows(ExecutionException.class, () -> workspaceHelper.getWorkspaceForDestinationId(UUID.randomUUID())); + assertThrows(ExecutionException.class, () -> workspaceHelper.getWorkspaceForConnectionId(UUID.randomUUID())); + assertThrows(ExecutionException.class, () -> workspaceHelper.getWorkspaceForConnection(UUID.randomUUID(), UUID.randomUUID())); + assertThrows(UncheckedExecutionException.class, () -> workspaceHelper.getWorkspaceForJobId(0L)); + // todo: add operationId check + } + + @Test + public void testSource() throws IOException, ExecutionException, JsonValidationException { + UUID source = UUID.randomUUID(); + UUID workspace = UUID.randomUUID(); + + UUID sourceDefinition = UUID.randomUUID(); + configRepository.writeStandardSource(new StandardSourceDefinition().withSourceDefinitionId(sourceDefinition)); + + SourceConnection sourceConnection = new SourceConnection() + .withSourceId(source) + .withSourceDefinitionId(sourceDefinition) + .withWorkspaceId(workspace) + .withConfiguration(Jsons.deserialize("{}")) + .withName("source") + .withTombstone(false); + + configRepository.writeSourceConnection(sourceConnection); + UUID retrievedWorkspace = workspaceHelper.getWorkspaceForSourceId(source); + + assertEquals(workspace, retrievedWorkspace); + + // check that caching is working + configRepository.writeSourceConnection(sourceConnection.withWorkspaceId(UUID.randomUUID())); + UUID retrievedWorkspaceAfterUpdate = workspaceHelper.getWorkspaceForSourceId(source); + assertEquals(workspace, retrievedWorkspaceAfterUpdate); + } + + @Test + public void testDestination() throws IOException, ExecutionException, JsonValidationException { + UUID destination = UUID.randomUUID(); + UUID workspace = UUID.randomUUID(); + + UUID destinationDefinition = UUID.randomUUID(); + configRepository.writeStandardDestinationDefinition(new StandardDestinationDefinition().withDestinationDefinitionId(destinationDefinition)); + + DestinationConnection destinationConnection = new DestinationConnection() + .withDestinationId(destination) + .withDestinationDefinitionId(destinationDefinition) + .withWorkspaceId(workspace) + .withConfiguration(Jsons.deserialize("{}")) + .withName("dest") + .withTombstone(false); + + configRepository.writeDestinationConnection(destinationConnection); + UUID retrievedWorkspace = workspaceHelper.getWorkspaceForDestinationId(destination); + + assertEquals(workspace, retrievedWorkspace); + + // check that caching is working + configRepository.writeDestinationConnection(destinationConnection.withWorkspaceId(UUID.randomUUID())); + UUID retrievedWorkspaceAfterUpdate = workspaceHelper.getWorkspaceForDestinationId(destination); + assertEquals(workspace, retrievedWorkspaceAfterUpdate); + } + + @Test + public void testConnectionAndJobs() throws IOException, ExecutionException, JsonValidationException { + UUID workspace = UUID.randomUUID(); + + // set up source + UUID source = UUID.randomUUID(); + + UUID sourceDefinition = UUID.randomUUID(); + configRepository.writeStandardSource(new StandardSourceDefinition().withSourceDefinitionId(sourceDefinition)); + + SourceConnection sourceConnection = new SourceConnection() + .withSourceId(source) + .withSourceDefinitionId(sourceDefinition) + .withWorkspaceId(workspace) + .withConfiguration(Jsons.deserialize("{}")) + .withName("source") + .withTombstone(false); + + configRepository.writeSourceConnection(sourceConnection); + + // set up destination + UUID destination = UUID.randomUUID(); + + UUID destinationDefinition = UUID.randomUUID(); + configRepository.writeStandardDestinationDefinition(new StandardDestinationDefinition().withDestinationDefinitionId(destinationDefinition)); + + DestinationConnection destinationConnection = new DestinationConnection() + .withDestinationId(destination) + .withDestinationDefinitionId(destinationDefinition) + .withWorkspaceId(workspace) + .withConfiguration(Jsons.deserialize("{}")) + .withName("dest") + .withTombstone(false); + + configRepository.writeDestinationConnection(destinationConnection); + + // set up connection + UUID connection = UUID.randomUUID(); + configRepository.writeStandardSync(new StandardSync().withManual(true).withConnectionId(connection).withSourceId(source) + .withDestinationId(destination).withCatalog(new ConfiguredAirbyteCatalog().withStreams(new ArrayList<>()))); + + // test retrieving by connection id + UUID retrievedWorkspace = workspaceHelper.getWorkspaceForConnectionId(connection); + assertEquals(workspace, retrievedWorkspace); + + // test retrieving by source and destination ids + UUID retrievedWorkspaceBySourceAndDestination = workspaceHelper.getWorkspaceForConnectionId(connection); + assertEquals(workspace, retrievedWorkspaceBySourceAndDestination); + + // check that caching is working + UUID newWorkspace = UUID.randomUUID(); + configRepository.writeSourceConnection(sourceConnection.withWorkspaceId(newWorkspace)); + configRepository.writeDestinationConnection(destinationConnection.withWorkspaceId(newWorkspace)); + UUID retrievedWorkspaceAfterUpdate = workspaceHelper.getWorkspaceForDestinationId(destination); + assertEquals(workspace, retrievedWorkspaceAfterUpdate); + + // test jobs + long jobId = 123; + Job job = new Job( + jobId, + JobConfig.ConfigType.SYNC, + connection.toString(), + new JobConfig().withConfigType(JobConfig.ConfigType.SYNC).withSync(new JobSyncConfig()), + new ArrayList<>(), + JobStatus.PENDING, + System.currentTimeMillis(), + System.currentTimeMillis(), + System.currentTimeMillis()); + when(jobPersistence.getJob(jobId)).thenReturn(job); + + UUID jobWorkspace = workspaceHelper.getWorkspaceForJobId(jobId); + assertEquals(workspace, jobWorkspace); + } + +}