Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add workspace helper #4868

Merged
merged 4 commits into from
Jul 20, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.server.helpers;

import com.google.common.base.Preconditions;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import io.airbyte.api.model.ConnectionIdRequestBody;
import io.airbyte.api.model.ConnectionRead;
import io.airbyte.api.model.DestinationIdRequestBody;
import io.airbyte.api.model.DestinationRead;
import io.airbyte.api.model.SourceIdRequestBody;
import io.airbyte.api.model.SourceRead;
import io.airbyte.config.JobConfig;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.scheduler.models.Job;
import io.airbyte.scheduler.persistence.JobPersistence;
import io.airbyte.server.converters.SpecFetcher;
import io.airbyte.server.handlers.ConnectionsHandler;
import io.airbyte.server.handlers.DestinationHandler;
import io.airbyte.server.handlers.SourceHandler;
import io.airbyte.validation.json.JsonSchemaValidator;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.NotImplementedException;

public class WorkspaceHelper {

private final ConnectionsHandler connectionsHandler;
private final SourceHandler sourceHandler;
private final DestinationHandler destinationHandler;

private final LoadingCache<UUID, UUID> sourceToWorkspaceCache;
private final LoadingCache<UUID, UUID> destinationToWorkspaceCache;
private final LoadingCache<UUID, UUID> connectionToWorkspaceCache;
private final LoadingCache<Long, UUID> jobToWorkspaceCache;

public WorkspaceHelper(ConfigRepository configRepository,
JobPersistence jobPersistence,
JsonSchemaValidator jsonSchemaValidator,
SpecFetcher specFetcher) {
this.connectionsHandler = new ConnectionsHandler(configRepository);
this.sourceHandler = new SourceHandler(configRepository, jsonSchemaValidator, specFetcher, connectionsHandler);
this.destinationHandler = new DestinationHandler(configRepository, jsonSchemaValidator, specFetcher, connectionsHandler);

this.sourceToWorkspaceCache = getExpiringCache(new CacheLoader<>() {

@Override
public UUID load(UUID sourceId) throws JsonValidationException, ConfigNotFoundException, IOException {
final SourceRead source = sourceHandler.getSource(new SourceIdRequestBody().sourceId(sourceId));
return source.getWorkspaceId();
}

});

this.destinationToWorkspaceCache = getExpiringCache(new CacheLoader<>() {

@Override
public UUID load(UUID destinationId) throws JsonValidationException, ConfigNotFoundException, IOException {
final DestinationRead destination = destinationHandler.getDestination(new DestinationIdRequestBody().destinationId(destinationId));
return destination.getWorkspaceId();
}

});

this.connectionToWorkspaceCache = getExpiringCache(new CacheLoader<>() {

@Override
public UUID load(UUID connectionId) throws JsonValidationException, ConfigNotFoundException, IOException, ExecutionException {
final ConnectionRead connection = connectionsHandler.getConnection(new ConnectionIdRequestBody().connectionId(connectionId));
final UUID sourceId = connection.getSourceId();
final UUID destinationId = connection.getDestinationId();
return getWorkspaceForConnection(sourceId, destinationId);
}

});

this.jobToWorkspaceCache = getExpiringCache(new CacheLoader<>() {

@Override
public UUID load(Long jobId) throws IOException, ExecutionException {
final Job job = jobPersistence.getJob(jobId);
if (job.getConfigType() == JobConfig.ConfigType.SYNC || job.getConfigType() == JobConfig.ConfigType.RESET_CONNECTION) {
return getWorkspaceForConnectionId(UUID.fromString(job.getScope()));
} else {
throw new IllegalArgumentException("Only sync/reset jobs are associated with workspaces! A " + job.getConfigType() + " job was requested!");
}
}

});
}

public UUID getWorkspaceForSourceId(UUID sourceId) throws ExecutionException {
return sourceToWorkspaceCache.get(sourceId);
}

public UUID getWorkspaceForDestinationId(UUID destinationId) throws ExecutionException {
return destinationToWorkspaceCache.get(destinationId);
}

public UUID getWorkspaceForJobId(Long jobId) throws IOException, ExecutionException {
return jobToWorkspaceCache.get(jobId);
}

public UUID getWorkspaceForConnection(UUID sourceId, UUID destinationId) throws ExecutionException {
final UUID sourceWorkspace = getWorkspaceForSourceId(sourceId);
final UUID destinationWorkspace = getWorkspaceForDestinationId(destinationId);

Preconditions.checkArgument(Objects.equals(sourceWorkspace, destinationWorkspace), "Source and destination must be from the same workspace!");
return sourceWorkspace;
}

public UUID getWorkspaceForConnectionId(UUID connectionId) throws ExecutionException {
return connectionToWorkspaceCache.get(connectionId);
}

public UUID getWorkspaceForOperationId(UUID operationId) {
throw new NotImplementedException();
}

private static <K, V> LoadingCache<K, V> getExpiringCache(CacheLoader<K, V> cacheLoader) {
return CacheBuilder.newBuilder()
.expireAfterAccess(5, TimeUnit.MINUTES)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the merit of a temporal expiration? i think all of these cases are where these ids should never change. if there is some reason that they are changing it's going to be kinda big deal and a 5 minute expiration is not really going to help.

my sense is we probably don't want a temporal expiration? after that we can decide if we need manual expiration or not expiration.

what do you think? am i way off here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was mostly concerned about an infinitely growing cache where we don't want to keep all of the connections in memory. I switched it in d080a23 to a fixed 20k limit.

Does that sound good?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i dig it. thanks!

.build(cacheLoader);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.server.helpers;

import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import com.google.common.util.concurrent.UncheckedExecutionException;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.DestinationConnection;
import io.airbyte.config.JobConfig;
import io.airbyte.config.JobSyncConfig;
import io.airbyte.config.SourceConnection;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.StandardSync;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.config.persistence.FileSystemConfigPersistence;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.scheduler.models.Job;
import io.airbyte.scheduler.models.JobStatus;
import io.airbyte.scheduler.persistence.JobPersistence;
import io.airbyte.server.converters.SpecFetcher;
import io.airbyte.validation.json.JsonSchemaValidator;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class WorkspaceHelperTest {

Path tmpDir;
ConfigRepository configRepository;
JobPersistence jobPersistence;
WorkspaceHelper workspaceHelper;

@BeforeEach
public void setup() throws IOException {
tmpDir = Files.createTempDirectory("workspace_helper_test_" + RandomStringUtils.randomAlphabetic(5));

configRepository = new ConfigRepository(new FileSystemConfigPersistence(tmpDir));
jobPersistence = mock(JobPersistence.class);

SpecFetcher specFetcher = mock(SpecFetcher.class);
when(specFetcher.execute(any())).thenReturn(new ConnectorSpecification().withConnectionSpecification(Jsons.deserialize("{}")));

workspaceHelper = new WorkspaceHelper(configRepository, jobPersistence, new JsonSchemaValidator(), specFetcher);
}

@Test
public void testObjectsThatDoNotExist() {
assertThrows(ExecutionException.class, () -> workspaceHelper.getWorkspaceForSourceId(UUID.randomUUID()));
assertThrows(ExecutionException.class, () -> workspaceHelper.getWorkspaceForDestinationId(UUID.randomUUID()));
assertThrows(ExecutionException.class, () -> workspaceHelper.getWorkspaceForConnectionId(UUID.randomUUID()));
assertThrows(ExecutionException.class, () -> workspaceHelper.getWorkspaceForConnection(UUID.randomUUID(), UUID.randomUUID()));
assertThrows(UncheckedExecutionException.class, () -> workspaceHelper.getWorkspaceForJobId(0L));
// todo: add operationId check
}

@Test
public void testSource() throws IOException, ExecutionException, JsonValidationException {
UUID source = UUID.randomUUID();
UUID workspace = UUID.randomUUID();

UUID sourceDefinition = UUID.randomUUID();
configRepository.writeStandardSource(new StandardSourceDefinition().withSourceDefinitionId(sourceDefinition));

SourceConnection sourceConnection = new SourceConnection()
.withSourceId(source)
.withSourceDefinitionId(sourceDefinition)
.withWorkspaceId(workspace)
.withConfiguration(Jsons.deserialize("{}"))
.withName("source")
.withTombstone(false);

configRepository.writeSourceConnection(sourceConnection);
UUID retrievedWorkspace = workspaceHelper.getWorkspaceForSourceId(source);

assertEquals(workspace, retrievedWorkspace);

// check that caching is working
configRepository.writeSourceConnection(sourceConnection.withWorkspaceId(UUID.randomUUID()));
UUID retrievedWorkspaceAfterUpdate = workspaceHelper.getWorkspaceForSourceId(source);
assertEquals(workspace, retrievedWorkspaceAfterUpdate);
}

@Test
public void testDestination() throws IOException, ExecutionException, JsonValidationException {
UUID destination = UUID.randomUUID();
UUID workspace = UUID.randomUUID();

UUID destinationDefinition = UUID.randomUUID();
configRepository.writeStandardDestinationDefinition(new StandardDestinationDefinition().withDestinationDefinitionId(destinationDefinition));

DestinationConnection destinationConnection = new DestinationConnection()
.withDestinationId(destination)
.withDestinationDefinitionId(destinationDefinition)
.withWorkspaceId(workspace)
.withConfiguration(Jsons.deserialize("{}"))
.withName("dest")
.withTombstone(false);

configRepository.writeDestinationConnection(destinationConnection);
UUID retrievedWorkspace = workspaceHelper.getWorkspaceForDestinationId(destination);

assertEquals(workspace, retrievedWorkspace);

// check that caching is working
configRepository.writeDestinationConnection(destinationConnection.withWorkspaceId(UUID.randomUUID()));
UUID retrievedWorkspaceAfterUpdate = workspaceHelper.getWorkspaceForDestinationId(destination);
assertEquals(workspace, retrievedWorkspaceAfterUpdate);
}

@Test
public void testConnectionAndJobs() throws IOException, ExecutionException, JsonValidationException {
UUID workspace = UUID.randomUUID();

// set up source
UUID source = UUID.randomUUID();

UUID sourceDefinition = UUID.randomUUID();
configRepository.writeStandardSource(new StandardSourceDefinition().withSourceDefinitionId(sourceDefinition));

SourceConnection sourceConnection = new SourceConnection()
.withSourceId(source)
.withSourceDefinitionId(sourceDefinition)
.withWorkspaceId(workspace)
.withConfiguration(Jsons.deserialize("{}"))
.withName("source")
.withTombstone(false);

configRepository.writeSourceConnection(sourceConnection);

// set up destination
UUID destination = UUID.randomUUID();

UUID destinationDefinition = UUID.randomUUID();
configRepository.writeStandardDestinationDefinition(new StandardDestinationDefinition().withDestinationDefinitionId(destinationDefinition));

DestinationConnection destinationConnection = new DestinationConnection()
.withDestinationId(destination)
.withDestinationDefinitionId(destinationDefinition)
.withWorkspaceId(workspace)
.withConfiguration(Jsons.deserialize("{}"))
.withName("dest")
.withTombstone(false);

configRepository.writeDestinationConnection(destinationConnection);

// set up connection
UUID connection = UUID.randomUUID();
configRepository.writeStandardSync(new StandardSync().withManual(true).withConnectionId(connection).withSourceId(source)
.withDestinationId(destination).withCatalog(new ConfiguredAirbyteCatalog().withStreams(new ArrayList<>())));

// test retrieving by connection id
UUID retrievedWorkspace = workspaceHelper.getWorkspaceForConnectionId(connection);
assertEquals(workspace, retrievedWorkspace);

// test retrieving by source and destination ids
UUID retrievedWorkspaceBySourceAndDestination = workspaceHelper.getWorkspaceForConnectionId(connection);
assertEquals(workspace, retrievedWorkspaceBySourceAndDestination);

// check that caching is working
UUID newWorkspace = UUID.randomUUID();
configRepository.writeSourceConnection(sourceConnection.withWorkspaceId(newWorkspace));
configRepository.writeDestinationConnection(destinationConnection.withWorkspaceId(newWorkspace));
UUID retrievedWorkspaceAfterUpdate = workspaceHelper.getWorkspaceForDestinationId(destination);
assertEquals(workspace, retrievedWorkspaceAfterUpdate);

// test jobs
long jobId = 123;
Job job = new Job(
jobId,
JobConfig.ConfigType.SYNC,
connection.toString(),
new JobConfig().withConfigType(JobConfig.ConfigType.SYNC).withSync(new JobSyncConfig()),
new ArrayList<>(),
JobStatus.PENDING,
System.currentTimeMillis(),
System.currentTimeMillis(),
System.currentTimeMillis());
when(jobPersistence.getJob(jobId)).thenReturn(job);

UUID jobWorkspace = workspaceHelper.getWorkspaceForJobId(jobId);
assertEquals(workspace, jobWorkspace);
}

}