Skip to content

Commit

Permalink
Bmoric/extract source definition specification api (#18997)
Browse files Browse the repository at this point in the history
* Extract Operation API

* Extract scheduler API

* Format

* extract source api

* Extract source definition api

* Add path

* Extract State API

* extract webbackend api

* extract webbackend api

* extract workspace api

* Extract source definition specification api
  • Loading branch information
benmoriceau authored Nov 8, 2022
1 parent f22485a commit 77d22c5
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 162 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,87 +4,25 @@

package io.airbyte.server;

import io.airbyte.analytics.TrackingClient;
import io.airbyte.commons.version.AirbyteVersion;
import io.airbyte.config.Configs.WorkerEnvironment;
import io.airbyte.config.helpers.LogConfigs;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.config.persistence.SecretsRepositoryReader;
import io.airbyte.config.persistence.SecretsRepositoryWriter;
import io.airbyte.config.persistence.StatePersistence;
import io.airbyte.db.Database;
import io.airbyte.persistence.job.JobPersistence;
import io.airbyte.server.apis.ConfigurationApi;
import io.airbyte.server.scheduler.EventRunner;
import io.airbyte.server.scheduler.SynchronousSchedulerClient;
import java.net.http.HttpClient;
import java.nio.file.Path;
import java.util.Map;
import org.flywaydb.core.Flyway;
import org.glassfish.hk2.api.Factory;
import org.slf4j.MDC;

public class ConfigurationApiFactory implements Factory<ConfigurationApi> {

private static ConfigRepository configRepository;
private static JobPersistence jobPersistence;
private static SecretsRepositoryReader secretsRepositoryReader;
private static SecretsRepositoryWriter secretsRepositoryWriter;
private static SynchronousSchedulerClient synchronousSchedulerClient;
private static Map<String, String> mdc;
private static TrackingClient trackingClient;
private static WorkerEnvironment workerEnvironment;
private static LogConfigs logConfigs;
private static AirbyteVersion airbyteVersion;
private static EventRunner eventRunner;

public static void setValues(
final ConfigRepository configRepository,
final SecretsRepositoryReader secretsRepositoryReader,
final SecretsRepositoryWriter secretsRepositoryWriter,
final JobPersistence jobPersistence,
final SynchronousSchedulerClient synchronousSchedulerClient,
final StatePersistence statePersistence,
final Map<String, String> mdc,
final Database configsDatabase,
final Database jobsDatabase,
final TrackingClient trackingClient,
final WorkerEnvironment workerEnvironment,
final LogConfigs logConfigs,
final AirbyteVersion airbyteVersion,
final Path workspaceRoot,
final HttpClient httpClient,
final EventRunner eventRunner,
final Flyway configsFlyway,
final Flyway jobsFlyway) {
ConfigurationApiFactory.configRepository = configRepository;
ConfigurationApiFactory.jobPersistence = jobPersistence;
ConfigurationApiFactory.secretsRepositoryReader = secretsRepositoryReader;
ConfigurationApiFactory.secretsRepositoryWriter = secretsRepositoryWriter;
ConfigurationApiFactory.synchronousSchedulerClient = synchronousSchedulerClient;
final Map<String, String> mdc) {
ConfigurationApiFactory.mdc = mdc;
ConfigurationApiFactory.trackingClient = trackingClient;
ConfigurationApiFactory.workerEnvironment = workerEnvironment;
ConfigurationApiFactory.logConfigs = logConfigs;
ConfigurationApiFactory.airbyteVersion = airbyteVersion;
ConfigurationApiFactory.eventRunner = eventRunner;
}

@Override
public ConfigurationApi provide() {
MDC.setContextMap(ConfigurationApiFactory.mdc);

return new ConfigurationApi(
ConfigurationApiFactory.configRepository,
ConfigurationApiFactory.jobPersistence,
ConfigurationApiFactory.secretsRepositoryReader,
ConfigurationApiFactory.secretsRepositoryWriter,
ConfigurationApiFactory.synchronousSchedulerClient,
ConfigurationApiFactory.trackingClient,
ConfigurationApiFactory.workerEnvironment,
ConfigurationApiFactory.logConfigs,
ConfigurationApiFactory.airbyteVersion,
ConfigurationApiFactory.eventRunner);
return new ConfigurationApi();
}

@Override
Expand Down
28 changes: 8 additions & 20 deletions airbyte-server/src/main/java/io/airbyte/server/ServerFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.config.persistence.SecretsRepositoryReader;
import io.airbyte.config.persistence.SecretsRepositoryWriter;
import io.airbyte.config.persistence.StatePersistence;
import io.airbyte.db.Database;
import io.airbyte.persistence.job.JobPersistence;
import io.airbyte.server.apis.AttemptApiController;
Expand All @@ -31,6 +30,7 @@
import io.airbyte.server.apis.SchedulerApiController;
import io.airbyte.server.apis.SourceApiController;
import io.airbyte.server.apis.SourceDefinitionApiController;
import io.airbyte.server.apis.SourceDefinitionSpecificationApiController;
import io.airbyte.server.apis.SourceOauthApiController;
import io.airbyte.server.apis.StateApiController;
import io.airbyte.server.apis.WebBackendApiController;
Expand All @@ -51,6 +51,7 @@
import io.airbyte.server.apis.binders.SchedulerApiBinder;
import io.airbyte.server.apis.binders.SourceApiBinder;
import io.airbyte.server.apis.binders.SourceDefinitionApiBinder;
import io.airbyte.server.apis.binders.SourceDefinitionSpecificationApiBinder;
import io.airbyte.server.apis.binders.SourceOauthApiBinder;
import io.airbyte.server.apis.binders.StateApiBinder;
import io.airbyte.server.apis.binders.WebBackendApiBinder;
Expand All @@ -71,6 +72,7 @@
import io.airbyte.server.apis.factories.SchedulerApiFactory;
import io.airbyte.server.apis.factories.SourceApiFactory;
import io.airbyte.server.apis.factories.SourceDefinitionApiFactory;
import io.airbyte.server.apis.factories.SourceDefinitionSpecificationApiFactory;
import io.airbyte.server.apis.factories.SourceOauthApiFactory;
import io.airbyte.server.apis.factories.StateApiFactory;
import io.airbyte.server.apis.factories.WebBackendApiFactory;
Expand Down Expand Up @@ -179,25 +181,7 @@ public ServerRunnable create(final SynchronousSchedulerClient synchronousSchedul
final Map<String, String> mdc = MDC.getCopyOfContextMap();

// set static values for factory
ConfigurationApiFactory.setValues(
configRepository,
secretsRepositoryReader,
secretsRepositoryWriter,
jobPersistence,
synchronousSchedulerClient,
new StatePersistence(configsDatabase),
mdc,
configsDatabase,
jobsDatabase,
trackingClient,
workerEnvironment,
logConfigs,
airbyteVersion,
workspaceRoot,
httpClient,
eventRunner,
configsFlyway,
jobsFlyway);
ConfigurationApiFactory.setValues(mdc);

AttemptApiFactory.setValues(attemptHandler, mdc);

Expand Down Expand Up @@ -237,6 +221,8 @@ public ServerRunnable create(final SynchronousSchedulerClient synchronousSchedul

SourceDefinitionApiFactory.setValues(sourceDefinitionsHandler);

SourceDefinitionSpecificationApiFactory.setValues(schedulerHandler);

StateApiFactory.setValues(stateHandler);

WebBackendApiFactory.setValues(webBackendConnectionsHandler, webBackendGeographiesHandler);
Expand All @@ -262,6 +248,7 @@ public ServerRunnable create(final SynchronousSchedulerClient synchronousSchedul
SchedulerApiController.class,
SourceApiController.class,
SourceDefinitionApiController.class,
SourceDefinitionSpecificationApiController.class,
SourceOauthApiController.class,
StateApiController.class,
WebBackendApiController.class,
Expand All @@ -286,6 +273,7 @@ public ServerRunnable create(final SynchronousSchedulerClient synchronousSchedul
new SchedulerApiBinder(),
new SourceApiBinder(),
new SourceDefinitionApiBinder(),
new SourceDefinitionSpecificationApiBinder(),
new SourceOauthApiBinder(),
new StateApiBinder(),
new WebBackendApiBinder(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

package io.airbyte.server.apis;

import io.airbyte.analytics.TrackingClient;
import io.airbyte.api.model.generated.AttemptNormalizationStatusReadList;
import io.airbyte.api.model.generated.CheckConnectionRead;
import io.airbyte.api.model.generated.CheckOperationRead;
Expand Down Expand Up @@ -102,27 +101,9 @@
import io.airbyte.api.model.generated.WorkspaceReadList;
import io.airbyte.api.model.generated.WorkspaceUpdate;
import io.airbyte.api.model.generated.WorkspaceUpdateName;
import io.airbyte.commons.version.AirbyteVersion;
import io.airbyte.config.Configs.WorkerEnvironment;
import io.airbyte.config.helpers.LogConfigs;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.config.persistence.SecretsRepositoryReader;
import io.airbyte.config.persistence.SecretsRepositoryWriter;
import io.airbyte.persistence.job.JobPersistence;
import io.airbyte.persistence.job.WorkspaceHelper;
import io.airbyte.server.errors.BadObjectSchemaKnownException;
import io.airbyte.server.errors.IdNotFoundKnownException;
import io.airbyte.server.handlers.ConnectionsHandler;
import io.airbyte.server.handlers.DestinationDefinitionsHandler;
import io.airbyte.server.handlers.DestinationHandler;
import io.airbyte.server.handlers.JobHistoryHandler;
import io.airbyte.server.handlers.SchedulerHandler;
import io.airbyte.server.handlers.SourceDefinitionsHandler;
import io.airbyte.server.handlers.SourceHandler;
import io.airbyte.server.scheduler.EventRunner;
import io.airbyte.server.scheduler.SynchronousSchedulerClient;
import io.airbyte.validation.json.JsonSchemaValidator;
import io.airbyte.validation.json.JsonValidationException;
import java.io.File;
import java.io.IOException;
Expand All @@ -134,63 +115,7 @@
@Slf4j
public class ConfigurationApi implements io.airbyte.api.generated.V1Api {

private final SourceDefinitionsHandler sourceDefinitionsHandler;
private final SourceHandler sourceHandler;
private final DestinationDefinitionsHandler destinationDefinitionsHandler;
private final DestinationHandler destinationHandler;
private final ConnectionsHandler connectionsHandler;
private final SchedulerHandler schedulerHandler;
private final JobHistoryHandler jobHistoryHandler;

public ConfigurationApi(final ConfigRepository configRepository,
final JobPersistence jobPersistence,
final SecretsRepositoryReader secretsRepositoryReader,
final SecretsRepositoryWriter secretsRepositoryWriter,
final SynchronousSchedulerClient synchronousSchedulerClient,
final TrackingClient trackingClient,
final WorkerEnvironment workerEnvironment,
final LogConfigs logConfigs,
final AirbyteVersion airbyteVersion,
final EventRunner eventRunner) {

final JsonSchemaValidator schemaValidator = new JsonSchemaValidator();

final WorkspaceHelper workspaceHelper = new WorkspaceHelper(configRepository, jobPersistence);

connectionsHandler = new ConnectionsHandler(
configRepository,
workspaceHelper,
trackingClient,
eventRunner);

schedulerHandler = new SchedulerHandler(
configRepository,
secretsRepositoryReader,
secretsRepositoryWriter,
synchronousSchedulerClient,
jobPersistence,
workerEnvironment,
logConfigs,
eventRunner,
connectionsHandler);

sourceHandler = new SourceHandler(
configRepository,
secretsRepositoryReader,
secretsRepositoryWriter,
schemaValidator,
connectionsHandler);
sourceDefinitionsHandler = new SourceDefinitionsHandler(configRepository, synchronousSchedulerClient, sourceHandler);
destinationHandler = new DestinationHandler(
configRepository,
secretsRepositoryReader,
secretsRepositoryWriter,
schemaValidator,
connectionsHandler);
destinationDefinitionsHandler = new DestinationDefinitionsHandler(configRepository, synchronousSchedulerClient, destinationHandler);
jobHistoryHandler = new JobHistoryHandler(jobPersistence, workerEnvironment, logConfigs, connectionsHandler, sourceHandler,
sourceDefinitionsHandler, destinationHandler, destinationDefinitionsHandler, airbyteVersion);
}
public ConfigurationApi() {}

// WORKSPACE

Expand Down Expand Up @@ -405,9 +330,14 @@ public InternalOperationResult saveStats(final SaveStatsRequestBody saveStatsReq

// SOURCE SPECIFICATION

/**
* This implementation has been moved to {@link SourceDefinitionSpecificationApiController}. Since
* the path of {@link SourceDefinitionSpecificationApiController} is more granular, it will override
* this implementation
*/
@Override
public SourceDefinitionSpecificationRead getSourceDefinitionSpecification(final SourceDefinitionIdWithWorkspaceId sourceDefinitionIdWithWorkspaceId) {
return execute(() -> schedulerHandler.getSourceDefinitionSpecification(sourceDefinitionIdWithWorkspaceId));
throw new NotImplementedException();
}

// OAUTH
Expand Down Expand Up @@ -1066,7 +996,7 @@ public JobDebugInfoRead getJobDebugInfo(final JobIdRequestBody jobIdRequestBody)
*/
@Override
public AttemptNormalizationStatusReadList getAttemptNormalizationStatusesForJob(final JobIdRequestBody jobIdRequestBody) {
return execute(() -> jobHistoryHandler.getAttemptNormalizationStatuses(jobIdRequestBody));
throw new NotImplementedException();
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.server.apis;

import io.airbyte.api.generated.SourceDefinitionSpecificationApi;
import io.airbyte.api.model.generated.SourceDefinitionIdWithWorkspaceId;
import io.airbyte.api.model.generated.SourceDefinitionSpecificationRead;
import io.airbyte.server.handlers.SchedulerHandler;
import javax.ws.rs.Path;
import lombok.AllArgsConstructor;

@Path("/v1/source_definition_specifications/get")
@AllArgsConstructor
public class SourceDefinitionSpecificationApiController implements SourceDefinitionSpecificationApi {

private final SchedulerHandler schedulerHandler;

@Override
public SourceDefinitionSpecificationRead getSourceDefinitionSpecification(final SourceDefinitionIdWithWorkspaceId sourceDefinitionIdWithWorkspaceId) {
return ConfigurationApi.execute(() -> schedulerHandler.getSourceDefinitionSpecification(sourceDefinitionIdWithWorkspaceId));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.server.apis.binders;

import io.airbyte.server.apis.SourceDefinitionSpecificationApiController;
import io.airbyte.server.apis.factories.SourceDefinitionSpecificationApiFactory;
import org.glassfish.hk2.utilities.binding.AbstractBinder;
import org.glassfish.jersey.process.internal.RequestScoped;

public class SourceDefinitionSpecificationApiBinder extends AbstractBinder {

@Override
protected void configure() {
bindFactory(SourceDefinitionSpecificationApiFactory.class)
.to(SourceDefinitionSpecificationApiController.class)
.in(RequestScoped.class);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.server.apis.factories;

import io.airbyte.server.apis.SourceDefinitionSpecificationApiController;
import io.airbyte.server.handlers.SchedulerHandler;
import org.glassfish.hk2.api.Factory;

public class SourceDefinitionSpecificationApiFactory implements Factory<SourceDefinitionSpecificationApiController> {

private static SchedulerHandler schedulerHandler;

public static void setValues(final SchedulerHandler schedulerHandler) {
SourceDefinitionSpecificationApiFactory.schedulerHandler = schedulerHandler;
}

@Override
public SourceDefinitionSpecificationApiController provide() {
return new SourceDefinitionSpecificationApiController(SourceDefinitionSpecificationApiFactory.schedulerHandler);
}

@Override
public void dispose(final SourceDefinitionSpecificationApiController instance) {
/* no op */
}

}

0 comments on commit 77d22c5

Please sign in to comment.