Skip to content

Commit

Permalink
add workspace helper (#4868)
Browse files Browse the repository at this point in the history
* add workspace helper

* fmt

* switch to a fixed limit
  • Loading branch information
jrhizor authored and gl-pix committed Jul 22, 2021
1 parent 32a9e60 commit 768aad0
Show file tree
Hide file tree
Showing 2 changed files with 373 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.server.helpers;

import com.google.common.base.Preconditions;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import io.airbyte.api.model.ConnectionIdRequestBody;
import io.airbyte.api.model.ConnectionRead;
import io.airbyte.api.model.DestinationIdRequestBody;
import io.airbyte.api.model.DestinationRead;
import io.airbyte.api.model.SourceIdRequestBody;
import io.airbyte.api.model.SourceRead;
import io.airbyte.config.JobConfig;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.scheduler.models.Job;
import io.airbyte.scheduler.persistence.JobPersistence;
import io.airbyte.server.converters.SpecFetcher;
import io.airbyte.server.handlers.ConnectionsHandler;
import io.airbyte.server.handlers.DestinationHandler;
import io.airbyte.server.handlers.SourceHandler;
import io.airbyte.validation.json.JsonSchemaValidator;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import org.apache.commons.lang3.NotImplementedException;

public class WorkspaceHelper {

private final ConnectionsHandler connectionsHandler;
private final SourceHandler sourceHandler;
private final DestinationHandler destinationHandler;

private final LoadingCache<UUID, UUID> sourceToWorkspaceCache;
private final LoadingCache<UUID, UUID> destinationToWorkspaceCache;
private final LoadingCache<UUID, UUID> connectionToWorkspaceCache;
private final LoadingCache<Long, UUID> jobToWorkspaceCache;

public WorkspaceHelper(ConfigRepository configRepository,
JobPersistence jobPersistence,
JsonSchemaValidator jsonSchemaValidator,
SpecFetcher specFetcher) {
this.connectionsHandler = new ConnectionsHandler(configRepository);
this.sourceHandler = new SourceHandler(configRepository, jsonSchemaValidator, specFetcher, connectionsHandler);
this.destinationHandler = new DestinationHandler(configRepository, jsonSchemaValidator, specFetcher, connectionsHandler);

this.sourceToWorkspaceCache = getExpiringCache(new CacheLoader<>() {

@Override
public UUID load(UUID sourceId) throws JsonValidationException, ConfigNotFoundException, IOException {
final SourceRead source = sourceHandler.getSource(new SourceIdRequestBody().sourceId(sourceId));
return source.getWorkspaceId();
}

});

this.destinationToWorkspaceCache = getExpiringCache(new CacheLoader<>() {

@Override
public UUID load(UUID destinationId) throws JsonValidationException, ConfigNotFoundException, IOException {
final DestinationRead destination = destinationHandler.getDestination(new DestinationIdRequestBody().destinationId(destinationId));
return destination.getWorkspaceId();
}

});

this.connectionToWorkspaceCache = getExpiringCache(new CacheLoader<>() {

@Override
public UUID load(UUID connectionId) throws JsonValidationException, ConfigNotFoundException, IOException, ExecutionException {
final ConnectionRead connection = connectionsHandler.getConnection(new ConnectionIdRequestBody().connectionId(connectionId));
final UUID sourceId = connection.getSourceId();
final UUID destinationId = connection.getDestinationId();
return getWorkspaceForConnection(sourceId, destinationId);
}

});

this.jobToWorkspaceCache = getExpiringCache(new CacheLoader<>() {

@Override
public UUID load(Long jobId) throws IOException, ExecutionException {
final Job job = jobPersistence.getJob(jobId);
if (job.getConfigType() == JobConfig.ConfigType.SYNC || job.getConfigType() == JobConfig.ConfigType.RESET_CONNECTION) {
return getWorkspaceForConnectionId(UUID.fromString(job.getScope()));
} else {
throw new IllegalArgumentException("Only sync/reset jobs are associated with workspaces! A " + job.getConfigType() + " job was requested!");
}
}

});
}

public UUID getWorkspaceForSourceId(UUID sourceId) throws ExecutionException {
return sourceToWorkspaceCache.get(sourceId);
}

public UUID getWorkspaceForDestinationId(UUID destinationId) throws ExecutionException {
return destinationToWorkspaceCache.get(destinationId);
}

public UUID getWorkspaceForJobId(Long jobId) throws IOException, ExecutionException {
return jobToWorkspaceCache.get(jobId);
}

public UUID getWorkspaceForConnection(UUID sourceId, UUID destinationId) throws ExecutionException {
final UUID sourceWorkspace = getWorkspaceForSourceId(sourceId);
final UUID destinationWorkspace = getWorkspaceForDestinationId(destinationId);

Preconditions.checkArgument(Objects.equals(sourceWorkspace, destinationWorkspace), "Source and destination must be from the same workspace!");
return sourceWorkspace;
}

public UUID getWorkspaceForConnectionId(UUID connectionId) throws ExecutionException {
return connectionToWorkspaceCache.get(connectionId);
}

public UUID getWorkspaceForOperationId(UUID operationId) {
throw new NotImplementedException();
}

private static <K, V> LoadingCache<K, V> getExpiringCache(CacheLoader<K, V> cacheLoader) {
return CacheBuilder.newBuilder()
.maximumSize(20000)
.build(cacheLoader);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.server.helpers;

import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import com.google.common.util.concurrent.UncheckedExecutionException;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.DestinationConnection;
import io.airbyte.config.JobConfig;
import io.airbyte.config.JobSyncConfig;
import io.airbyte.config.SourceConnection;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.StandardSync;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.config.persistence.FileSystemConfigPersistence;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.scheduler.models.Job;
import io.airbyte.scheduler.models.JobStatus;
import io.airbyte.scheduler.persistence.JobPersistence;
import io.airbyte.server.converters.SpecFetcher;
import io.airbyte.validation.json.JsonSchemaValidator;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class WorkspaceHelperTest {

Path tmpDir;
ConfigRepository configRepository;
JobPersistence jobPersistence;
WorkspaceHelper workspaceHelper;

@BeforeEach
public void setup() throws IOException {
tmpDir = Files.createTempDirectory("workspace_helper_test_" + RandomStringUtils.randomAlphabetic(5));

configRepository = new ConfigRepository(new FileSystemConfigPersistence(tmpDir));
jobPersistence = mock(JobPersistence.class);

SpecFetcher specFetcher = mock(SpecFetcher.class);
when(specFetcher.execute(any())).thenReturn(new ConnectorSpecification().withConnectionSpecification(Jsons.deserialize("{}")));

workspaceHelper = new WorkspaceHelper(configRepository, jobPersistence, new JsonSchemaValidator(), specFetcher);
}

@Test
public void testObjectsThatDoNotExist() {
assertThrows(ExecutionException.class, () -> workspaceHelper.getWorkspaceForSourceId(UUID.randomUUID()));
assertThrows(ExecutionException.class, () -> workspaceHelper.getWorkspaceForDestinationId(UUID.randomUUID()));
assertThrows(ExecutionException.class, () -> workspaceHelper.getWorkspaceForConnectionId(UUID.randomUUID()));
assertThrows(ExecutionException.class, () -> workspaceHelper.getWorkspaceForConnection(UUID.randomUUID(), UUID.randomUUID()));
assertThrows(UncheckedExecutionException.class, () -> workspaceHelper.getWorkspaceForJobId(0L));
// todo: add operationId check
}

@Test
public void testSource() throws IOException, ExecutionException, JsonValidationException {
UUID source = UUID.randomUUID();
UUID workspace = UUID.randomUUID();

UUID sourceDefinition = UUID.randomUUID();
configRepository.writeStandardSource(new StandardSourceDefinition().withSourceDefinitionId(sourceDefinition));

SourceConnection sourceConnection = new SourceConnection()
.withSourceId(source)
.withSourceDefinitionId(sourceDefinition)
.withWorkspaceId(workspace)
.withConfiguration(Jsons.deserialize("{}"))
.withName("source")
.withTombstone(false);

configRepository.writeSourceConnection(sourceConnection);
UUID retrievedWorkspace = workspaceHelper.getWorkspaceForSourceId(source);

assertEquals(workspace, retrievedWorkspace);

// check that caching is working
configRepository.writeSourceConnection(sourceConnection.withWorkspaceId(UUID.randomUUID()));
UUID retrievedWorkspaceAfterUpdate = workspaceHelper.getWorkspaceForSourceId(source);
assertEquals(workspace, retrievedWorkspaceAfterUpdate);
}

@Test
public void testDestination() throws IOException, ExecutionException, JsonValidationException {
UUID destination = UUID.randomUUID();
UUID workspace = UUID.randomUUID();

UUID destinationDefinition = UUID.randomUUID();
configRepository.writeStandardDestinationDefinition(new StandardDestinationDefinition().withDestinationDefinitionId(destinationDefinition));

DestinationConnection destinationConnection = new DestinationConnection()
.withDestinationId(destination)
.withDestinationDefinitionId(destinationDefinition)
.withWorkspaceId(workspace)
.withConfiguration(Jsons.deserialize("{}"))
.withName("dest")
.withTombstone(false);

configRepository.writeDestinationConnection(destinationConnection);
UUID retrievedWorkspace = workspaceHelper.getWorkspaceForDestinationId(destination);

assertEquals(workspace, retrievedWorkspace);

// check that caching is working
configRepository.writeDestinationConnection(destinationConnection.withWorkspaceId(UUID.randomUUID()));
UUID retrievedWorkspaceAfterUpdate = workspaceHelper.getWorkspaceForDestinationId(destination);
assertEquals(workspace, retrievedWorkspaceAfterUpdate);
}

@Test
public void testConnectionAndJobs() throws IOException, ExecutionException, JsonValidationException {
UUID workspace = UUID.randomUUID();

// set up source
UUID source = UUID.randomUUID();

UUID sourceDefinition = UUID.randomUUID();
configRepository.writeStandardSource(new StandardSourceDefinition().withSourceDefinitionId(sourceDefinition));

SourceConnection sourceConnection = new SourceConnection()
.withSourceId(source)
.withSourceDefinitionId(sourceDefinition)
.withWorkspaceId(workspace)
.withConfiguration(Jsons.deserialize("{}"))
.withName("source")
.withTombstone(false);

configRepository.writeSourceConnection(sourceConnection);

// set up destination
UUID destination = UUID.randomUUID();

UUID destinationDefinition = UUID.randomUUID();
configRepository.writeStandardDestinationDefinition(new StandardDestinationDefinition().withDestinationDefinitionId(destinationDefinition));

DestinationConnection destinationConnection = new DestinationConnection()
.withDestinationId(destination)
.withDestinationDefinitionId(destinationDefinition)
.withWorkspaceId(workspace)
.withConfiguration(Jsons.deserialize("{}"))
.withName("dest")
.withTombstone(false);

configRepository.writeDestinationConnection(destinationConnection);

// set up connection
UUID connection = UUID.randomUUID();
configRepository.writeStandardSync(new StandardSync().withManual(true).withConnectionId(connection).withSourceId(source)
.withDestinationId(destination).withCatalog(new ConfiguredAirbyteCatalog().withStreams(new ArrayList<>())));

// test retrieving by connection id
UUID retrievedWorkspace = workspaceHelper.getWorkspaceForConnectionId(connection);
assertEquals(workspace, retrievedWorkspace);

// test retrieving by source and destination ids
UUID retrievedWorkspaceBySourceAndDestination = workspaceHelper.getWorkspaceForConnectionId(connection);
assertEquals(workspace, retrievedWorkspaceBySourceAndDestination);

// check that caching is working
UUID newWorkspace = UUID.randomUUID();
configRepository.writeSourceConnection(sourceConnection.withWorkspaceId(newWorkspace));
configRepository.writeDestinationConnection(destinationConnection.withWorkspaceId(newWorkspace));
UUID retrievedWorkspaceAfterUpdate = workspaceHelper.getWorkspaceForDestinationId(destination);
assertEquals(workspace, retrievedWorkspaceAfterUpdate);

// test jobs
long jobId = 123;
Job job = new Job(
jobId,
JobConfig.ConfigType.SYNC,
connection.toString(),
new JobConfig().withConfigType(JobConfig.ConfigType.SYNC).withSync(new JobSyncConfig()),
new ArrayList<>(),
JobStatus.PENDING,
System.currentTimeMillis(),
System.currentTimeMillis(),
System.currentTimeMillis());
when(jobPersistence.getJob(jobId)).thenReturn(job);

UUID jobWorkspace = workspaceHelper.getWorkspaceForJobId(jobId);
assertEquals(workspace, jobWorkspace);
}

}

0 comments on commit 768aad0

Please sign in to comment.