Skip to content

Commit

Permalink
feat: return whether configuration was updated as part of api response (
Browse files Browse the repository at this point in the history
#21466)

* wip: return whether configuration was updated

* updated outputs working

* fix pmd

* update description, format

* add didUpdateConfiguration to metadata, rm unneeded generics

* add didUpdateConfiguration to api response

* update name to fix pmd

* not required

* rename to match api response

* remove unused field

* match naming
  • Loading branch information
pedroslopez authored Jan 18, 2023
1 parent b69639e commit b24d575
Show file tree
Hide file tree
Showing 13 changed files with 169 additions and 65 deletions.
3 changes: 3 additions & 0 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4200,6 +4200,9 @@ components:
format: int64
succeeded:
type: boolean
connectorConfigurationUpdated:
type: boolean
default: false
logs:
$ref: "#/components/schemas/LogRead"
Pagination:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.ConnectorJobOutput.OutputType;
import io.airbyte.config.FailureReason;
import io.airbyte.config.StandardSyncInput;
Expand All @@ -17,6 +18,7 @@
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.AirbyteTraceMessage;
import io.airbyte.protocol.models.Config;
import io.airbyte.workers.exception.WorkerException;
import io.airbyte.workers.helper.FailureHelper;
import io.airbyte.workers.helper.FailureHelper.ConnectorCommand;
Expand Down Expand Up @@ -140,6 +142,12 @@ private static Optional<AirbyteTraceMessage> getTraceMessageFromMessagesByType(f
.findFirst();
}

public static Boolean getDidControlMessageChangeConfig(final JsonNode initialConfigJson, final AirbyteControlConnectorConfigMessage configMessage) {
final Config newConfig = configMessage.getConfig();
final JsonNode newConfigJson = Jsons.jsonNode(newConfig);
return !initialConfigJson.equals(newConfigJson);
}

public static Map<Type, List<AirbyteMessage>> getMessagesByType(final Process process, final AirbyteStreamFactory streamFactory, final int timeOut)
throws IOException {
final Map<Type, List<AirbyteMessage>> messagesByType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ROOT_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.WORKER_OPERATION_NAME;

import com.fasterxml.jackson.databind.JsonNode;
import datadog.trace.api.Trace;
import io.airbyte.commons.enums.Enums;
import io.airbyte.commons.io.LineGobbler;
Expand Down Expand Up @@ -64,13 +65,16 @@ public DefaultCheckConnectionWorker(final IntegrationLauncher integrationLaunche
public ConnectorJobOutput run(final StandardCheckConnectionInput input, final Path jobRoot) throws WorkerException {
LineGobbler.startSection("CHECK");
ApmTraceUtils.addTagsToTrace(Map.of(JOB_ROOT_KEY, jobRoot));

try {
final JsonNode inputConfig = input.getConnectionConfiguration();
process = integrationLauncher.check(
jobRoot,
WorkerConstants.SOURCE_CONFIG_JSON_FILENAME,
Jsons.serialize(input.getConnectionConfiguration()));
Jsons.serialize(inputConfig));

final ConnectorJobOutput jobOutput = new ConnectorJobOutput().withOutputType(OutputType.CHECK_CONNECTION);
final ConnectorJobOutput jobOutput = new ConnectorJobOutput()
.withOutputType(OutputType.CHECK_CONNECTION);

LineGobbler.gobble(process.getErrorStream(), LOGGER::error);

Expand All @@ -82,17 +86,17 @@ public ConnectorJobOutput run(final StandardCheckConnectionInput input, final Pa

if (input.getActorId() != null && input.getActorType() != null) {
final Optional<AirbyteControlConnectorConfigMessage> optionalConfigMsg = WorkerUtils.getMostRecentConfigControlMessage(messagesByType);
optionalConfigMsg.ifPresent(
configMessage -> {
switch (input.getActorType()) {
case SOURCE -> connectorConfigUpdater.updateSource(
input.getActorId(),
configMessage.getConfig());
case DESTINATION -> connectorConfigUpdater.updateDestination(
input.getActorId(),
configMessage.getConfig());
}
});
if (optionalConfigMsg.isPresent() && WorkerUtils.getDidControlMessageChangeConfig(inputConfig, optionalConfigMsg.get())) {
switch (input.getActorType()) {
case SOURCE -> connectorConfigUpdater.updateSource(
input.getActorId(),
optionalConfigMsg.get().getConfig());
case DESTINATION -> connectorConfigUpdater.updateDestination(
input.getActorId(),
optionalConfigMsg.get().getConfig());
}
jobOutput.setConnectorConfigurationUpdated(true);
}
}

final Optional<FailureReason> failureReason = WorkerUtils.getJobFailureReasonFromMessages(OutputType.CHECK_CONNECTION, messagesByType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.SOURCE_ID_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.WORKER_OPERATION_NAME;

import com.fasterxml.jackson.databind.JsonNode;
import datadog.trace.api.Trace;
import io.airbyte.commons.io.LineGobbler;
import io.airbyte.commons.json.Jsons;
Expand Down Expand Up @@ -72,12 +73,15 @@ public DefaultDiscoverCatalogWorker(final ConfigRepository configRepository,
public ConnectorJobOutput run(final StandardDiscoverCatalogInput discoverSchemaInput, final Path jobRoot) throws WorkerException {
ApmTraceUtils.addTagsToTrace(generateTraceTags(discoverSchemaInput, jobRoot));
try {
final JsonNode inputConfig = discoverSchemaInput.getConnectionConfiguration();
process = integrationLauncher.discover(
jobRoot,
WorkerConstants.SOURCE_CONFIG_JSON_FILENAME,
Jsons.serialize(discoverSchemaInput.getConnectionConfiguration()));
Jsons.serialize(inputConfig));

final ConnectorJobOutput jobOutput = new ConnectorJobOutput()
.withOutputType(OutputType.DISCOVER_CATALOG_ID);

final ConnectorJobOutput jobOutput = new ConnectorJobOutput().withOutputType(OutputType.DISCOVER_CATALOG_ID);
LineGobbler.gobble(process.getErrorStream(), LOGGER::error);

final Map<Type, List<AirbyteMessage>> messagesByType = WorkerUtils.getMessagesByType(process, streamFactory, 30);
Expand All @@ -88,10 +92,12 @@ public ConnectorJobOutput run(final StandardDiscoverCatalogInput discoverSchemaI
.findFirst();

final Optional<AirbyteControlConnectorConfigMessage> optionalConfigMsg = WorkerUtils.getMostRecentConfigControlMessage(messagesByType);
optionalConfigMsg.ifPresent(
configMessage -> connectorConfigUpdater.updateSource(
UUID.fromString(discoverSchemaInput.getSourceId()),
configMessage.getConfig()));
if (optionalConfigMsg.isPresent() && WorkerUtils.getDidControlMessageChangeConfig(inputConfig, optionalConfigMsg.get())) {
connectorConfigUpdater.updateSource(
UUID.fromString(discoverSchemaInput.getSourceId()),
optionalConfigMsg.get().getConfig());
jobOutput.setConnectorConfigurationUpdated(true);
}

final Optional<FailureReason> failureReason = WorkerUtils.getJobFailureReasonFromMessages(OutputType.DISCOVER_CATALOG_ID, messagesByType);
failureReason.ifPresent(jobOutput::setFailureReason);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,9 @@ properties:
format: uuid
spec:
existingJavaType: io.airbyte.protocol.models.ConnectorSpecification
connectorConfigurationUpdated:
description: A boolean indicating whether the configuration was updated during the job, e.g. if an AirbyteConfigControlMessage was received.
type: boolean
default: false
failureReason:
"$ref": FailureReason.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ public SynchronousJobRead getSynchronousJobRead(final SynchronousJobMetadata met
.createdAt(metadata.getCreatedAt())
.endedAt(metadata.getEndedAt())
.succeeded(metadata.isSucceeded())
.connectorConfigurationUpdated(metadata.isConnectorConfigurationUpdated())
.logs(getLogRead(metadata.getLogPath()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,18 +170,18 @@ public SynchronousResponse<ConnectorSpecification> createGetSpecJob(final String
}

@VisibleForTesting
<T, U> SynchronousResponse<T> execute(final ConfigType configType,
final ConnectorJobReportingContext jobContext,
@Nullable final UUID connectorDefinitionId,
final Supplier<TemporalResponse<U>> executor,
final Function<U, T> outputMapper,
final UUID workspaceId) {
<T> SynchronousResponse<T> execute(final ConfigType configType,
final ConnectorJobReportingContext jobContext,
@Nullable final UUID connectorDefinitionId,
final Supplier<TemporalResponse<ConnectorJobOutput>> executor,
final Function<ConnectorJobOutput, T> outputMapper,
final UUID workspaceId) {
final long createdAt = Instant.now().toEpochMilli();
final UUID jobId = jobContext.jobId();
try {
track(jobId, configType, connectorDefinitionId, workspaceId, JobState.STARTED, null);
final TemporalResponse<U> temporalResponse = executor.get();
final Optional<U> jobOutput = temporalResponse.getOutput();
final TemporalResponse<ConnectorJobOutput> temporalResponse = executor.get();
final Optional<ConnectorJobOutput> jobOutput = temporalResponse.getOutput();
final T mappedOutput = jobOutput.map(outputMapper).orElse(null);
final JobState outputState = temporalResponse.getMetadata().isSucceeded() ? JobState.SUCCEEDED : JobState.FAILED;

Expand All @@ -194,6 +194,7 @@ <T, U> SynchronousResponse<T> execute(final ConfigType configType,
final long endedAt = Instant.now().toEpochMilli();
return SynchronousResponse.fromTemporalResponse(
temporalResponse,
jobOutput.orElse(null),
mappedOutput,
jobId,
configType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@ public class SynchronousJobMetadata {
private final long createdAt;
private final long endedAt;
private final boolean succeeded;
private final boolean connectorConfigurationUpdated;

private final Path logPath;

public static SynchronousJobMetadata fromJobMetadata(final JobMetadata jobMetadata,
final UUID id,
final ConfigType configType,
final UUID configId,
final boolean connectorConfigurationUpdated,
final long createdAt,
final long endedAt) {
return new SynchronousJobMetadata(
Expand All @@ -37,6 +39,7 @@ public static SynchronousJobMetadata fromJobMetadata(final JobMetadata jobMetada
createdAt,
endedAt,
jobMetadata.isSucceeded(),
connectorConfigurationUpdated,
jobMetadata.getLogPath());
}

Expand All @@ -46,13 +49,15 @@ public SynchronousJobMetadata(final UUID id,
final long createdAt,
final long endedAt,
final boolean succeeded,
final boolean connectorConfigurationUpdated,
final Path logPath) {
this.id = id;
this.configType = configType;
this.configId = configId;
this.createdAt = createdAt;
this.endedAt = endedAt;
this.succeeded = succeeded;
this.connectorConfigurationUpdated = connectorConfigurationUpdated;
this.logPath = logPath;
}

Expand Down Expand Up @@ -80,6 +85,10 @@ public boolean isSucceeded() {
return succeeded;
}

public boolean isConnectorConfigurationUpdated() {
return connectorConfigurationUpdated;
}

public Path getLogPath() {
return logPath;
}
Expand All @@ -93,13 +102,14 @@ public boolean equals(final Object o) {
return false;
}
final SynchronousJobMetadata that = (SynchronousJobMetadata) o;
return createdAt == that.createdAt && endedAt == that.endedAt && succeeded == that.succeeded && Objects.equals(id, that.id)
return createdAt == that.createdAt && endedAt == that.endedAt && succeeded == that.succeeded
&& connectorConfigurationUpdated == that.connectorConfigurationUpdated && Objects.equals(id, that.id)
&& configType == that.configType && Objects.equals(configId, that.configId) && Objects.equals(logPath, that.logPath);
}

@Override
public int hashCode() {
return Objects.hash(id, configType, configId, createdAt, endedAt, succeeded, logPath);
return Objects.hash(id, configType, configId, createdAt, endedAt, succeeded, connectorConfigurationUpdated, logPath);
}

@Override
Expand All @@ -111,6 +121,7 @@ public String toString() {
", createdAt=" + createdAt +
", endedAt=" + endedAt +
", succeeded=" + succeeded +
", connectorConfigurationUpdated=" + connectorConfigurationUpdated +
", logPath=" + logPath +
'}';
}
Expand All @@ -119,6 +130,7 @@ public static SynchronousJobMetadata mock(final ConfigType configType) {
final long now = Instant.now().toEpochMilli();
final UUID configId = null;
final boolean succeeded = true;
final boolean connectorConfigurationUpdated = false;
final Path logPath = null;

return new SynchronousJobMetadata(
Expand All @@ -128,6 +140,7 @@ public static SynchronousJobMetadata mock(final ConfigType configType) {
now,
now,
succeeded,
connectorConfigurationUpdated,
logPath);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
package io.airbyte.server.scheduler;

import io.airbyte.commons.temporal.TemporalResponse;
import io.airbyte.config.ConnectorJobOutput;
import io.airbyte.config.JobConfig.ConfigType;
import java.util.Objects;
import java.util.UUID;
import javax.annotation.Nullable;

public class SynchronousResponse<T> {

Expand All @@ -23,7 +25,8 @@ public static <T> SynchronousResponse<T> success(final T output, final Synchrono
}

public static <T, U> SynchronousResponse<T> fromTemporalResponse(final TemporalResponse<U> temporalResponse,
final T output,
@Nullable final ConnectorJobOutput jobOutput,
@Nullable final T responseOutput,
final UUID id,
final ConfigType configType,
final UUID configId,
Expand All @@ -35,9 +38,10 @@ public static <T, U> SynchronousResponse<T> fromTemporalResponse(final TemporalR
id,
configType,
configId,
jobOutput != null ? jobOutput.getConnectorConfigurationUpdated() : false,
createdAt,
endedAt);
return new SynchronousResponse<>(output, metadata);
return new SynchronousResponse<>(responseOutput, metadata);
}

public SynchronousResponse(final T output, final SynchronousJobMetadata metadata) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,16 +108,18 @@ class ExecuteSynchronousJob {
@Test
void testExecuteJobSuccess() {
final UUID sourceDefinitionId = UUID.randomUUID();
final Supplier<TemporalResponse<String>> function = mock(Supplier.class);
final Function<String, String> mapperFunction = output -> output;
when(function.get()).thenReturn(new TemporalResponse<>("hello", createMetadata(true)));
final UUID discoveredCatalogId = UUID.randomUUID();
final Supplier<TemporalResponse<ConnectorJobOutput>> function = mock(Supplier.class);
final Function<ConnectorJobOutput, UUID> mapperFunction = ConnectorJobOutput::getDiscoverCatalogId;
final ConnectorJobOutput jobOutput = new ConnectorJobOutput().withDiscoverCatalogId(discoveredCatalogId);
when(function.get()).thenReturn(new TemporalResponse<>(jobOutput, createMetadata(true)));

final ConnectorJobReportingContext jobContext = new ConnectorJobReportingContext(UUID.randomUUID(), SOURCE_DOCKER_IMAGE);
final SynchronousResponse<String> response = schedulerClient
final SynchronousResponse<UUID> response = schedulerClient
.execute(ConfigType.DISCOVER_SCHEMA, jobContext, sourceDefinitionId, function, mapperFunction, WORKSPACE_ID);

assertNotNull(response);
assertEquals("hello", response.getOutput());
assertEquals(discoveredCatalogId, response.getOutput());
assertEquals(ConfigType.DISCOVER_SCHEMA, response.getMetadata().getConfigType());
assertTrue(response.getMetadata().getConfigId().isPresent());
assertEquals(sourceDefinitionId, response.getMetadata().getConfigId().get());
Expand All @@ -129,37 +131,16 @@ void testExecuteJobSuccess() {
verifyNoInteractions(jobErrorReporter);
}

@SuppressWarnings(UNCHECKED)
@Test
void testExecuteMappedOutput() {
final UUID sourceDefinitionId = UUID.randomUUID();
final Supplier<TemporalResponse<Integer>> function = mock(Supplier.class);
final Function<Integer, String> mapperFunction = Object::toString;
when(function.get()).thenReturn(new TemporalResponse<>(42, createMetadata(true)));

final ConnectorJobReportingContext jobContext = new ConnectorJobReportingContext(UUID.randomUUID(), SOURCE_DOCKER_IMAGE);
final SynchronousResponse<String> response = schedulerClient
.execute(ConfigType.DISCOVER_SCHEMA, jobContext, sourceDefinitionId, function, mapperFunction, WORKSPACE_ID);

assertNotNull(response);
assertEquals("42", response.getOutput());
assertEquals(ConfigType.DISCOVER_SCHEMA, response.getMetadata().getConfigType());
assertTrue(response.getMetadata().getConfigId().isPresent());
assertEquals(sourceDefinitionId, response.getMetadata().getConfigId().get());
assertTrue(response.getMetadata().isSucceeded());
assertEquals(LOG_PATH, response.getMetadata().getLogPath());
}

@SuppressWarnings(UNCHECKED)
@Test
void testExecuteJobFailure() {
final UUID sourceDefinitionId = UUID.randomUUID();
final Supplier<TemporalResponse<String>> function = mock(Supplier.class);
final Function<String, String> mapperFunction = output -> output;
final Supplier<TemporalResponse<ConnectorJobOutput>> function = mock(Supplier.class);
final Function<ConnectorJobOutput, UUID> mapperFunction = ConnectorJobOutput::getDiscoverCatalogId;
when(function.get()).thenReturn(new TemporalResponse<>(null, createMetadata(false)));

final ConnectorJobReportingContext jobContext = new ConnectorJobReportingContext(UUID.randomUUID(), SOURCE_DOCKER_IMAGE);
final SynchronousResponse<String> response = schedulerClient
final SynchronousResponse<UUID> response = schedulerClient
.execute(ConfigType.DISCOVER_SCHEMA, jobContext, sourceDefinitionId, function, mapperFunction, WORKSPACE_ID);

assertNotNull(response);
Expand All @@ -179,8 +160,8 @@ void testExecuteJobFailure() {
@Test
void testExecuteRuntimeException() {
final UUID sourceDefinitionId = UUID.randomUUID();
final Supplier<TemporalResponse<String>> function = mock(Supplier.class);
final Function<String, String> mapperFunction = output -> output;
final Supplier<TemporalResponse<ConnectorJobOutput>> function = mock(Supplier.class);
final Function<ConnectorJobOutput, UUID> mapperFunction = ConnectorJobOutput::getDiscoverCatalogId;
when(function.get()).thenThrow(new RuntimeException());

final ConnectorJobReportingContext jobContext = new ConnectorJobReportingContext(UUID.randomUUID(), SOURCE_DOCKER_IMAGE);
Expand Down
Loading

0 comments on commit b24d575

Please sign in to comment.