Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JobErrorReporter: Report connector_command tag for errors encountered during sync #15925

Merged
merged 8 commits into from
Aug 26, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class JobErrorReporter {
private static final String FAILURE_ORIGIN_META_KEY = "failure_origin";
private static final String FAILURE_TYPE_META_KEY = "failure_type";
private static final String WORKSPACE_ID_META_KEY = "workspace_id";
private static final String WORKSPACE_URL_META_KEY = "workspace_url";
private static final String CONNECTION_ID_META_KEY = "connection_id";
private static final String CONNECTION_URL_META_KEY = "connection_url";
private static final String CONNECTOR_NAME_META_KEY = "connector_name";
Expand Down Expand Up @@ -140,9 +141,7 @@ public void reportSourceCheckJobFailure(final UUID sourceDefinitionId,
final StandardSourceDefinition sourceDefinition = configRepository.getStandardSourceDefinition(sourceDefinitionId);
final Map<String, String> metadata = MoreMaps.merge(
getSourceMetadata(sourceDefinition),
Map.of(
JOB_ID_KEY, jobContext.jobId().toString(),
CONNECTOR_COMMAND_META_KEY, "check"));
Comment on lines -143 to -145
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Connector command will now be already set on the FailureReason as metadata, so don't need to set it here when reporting the error

Map.of(JOB_ID_KEY, jobContext.jobId().toString()));
reportJobFailureReason(workspace, failureReason.withFailureOrigin(FailureOrigin.SOURCE), jobContext.dockerImage(), metadata);
}

Expand All @@ -163,9 +162,7 @@ public void reportDestinationCheckJobFailure(final UUID destinationDefinitionId,
final StandardDestinationDefinition destinationDefinition = configRepository.getStandardDestinationDefinition(destinationDefinitionId);
final Map<String, String> metadata = MoreMaps.merge(
getDestinationMetadata(destinationDefinition),
Map.of(
JOB_ID_KEY, jobContext.jobId().toString(),
CONNECTOR_COMMAND_META_KEY, "check"));
Map.of(JOB_ID_KEY, jobContext.jobId().toString()));
reportJobFailureReason(workspace, failureReason.withFailureOrigin(FailureOrigin.DESTINATION), jobContext.dockerImage(), metadata);
}

Expand All @@ -185,9 +182,7 @@ public void reportDiscoverJobFailure(final UUID sourceDefinitionId,
final StandardSourceDefinition sourceDefinition = configRepository.getStandardSourceDefinition(sourceDefinitionId);
final Map<String, String> metadata = MoreMaps.merge(
getSourceMetadata(sourceDefinition),
Map.of(
JOB_ID_KEY, jobContext.jobId().toString(),
CONNECTOR_COMMAND_META_KEY, "discover"));
Map.of(JOB_ID_KEY, jobContext.jobId().toString()));
reportJobFailureReason(workspace, failureReason, jobContext.dockerImage(), metadata);
}

Expand All @@ -202,8 +197,7 @@ public void reportSpecJobFailure(final FailureReason failureReason, final Connec
final String connectorRepository = dockerImage.split(":")[0];
final Map<String, String> metadata = Map.of(
JOB_ID_KEY, jobContext.jobId().toString(),
CONNECTOR_REPOSITORY_META_KEY, connectorRepository,
CONNECTOR_COMMAND_META_KEY, "spec");
CONNECTOR_REPOSITORY_META_KEY, connectorRepository);
reportJobFailureReason(null, failureReason, dockerImage, metadata);
}

Expand Down Expand Up @@ -236,13 +230,40 @@ private Map<String, String> getNormalizationMetadata() {
}

private Map<String, String> prefixConnectorMetadataKeys(final Map<String, String> connectorMetadata, final String prefix) {
Map<String, String> prefixedMetadata = new HashMap<>();
final Map<String, String> prefixedMetadata = new HashMap<>();
for (final Map.Entry<String, String> entry : connectorMetadata.entrySet()) {
prefixedMetadata.put(String.format("%s_%s", prefix, entry.getKey()), entry.getValue());
}
return prefixedMetadata;
}

