Skip to content

Commit

Permalink
Msiega/log unexpected fields (#21922)
Browse files Browse the repository at this point in the history
* add logic to detect and log unexpected fields during a sync

* include the list of unexpected fields with the logging

* tolerate malformed records when we are checking for unexpected fields
  • Loading branch information
mfsiega-airbyte authored Jan 26, 2023
1 parent b7e3894 commit 1475d6e
Showing 1 changed file with 52 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -336,9 +338,12 @@ private static Runnable readFromSrcAndWriteToDstRunnable(final AirbyteSource sou
long recordsRead = 0L;
final Map<AirbyteStreamNameNamespacePair, ImmutablePair<Set<String>, Integer>> validationErrors = new HashMap<>();
final Map<AirbyteStreamNameNamespacePair, List<String>> streamToSelectedFields = new HashMap<>();
final Map<AirbyteStreamNameNamespacePair, Set<String>> streamToAllFields = new HashMap<>();
final Map<AirbyteStreamNameNamespacePair, Set<String>> unexpectedFields = new HashMap<>();
if (fieldSelectionEnabled) {
populatedStreamToSelectedFields(catalog, streamToSelectedFields);
}
populateStreamToAllFields(catalog, streamToAllFields);
try {
while (!cancelled.get() && !source.isFinished()) {
final Optional<AirbyteMessage> messageOptional;
Expand All @@ -353,7 +358,7 @@ private static Runnable readFromSrcAndWriteToDstRunnable(final AirbyteSource sou
if (fieldSelectionEnabled) {
filterSelectedFields(streamToSelectedFields, airbyteMessage);
}
validateSchema(recordSchemaValidator, validationErrors, airbyteMessage);
validateSchema(recordSchemaValidator, streamToAllFields, unexpectedFields, validationErrors, airbyteMessage);
final AirbyteMessage message = mapper.mapMessage(airbyteMessage);

messageTracker.acceptFromSource(message);
Expand Down Expand Up @@ -396,6 +401,12 @@ private static Runnable readFromSrcAndWriteToDstRunnable(final AirbyteSource sou
metricReporter.trackSchemaValidationError(stream);
});
}
unexpectedFields.forEach((stream, unexpectedFieldNames) -> {
if (!unexpectedFieldNames.isEmpty()) {
LOGGER.warn("Source {} has unexpected fields [{}] in stream {}", sourceId, String.join(", ", unexpectedFieldNames), stream);
// TODO(mfsiega-airbyte): publish this as a metric.
}
});

try {
destination.notifyEndOfInput();
Expand Down Expand Up @@ -595,6 +606,8 @@ private List<FailureReason> getFailureReasons(final AtomicReference<FailureReaso
}

private static void validateSchema(final RecordSchemaValidator recordSchemaValidator,
Map<AirbyteStreamNameNamespacePair, Set<String>> streamToAllFields,
Map<AirbyteStreamNameNamespacePair, Set<String>> unexpectedFields,
final Map<AirbyteStreamNameNamespacePair, ImmutablePair<Set<String>, Integer>> validationErrors,
final AirbyteMessage message) {
if (message.getRecord() == null) {
Expand All @@ -609,6 +622,9 @@ private static void validateSchema(final RecordSchemaValidator recordSchemaValid
if (streamHasLessThenTenErrs) {
try {
recordSchemaValidator.validateSchema(record, messageStream);
final Set<String> unexpectedFieldNames = unexpectedFields.getOrDefault(messageStream, new HashSet<>());
populateUnexpectedFieldNames(record, streamToAllFields.get(messageStream), unexpectedFieldNames);
unexpectedFields.put(messageStream, unexpectedFieldNames);
} catch (final RecordSchemaValidationException e) {
final ImmutablePair<Set<String>, Integer> exceptionWithCount = validationErrors.get(messageStream);
if (exceptionWithCount == null) {
Expand All @@ -620,8 +636,22 @@ private static void validateSchema(final RecordSchemaValidator recordSchemaValid
validationErrors.put(messageStream, new ImmutablePair<>(updatedErrorMessages, currentCount + 1));
}
}
}
}

private static void populateUnexpectedFieldNames(AirbyteRecordMessage record, Set<String> fieldsInCatalog, Set<String> unexpectedFieldNames) {
final JsonNode data = record.getData();
if (data.isObject()) {
Iterator<String> fieldNamesInRecord = data.fieldNames();
while (fieldNamesInRecord.hasNext()) {
final String fieldName = fieldNamesInRecord.next();
if (!fieldsInCatalog.contains(fieldName)) {
unexpectedFieldNames.add(fieldName);
}
}
}
// If it's not an object it's malformed, but we tolerate it here - it will be logged as an error by
// the validation.
}

/**
Expand All @@ -646,6 +676,27 @@ private static void populatedStreamToSelectedFields(final ConfiguredAirbyteCatal
}
}

/**
* Populates a map for stream -> all the top-level fields in the catalog. Used to identify any
* unexpected top-level fields in the records.
*
* @param catalog
* @param streamToAllFields
*/
private static void populateStreamToAllFields(final ConfiguredAirbyteCatalog catalog,
final Map<AirbyteStreamNameNamespacePair, Set<String>> streamToAllFields) {
for (final var s : catalog.getStreams()) {
final Set<String> fields = new HashSet<>();
final JsonNode propertiesNode = s.getStream().getJsonSchema().findPath("properties");
if (propertiesNode.isObject()) {
propertiesNode.fieldNames().forEachRemaining((fieldName) -> fields.add(fieldName));
} else {
throw new RuntimeException("No properties node in stream schema");
}
streamToAllFields.put(AirbyteStreamNameNamespacePair.fromConfiguredAirbyteSteam(s), fields);
}
}

private static void filterSelectedFields(final Map<AirbyteStreamNameNamespacePair, List<String>> streamToSelectedFields,
final AirbyteMessage airbyteMessage) {
final AirbyteRecordMessage record = airbyteMessage.getRecord();
Expand Down

0 comments on commit 1475d6e

Please sign in to comment.