Skip to content

Commit

Permalink
feat(validation): Ingest and schema validator
Browse files Browse the repository at this point in the history
  • Loading branch information
david-leifker committed Oct 15, 2024
1 parent a9cb610 commit dfa247f
Show file tree
Hide file tree
Showing 9 changed files with 413 additions and 136 deletions.
2 changes: 2 additions & 0 deletions docs/how/updating-datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ This file documents any backwards-incompatible changes in DataHub and assists pe
- #11484 - Metadata service authentication enabled by default
- #11484 - Rest API authorization enabled by default
- #10472 - `SANDBOX` added as a FabricType. No rollbacks allowed once metadata with this fabric type is added without manual cleanups in databases.
- #11619 - schema field/column paths can no longer be empty strings
- #11619 - schema field/column paths can no longer be duplicated within the schema

### Potential Downtime

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,28 @@

import static com.linkedin.metadata.Constants.*;

import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.aspect.RetrieverContext;
import com.linkedin.metadata.aspect.batch.BatchItem;
import com.linkedin.metadata.aspect.batch.ChangeMCP;
import com.linkedin.metadata.aspect.plugins.config.AspectPluginConfig;
import com.linkedin.metadata.aspect.plugins.validation.AspectPayloadValidator;
import com.linkedin.metadata.aspect.plugins.validation.AspectValidationException;
import com.linkedin.metadata.aspect.plugins.validation.ValidationExceptionCollection;
import com.linkedin.schema.EditableSchemaFieldInfo;
import com.linkedin.schema.EditableSchemaMetadata;
import com.linkedin.schema.SchemaField;
import com.linkedin.schema.SchemaMetadata;
import java.util.Collection;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import lombok.Getter;
import lombok.Setter;
import lombok.experimental.Accessors;

/**
* Validates the Schema Field Path specification, specifically that all field IDs must be unique
* across all fields within a schema.
* 1. Validates the Schema Field Path specification, specifically that all field IDs must be unique
* across all fields within a schema. 2. Validates that the field path id is not empty.
*
* @see <a href="https://datahubproject.io/docs/advanced/field-path-spec-v2/#requirements">Field
* Path V2 docs</a>
Expand All @@ -34,82 +34,83 @@
public class FieldPathValidator extends AspectPayloadValidator {
@Nonnull private AspectPluginConfig config;

/**
* Prevent any MCP for SchemaMetadata where field ids are duplicated (except for MCPs with {@link
* ChangeType#DELETE} and {@link ChangeType#PATCH}, the latter gets handled pre-commit to the DB).
*/
/** Prevent any MCP for SchemaMetadata where field ids are duplicated. */
@Override
protected Stream<AspectValidationException> validateProposedAspects(
@Nonnull Collection<? extends BatchItem> mcpItems,
@Nonnull RetrieverContext retrieverContext) {
return mcpItems.stream()
.filter(
i ->
!ChangeType.DELETE.equals(i.getChangeType())
&& !ChangeType.PATCH.equals(i.getChangeType()))
.filter(
i ->
i.getAspectName().equals(SCHEMA_METADATA_ASPECT_NAME)
|| i.getAspectName().equals(EDITABLE_SCHEMA_METADATA_ASPECT_NAME))
.map(
i -> {
if (i.getAspectName().equals(SCHEMA_METADATA_ASPECT_NAME)) {
return processSchemaMetadataAspect(i);
} else {
return processEditableSchemaMetadataAspect(i);
}
})
.filter(Objects::nonNull);

ValidationExceptionCollection exceptions = ValidationExceptionCollection.newCollection();

mcpItems.forEach(
i -> {
if (i.getAspectName().equals(SCHEMA_METADATA_ASPECT_NAME)) {
processSchemaMetadataAspect(i, exceptions);
} else {
processEditableSchemaMetadataAspect(i, exceptions);
}
});

return exceptions.streamAllExceptions();
}

@Override
protected Stream<AspectValidationException> validatePreCommitAspects(
@Nonnull Collection<ChangeMCP> changeMCPs, @Nonnull RetrieverContext retrieverContext) {
return changeMCPs.stream()
.filter(i -> ChangeType.PATCH.equals(i.getChangeType()))
.filter(
i ->
i.getAspectName().equals(SCHEMA_METADATA_ASPECT_NAME)
|| i.getAspectName().equals(EDITABLE_SCHEMA_METADATA_ASPECT_NAME))
.map(
i -> {
if (i.getAspectName().equals(SCHEMA_METADATA_ASPECT_NAME)) {
return processSchemaMetadataAspect(i);
} else {
return processEditableSchemaMetadataAspect(i);
}
})
.filter(Objects::nonNull);
return Stream.of();
}

private static AspectValidationException processEditableSchemaMetadataAspect(BatchItem i) {
private static void processEditableSchemaMetadataAspect(
BatchItem i, ValidationExceptionCollection exceptions) {
final EditableSchemaMetadata schemaMetadata = i.getAspect(EditableSchemaMetadata.class);
final long uniquePaths =
schemaMetadata.getEditableSchemaFieldInfo().stream()
.map(EditableSchemaFieldInfo::getFieldPath)
.distinct()
.count();
validateAndCount(
i,
schemaMetadata.getEditableSchemaFieldInfo().stream()
.map(EditableSchemaFieldInfo::getFieldPath),
exceptions);

if (uniquePaths != schemaMetadata.getEditableSchemaFieldInfo().size()) {
return AspectValidationException.forItem(
exceptions.addException(
i,
String.format(
"Cannot perform %s action on proposal. EditableSchemaMetadata aspect has duplicated field paths",
i.getChangeType()));
}
return null;
}

private static AspectValidationException processSchemaMetadataAspect(BatchItem i) {
private static void processSchemaMetadataAspect(
BatchItem i, ValidationExceptionCollection exceptions) {
final SchemaMetadata schemaMetadata = i.getAspect(SchemaMetadata.class);
final long uniquePaths =
schemaMetadata.getFields().stream().map(SchemaField::getFieldPath).distinct().count();
validateAndCount(
i, schemaMetadata.getFields().stream().map(SchemaField::getFieldPath), exceptions);

if (uniquePaths != schemaMetadata.getFields().size()) {
return AspectValidationException.forItem(
exceptions.addException(
i,
String.format(
"Cannot perform %s action on proposal. SchemaMetadata aspect has duplicated field paths",
i.getChangeType()));
}
return null;
}

private static long validateAndCount(
BatchItem i, Stream<String> fieldPaths, ValidationExceptionCollection exceptions) {
return fieldPaths
.distinct()
// inspect the stream of fieldPath validation errors since we're already iterating
.peek(
fieldPath ->
validateFieldPath(fieldPath)
.ifPresent(message -> exceptions.addException(i, message)))
.count();
}

private static Optional<String> validateFieldPath(String fieldPath) {
if (fieldPath == null || fieldPath.isEmpty()) {
return Optional.of("SchemaMetadata aspect has empty field path.");
}
return Optional.empty();
}
}
Loading

0 comments on commit dfa247f

Please sign in to comment.