private Map<String, String> getFailureReasonMetadata(final FailureReason failureReason) {
final Map<String, Object> failureReasonAdditionalProps = failureReason.getMetadata().getAdditionalProperties();
final Map<String, String> outMetadata = new HashMap<>();

if (failureReasonAdditionalProps.containsKey(CONNECTOR_COMMAND_META_KEY)
&& failureReasonAdditionalProps.get(CONNECTOR_COMMAND_META_KEY) != null) {
outMetadata.put(CONNECTOR_COMMAND_META_KEY, failureReasonAdditionalProps.get(CONNECTOR_COMMAND_META_KEY).toString());
}

if (failureReason.getFailureOrigin() != null) {
outMetadata.put(FAILURE_ORIGIN_META_KEY, failureReason.getFailureOrigin().value());
}

if (failureReason.getFailureType() != null) {
outMetadata.put(FAILURE_TYPE_META_KEY, failureReason.getFailureType().value());
}

return outMetadata;
}

private Map<String, String> getWorkspaceMetadata(final UUID workspaceId) {
final String workspaceUrl = webUrlHelper.getWorkspaceUrl(workspaceId);
return Map.ofEntries(
Map.entry(WORKSPACE_ID_META_KEY, workspaceId.toString()),
Map.entry(WORKSPACE_URL_META_KEY, workspaceUrl));
Comment on lines +263 to +264
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This wasn't in scope on the original issue, but added this here quickly since we're not getting any urls for synchronous job errors (due to them not being encountered in the context of a connection)

}

private void reportJobFailureReason(@Nullable final StandardWorkspace workspace,
final FailureReason failureReason,
final String dockerImage,
Expand All @@ -252,19 +273,16 @@ private void reportJobFailureReason(@Nullable final StandardWorkspace workspace,
Map.entry(DEPLOYMENT_MODE_META_KEY, deploymentMode.name())));

if (workspace != null) {
commonMetadata.put(WORKSPACE_ID_META_KEY, workspace.getWorkspaceId().toString());
commonMetadata.putAll(getWorkspaceMetadata(workspace.getWorkspaceId()));
}

if (failureReason.getFailureOrigin() != null) {
commonMetadata.put(FAILURE_ORIGIN_META_KEY, failureReason.getFailureOrigin().value());
}

if (failureReason.getFailureType() != null) {
commonMetadata.put(FAILURE_TYPE_META_KEY, failureReason.getFailureType().value());
}
final Map<String, String> allMetadata = MoreMaps.merge(
commonMetadata,
getFailureReasonMetadata(failureReason),
metadata);

