Skip to content

Commit

Permalink
Extract event from the temporal worker run factory (#10739)
Browse files Browse the repository at this point in the history
Extract of different events that can happen to a sync into a non temporal related interface.
  • Loading branch information
benmoriceau authored and etsybaev committed Mar 5, 2022
1 parent f3876df commit b333204
Show file tree
Hide file tree
Showing 15 changed files with 144 additions and 109 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.scheduler.client;

import io.airbyte.workers.temporal.TemporalClient.ManualSyncSubmissionResult;
import java.util.Set;
import java.util.UUID;

public interface EventRunner {

void createNewSchedulerWorkflow(final UUID connectionId);

ManualSyncSubmissionResult startNewManualSync(final UUID connectionId);

ManualSyncSubmissionResult startNewCancelation(final UUID connectionId);

ManualSyncSubmissionResult resetConnection(final UUID connectionId);

ManualSyncSubmissionResult synchronousResetConnection(final UUID connectionId);

void deleteConnection(final UUID connectionId);

void migrateSyncIfNeeded(final Set<UUID> connectionIds);

void update(final UUID connectionId);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.scheduler.client;

import io.airbyte.workers.temporal.TemporalClient;
import io.airbyte.workers.temporal.TemporalClient.ManualSyncSubmissionResult;
import java.util.Set;
import java.util.UUID;
import lombok.AllArgsConstructor;

@AllArgsConstructor
public class TemporalEventRunner implements EventRunner {

private final TemporalClient temporalClient;

public void createNewSchedulerWorkflow(final UUID connectionId) {
temporalClient.submitConnectionUpdaterAsync(connectionId);
}

public ManualSyncSubmissionResult startNewManualSync(final UUID connectionId) {
return temporalClient.startNewManualSync(connectionId);
}

public ManualSyncSubmissionResult startNewCancelation(final UUID connectionId) {
return temporalClient.startNewCancelation(connectionId);
}

public ManualSyncSubmissionResult resetConnection(final UUID connectionId) {
return temporalClient.resetConnection(connectionId);
}

public ManualSyncSubmissionResult synchronousResetConnection(final UUID connectionId) {
return temporalClient.synchronousResetConnection(connectionId);
}

public void deleteConnection(final UUID connectionId) {
temporalClient.deleteConnection(connectionId);
}

public void migrateSyncIfNeeded(final Set<UUID> connectionIds) {
temporalClient.migrateSyncIfNeeded(connectionIds);
}

public void update(final UUID connectionId) {
temporalClient.update(connectionId);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@
import io.airbyte.config.persistence.ConfigPersistence;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.db.Database;
import io.airbyte.scheduler.client.EventRunner;
import io.airbyte.scheduler.client.SchedulerJobClient;
import io.airbyte.scheduler.client.SynchronousSchedulerClient;
import io.airbyte.scheduler.persistence.JobPersistence;
import io.airbyte.server.apis.ConfigurationApi;
import io.airbyte.workers.WorkerConfigs;
import io.airbyte.workers.worker_run.TemporalWorkerRunFactory;
import io.temporal.serviceclient.WorkflowServiceStubs;
import java.net.http.HttpClient;
import java.nio.file.Path;
Expand Down Expand Up @@ -47,7 +47,7 @@ public class ConfigurationApiFactory implements Factory<ConfigurationApi> {
private static AirbyteVersion airbyteVersion;
private static HttpClient httpClient;
private static FeatureFlags featureFlags;
private static TemporalWorkerRunFactory temporalWorkerRunFactory;
private static EventRunner eventRunner;

public static void setValues(
final WorkflowServiceStubs temporalService,
Expand All @@ -69,7 +69,7 @@ public static void setValues(
final Path workspaceRoot,
final HttpClient httpClient,
final FeatureFlags featureFlags,
final TemporalWorkerRunFactory temporalWorkerRunFactory) {
final EventRunner eventRunner) {
ConfigurationApiFactory.configRepository = configRepository;
ConfigurationApiFactory.jobPersistence = jobPersistence;
ConfigurationApiFactory.seed = seed;
Expand All @@ -89,7 +89,7 @@ public static void setValues(
ConfigurationApiFactory.airbyteVersion = airbyteVersion;
ConfigurationApiFactory.httpClient = httpClient;
ConfigurationApiFactory.featureFlags = featureFlags;
ConfigurationApiFactory.temporalWorkerRunFactory = temporalWorkerRunFactory;
ConfigurationApiFactory.eventRunner = eventRunner;
}

@Override
Expand All @@ -115,7 +115,7 @@ public ConfigurationApi provide() {
ConfigurationApiFactory.workspaceRoot,
ConfigurationApiFactory.httpClient,
ConfigurationApiFactory.featureFlags,
ConfigurationApiFactory.temporalWorkerRunFactory);
ConfigurationApiFactory.eventRunner);
}

@Override
Expand Down
16 changes: 7 additions & 9 deletions airbyte-server/src/main/java/io/airbyte/server/ServerApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@
import io.airbyte.db.instance.jobs.JobsDatabaseMigrator;
import io.airbyte.scheduler.client.DefaultSchedulerJobClient;
import io.airbyte.scheduler.client.DefaultSynchronousSchedulerClient;
import io.airbyte.scheduler.client.EventRunner;
import io.airbyte.scheduler.client.SchedulerJobClient;
import io.airbyte.scheduler.client.TemporalEventRunner;
import io.airbyte.scheduler.persistence.DefaultJobCreator;
import io.airbyte.scheduler.persistence.DefaultJobPersistence;
import io.airbyte.scheduler.persistence.JobPersistence;
Expand All @@ -47,7 +49,6 @@
import io.airbyte.workers.WorkerConfigs;
import io.airbyte.workers.temporal.TemporalClient;
import io.airbyte.workers.temporal.TemporalUtils;
import io.airbyte.workers.worker_run.TemporalWorkerRunFactory;
import io.temporal.serviceclient.WorkflowServiceStubs;
import java.io.IOException;
import java.net.http.HttpClient;
Expand Down Expand Up @@ -191,11 +192,8 @@ public static ServerRunnable getServer(final ServerFactory apiFactory, final Con
new DefaultSynchronousSchedulerClient(temporalClient, jobTracker, oAuthConfigSupplier);
final HttpClient httpClient = HttpClient.newBuilder().version(HttpClient.Version.HTTP_1_1).build();
final FeatureFlags featureFlags = new EnvVariableFeatureFlags();
final TemporalWorkerRunFactory temporalWorkerRunFactory = new TemporalWorkerRunFactory(
TemporalClient.production(configs.getTemporalHost(), configs.getWorkspaceRoot(), configs),
configs.getWorkspaceRoot(),
configs.getAirbyteVersionOrWarning(),
featureFlags);
final EventRunner eventRunner = new TemporalEventRunner(
TemporalClient.production(configs.getTemporalHost(), configs.getWorkspaceRoot(), configs));

LOGGER.info("Starting server...");

Expand All @@ -217,17 +215,17 @@ public static ServerRunnable getServer(final ServerFactory apiFactory, final Con
configs.getWorkspaceRoot(),
httpClient,
featureFlags,
temporalWorkerRunFactory);
eventRunner);
}

private static void migrateExistingConnection(final ConfigRepository configRepository, final TemporalWorkerRunFactory temporalWorkerRunFactory)
private static void migrateExistingConnection(final ConfigRepository configRepository, final EventRunner eventRunner)
throws JsonValidationException, ConfigNotFoundException, IOException {
LOGGER.info("Start migration to the new scheduler...");
final Set<UUID> connectionIds =
configRepository.listStandardSyncs().stream()
.filter(standardSync -> standardSync.getStatus() == Status.ACTIVE || standardSync.getStatus() == Status.INACTIVE)
.map(standardSync -> standardSync.getConnectionId()).collect(Collectors.toSet());
temporalWorkerRunFactory.migrateSyncIfNeeded(connectionIds);
eventRunner.migrateSyncIfNeeded(connectionIds);
LOGGER.info("Done migrating to the new scheduler...");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@
import io.airbyte.config.persistence.ConfigPersistence;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.db.Database;
import io.airbyte.scheduler.client.EventRunner;
import io.airbyte.scheduler.client.SchedulerJobClient;
import io.airbyte.scheduler.client.SynchronousSchedulerClient;
import io.airbyte.scheduler.persistence.JobPersistence;
import io.airbyte.server.apis.ConfigurationApi;
import io.airbyte.workers.WorkerConfigs;
import io.airbyte.workers.worker_run.TemporalWorkerRunFactory;
import io.temporal.serviceclient.WorkflowServiceStubs;
import java.net.http.HttpClient;
import java.nio.file.Path;
Expand All @@ -45,7 +45,7 @@ ServerRunnable create(SchedulerJobClient schedulerJobClient,
Path workspaceRoot,
HttpClient httpClient,
FeatureFlags featureFlags,
TemporalWorkerRunFactory temporalWorkerRunFactory);
EventRunner eventRunner);

class Api implements ServerFactory {

Expand All @@ -67,7 +67,7 @@ public ServerRunnable create(final SchedulerJobClient schedulerJobClient,
final Path workspaceRoot,
final HttpClient httpClient,
final FeatureFlags featureFlags,
final TemporalWorkerRunFactory temporalWorkerRunFactory) {
final EventRunner eventRunner) {
// set static values for factory
ConfigurationApiFactory.setValues(
temporalService,
Expand All @@ -89,7 +89,7 @@ public ServerRunnable create(final SchedulerJobClient schedulerJobClient,
workspaceRoot,
httpClient,
featureFlags,
temporalWorkerRunFactory);
eventRunner);

// server configurations
final Set<Class<?>> componentClasses = Set.of(ConfigurationApi.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
import io.airbyte.config.persistence.ConfigPersistence;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.db.Database;
import io.airbyte.scheduler.client.EventRunner;
import io.airbyte.scheduler.client.SchedulerJobClient;
import io.airbyte.scheduler.client.SynchronousSchedulerClient;
import io.airbyte.scheduler.persistence.JobNotifier;
Expand Down Expand Up @@ -119,7 +120,6 @@
import io.airbyte.validation.json.JsonSchemaValidator;
import io.airbyte.validation.json.JsonValidationException;
import io.airbyte.workers.WorkerConfigs;
import io.airbyte.workers.worker_run.TemporalWorkerRunFactory;
import io.temporal.serviceclient.WorkflowServiceStubs;
import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -168,7 +168,7 @@ public ConfigurationApi(final ConfigRepository configRepository,
final Path workspaceRoot,
final HttpClient httpClient,
final FeatureFlags featureFlags,
final TemporalWorkerRunFactory temporalWorkerRunFactory) {
final EventRunner eventRunner) {
this.workerEnvironment = workerEnvironment;
this.logConfigs = logConfigs;
this.workspaceRoot = workspaceRoot;
Expand All @@ -189,13 +189,13 @@ public ConfigurationApi(final ConfigRepository configRepository,
jobPersistence,
jobNotifier,
temporalService,
new OAuthConfigSupplier(configRepository, trackingClient), workerEnvironment, logConfigs, temporalWorkerRunFactory, featureFlags);
new OAuthConfigSupplier(configRepository, trackingClient), workerEnvironment, logConfigs, eventRunner, featureFlags);

connectionsHandler = new ConnectionsHandler(
configRepository,
workspaceHelper,
trackingClient,
temporalWorkerRunFactory,
eventRunner,
featureFlags,
workerConfigs);
sourceHandler = new SourceHandler(configRepository, schemaValidator, connectionsHandler);
Expand All @@ -215,7 +215,7 @@ public ConfigurationApi(final ConfigRepository configRepository,
schedulerHandler,
operationsHandler,
featureFlags,
temporalWorkerRunFactory);
eventRunner);
healthCheckHandler = new HealthCheckHandler();
archiveHandler = new ArchiveHandler(
airbyteVersion,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.scheduler.client.EventRunner;
import io.airbyte.scheduler.persistence.WorkspaceHelper;
import io.airbyte.server.converters.ApiPojoConverters;
import io.airbyte.server.handlers.helpers.ConnectionMatcher;
Expand All @@ -42,7 +43,6 @@
import io.airbyte.workers.WorkerConfigs;
import io.airbyte.workers.helper.CatalogConverter;
import io.airbyte.workers.helper.ConnectionHelper;
import io.airbyte.workers.worker_run.TemporalWorkerRunFactory;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
Expand All @@ -61,7 +61,7 @@ public class ConnectionsHandler {
private final Supplier<UUID> uuidGenerator;
private final WorkspaceHelper workspaceHelper;
private final TrackingClient trackingClient;
private final TemporalWorkerRunFactory temporalWorkerRunFactory;
private final EventRunner eventRunner;
private final FeatureFlags featureFlags;
private final WorkerConfigs workerConfigs;

Expand All @@ -70,29 +70,29 @@ public class ConnectionsHandler {
final Supplier<UUID> uuidGenerator,
final WorkspaceHelper workspaceHelper,
final TrackingClient trackingClient,
final TemporalWorkerRunFactory temporalWorkerRunFactory,
final EventRunner eventRunner,
final FeatureFlags featureFlags,
final WorkerConfigs workerConfigs) {
this.configRepository = configRepository;
this.uuidGenerator = uuidGenerator;
this.workspaceHelper = workspaceHelper;
this.trackingClient = trackingClient;
this.temporalWorkerRunFactory = temporalWorkerRunFactory;
this.eventRunner = eventRunner;
this.featureFlags = featureFlags;
this.workerConfigs = workerConfigs;
}

public ConnectionsHandler(final ConfigRepository configRepository,
final WorkspaceHelper workspaceHelper,
final TrackingClient trackingClient,
final TemporalWorkerRunFactory temporalWorkerRunFactory,
final EventRunner eventRunner,
final FeatureFlags featureFlags,
final WorkerConfigs workerConfigs) {
this(configRepository,
UUID::randomUUID,
workspaceHelper,
trackingClient,
temporalWorkerRunFactory,
eventRunner,
featureFlags,
workerConfigs);

Expand Down Expand Up @@ -150,7 +150,7 @@ public ConnectionRead createConnection(final ConnectionCreate connectionCreate)
if (featureFlags.usesNewScheduler()) {
try {
LOGGER.info("Starting a connection using the new scheduler");
temporalWorkerRunFactory.createNewSchedulerWorkflow(connectionId);
eventRunner.createNewSchedulerWorkflow(connectionId);
} catch (final Exception e) {
LOGGER.error("Start of the temporal connection manager workflow failed", e);
configRepository.deleteStandardSyncDefinition(standardSync.getConnectionId());
Expand Down Expand Up @@ -214,7 +214,7 @@ public ConnectionRead updateConnection(final ConnectionUpdate connectionUpdate)
configRepository.writeStandardSync(newConnection);

if (featureFlags.usesNewScheduler()) {
temporalWorkerRunFactory.update(connectionUpdate);
eventRunner.update(connectionUpdate.getConnectionId());
}

return buildConnectionRead(connectionUpdate.getConnectionId());
Expand Down Expand Up @@ -323,7 +323,7 @@ public void deleteConnection(final UUID connectionId)
throws ConfigNotFoundException, IOException, JsonValidationException {
if (featureFlags.usesNewScheduler()) {
// todo (cgardens) - need an interface over this.
temporalWorkerRunFactory.deleteConnection(connectionId);
eventRunner.deleteConnection(connectionId);
} else {
final ConnectionRead connectionRead = getConnection(connectionId);
deleteConnection(connectionRead);
Expand Down
Loading

0 comments on commit b333204

Please sign in to comment.