Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add spec cache #1066

Merged
merged 5 commits into from
Nov 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.scheduler.persistence.SchedulerPersistence;
import io.airbyte.server.apis.ConfigurationApi;
import io.airbyte.server.cache.SpecCache;
import org.glassfish.hk2.api.Factory;

public class ConfigurationApiFactory implements Factory<ConfigurationApi> {

private static ConfigRepository configRepository;
private static SchedulerPersistence schedulerPersistence;
private static SpecCache specCache;

public static void setConfigRepository(final ConfigRepository configRepository) {
ConfigurationApiFactory.configRepository = configRepository;
Expand All @@ -42,9 +44,16 @@ public static void setSchedulerPersistence(final SchedulerPersistence schedulerP
ConfigurationApiFactory.schedulerPersistence = schedulerPersistence;
}

public static void setSpecCache(final SpecCache specCache) {
ConfigurationApiFactory.specCache = specCache;
}

@Override
public ConfigurationApi provide() {
return new ConfigurationApi(ConfigurationApiFactory.configRepository, ConfigurationApiFactory.schedulerPersistence);
return new ConfigurationApi(
ConfigurationApiFactory.configRepository,
ConfigurationApiFactory.schedulerPersistence,
ConfigurationApiFactory.specCache);
}

@Override
Expand Down
2 changes: 2 additions & 0 deletions airbyte-server/src/main/java/io/airbyte/server/ServerApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.airbyte.scheduler.persistence.DefaultSchedulerPersistence;
import io.airbyte.scheduler.persistence.SchedulerPersistence;
import io.airbyte.server.apis.ConfigurationApi;
import io.airbyte.server.cache.DefaultSpecCache;
import io.airbyte.server.errors.InvalidInputExceptionMapper;
import io.airbyte.server.errors.InvalidJsonExceptionMapper;
import io.airbyte.server.errors.InvalidJsonInputExceptionMapper;
Expand Down Expand Up @@ -79,6 +80,7 @@ public void start() throws Exception {

ServletContextHandler handler = new ServletContextHandler();

ConfigurationApiFactory.setSpecCache(new DefaultSpecCache());
ConfigurationApiFactory.setConfigRepository(configRepository);
ConfigurationApiFactory.setSchedulerPersistence(schedulerPersistence);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.scheduler.persistence.SchedulerPersistence;
import io.airbyte.server.cache.SpecCache;
import io.airbyte.server.errors.KnownException;
import io.airbyte.server.handlers.ConnectionsHandler;
import io.airbyte.server.handlers.DebugInfoHandler;
Expand Down Expand Up @@ -105,16 +106,15 @@ public class ConfigurationApi implements io.airbyte.api.V1Api {
private final WebBackendSourceHandler webBackendSourceHandler;
private final WebBackendDestinationHandler webBackendDestinationHandler;

public ConfigurationApi(final ConfigRepository configRepository, final SchedulerPersistence schedulerPersistence) {
public ConfigurationApi(final ConfigRepository configRepository, final SchedulerPersistence schedulerPersistence, final SpecCache specCache) {
final JsonSchemaValidator schemaValidator = new JsonSchemaValidator();
schedulerHandler = new SchedulerHandler(configRepository, schedulerPersistence);
schedulerHandler = new SchedulerHandler(configRepository, schedulerPersistence, specCache);
workspacesHandler = new WorkspacesHandler(configRepository);
DockerImageValidator dockerImageValidator = new DockerImageValidator(schedulerHandler);
sourceDefinitionsHandler = new SourceDefinitionsHandler(configRepository, dockerImageValidator);
final DockerImageValidator dockerImageValidator = new DockerImageValidator(schedulerHandler);
sourceDefinitionsHandler = new SourceDefinitionsHandler(configRepository, dockerImageValidator, specCache);
connectionsHandler = new ConnectionsHandler(configRepository);
destinationDefinitionsHandler = new DestinationDefinitionsHandler(configRepository, dockerImageValidator);
destinationHandler =
new DestinationHandler(configRepository, schemaValidator, schedulerHandler, connectionsHandler);
destinationDefinitionsHandler = new DestinationDefinitionsHandler(configRepository, dockerImageValidator, specCache);
destinationHandler = new DestinationHandler(configRepository, schemaValidator, schedulerHandler, connectionsHandler);
sourceHandler = new SourceHandler(configRepository, schemaValidator, schedulerHandler, connectionsHandler);
jobHistoryHandler = new JobHistoryHandler(schedulerPersistence);
webBackendConnectionsHandler = new WebBackendConnectionsHandler(connectionsHandler, sourceHandler, destinationHandler, jobHistoryHandler);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.server.cache;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import io.airbyte.protocol.models.ConnectorSpecification;
import java.util.Optional;

public class DefaultSpecCache implements SpecCache {

private final Cache<String, ConnectorSpecification> cache;

public DefaultSpecCache() {
cache = CacheBuilder.newBuilder().build();
}

@Override
public Optional<ConnectorSpecification> get(String imageName) {
return Optional.ofNullable(cache.getIfPresent(imageName));
}

@Override
public void put(String imageName, ConnectorSpecification spec) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

opted not to use the LoadingCache that guava has (where it does fetch-if-absent behavior). mostly, i didn't like the error handling.

this whole iface exists though because i want to be able to change my impl easily if i decide i messed up here.

cache.put(imageName, spec);
}

@Override
public void evict(String imageName) {
cache.invalidate(imageName);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.server.cache;

import io.airbyte.protocol.models.ConnectorSpecification;
import java.util.Optional;

public interface SpecCache {

Optional<ConnectorSpecification> get(String imageName);

void put(String imageName, ConnectorSpecification spec);

void evict(String imageName);

public static class AlwaysMissCache implements SpecCache {

@Override
public Optional<ConnectorSpecification> get(String imageName) {
return Optional.empty();
}

@Override
public void put(String imageName, ConnectorSpecification spec) {
// no op.
}

@Override
public void evict(String imageName) {
// no op.
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.server.cache.SpecCache;
import io.airbyte.server.validators.DockerImageValidator;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
Expand All @@ -42,15 +43,16 @@
public class DestinationDefinitionsHandler {

private final ConfigRepository configRepository;
private DockerImageValidator dockerImageValidator;
private final DockerImageValidator dockerImageValidator;
private final SpecCache specCache;

public DestinationDefinitionsHandler(final ConfigRepository configRepository, DockerImageValidator dockerImageValidator) {
public DestinationDefinitionsHandler(final ConfigRepository configRepository, DockerImageValidator dockerImageValidator, SpecCache specCache) {
this.configRepository = configRepository;
this.dockerImageValidator = dockerImageValidator;
this.specCache = specCache;
}

public DestinationDefinitionReadList listDestinationDefinitions()
throws ConfigNotFoundException, IOException, JsonValidationException {
public DestinationDefinitionReadList listDestinationDefinitions() throws ConfigNotFoundException, IOException, JsonValidationException {
final List<DestinationDefinitionRead> reads = configRepository.listStandardDestinationDefinitions()
.stream()
.map(DestinationDefinitionsHandler::buildDestinationDefinitionRead)
Expand All @@ -71,14 +73,16 @@ public DestinationDefinitionRead updateDestinationDefinition(DestinationDefiniti
configRepository.getStandardDestinationDefinition(destinationDefinitionUpdate.getDestinationDefinitionId());
dockerImageValidator.assertValidIntegrationImage(currentDestination.getDockerRepository(), destinationDefinitionUpdate.getDockerImageTag());

StandardDestinationDefinition newDestination = new StandardDestinationDefinition()
final StandardDestinationDefinition newDestination = new StandardDestinationDefinition()
.withDestinationDefinitionId(currentDestination.getDestinationDefinitionId())
.withDockerImageTag(destinationDefinitionUpdate.getDockerImageTag())
.withDockerRepository(currentDestination.getDockerRepository())
.withName(currentDestination.getName())
.withDocumentationUrl(currentDestination.getDocumentationUrl());

configRepository.writeStandardDestinationDefinition(newDestination);
// we want to re-fetch the spec for updated definitions.
specCache.evict(currentDestination.getDockerRepository() + ":" + destinationDefinitionUpdate.getDockerImageTag());
return buildDestinationDefinitionRead(newDestination);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,12 @@
import io.airbyte.scheduler.Job;
import io.airbyte.scheduler.JobStatus;
import io.airbyte.scheduler.persistence.SchedulerPersistence;
import io.airbyte.server.cache.SpecCache;
import io.airbyte.server.converters.SchemaConverter;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
import java.util.Collections;
import java.util.Optional;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -67,10 +69,10 @@ public class SchedulerHandler {

private final ConfigRepository configRepository;
private final SchedulerPersistence schedulerPersistence;
private final SpecCache specCache;

public SchedulerHandler(final ConfigRepository configRepository,
final SchedulerPersistence schedulerPersistence) {

public SchedulerHandler(final ConfigRepository configRepository, final SchedulerPersistence schedulerPersistence, final SpecCache specCache) {
this.specCache = specCache;
this.configRepository = configRepository;
this.schedulerPersistence = schedulerPersistence;
}
Expand Down Expand Up @@ -170,12 +172,22 @@ public SourceDefinitionSpecificationRead getSourceDefinitionSpecification(Source
}

public ConnectorSpecification getConnectorSpecification(String imageName) throws IOException {
final long jobId = schedulerPersistence.createGetSpecJob(imageName);
LOGGER.debug("getSourceSpec jobId = {}", jobId);
final Optional<ConnectorSpecification> cachedSpec = specCache.get(imageName);
if (cachedSpec.isPresent()) {
LOGGER.debug("cache hit: " + imageName);
return cachedSpec.get();
} else {

LOGGER.debug("cache miss: " + imageName);
final long jobId = schedulerPersistence.createGetSpecJob(imageName);
LOGGER.debug("getSourceSpec jobId = {}", jobId);

Job job = waitUntilJobIsTerminalOrTimeout(jobId);
final Job job = waitUntilJobIsTerminalOrTimeout(jobId);

return job.getOutput().orElseThrow().getGetSpec().getSpecification();
final ConnectorSpecification spec = job.getOutput().orElseThrow().getGetSpec().getSpecification();
specCache.put(imageName, spec);
return spec;
}
}

public DestinationDefinitionSpecificationRead getDestinationSpecification(DestinationDefinitionIdRequestBody destinationDefinitionIdRequestBody)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.server.cache.SpecCache;
import io.airbyte.server.validators.DockerImageValidator;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
Expand All @@ -51,15 +52,24 @@ public class SourceDefinitionsHandler {
private final DockerImageValidator imageValidator;
private final ConfigRepository configRepository;
private final Supplier<UUID> uuidSupplier;
private final SpecCache specCache;

public SourceDefinitionsHandler(final ConfigRepository configRepository, DockerImageValidator imageValidator) {
this(configRepository, imageValidator, UUID::randomUUID);
public SourceDefinitionsHandler(
final ConfigRepository configRepository,
final DockerImageValidator imageValidator,
final SpecCache specCache) {
this(configRepository, imageValidator, UUID::randomUUID, specCache);
}

public SourceDefinitionsHandler(final ConfigRepository configRepository, DockerImageValidator imageValidator, Supplier<UUID> uuidSupplier) {
public SourceDefinitionsHandler(
final ConfigRepository configRepository,
final DockerImageValidator imageValidator,
final Supplier<UUID> uuidSupplier,
final SpecCache specCache) {
this.configRepository = configRepository;
this.uuidSupplier = uuidSupplier;
this.imageValidator = imageValidator;
this.specCache = specCache;
}

public SourceDefinitionReadList listSourceDefinitions() throws ConfigNotFoundException, IOException, JsonValidationException {
Expand Down Expand Up @@ -104,6 +114,8 @@ public SourceDefinitionRead updateSourceDefinition(SourceDefinitionUpdate source
.withName(currentSourceDefinition.getName());

configRepository.writeStandardSource(newSource);
// we want to re-fetch the spec for updated definitions.
specCache.evict(currentSourceDefinition.getDockerRepository() + ":" + sourceDefinitionUpdate.getDockerImageTag());
return buildSourceDefinitionRead(newSource);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

Expand All @@ -38,6 +39,7 @@
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.server.cache.SpecCache.AlwaysMissCache;
import io.airbyte.server.validators.DockerImageValidator;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
Expand All @@ -53,13 +55,15 @@ class DestinationDefinitionsHandlerTest {
private ConfigRepository configRepository;
private StandardDestinationDefinition destination;
private DestinationDefinitionsHandler destinationHandler;
private AlwaysMissCache specCache;

@BeforeEach
void setUp() {
configRepository = mock(ConfigRepository.class);
dockerImageValidator = mock(DockerImageValidator.class);
destination = generateDestination();
destinationHandler = new DestinationDefinitionsHandler(configRepository, dockerImageValidator);
specCache = spy(new AlwaysMissCache());
destinationHandler = new DestinationDefinitionsHandler(configRepository, dockerImageValidator, specCache);
}

private StandardDestinationDefinition generateDestination() {
Expand Down Expand Up @@ -121,19 +125,20 @@ void testGetDestination() throws JsonValidationException, ConfigNotFoundExceptio
@Test
void testUpdateDestination() throws ConfigNotFoundException, IOException, JsonValidationException {
when(configRepository.getStandardDestinationDefinition(destination.getDestinationDefinitionId())).thenReturn(destination);
DestinationDefinitionRead currentDestination =
destinationHandler
.getDestinationDefinition(new DestinationDefinitionIdRequestBody().destinationDefinitionId(destination.getDestinationDefinitionId()));
final DestinationDefinitionRead currentDestination = destinationHandler
.getDestinationDefinition(new DestinationDefinitionIdRequestBody().destinationDefinitionId(destination.getDestinationDefinitionId()));
final String currentTag = currentDestination.getDockerImageTag();
final String currentRepo = currentDestination.getDockerRepository();
final String dockerRepository = currentDestination.getDockerRepository();
final String newDockerImageTag = "averydifferenttag";
assertNotEquals(newDockerImageTag, currentTag);

final DestinationDefinitionRead sourceRead = destinationHandler.updateDestinationDefinition(
new DestinationDefinitionUpdate().destinationDefinitionId(this.destination.getDestinationDefinitionId()).dockerImageTag(newDockerImageTag));

assertEquals(newDockerImageTag, sourceRead.getDockerImageTag());
verify(dockerImageValidator).assertValidIntegrationImage(currentRepo, newDockerImageTag);
verify(dockerImageValidator).assertValidIntegrationImage(dockerRepository, newDockerImageTag);
verify(specCache).evict(dockerRepository + ":" + newDockerImageTag);

}

}
Loading