try {
jobErrorReportingClient.reportJobFailureReason(workspace, failureReason, dockerImage, MoreMaps.merge(commonMetadata, metadata));
jobErrorReportingClient.reportJobFailureReason(workspace, failureReason, dockerImage, allMetadata);
} catch (final Exception e) {
LOGGER.error("Error when reporting job failure reason: {}", failureReason, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class JobErrorReporterTest {
private static final UUID WORKSPACE_ID = UUID.randomUUID();
private static final UUID CONNECTION_ID = UUID.randomUUID();
private static final String CONNECTION_URL = "http://localhost:8000/connection/my_connection";
private static final String WORKSPACE_URL = "http://localhost:8000/workspace/my_workspace";
private static final DeploymentMode DEPLOYMENT_MODE = DeploymentMode.OSS;
private static final String AIRBYTE_VERSION = "0.1.40";
private static final String NORMALIZATION_IMAGE = "airbyte/normalization";
Expand All @@ -51,6 +52,9 @@ class JobErrorReporterTest {
private static final String FROM_TRACE_MESSAGE = "from_trace_message";
private static final String JOB_ID_KEY = "job_id";
private static final String WORKSPACE_ID_KEY = "workspace_id";
private static final String WORKSPACE_URL_KEY = "workspace_url";
private static final String CONNECTION_ID_KEY = "connection_id";
private static final String CONNECTION_URL_KEY = "connection_url";
private static final String DEPLOYMENT_MODE_KEY = "deployment_mode";
private static final String AIRBYTE_VERSION_KEY = "airbyte_version";
private static final String FAILURE_ORIGIN_KEY = "failure_origin";
Expand All @@ -77,19 +81,26 @@ void setup() {
webUrlHelper = mock(WebUrlHelper.class);
jobErrorReporter = new JobErrorReporter(
configRepository, DEPLOYMENT_MODE, AIRBYTE_VERSION, NORMALIZATION_IMAGE, NORMALIZATION_VERSION, webUrlHelper, jobErrorReportingClient);

Mockito.when(webUrlHelper.getConnectionUrl(WORKSPACE_ID, CONNECTION_ID)).thenReturn(CONNECTION_URL);
Mockito.when(webUrlHelper.getWorkspaceUrl(WORKSPACE_ID)).thenReturn(WORKSPACE_URL);
}

@Test
void testReportSyncJobFailure() {
final AttemptFailureSummary mFailureSummary = Mockito.mock(AttemptFailureSummary.class);

final FailureReason sourceFailureReason = new FailureReason()
.withMetadata(new Metadata().withAdditionalProperty(FROM_TRACE_MESSAGE, true))
.withMetadata(new Metadata()
.withAdditionalProperty(FROM_TRACE_MESSAGE, true)
.withAdditionalProperty(CONNECTOR_COMMAND_KEY, "read"))
.withFailureOrigin(FailureOrigin.SOURCE)
.withFailureType(FailureType.SYSTEM_ERROR);

final FailureReason destinationFailureReason = new FailureReason()
.withMetadata(new Metadata().withAdditionalProperty(FROM_TRACE_MESSAGE, true))
.withMetadata(new Metadata()
.withAdditionalProperty(FROM_TRACE_MESSAGE, true)
.withAdditionalProperty(CONNECTOR_COMMAND_KEY, "write"))
.withFailureOrigin(FailureOrigin.DESTINATION)
.withFailureType(FailureType.SYSTEM_ERROR);

Expand All @@ -110,8 +121,6 @@ void testReportSyncJobFailure() {
SOURCE_DOCKER_IMAGE,
DESTINATION_DOCKER_IMAGE);

Mockito.when(webUrlHelper.getConnectionUrl(WORKSPACE_ID, CONNECTION_ID)).thenReturn(CONNECTION_URL);

Mockito.when(configRepository.getSourceDefinitionFromConnection(CONNECTION_ID))
.thenReturn(new StandardSourceDefinition()
.withDockerRepository(SOURCE_DOCKER_REPOSITORY)
Expand All @@ -135,12 +144,14 @@ void testReportSyncJobFailure() {
final Map<String, String> expectedSourceMetadata = Map.ofEntries(
Map.entry(JOB_ID_KEY, String.valueOf(syncJobId)),
Map.entry(WORKSPACE_ID_KEY, WORKSPACE_ID.toString()),
Map.entry("connection_id", CONNECTION_ID.toString()),
Map.entry("connection_url", CONNECTION_URL),
Map.entry(WORKSPACE_URL_KEY, WORKSPACE_URL),
Map.entry(CONNECTION_ID_KEY, CONNECTION_ID.toString()),
Map.entry(CONNECTION_URL_KEY, CONNECTION_URL),
Map.entry(DEPLOYMENT_MODE_KEY, DEPLOYMENT_MODE.name()),
Map.entry(AIRBYTE_VERSION_KEY, AIRBYTE_VERSION),
Map.entry(FAILURE_ORIGIN_KEY, SOURCE),
Map.entry(FAILURE_TYPE_KEY, SYSTEM_ERROR),
Map.entry(CONNECTOR_COMMAND_KEY, "read"),
Map.entry(CONNECTOR_DEFINITION_ID_KEY, SOURCE_DEFINITION_ID.toString()),
Map.entry(CONNECTOR_REPOSITORY_KEY, SOURCE_DOCKER_REPOSITORY),
Map.entry(CONNECTOR_NAME_KEY, SOURCE_DEFINITION_NAME),
Expand All @@ -149,12 +160,14 @@ void testReportSyncJobFailure() {
final Map<String, String> expectedDestinationMetadata = Map.ofEntries(
Map.entry(JOB_ID_KEY, String.valueOf(syncJobId)),
Map.entry(WORKSPACE_ID_KEY, WORKSPACE_ID.toString()),
Map.entry("connection_id", CONNECTION_ID.toString()),
Map.entry("connection_url", CONNECTION_URL),
Map.entry(WORKSPACE_URL_KEY, WORKSPACE_URL),
Map.entry(CONNECTION_ID_KEY, CONNECTION_ID.toString()),
Map.entry(CONNECTION_URL_KEY, CONNECTION_URL),
Map.entry(DEPLOYMENT_MODE_KEY, DEPLOYMENT_MODE.name()),
Map.entry(AIRBYTE_VERSION_KEY, AIRBYTE_VERSION),
Map.entry(FAILURE_ORIGIN_KEY, "destination"),
Map.entry(FAILURE_TYPE_KEY, SYSTEM_ERROR),
Map.entry(CONNECTOR_COMMAND_KEY, "write"),
Map.entry(CONNECTOR_DEFINITION_ID_KEY, DESTINATION_DEFINITION_ID.toString()),
Map.entry(CONNECTOR_REPOSITORY_KEY, DESTINATION_DOCKER_REPOSITORY),
Map.entry(CONNECTOR_NAME_KEY, DESTINATION_DEFINITION_NAME),
Expand All @@ -163,8 +176,9 @@ void testReportSyncJobFailure() {
final Map<String, String> expectedNormalizationMetadata = Map.ofEntries(
Map.entry(JOB_ID_KEY, String.valueOf(syncJobId)),
Map.entry(WORKSPACE_ID_KEY, WORKSPACE_ID.toString()),
Map.entry("connection_id", CONNECTION_ID.toString()),
Map.entry("connection_url", CONNECTION_URL),
Map.entry(WORKSPACE_URL_KEY, WORKSPACE_URL),
Map.entry(CONNECTION_ID_KEY, CONNECTION_ID.toString()),
Map.entry(CONNECTION_URL_KEY, CONNECTION_URL),
Map.entry(DEPLOYMENT_MODE_KEY, DEPLOYMENT_MODE.name()),
Map.entry(AIRBYTE_VERSION_KEY, AIRBYTE_VERSION),
Map.entry(FAILURE_ORIGIN_KEY, "normalization"),
Expand Down Expand Up @@ -222,8 +236,11 @@ void testReportSyncJobFailureDoesNotThrow() {

@Test
void testReportSourceCheckJobFailure() throws JsonValidationException, ConfigNotFoundException, IOException {
final String connectorCommand = "check";
final FailureReason failureReason = new FailureReason()
.withMetadata(new Metadata().withAdditionalProperty(FROM_TRACE_MESSAGE, true))
.withMetadata(new Metadata()
.withAdditionalProperty(FROM_TRACE_MESSAGE, true)
.withAdditionalProperty(CONNECTOR_COMMAND_KEY, connectorCommand))
.withFailureOrigin(FailureOrigin.SOURCE)
.withFailureType(FailureType.SYSTEM_ERROR);

Expand All @@ -245,6 +262,7 @@ void testReportSourceCheckJobFailure() throws JsonValidationException, ConfigNot
final Map<String, String> expectedMetadata = Map.ofEntries(
Map.entry(JOB_ID_KEY, JOB_ID.toString()),
Map.entry(WORKSPACE_ID_KEY, WORKSPACE_ID.toString()),
Map.entry(WORKSPACE_URL_KEY, WORKSPACE_URL),
Map.entry(DEPLOYMENT_MODE_KEY, DEPLOYMENT_MODE.name()),
Map.entry(AIRBYTE_VERSION_KEY, AIRBYTE_VERSION),
Map.entry(FAILURE_ORIGIN_KEY, SOURCE),
Expand All @@ -253,16 +271,19 @@ void testReportSourceCheckJobFailure() throws JsonValidationException, ConfigNot
Map.entry(CONNECTOR_REPOSITORY_KEY, SOURCE_DOCKER_REPOSITORY),
Map.entry(CONNECTOR_NAME_KEY, SOURCE_DEFINITION_NAME),
Map.entry(CONNECTOR_RELEASE_STAGE_KEY, SOURCE_RELEASE_STAGE.toString()),
Map.entry(CONNECTOR_COMMAND_KEY, "check"));
Map.entry(CONNECTOR_COMMAND_KEY, connectorCommand));

Mockito.verify(jobErrorReportingClient).reportJobFailureReason(mWorkspace, failureReason, SOURCE_DOCKER_IMAGE, expectedMetadata);
Mockito.verifyNoMoreInteractions(jobErrorReportingClient);
}

@Test
void testReportDestinationCheckJobFailure() throws JsonValidationException, ConfigNotFoundException, IOException {
final String connectorCommand = "check";
final FailureReason failureReason = new FailureReason()
.withMetadata(new Metadata().withAdditionalProperty(FROM_TRACE_MESSAGE, true))
.withMetadata(new Metadata()
.withAdditionalProperty(FROM_TRACE_MESSAGE, true)
.withAdditionalProperty(CONNECTOR_COMMAND_KEY, connectorCommand))
.withFailureOrigin(FailureOrigin.DESTINATION)
.withFailureType(FailureType.SYSTEM_ERROR);

Expand All @@ -284,6 +305,7 @@ void testReportDestinationCheckJobFailure() throws JsonValidationException, Conf
final Map<String, String> expectedMetadata = Map.ofEntries(
Map.entry(JOB_ID_KEY, JOB_ID.toString()),
Map.entry(WORKSPACE_ID_KEY, WORKSPACE_ID.toString()),
Map.entry(WORKSPACE_URL_KEY, WORKSPACE_URL),
Map.entry(DEPLOYMENT_MODE_KEY, DEPLOYMENT_MODE.name()),
Map.entry(AIRBYTE_VERSION_KEY, AIRBYTE_VERSION),
Map.entry(FAILURE_ORIGIN_KEY, "destination"),
Expand All @@ -292,7 +314,7 @@ void testReportDestinationCheckJobFailure() throws JsonValidationException, Conf
Map.entry(CONNECTOR_REPOSITORY_KEY, DESTINATION_DOCKER_REPOSITORY),
Map.entry(CONNECTOR_NAME_KEY, DESTINATION_DEFINITION_NAME),
Map.entry(CONNECTOR_RELEASE_STAGE_KEY, DESTINATION_RELEASE_STAGE.toString()),
Map.entry(CONNECTOR_COMMAND_KEY, "check"));
Map.entry(CONNECTOR_COMMAND_KEY, connectorCommand));

Mockito.verify(jobErrorReportingClient).reportJobFailureReason(mWorkspace, failureReason, DESTINATION_DOCKER_IMAGE, expectedMetadata);
Mockito.verifyNoMoreInteractions(jobErrorReportingClient);
Expand All @@ -301,7 +323,9 @@ void testReportDestinationCheckJobFailure() throws JsonValidationException, Conf
@Test
void testReportDiscoverJobFailure() throws JsonValidationException, ConfigNotFoundException, IOException {
final FailureReason failureReason = new FailureReason()
.withMetadata(new Metadata().withAdditionalProperty(FROM_TRACE_MESSAGE, true))
.withMetadata(new Metadata()
.withAdditionalProperty(FROM_TRACE_MESSAGE, true)
.withAdditionalProperty(CONNECTOR_COMMAND_KEY, "discover"))
.withFailureOrigin(FailureOrigin.SOURCE)
.withFailureType(FailureType.SYSTEM_ERROR);

Expand All @@ -323,6 +347,7 @@ void testReportDiscoverJobFailure() throws JsonValidationException, ConfigNotFou
final Map<String, String> expectedMetadata = Map.ofEntries(
Map.entry(JOB_ID_KEY, JOB_ID.toString()),
Map.entry(WORKSPACE_ID_KEY, WORKSPACE_ID.toString()),
Map.entry(WORKSPACE_URL_KEY, WORKSPACE_URL),
Map.entry(DEPLOYMENT_MODE_KEY, DEPLOYMENT_MODE.name()),
Map.entry(AIRBYTE_VERSION_KEY, AIRBYTE_VERSION),
Map.entry(FAILURE_ORIGIN_KEY, SOURCE),
Expand All @@ -340,7 +365,9 @@ void testReportDiscoverJobFailure() throws JsonValidationException, ConfigNotFou
@Test
void testReportSpecJobFailure() {
final FailureReason failureReason = new FailureReason()
.withMetadata(new Metadata().withAdditionalProperty(FROM_TRACE_MESSAGE, true))
.withMetadata(new Metadata()
.withAdditionalProperty(FROM_TRACE_MESSAGE, true)
.withAdditionalProperty(CONNECTOR_COMMAND_KEY, "spec"))
.withFailureOrigin(FailureOrigin.SOURCE)
.withFailureType(FailureType.SYSTEM_ERROR);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.airbyte.scheduler.models.JobRunConfig;
import io.airbyte.workers.exception.WorkerException;
import io.airbyte.workers.helper.FailureHelper;
import io.airbyte.workers.helper.FailureHelper.ConnectorCommand;
import java.nio.file.Path;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
Expand Down Expand Up @@ -122,7 +123,13 @@ public static ConnectorJobOutput getJobFailureOutputOrThrow(final OutputType out
.findFirst();

if (traceMessage.isPresent()) {
final FailureReason failureReason = FailureHelper.genericFailure(traceMessage.get(), null, null);
final ConnectorCommand connectorCommand = switch (outputType) {
case SPEC -> ConnectorCommand.SPEC;
case CHECK_CONNECTION -> ConnectorCommand.CHECK;
case DISCOVER_CATALOG -> ConnectorCommand.DISCOVER;
};

final FailureReason failureReason = FailureHelper.connectorCommandFailure(traceMessage.get(), null, null, connectorCommand);
return new ConnectorJobOutput().withOutputType(outputType).withFailureReason(failureReason);
}

Expand Down
Loading