diff --git a/airbyte-scheduler/client/src/main/java/io/airbyte/scheduler/client/DefaultSynchronousSchedulerClient.java b/airbyte-scheduler/client/src/main/java/io/airbyte/scheduler/client/DefaultSynchronousSchedulerClient.java index 2a8dc2929407..dc2bbe1224fc 100644 --- a/airbyte-scheduler/client/src/main/java/io/airbyte/scheduler/client/DefaultSynchronousSchedulerClient.java +++ b/airbyte-scheduler/client/src/main/java/io/airbyte/scheduler/client/DefaultSynchronousSchedulerClient.java @@ -6,6 +6,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.annotations.VisibleForTesting; +import io.airbyte.commons.lang.Exceptions; import io.airbyte.config.ConnectorJobOutput; import io.airbyte.config.DestinationConnection; import io.airbyte.config.JobCheckConnectionConfig; @@ -16,6 +17,8 @@ import io.airbyte.config.StandardCheckConnectionOutput; import io.airbyte.protocol.models.AirbyteCatalog; import io.airbyte.protocol.models.ConnectorSpecification; +import io.airbyte.scheduler.persistence.job_error_reporter.ConnectorJobReportingContext; +import io.airbyte.scheduler.persistence.job_error_reporter.JobErrorReporter; import io.airbyte.scheduler.persistence.job_factory.OAuthConfigSupplier; import io.airbyte.scheduler.persistence.job_tracker.JobTracker; import io.airbyte.scheduler.persistence.job_tracker.JobTracker.JobState; @@ -26,19 +29,27 @@ import java.util.Optional; import java.util.UUID; import java.util.function.Function; +import java.util.function.Supplier; import javax.annotation.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class DefaultSynchronousSchedulerClient implements SynchronousSchedulerClient { + private static final Logger LOGGER = LoggerFactory.getLogger(DefaultSynchronousSchedulerClient.class); + private final TemporalClient temporalClient; private final JobTracker jobTracker; + private final JobErrorReporter jobErrorReporter; private final OAuthConfigSupplier oAuthConfigSupplier; public DefaultSynchronousSchedulerClient(final TemporalClient temporalClient, final JobTracker jobTracker, + final JobErrorReporter jobErrorReporter, final OAuthConfigSupplier oAuthConfigSupplier) { this.temporalClient = temporalClient; this.jobTracker = jobTracker; + this.jobErrorReporter = jobErrorReporter; this.oAuthConfigSupplier = oAuthConfigSupplier; } @@ -53,10 +64,14 @@ public SynchronousResponse createSourceCheckConne .withConnectionConfiguration(sourceConfiguration) .withDockerImage(dockerImage); + final UUID jobId = UUID.randomUUID(); + final ConnectorJobReportingContext jobReportingContext = new ConnectorJobReportingContext(jobId, dockerImage); + return execute( ConfigType.CHECK_CONNECTION_SOURCE, + jobReportingContext, source.getSourceDefinitionId(), - jobId -> temporalClient.submitCheckConnection(UUID.randomUUID(), 0, jobCheckConnectionConfig), + () -> temporalClient.submitCheckConnection(UUID.randomUUID(), 0, jobCheckConnectionConfig), ConnectorJobOutput::getCheckConnection, source.getWorkspaceId()); } @@ -73,10 +88,14 @@ public SynchronousResponse createDestinationCheck .withConnectionConfiguration(destinationConfiguration) .withDockerImage(dockerImage); + final UUID jobId = UUID.randomUUID(); + final ConnectorJobReportingContext jobReportingContext = new ConnectorJobReportingContext(jobId, dockerImage); + return execute( ConfigType.CHECK_CONNECTION_DESTINATION, + jobReportingContext, destination.getDestinationDefinitionId(), - jobId -> temporalClient.submitCheckConnection(UUID.randomUUID(), 0, jobCheckConnectionConfig), + () -> temporalClient.submitCheckConnection(jobId, 0, jobCheckConnectionConfig), ConnectorJobOutput::getCheckConnection, destination.getWorkspaceId()); } @@ -92,10 +111,14 @@ public SynchronousResponse createDiscoverSchemaJob(final SourceC .withConnectionConfiguration(sourceConfiguration) .withDockerImage(dockerImage); + final UUID jobId = UUID.randomUUID(); + final ConnectorJobReportingContext jobReportingContext = new ConnectorJobReportingContext(jobId, dockerImage); + return execute( ConfigType.DISCOVER_SCHEMA, + jobReportingContext, source.getSourceDefinitionId(), - jobId -> temporalClient.submitDiscoverSchema(UUID.randomUUID(), 0, jobDiscoverCatalogConfig), + () -> temporalClient.submitDiscoverSchema(jobId, 0, jobDiscoverCatalogConfig), ConnectorJobOutput::getDiscoverCatalog, source.getWorkspaceId()); } @@ -104,31 +127,39 @@ public SynchronousResponse createDiscoverSchemaJob(final SourceC public SynchronousResponse createGetSpecJob(final String dockerImage) throws IOException { final JobGetSpecConfig jobSpecConfig = new JobGetSpecConfig().withDockerImage(dockerImage); + final UUID jobId = UUID.randomUUID(); + final ConnectorJobReportingContext jobReportingContext = new ConnectorJobReportingContext(jobId, dockerImage); + return execute( ConfigType.GET_SPEC, + jobReportingContext, null, - jobId -> temporalClient.submitGetSpec(UUID.randomUUID(), 0, jobSpecConfig), + () -> temporalClient.submitGetSpec(jobId, 0, jobSpecConfig), ConnectorJobOutput::getSpec, null); } @VisibleForTesting SynchronousResponse execute(final ConfigType configType, + final ConnectorJobReportingContext jobContext, @Nullable final UUID connectorDefinitionId, - final Function> executor, + final Supplier> executor, final Function outputMapper, final UUID workspaceId) { final long createdAt = Instant.now().toEpochMilli(); - final UUID jobId = UUID.randomUUID(); + final UUID jobId = jobContext.jobId(); try { track(jobId, configType, connectorDefinitionId, workspaceId, JobState.STARTED, null); - final TemporalResponse temporalResponse = executor.apply(jobId); + final TemporalResponse temporalResponse = executor.get(); final Optional jobOutput = temporalResponse.getOutput(); final T mappedOutput = jobOutput.map(outputMapper).orElse(null); final JobState outputState = temporalResponse.getMetadata().isSucceeded() ? JobState.SUCCEEDED : JobState.FAILED; track(jobId, configType, connectorDefinitionId, workspaceId, outputState, mappedOutput); - // TODO(pedro): report ConnectorJobOutput's failureReason to the JobErrorReporter, like the above + + if (outputState == JobState.FAILED && jobOutput.isPresent()) { + reportError(configType, jobContext, jobOutput.get(), connectorDefinitionId, workspaceId); + } final long endedAt = Instant.now().toEpochMilli(); return SynchronousResponse.fromTemporalResponse( @@ -177,4 +208,34 @@ private void track(final UUID jobId, } + private void reportError(final ConfigType configType, + final ConnectorJobReportingContext jobContext, + final T jobOutput, + final UUID connectorDefinitionId, + final UUID workspaceId) { + Exceptions.swallow(() -> { + switch (configType) { + case CHECK_CONNECTION_SOURCE -> jobErrorReporter.reportSourceCheckJobFailure( + connectorDefinitionId, + workspaceId, + ((ConnectorJobOutput) jobOutput).getFailureReason(), + jobContext); + case CHECK_CONNECTION_DESTINATION -> jobErrorReporter.reportDestinationCheckJobFailure( + connectorDefinitionId, + workspaceId, + ((ConnectorJobOutput) jobOutput).getFailureReason(), + jobContext); + case DISCOVER_SCHEMA -> jobErrorReporter.reportDiscoverJobFailure( + connectorDefinitionId, + workspaceId, + ((ConnectorJobOutput) jobOutput).getFailureReason(), + jobContext); + case GET_SPEC -> jobErrorReporter.reportSpecJobFailure( + ((ConnectorJobOutput) jobOutput).getFailureReason(), + jobContext); + default -> LOGGER.error("Tried to report job failure for type {}, but this job type is not supported", configType); + } + }); + } + } diff --git a/airbyte-scheduler/client/src/test/java/io/airbyte/scheduler/client/DefaultSynchronousSchedulerClientTest.java b/airbyte-scheduler/client/src/test/java/io/airbyte/scheduler/client/DefaultSynchronousSchedulerClientTest.java index 55687e5cfcc0..7688e63607fd 100644 --- a/airbyte-scheduler/client/src/test/java/io/airbyte/scheduler/client/DefaultSynchronousSchedulerClientTest.java +++ b/airbyte-scheduler/client/src/test/java/io/airbyte/scheduler/client/DefaultSynchronousSchedulerClientTest.java @@ -14,6 +14,7 @@ import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; import com.fasterxml.jackson.databind.JsonNode; @@ -29,6 +30,8 @@ import io.airbyte.config.StandardCheckConnectionOutput; import io.airbyte.protocol.models.AirbyteCatalog; import io.airbyte.protocol.models.ConnectorSpecification; +import io.airbyte.scheduler.persistence.job_error_reporter.ConnectorJobReportingContext; +import io.airbyte.scheduler.persistence.job_error_reporter.JobErrorReporter; import io.airbyte.scheduler.persistence.job_factory.OAuthConfigSupplier; import io.airbyte.scheduler.persistence.job_tracker.JobTracker; import io.airbyte.scheduler.persistence.job_tracker.JobTracker.JobState; @@ -39,6 +42,7 @@ import java.nio.file.Path; import java.util.UUID; import java.util.function.Function; +import java.util.function.Supplier; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Nested; @@ -69,6 +73,7 @@ class DefaultSynchronousSchedulerClientTest { private TemporalClient temporalClient; private JobTracker jobTracker; + private JobErrorReporter jobErrorReporter; private OAuthConfigSupplier oAuthConfigSupplier; private DefaultSynchronousSchedulerClient schedulerClient; @@ -76,8 +81,9 @@ class DefaultSynchronousSchedulerClientTest { void setup() throws IOException { temporalClient = mock(TemporalClient.class); jobTracker = mock(JobTracker.class); + jobErrorReporter = mock(JobErrorReporter.class); oAuthConfigSupplier = mock(OAuthConfigSupplier.class); - schedulerClient = new DefaultSynchronousSchedulerClient(temporalClient, jobTracker, oAuthConfigSupplier); + schedulerClient = new DefaultSynchronousSchedulerClient(temporalClient, jobTracker, jobErrorReporter, oAuthConfigSupplier); when(oAuthConfigSupplier.injectSourceOAuthParameters(any(), any(), eq(CONFIGURATION))).thenReturn(CONFIGURATION); when(oAuthConfigSupplier.injectDestinationOAuthParameters(any(), any(), eq(CONFIGURATION))).thenReturn(CONFIGURATION); @@ -97,12 +103,13 @@ class ExecuteSynchronousJob { @Test void testExecuteJobSuccess() { final UUID sourceDefinitionId = UUID.randomUUID(); - final Function> function = mock(Function.class); + final Supplier> function = mock(Supplier.class); final Function mapperFunction = output -> output; - when(function.apply(any(UUID.class))).thenReturn(new TemporalResponse<>("hello", createMetadata(true))); + when(function.get()).thenReturn(new TemporalResponse<>("hello", createMetadata(true))); + final ConnectorJobReportingContext jobContext = new ConnectorJobReportingContext(UUID.randomUUID(), "source-airbyte:1.2.3"); final SynchronousResponse response = schedulerClient - .execute(ConfigType.DISCOVER_SCHEMA, sourceDefinitionId, function, mapperFunction, WORKSPACE_ID); + .execute(ConfigType.DISCOVER_SCHEMA, jobContext, sourceDefinitionId, function, mapperFunction, WORKSPACE_ID); assertNotNull(response); assertEquals("hello", response.getOutput()); @@ -114,18 +121,20 @@ void testExecuteJobSuccess() { verify(jobTracker).trackDiscover(any(UUID.class), eq(sourceDefinitionId), eq(WORKSPACE_ID), eq(JobState.STARTED)); verify(jobTracker).trackDiscover(any(UUID.class), eq(sourceDefinitionId), eq(WORKSPACE_ID), eq(JobState.SUCCEEDED)); + verifyNoInteractions(jobErrorReporter); } @SuppressWarnings("unchecked") @Test void testExecuteMappedOutput() { final UUID sourceDefinitionId = UUID.randomUUID(); - final Function> function = mock(Function.class); + final Supplier> function = mock(Supplier.class); final Function mapperFunction = Object::toString; - when(function.apply(any(UUID.class))).thenReturn(new TemporalResponse<>(42, createMetadata(true))); + when(function.get()).thenReturn(new TemporalResponse<>(42, createMetadata(true))); + final ConnectorJobReportingContext jobContext = new ConnectorJobReportingContext(UUID.randomUUID(), "source-airbyte:1.2.3"); final SynchronousResponse response = schedulerClient - .execute(ConfigType.DISCOVER_SCHEMA, sourceDefinitionId, function, mapperFunction, WORKSPACE_ID); + .execute(ConfigType.DISCOVER_SCHEMA, jobContext, sourceDefinitionId, function, mapperFunction, WORKSPACE_ID); assertNotNull(response); assertEquals("42", response.getOutput()); @@ -140,12 +149,13 @@ void testExecuteMappedOutput() { @Test void testExecuteJobFailure() { final UUID sourceDefinitionId = UUID.randomUUID(); - final Function> function = mock(Function.class); + final Supplier> function = mock(Supplier.class); final Function mapperFunction = output -> output; - when(function.apply(any(UUID.class))).thenReturn(new TemporalResponse<>(null, createMetadata(false))); + when(function.get()).thenReturn(new TemporalResponse<>(null, createMetadata(false))); + final ConnectorJobReportingContext jobContext = new ConnectorJobReportingContext(UUID.randomUUID(), "source-airbyte:1.2.3"); final SynchronousResponse response = schedulerClient - .execute(ConfigType.DISCOVER_SCHEMA, sourceDefinitionId, function, mapperFunction, WORKSPACE_ID); + .execute(ConfigType.DISCOVER_SCHEMA, jobContext, sourceDefinitionId, function, mapperFunction, WORKSPACE_ID); assertNotNull(response); assertNull(response.getOutput()); @@ -157,22 +167,26 @@ void testExecuteJobFailure() { verify(jobTracker).trackDiscover(any(UUID.class), eq(sourceDefinitionId), eq(WORKSPACE_ID), eq(JobState.STARTED)); verify(jobTracker).trackDiscover(any(UUID.class), eq(sourceDefinitionId), eq(WORKSPACE_ID), eq(JobState.FAILED)); + verifyNoInteractions(jobErrorReporter); } @SuppressWarnings("unchecked") @Test void testExecuteRuntimeException() { final UUID sourceDefinitionId = UUID.randomUUID(); - final Function> function = mock(Function.class); + final Supplier> function = mock(Supplier.class); final Function mapperFunction = output -> output; - when(function.apply(any(UUID.class))).thenThrow(new RuntimeException()); + when(function.get()).thenThrow(new RuntimeException()); + final ConnectorJobReportingContext jobContext = new ConnectorJobReportingContext(UUID.randomUUID(), "source-airbyte:1.2.3"); assertThrows( RuntimeException.class, - () -> schedulerClient.execute(ConfigType.DISCOVER_SCHEMA, sourceDefinitionId, function, mapperFunction, WORKSPACE_ID)); + () -> schedulerClient.execute(ConfigType.DISCOVER_SCHEMA, jobContext, sourceDefinitionId, function, + mapperFunction, WORKSPACE_ID)); verify(jobTracker).trackDiscover(any(UUID.class), eq(sourceDefinitionId), eq(WORKSPACE_ID), eq(JobState.STARTED)); verify(jobTracker).trackDiscover(any(UUID.class), eq(sourceDefinitionId), eq(WORKSPACE_ID), eq(JobState.FAILED)); + verifyNoInteractions(jobErrorReporter); } } diff --git a/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_error_reporter/ConnectorJobReportingContext.java b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_error_reporter/ConnectorJobReportingContext.java new file mode 100644 index 000000000000..3478ade853a3 --- /dev/null +++ b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_error_reporter/ConnectorJobReportingContext.java @@ -0,0 +1,9 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.scheduler.persistence.job_error_reporter; + +import java.util.UUID; + +public record ConnectorJobReportingContext(UUID jobId, String dockerImage) {} diff --git a/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_error_reporter/JobErrorReporter.java b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_error_reporter/JobErrorReporter.java index 5d2c5ce621be..c67a2bb3eb4a 100644 --- a/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_error_reporter/JobErrorReporter.java +++ b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_error_reporter/JobErrorReporter.java @@ -4,18 +4,24 @@ package io.airbyte.scheduler.persistence.job_error_reporter; +import edu.umd.cs.findbugs.annotations.Nullable; +import io.airbyte.commons.lang.Exceptions; +import io.airbyte.commons.map.MoreMaps; import io.airbyte.config.AttemptFailureSummary; import io.airbyte.config.Configs.DeploymentMode; import io.airbyte.config.FailureReason; import io.airbyte.config.FailureReason.FailureOrigin; -import io.airbyte.config.JobSyncConfig; import io.airbyte.config.StandardDestinationDefinition; import io.airbyte.config.StandardSourceDefinition; import io.airbyte.config.StandardWorkspace; +import io.airbyte.config.persistence.ConfigNotFoundException; import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.scheduler.persistence.WebUrlHelper; +import io.airbyte.validation.json.JsonValidationException; +import java.io.IOException; import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.UUID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,6 +42,8 @@ public class JobErrorReporter { private static final String CONNECTOR_REPOSITORY_META_KEY = "connector_repository"; private static final String CONNECTOR_DEFINITION_ID_META_KEY = "connector_definition_id"; private static final String CONNECTOR_RELEASE_STAGE_META_KEY = "connector_release_stage"; + private static final String CONNECTOR_COMMAND_META_KEY = "connector_command"; + private static final String JOB_ID_KEY = "job_id"; private final ConfigRepository configRepository; private final DeploymentMode deploymentMode; @@ -61,53 +69,169 @@ public JobErrorReporter(final ConfigRepository configRepository, * * @param connectionId - connection that had the failure * @param failureSummary - final attempt failure summary - * @param jobSyncConfig - config for the sync job + * @param jobContext - sync job reporting context */ - public void reportSyncJobFailure(final UUID connectionId, final AttemptFailureSummary failureSummary, final JobSyncConfig jobSyncConfig) { - final List traceMessageFailures = failureSummary.getFailures().stream() - .filter(failure -> failure.getMetadata() != null && failure.getMetadata().getAdditionalProperties().containsKey(FROM_TRACE_MESSAGE)) - .toList(); - - final StandardWorkspace workspace = configRepository.getStandardWorkspaceFromConnection(connectionId, true); - final String connectionUrl = webUrlHelper.getConnectionUrl(workspace.getWorkspaceId(), connectionId); - - for (final FailureReason failureReason : traceMessageFailures) { - final FailureOrigin failureOrigin = failureReason.getFailureOrigin(); - - final HashMap metadata = new HashMap<>(); - metadata.put(WORKSPACE_ID_META_KEY, workspace.getWorkspaceId().toString()); - metadata.put(CONNECTION_ID_META_KEY, connectionId.toString()); - metadata.put(CONNECTION_URL_META_KEY, connectionUrl); - metadata.put(AIRBYTE_VERSION_META_KEY, airbyteVersion); - metadata.put(DEPLOYMENT_MODE_META_KEY, deploymentMode.name()); - metadata.put(FAILURE_ORIGIN_META_KEY, failureOrigin.value()); - metadata.put(FAILURE_TYPE_META_KEY, failureReason.getFailureType().value()); - - try { + public void reportSyncJobFailure(final UUID connectionId, final AttemptFailureSummary failureSummary, final SyncJobReportingContext jobContext) { + Exceptions.swallow(() -> { + final List traceMessageFailures = failureSummary.getFailures().stream() + .filter(failure -> failure.getMetadata() != null && failure.getMetadata().getAdditionalProperties().containsKey(FROM_TRACE_MESSAGE)) + .toList(); + + final StandardWorkspace workspace = configRepository.getStandardWorkspaceFromConnection(connectionId, true); + final Map commonMetadata = MoreMaps.merge( + Map.of(JOB_ID_KEY, String.valueOf(jobContext.jobId())), + getConnectionMetadata(workspace.getWorkspaceId(), connectionId)); + + for (final FailureReason failureReason : traceMessageFailures) { + final FailureOrigin failureOrigin = failureReason.getFailureOrigin(); + if (failureOrigin == FailureOrigin.SOURCE) { final StandardSourceDefinition sourceDefinition = configRepository.getSourceDefinitionFromConnection(connectionId); - final String dockerImage = jobSyncConfig.getSourceDockerImage(); + final String dockerImage = jobContext.sourceDockerImage(); + final Map metadata = MoreMaps.merge(commonMetadata, getSourceMetadata(sourceDefinition)); - metadata.put(CONNECTOR_DEFINITION_ID_META_KEY, sourceDefinition.getSourceDefinitionId().toString()); - metadata.put(CONNECTOR_NAME_META_KEY, sourceDefinition.getName()); - metadata.put(CONNECTOR_REPOSITORY_META_KEY, sourceDefinition.getDockerRepository()); - metadata.put(CONNECTOR_RELEASE_STAGE_META_KEY, sourceDefinition.getReleaseStage().value()); - - jobErrorReportingClient.reportJobFailureReason(workspace, failureReason, dockerImage, metadata); + reportJobFailureReason(workspace, failureReason, dockerImage, metadata); } else if (failureOrigin == FailureOrigin.DESTINATION) { final StandardDestinationDefinition destinationDefinition = configRepository.getDestinationDefinitionFromConnection(connectionId); - final String dockerImage = jobSyncConfig.getDestinationDockerImage(); - - metadata.put(CONNECTOR_DEFINITION_ID_META_KEY, destinationDefinition.getDestinationDefinitionId().toString()); - metadata.put(CONNECTOR_NAME_META_KEY, destinationDefinition.getName()); - metadata.put(CONNECTOR_REPOSITORY_META_KEY, destinationDefinition.getDockerRepository()); - metadata.put(CONNECTOR_RELEASE_STAGE_META_KEY, destinationDefinition.getReleaseStage().value()); + final String dockerImage = jobContext.destinationDockerImage(); + final Map metadata = MoreMaps.merge(commonMetadata, getDestinationMetadata(destinationDefinition)); - jobErrorReportingClient.reportJobFailureReason(workspace, failureReason, dockerImage, metadata); + reportJobFailureReason(workspace, failureReason, dockerImage, metadata); } - } catch (final Exception e) { - LOGGER.error("Error when reporting job failure reason: {}", failureReason, e); } + }); + } + + /** + * Reports a FailureReason from a connector Check job for a Source to the JobErrorReportingClient + * + * @param workspaceId - workspace for which the check failed + * @param failureReason - failure reason from the check connection job + * @param jobContext - connector job reporting context + */ + public void reportSourceCheckJobFailure(final UUID sourceDefinitionId, + final UUID workspaceId, + final FailureReason failureReason, + final ConnectorJobReportingContext jobContext) + throws JsonValidationException, ConfigNotFoundException, IOException { + final StandardWorkspace workspace = configRepository.getStandardWorkspace(workspaceId, true); + final StandardSourceDefinition sourceDefinition = configRepository.getStandardSourceDefinition(sourceDefinitionId); + final Map metadata = MoreMaps.merge( + getSourceMetadata(sourceDefinition), + Map.of( + JOB_ID_KEY, jobContext.jobId().toString(), + CONNECTOR_COMMAND_META_KEY, "check")); + reportJobFailureReason(workspace, failureReason.withFailureOrigin(FailureOrigin.SOURCE), jobContext.dockerImage(), metadata); + } + + /** + * Reports a FailureReason from a connector Check job for a Destination to the + * JobErrorReportingClient + * + * @param workspaceId - workspace for which the check failed + * @param failureReason - failure reason from the check connection job + * @param jobContext - connector job reporting context + */ + public void reportDestinationCheckJobFailure(final UUID destinationDefinitionId, + final UUID workspaceId, + final FailureReason failureReason, + final ConnectorJobReportingContext jobContext) + throws JsonValidationException, ConfigNotFoundException, IOException { + final StandardWorkspace workspace = configRepository.getStandardWorkspace(workspaceId, true); + final StandardDestinationDefinition destinationDefinition = configRepository.getStandardDestinationDefinition(destinationDefinitionId); + final Map metadata = MoreMaps.merge( + getDestinationMetadata(destinationDefinition), + Map.of( + JOB_ID_KEY, jobContext.jobId().toString(), + CONNECTOR_COMMAND_META_KEY, "check")); + reportJobFailureReason(workspace, failureReason.withFailureOrigin(FailureOrigin.DESTINATION), jobContext.dockerImage(), metadata); + } + + /** + * Reports a FailureReason from a connector Deploy job for a Source to the JobErrorReportingClient + * + * @param workspaceId - workspace for which the Discover job failed + * @param failureReason - failure reason from the Discover job + * @param jobContext - connector job reporting context + */ + public void reportDiscoverJobFailure(final UUID sourceDefinitionId, + final UUID workspaceId, + final FailureReason failureReason, + final ConnectorJobReportingContext jobContext) + throws JsonValidationException, ConfigNotFoundException, IOException { + final StandardWorkspace workspace = configRepository.getStandardWorkspace(workspaceId, true); + final StandardSourceDefinition sourceDefinition = configRepository.getStandardSourceDefinition(sourceDefinitionId); + final Map metadata = MoreMaps.merge( + getSourceMetadata(sourceDefinition), + Map.of( + JOB_ID_KEY, jobContext.jobId().toString(), + CONNECTOR_COMMAND_META_KEY, "discover")); + reportJobFailureReason(workspace, failureReason, jobContext.dockerImage(), metadata); + } + + /** + * Reports a FailureReason from a connector Spec job to the JobErrorReportingClient + * + * @param failureReason - failure reason from the Deploy job + * @param jobContext - connector job reporting context + */ + public void reportSpecJobFailure(final FailureReason failureReason, final ConnectorJobReportingContext jobContext) { + final String dockerImage = jobContext.dockerImage(); + final String connectorRepository = dockerImage.split(":")[0]; + final Map metadata = Map.of( + JOB_ID_KEY, jobContext.jobId().toString(), + CONNECTOR_REPOSITORY_META_KEY, connectorRepository, + CONNECTOR_COMMAND_META_KEY, "spec"); + reportJobFailureReason(null, failureReason, dockerImage, metadata); + } + + private Map getConnectionMetadata(final UUID workspaceId, final UUID connectionId) { + final String connectionUrl = webUrlHelper.getConnectionUrl(workspaceId, connectionId); + return Map.ofEntries( + Map.entry(CONNECTION_ID_META_KEY, connectionId.toString()), + Map.entry(CONNECTION_URL_META_KEY, connectionUrl)); + } + + private Map getDestinationMetadata(final StandardDestinationDefinition destinationDefinition) { + return Map.ofEntries( + Map.entry(CONNECTOR_DEFINITION_ID_META_KEY, destinationDefinition.getDestinationDefinitionId().toString()), + Map.entry(CONNECTOR_NAME_META_KEY, destinationDefinition.getName()), + Map.entry(CONNECTOR_REPOSITORY_META_KEY, destinationDefinition.getDockerRepository()), + Map.entry(CONNECTOR_RELEASE_STAGE_META_KEY, destinationDefinition.getReleaseStage().value())); + } + + private Map getSourceMetadata(final StandardSourceDefinition sourceDefinition) { + return Map.ofEntries( + Map.entry(CONNECTOR_DEFINITION_ID_META_KEY, sourceDefinition.getSourceDefinitionId().toString()), + Map.entry(CONNECTOR_NAME_META_KEY, sourceDefinition.getName()), + Map.entry(CONNECTOR_REPOSITORY_META_KEY, sourceDefinition.getDockerRepository()), + Map.entry(CONNECTOR_RELEASE_STAGE_META_KEY, sourceDefinition.getReleaseStage().value())); + } + + private void reportJobFailureReason(@Nullable final StandardWorkspace workspace, + final FailureReason failureReason, + final String dockerImage, + final Map metadata) { + final Map commonMetadata = new HashMap<>(Map.ofEntries( + Map.entry(AIRBYTE_VERSION_META_KEY, airbyteVersion), + Map.entry(DEPLOYMENT_MODE_META_KEY, deploymentMode.name()))); + + if (workspace != null) { + commonMetadata.put(WORKSPACE_ID_META_KEY, workspace.getWorkspaceId().toString()); + } + + if (failureReason.getFailureOrigin() != null) { + commonMetadata.put(FAILURE_ORIGIN_META_KEY, failureReason.getFailureOrigin().value()); + } + + if (failureReason.getFailureType() != null) { + commonMetadata.put(FAILURE_TYPE_META_KEY, failureReason.getFailureType().value()); + } + + try { + jobErrorReportingClient.reportJobFailureReason(workspace, failureReason, dockerImage, MoreMaps.merge(commonMetadata, metadata)); + } catch (final Exception e) { + LOGGER.error("Error when reporting job failure reason: {}", failureReason, e); } } diff --git a/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_error_reporter/JobErrorReportingClient.java b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_error_reporter/JobErrorReportingClient.java index 3d52f558b667..54cd3009a7dd 100644 --- a/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_error_reporter/JobErrorReportingClient.java +++ b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_error_reporter/JobErrorReportingClient.java @@ -4,6 +4,7 @@ package io.airbyte.scheduler.persistence.job_error_reporter; +import edu.umd.cs.findbugs.annotations.Nullable; import io.airbyte.config.FailureReason; import io.airbyte.config.StandardWorkspace; import java.util.Map; @@ -16,6 +17,9 @@ public interface JobErrorReportingClient { /** * Report a job failure reason */ - void reportJobFailureReason(StandardWorkspace workspace, final FailureReason reason, final String dockerImage, Map metadata); + void reportJobFailureReason(@Nullable StandardWorkspace workspace, + final FailureReason reason, + final String dockerImage, + Map metadata); } diff --git a/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_error_reporter/LoggingJobErrorReportingClient.java b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_error_reporter/LoggingJobErrorReportingClient.java index cf1cebf1404b..d8e3a9ee3eeb 100644 --- a/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_error_reporter/LoggingJobErrorReportingClient.java +++ b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_error_reporter/LoggingJobErrorReportingClient.java @@ -4,6 +4,7 @@ package io.airbyte.scheduler.persistence.job_error_reporter; +import edu.umd.cs.findbugs.annotations.Nullable; import io.airbyte.config.FailureReason; import io.airbyte.config.StandardWorkspace; import java.util.Map; @@ -15,12 +16,12 @@ public class LoggingJobErrorReportingClient implements JobErrorReportingClient { private static final Logger LOGGER = LoggerFactory.getLogger(LoggingJobErrorReportingClient.class); @Override - public void reportJobFailureReason(final StandardWorkspace workspace, + public void reportJobFailureReason(@Nullable final StandardWorkspace workspace, final FailureReason reason, final String dockerImage, final Map metadata) { LOGGER.info("Report Job Error -> workspaceId: {}, dockerImage: {}, failureReason: {}, metadata: {}", - workspace.getWorkspaceId(), + workspace != null ? workspace.getWorkspaceId() : "null", dockerImage, reason, metadata); diff --git a/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_error_reporter/SentryJobErrorReportingClient.java b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_error_reporter/SentryJobErrorReportingClient.java index ff509b7ce254..93efe292ee82 100644 --- a/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_error_reporter/SentryJobErrorReportingClient.java +++ b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_error_reporter/SentryJobErrorReportingClient.java @@ -4,6 +4,7 @@ package io.airbyte.scheduler.persistence.job_error_reporter; +import edu.umd.cs.findbugs.annotations.Nullable; import io.airbyte.config.FailureReason; import io.airbyte.config.Metadata; import io.airbyte.config.StandardWorkspace; @@ -56,7 +57,7 @@ static IHub createSentryHubWithDSN(final String sentryDSN) { * @param metadata - Extra metadata to set as tags on the event */ @Override - public void reportJobFailureReason(final StandardWorkspace workspace, + public void reportJobFailureReason(@Nullable final StandardWorkspace workspace, final FailureReason failureReason, final String dockerImage, final Map metadata) { @@ -75,10 +76,12 @@ public void reportJobFailureReason(final StandardWorkspace workspace, } // set workspace as the user in sentry to get impact and priority - final User sentryUser = new User(); - sentryUser.setId(String.valueOf(workspace.getWorkspaceId())); - sentryUser.setUsername(workspace.getName()); - event.setUser(sentryUser); + if (workspace != null) { + final User sentryUser = new User(); + sentryUser.setId(String.valueOf(workspace.getWorkspaceId())); + sentryUser.setUsername(workspace.getName()); + event.setUser(sentryUser); + } // set metadata as tags event.setTags(metadata); diff --git a/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_error_reporter/SyncJobReportingContext.java b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_error_reporter/SyncJobReportingContext.java new file mode 100644 index 000000000000..7d3ed40830ed --- /dev/null +++ b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_error_reporter/SyncJobReportingContext.java @@ -0,0 +1,9 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.scheduler.persistence.job_error_reporter; + +public record SyncJobReportingContext(long jobId, String sourceDockerImage, String destinationDockerImage) { + +} diff --git a/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/job_error_reporter/JobErrorReporterTest.java b/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/job_error_reporter/JobErrorReporterTest.java index 0cc6fc146e8f..eb8bf089b802 100644 --- a/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/job_error_reporter/JobErrorReporterTest.java +++ b/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/job_error_reporter/JobErrorReporterTest.java @@ -11,13 +11,15 @@ import io.airbyte.config.FailureReason; import io.airbyte.config.FailureReason.FailureOrigin; import io.airbyte.config.FailureReason.FailureType; -import io.airbyte.config.JobSyncConfig; import io.airbyte.config.Metadata; import io.airbyte.config.StandardDestinationDefinition; import io.airbyte.config.StandardSourceDefinition; import io.airbyte.config.StandardWorkspace; +import io.airbyte.config.persistence.ConfigNotFoundException; import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.scheduler.persistence.WebUrlHelper; +import io.airbyte.validation.json.JsonValidationException; +import java.io.IOException; import java.util.List; import java.util.Map; import java.util.UUID; @@ -28,6 +30,7 @@ class JobErrorReporterTest { + private static final UUID JOB_ID = UUID.randomUUID(); private static final UUID WORKSPACE_ID = UUID.randomUUID(); private static final UUID CONNECTION_ID = UUID.randomUUID(); private static final String CONNECTION_URL = "http://localhost:8000/connection/my_connection"; @@ -77,9 +80,11 @@ void testReportSyncJobFailure() { Mockito.when(mFailureSummary.getFailures()) .thenReturn(List.of(sourceFailureReason, destinationFailureReason, nonTraceMessageFailureReason, replicationFailureReason)); - final JobSyncConfig mJobSyncConfig = Mockito.mock(JobSyncConfig.class); - Mockito.when(mJobSyncConfig.getSourceDockerImage()).thenReturn(SOURCE_DOCKER_IMAGE); - Mockito.when(mJobSyncConfig.getDestinationDockerImage()).thenReturn(DESTINATION_DOCKER_IMAGE); + final long syncJobId = 1L; + final SyncJobReportingContext jobReportingContext = new SyncJobReportingContext( + syncJobId, + SOURCE_DOCKER_IMAGE, + DESTINATION_DOCKER_IMAGE); Mockito.when(webUrlHelper.getConnectionUrl(WORKSPACE_ID, CONNECTION_ID)).thenReturn(CONNECTION_URL); @@ -98,12 +103,13 @@ void testReportSyncJobFailure() { .withName(DESTINATION_DEFINITION_NAME)); final StandardWorkspace mWorkspace = Mockito.mock(StandardWorkspace.class); - Mockito.when(configRepository.getStandardWorkspaceFromConnection(CONNECTION_ID, true)).thenReturn(mWorkspace); Mockito.when(mWorkspace.getWorkspaceId()).thenReturn(WORKSPACE_ID); + Mockito.when(configRepository.getStandardWorkspaceFromConnection(CONNECTION_ID, true)).thenReturn(mWorkspace); - jobErrorReporter.reportSyncJobFailure(CONNECTION_ID, mFailureSummary, mJobSyncConfig); + jobErrorReporter.reportSyncJobFailure(CONNECTION_ID, mFailureSummary, jobReportingContext); final Map expectedSourceMetadata = Map.ofEntries( + Map.entry("job_id", String.valueOf(syncJobId)), Map.entry("workspace_id", WORKSPACE_ID.toString()), Map.entry("connection_id", CONNECTION_ID.toString()), Map.entry("connection_url", CONNECTION_URL), @@ -117,6 +123,7 @@ void testReportSyncJobFailure() { Map.entry("connector_release_stage", SOURCE_RELEASE_STAGE.toString())); final Map expectedDestinationMetadata = Map.ofEntries( + Map.entry("job_id", String.valueOf(syncJobId)), Map.entry("workspace_id", WORKSPACE_ID.toString()), Map.entry("connection_id", CONNECTION_ID.toString()), Map.entry("connection_url", CONNECTION_URL), @@ -138,7 +145,7 @@ void testReportSyncJobFailure() { @Test void testReportSyncJobFailureDoesNotThrow() { final AttemptFailureSummary mFailureSummary = Mockito.mock(AttemptFailureSummary.class); - final JobSyncConfig mJobSyncConfig = Mockito.mock(JobSyncConfig.class); + final SyncJobReportingContext jobContext = new SyncJobReportingContext(1L, SOURCE_DOCKER_IMAGE, DESTINATION_DOCKER_IMAGE); final FailureReason sourceFailureReason = new FailureReason() .withMetadata(new Metadata().withAdditionalProperty("from_trace_message", true)) @@ -149,21 +156,164 @@ void testReportSyncJobFailureDoesNotThrow() { Mockito.when(configRepository.getSourceDefinitionFromConnection(CONNECTION_ID)) .thenReturn(new StandardSourceDefinition() + .withDockerRepository(SOURCE_DOCKER_REPOSITORY) .withReleaseStage(SOURCE_RELEASE_STAGE) .withSourceDefinitionId(SOURCE_DEFINITION_ID) .withName(SOURCE_DEFINITION_NAME)); final StandardWorkspace mWorkspace = Mockito.mock(StandardWorkspace.class); - Mockito.when(configRepository.getStandardWorkspaceFromConnection(CONNECTION_ID, true)).thenReturn(mWorkspace); Mockito.when(mWorkspace.getWorkspaceId()).thenReturn(WORKSPACE_ID); + Mockito.when(configRepository.getStandardWorkspaceFromConnection(CONNECTION_ID, true)).thenReturn(mWorkspace); + Mockito.when(webUrlHelper.getConnectionUrl(WORKSPACE_ID, CONNECTION_ID)).thenReturn(CONNECTION_URL); Mockito.doThrow(new RuntimeException("some exception")) .when(jobErrorReportingClient) .reportJobFailureReason(Mockito.any(), Mockito.eq(sourceFailureReason), Mockito.any(), Mockito.any()); - Assertions.assertDoesNotThrow(() -> jobErrorReporter.reportSyncJobFailure(CONNECTION_ID, mFailureSummary, mJobSyncConfig)); + Assertions.assertDoesNotThrow(() -> jobErrorReporter.reportSyncJobFailure(CONNECTION_ID, mFailureSummary, jobContext)); Mockito.verify(jobErrorReportingClient, Mockito.times(1)) .reportJobFailureReason(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()); } + @Test + void testReportSourceCheckJobFailure() throws JsonValidationException, ConfigNotFoundException, IOException { + final FailureReason failureReason = new FailureReason() + .withMetadata(new Metadata().withAdditionalProperty("from_trace_message", true)) + .withFailureOrigin(FailureOrigin.SOURCE) + .withFailureType(FailureType.SYSTEM_ERROR); + + final ConnectorJobReportingContext jobContext = new ConnectorJobReportingContext(JOB_ID, SOURCE_DOCKER_IMAGE); + + Mockito.when(configRepository.getStandardSourceDefinition(SOURCE_DEFINITION_ID)) + .thenReturn(new StandardSourceDefinition() + .withDockerRepository(SOURCE_DOCKER_REPOSITORY) + .withReleaseStage(SOURCE_RELEASE_STAGE) + .withSourceDefinitionId(SOURCE_DEFINITION_ID) + .withName(SOURCE_DEFINITION_NAME)); + + final StandardWorkspace mWorkspace = Mockito.mock(StandardWorkspace.class); + Mockito.when(mWorkspace.getWorkspaceId()).thenReturn(WORKSPACE_ID); + Mockito.when(configRepository.getStandardWorkspace(WORKSPACE_ID, true)).thenReturn(mWorkspace); + + jobErrorReporter.reportSourceCheckJobFailure(SOURCE_DEFINITION_ID, WORKSPACE_ID, failureReason, jobContext); + + final Map expectedMetadata = Map.ofEntries( + Map.entry("job_id", JOB_ID.toString()), + Map.entry("workspace_id", WORKSPACE_ID.toString()), + Map.entry("deployment_mode", DEPLOYMENT_MODE.name()), + Map.entry("airbyte_version", AIRBYTE_VERSION), + Map.entry("failure_origin", "source"), + Map.entry("failure_type", "system_error"), + Map.entry("connector_definition_id", SOURCE_DEFINITION_ID.toString()), + Map.entry("connector_repository", SOURCE_DOCKER_REPOSITORY), + Map.entry("connector_name", SOURCE_DEFINITION_NAME), + Map.entry("connector_release_stage", SOURCE_RELEASE_STAGE.toString()), + Map.entry("connector_command", "check")); + + Mockito.verify(jobErrorReportingClient).reportJobFailureReason(mWorkspace, failureReason, SOURCE_DOCKER_IMAGE, expectedMetadata); + Mockito.verifyNoMoreInteractions(jobErrorReportingClient); + } + + @Test + void testReportDestinationCheckJobFailure() throws JsonValidationException, ConfigNotFoundException, IOException { + final FailureReason failureReason = new FailureReason() + .withMetadata(new Metadata().withAdditionalProperty("from_trace_message", true)) + .withFailureOrigin(FailureOrigin.DESTINATION) + .withFailureType(FailureType.SYSTEM_ERROR); + + final ConnectorJobReportingContext jobContext = new ConnectorJobReportingContext(JOB_ID, DESTINATION_DOCKER_IMAGE); + + Mockito.when(configRepository.getStandardDestinationDefinition(DESTINATION_DEFINITION_ID)) + .thenReturn(new StandardDestinationDefinition() + .withDockerRepository(DESTINATION_DOCKER_REPOSITORY) + .withReleaseStage(DESTINATION_RELEASE_STAGE) + .withDestinationDefinitionId(DESTINATION_DEFINITION_ID) + .withName(DESTINATION_DEFINITION_NAME)); + + final StandardWorkspace mWorkspace = Mockito.mock(StandardWorkspace.class); + Mockito.when(mWorkspace.getWorkspaceId()).thenReturn(WORKSPACE_ID); + Mockito.when(configRepository.getStandardWorkspace(WORKSPACE_ID, true)).thenReturn(mWorkspace); + + jobErrorReporter.reportDestinationCheckJobFailure(DESTINATION_DEFINITION_ID, WORKSPACE_ID, failureReason, jobContext); + + final Map expectedMetadata = Map.ofEntries( + Map.entry("job_id", JOB_ID.toString()), + Map.entry("workspace_id", WORKSPACE_ID.toString()), + Map.entry("deployment_mode", DEPLOYMENT_MODE.name()), + Map.entry("airbyte_version", AIRBYTE_VERSION), + Map.entry("failure_origin", "destination"), + Map.entry("failure_type", "system_error"), + Map.entry("connector_definition_id", DESTINATION_DEFINITION_ID.toString()), + Map.entry("connector_repository", DESTINATION_DOCKER_REPOSITORY), + Map.entry("connector_name", DESTINATION_DEFINITION_NAME), + Map.entry("connector_release_stage", DESTINATION_RELEASE_STAGE.toString()), + Map.entry("connector_command", "check")); + + Mockito.verify(jobErrorReportingClient).reportJobFailureReason(mWorkspace, failureReason, DESTINATION_DOCKER_IMAGE, expectedMetadata); + Mockito.verifyNoMoreInteractions(jobErrorReportingClient); + } + + @Test + void testReportDiscoverJobFailure() throws JsonValidationException, ConfigNotFoundException, IOException { + final FailureReason failureReason = new FailureReason() + .withMetadata(new Metadata().withAdditionalProperty("from_trace_message", true)) + .withFailureOrigin(FailureOrigin.SOURCE) + .withFailureType(FailureType.SYSTEM_ERROR); + + final ConnectorJobReportingContext jobContext = new ConnectorJobReportingContext(JOB_ID, SOURCE_DOCKER_IMAGE); + + Mockito.when(configRepository.getStandardSourceDefinition(SOURCE_DEFINITION_ID)) + .thenReturn(new StandardSourceDefinition() + .withDockerRepository(SOURCE_DOCKER_REPOSITORY) + .withReleaseStage(SOURCE_RELEASE_STAGE) + .withSourceDefinitionId(SOURCE_DEFINITION_ID) + .withName(SOURCE_DEFINITION_NAME)); + + final StandardWorkspace mWorkspace = Mockito.mock(StandardWorkspace.class); + Mockito.when(mWorkspace.getWorkspaceId()).thenReturn(WORKSPACE_ID); + Mockito.when(configRepository.getStandardWorkspace(WORKSPACE_ID, true)).thenReturn(mWorkspace); + + jobErrorReporter.reportDiscoverJobFailure(SOURCE_DEFINITION_ID, WORKSPACE_ID, failureReason, jobContext); + + final Map expectedMetadata = Map.ofEntries( + Map.entry("job_id", JOB_ID.toString()), + Map.entry("workspace_id", WORKSPACE_ID.toString()), + Map.entry("deployment_mode", DEPLOYMENT_MODE.name()), + Map.entry("airbyte_version", AIRBYTE_VERSION), + Map.entry("failure_origin", "source"), + Map.entry("failure_type", "system_error"), + Map.entry("connector_definition_id", SOURCE_DEFINITION_ID.toString()), + Map.entry("connector_repository", SOURCE_DOCKER_REPOSITORY), + Map.entry("connector_name", SOURCE_DEFINITION_NAME), + Map.entry("connector_release_stage", SOURCE_RELEASE_STAGE.toString()), + Map.entry("connector_command", "discover")); + + Mockito.verify(jobErrorReportingClient).reportJobFailureReason(mWorkspace, failureReason, SOURCE_DOCKER_IMAGE, expectedMetadata); + Mockito.verifyNoMoreInteractions(jobErrorReportingClient); + } + + @Test + void testReportSpecJobFailure() { + final FailureReason failureReason = new FailureReason() + .withMetadata(new Metadata().withAdditionalProperty("from_trace_message", true)) + .withFailureOrigin(FailureOrigin.SOURCE) + .withFailureType(FailureType.SYSTEM_ERROR); + + final ConnectorJobReportingContext jobContext = new ConnectorJobReportingContext(JOB_ID, SOURCE_DOCKER_IMAGE); + + jobErrorReporter.reportSpecJobFailure(failureReason, jobContext); + + final Map expectedMetadata = Map.ofEntries( + Map.entry("job_id", JOB_ID.toString()), + Map.entry("deployment_mode", DEPLOYMENT_MODE.name()), + Map.entry("airbyte_version", AIRBYTE_VERSION), + Map.entry("failure_origin", "source"), + Map.entry("failure_type", "system_error"), + Map.entry("connector_repository", SOURCE_DOCKER_REPOSITORY), + Map.entry("connector_command", "spec")); + + Mockito.verify(jobErrorReportingClient).reportJobFailureReason(null, failureReason, SOURCE_DOCKER_IMAGE, expectedMetadata); + Mockito.verifyNoMoreInteractions(jobErrorReportingClient); + } + } diff --git a/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/job_error_reporter/SentryJobErrorReportingClientTest.java b/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/job_error_reporter/SentryJobErrorReportingClientTest.java index 69d6f16f1d04..492795fd4ef4 100644 --- a/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/job_error_reporter/SentryJobErrorReportingClientTest.java +++ b/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/job_error_reporter/SentryJobErrorReportingClientTest.java @@ -104,6 +104,27 @@ void testReportJobFailureReason() { assertEquals("RuntimeError: Something went wrong", message.getFormatted()); } + @Test + void testReportJobFailureReasonWithNoWorkspace() { + final ArgumentCaptor eventCaptor = ArgumentCaptor.forClass(SentryEvent.class); + + final FailureReason failureReason = new FailureReason() + .withFailureOrigin(FailureOrigin.SOURCE) + .withFailureType(FailureType.SYSTEM_ERROR) + .withInternalMessage("RuntimeError: Something went wrong"); + + sentryErrorReportingClient.reportJobFailureReason(null, failureReason, DOCKER_IMAGE, Map.of()); + + verify(mockSentryHub).captureEvent(eventCaptor.capture()); + final SentryEvent actualEvent = eventCaptor.getValue(); + final User sentryUser = actualEvent.getUser(); + assertNull(sentryUser); + + final Message message = actualEvent.getMessage(); + assertNotNull(message); + assertEquals("RuntimeError: Something went wrong", message.getFormatted()); + } + @Test void testReportJobFailureReasonWithStacktrace() { final ArgumentCaptor eventCaptor = ArgumentCaptor.forClass(SentryEvent.class); diff --git a/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java b/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java index 9c0c0abfa6fb..059b7777ea11 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java +++ b/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java @@ -41,6 +41,10 @@ import io.airbyte.scheduler.client.TemporalEventRunner; import io.airbyte.scheduler.persistence.DefaultJobPersistence; import io.airbyte.scheduler.persistence.JobPersistence; +import io.airbyte.scheduler.persistence.WebUrlHelper; +import io.airbyte.scheduler.persistence.job_error_reporter.JobErrorReporter; +import io.airbyte.scheduler.persistence.job_error_reporter.JobErrorReportingClient; +import io.airbyte.scheduler.persistence.job_error_reporter.JobErrorReportingClientFactory; import io.airbyte.scheduler.persistence.job_factory.OAuthConfigSupplier; import io.airbyte.scheduler.persistence.job_tracker.JobTracker; import io.airbyte.server.errors.InvalidInputExceptionMapper; @@ -195,6 +199,17 @@ public static ServerRunnable getServer(final ServerFactory apiFactory, final TrackingClient trackingClient = TrackingClientSingleton.get(); final JobTracker jobTracker = new JobTracker(configRepository, jobPersistence, trackingClient); + + final WebUrlHelper webUrlHelper = new WebUrlHelper(configs.getWebappUrl()); + final JobErrorReportingClient jobErrorReportingClient = JobErrorReportingClientFactory.getClient(configs.getJobErrorReportingStrategy(), configs); + final JobErrorReporter jobErrorReporter = + new JobErrorReporter( + configRepository, + configs.getDeploymentMode(), + configs.getAirbyteVersionOrWarning(), + webUrlHelper, + jobErrorReportingClient); + final StreamResetPersistence streamResetPersistence = new StreamResetPersistence(configsDatabase); final WorkflowServiceStubs temporalService = TemporalUtils.createTemporalService(); @@ -206,7 +221,7 @@ public static ServerRunnable getServer(final ServerFactory apiFactory, final OAuthConfigSupplier oAuthConfigSupplier = new OAuthConfigSupplier(configRepository, trackingClient); final DefaultSynchronousSchedulerClient syncSchedulerClient = - new DefaultSynchronousSchedulerClient(temporalClient, jobTracker, oAuthConfigSupplier); + new DefaultSynchronousSchedulerClient(temporalClient, jobTracker, jobErrorReporter, oAuthConfigSupplier); final HttpClient httpClient = HttpClient.newBuilder().version(HttpClient.Version.HTTP_1_1).build(); final EventRunner eventRunner = new TemporalEventRunner(temporalClient); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java index 5ab23cf7fcc0..27e2d00daaa3 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java @@ -12,6 +12,7 @@ import io.airbyte.config.DestinationConnection; import io.airbyte.config.FailureReason; import io.airbyte.config.JobOutput; +import io.airbyte.config.JobSyncConfig; import io.airbyte.config.StandardDestinationDefinition; import io.airbyte.config.StandardSync; import io.airbyte.config.StandardSyncOperation; @@ -32,6 +33,7 @@ import io.airbyte.scheduler.persistence.JobNotifier; import io.airbyte.scheduler.persistence.JobPersistence; import io.airbyte.scheduler.persistence.job_error_reporter.JobErrorReporter; +import io.airbyte.scheduler.persistence.job_error_reporter.SyncJobReportingContext; import io.airbyte.scheduler.persistence.job_factory.SyncJobFactory; import io.airbyte.scheduler.persistence.job_tracker.JobTracker; import io.airbyte.scheduler.persistence.job_tracker.JobTracker.JobState; @@ -197,7 +199,7 @@ public void jobSuccessWithAttemptNumber(final JobSuccessInputWithAttemptNumber i @Override public void jobFailure(final JobFailureInput input) { try { - final var jobId = input.getJobId(); + final long jobId = input.getJobId(); jobPersistence.failJob(jobId); final Job job = jobPersistence.getJob(jobId); @@ -206,8 +208,11 @@ public void jobFailure(final JobFailureInput input) { trackCompletion(job, JobStatus.FAILED); final UUID connectionId = UUID.fromString(job.getScope()); + final JobSyncConfig jobSyncConfig = job.getConfig().getSync(); + final SyncJobReportingContext jobContext = + new SyncJobReportingContext(jobId, jobSyncConfig.getSourceDockerImage(), jobSyncConfig.getDestinationDockerImage()); job.getLastFailedAttempt().flatMap(Attempt::getFailureSummary) - .ifPresent(failureSummary -> jobErrorReporter.reportSyncJobFailure(connectionId, failureSummary, job.getConfig().getSync())); + .ifPresent(failureSummary -> jobErrorReporter.reportSyncJobFailure(connectionId, failureSummary, jobContext)); } catch (final IOException e) { throw new RetryableException(e); } diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java index e5fc9408b7c8..653e74586b25 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java @@ -16,6 +16,7 @@ import io.airbyte.config.JobConfig; import io.airbyte.config.JobConfig.ConfigType; import io.airbyte.config.JobOutput; +import io.airbyte.config.JobSyncConfig; import io.airbyte.config.NormalizationSummary; import io.airbyte.config.StandardDestinationDefinition; import io.airbyte.config.StandardSync; @@ -301,9 +302,16 @@ void setJobFailure() throws IOException { final Attempt mAttempt = Mockito.mock(Attempt.class); Mockito.when(mAttempt.getFailureSummary()).thenReturn(Optional.of(failureSummary)); + final JobSyncConfig mJobSyncConfig = Mockito.mock(JobSyncConfig.class); + Mockito.when(mJobSyncConfig.getSourceDockerImage()).thenReturn(DOCKER_IMAGE_NAME); + Mockito.when(mJobSyncConfig.getDestinationDockerImage()).thenReturn(DOCKER_IMAGE_NAME); + + final JobConfig mJobConfig = Mockito.mock(JobConfig.class); + Mockito.when(mJobConfig.getSync()).thenReturn(mJobSyncConfig); + final Job mJob = Mockito.mock(Job.class); Mockito.when(mJob.getScope()).thenReturn(CONNECTION_ID.toString()); - Mockito.when(mJob.getConfig()).thenReturn(new JobConfig()); + Mockito.when(mJob.getConfig()).thenReturn(mJobConfig); Mockito.when(mJob.getLastFailedAttempt()).thenReturn(Optional.of(mAttempt)); Mockito.when(mJobPersistence.getJob(JOB_ID)) diff --git a/docker-compose.yaml b/docker-compose.yaml index fd0bd479350d..13566de5e479 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -125,6 +125,8 @@ services: - SECRET_PERSISTENCE=${SECRET_PERSISTENCE} - TEMPORAL_HOST=${TEMPORAL_HOST} - TRACKING_STRATEGY=${TRACKING_STRATEGY} + - JOB_ERROR_REPORTING_STRATEGY=${JOB_ERROR_REPORTING_STRATEGY} + - JOB_ERROR_REPORTING_SENTRY_DSN=${JOB_ERROR_REPORTING_SENTRY_DSN} - WEBAPP_URL=${WEBAPP_URL} - WORKER_ENVIRONMENT=${WORKER_ENVIRONMENT} - WORKSPACE_ROOT=${WORKSPACE_ROOT}