From dfa247f202cf18dea1c5779cb0ab0a982c0fafa4 Mon Sep 17 00:00:00 2001 From: David Leifker Date: Mon, 14 Oct 2024 17:38:45 -0500 Subject: [PATCH] feat(validation): Ingest and schema validator --- docs/how/updating-datahub.md | 2 + .../aspect/validation/FieldPathValidator.java | 105 +++++------ .../validators/FieldPathValidatorTest.java | 133 +++++++------- .../java/com/linkedin/metadata/Constants.java | 7 + .../ExecutionRequestResultValidator.java | 70 ++++++++ .../ExecutionRequestResultValidatorTest.java | 166 ++++++++++++++++++ .../src/main/resources/entity-registry.yml | 19 +- .../SpringStandardPluginConfiguration.java | 46 +++++ smoke-test/tests/utils.py | 1 + 9 files changed, 413 insertions(+), 136 deletions(-) create mode 100644 metadata-io/src/main/java/com/linkedin/metadata/aspect/validation/ExecutionRequestResultValidator.java create mode 100644 metadata-io/src/test/java/com/linkedin/metadata/aspect/validation/ExecutionRequestResultValidatorTest.java diff --git a/docs/how/updating-datahub.md b/docs/how/updating-datahub.md index 00e020bd2a387..dbcc7da846703 100644 --- a/docs/how/updating-datahub.md +++ b/docs/how/updating-datahub.md @@ -24,6 +24,8 @@ This file documents any backwards-incompatible changes in DataHub and assists pe - #11484 - Metadata service authentication enabled by default - #11484 - Rest API authorization enabled by default - #10472 - `SANDBOX` added as a FabricType. No rollbacks allowed once metadata with this fabric type is added without manual cleanups in databases. +- #11619 - schema field/column paths can no longer be empty strings +- #11619 - schema field/column paths can no longer be duplicated within the schema ### Potential Downtime diff --git a/entity-registry/src/main/java/com/linkedin/metadata/aspect/validation/FieldPathValidator.java b/entity-registry/src/main/java/com/linkedin/metadata/aspect/validation/FieldPathValidator.java index 47603504dd8a0..7c279254e1bc3 100644 --- a/entity-registry/src/main/java/com/linkedin/metadata/aspect/validation/FieldPathValidator.java +++ b/entity-registry/src/main/java/com/linkedin/metadata/aspect/validation/FieldPathValidator.java @@ -2,19 +2,19 @@ import static com.linkedin.metadata.Constants.*; -import com.linkedin.events.metadata.ChangeType; import com.linkedin.metadata.aspect.RetrieverContext; import com.linkedin.metadata.aspect.batch.BatchItem; import com.linkedin.metadata.aspect.batch.ChangeMCP; import com.linkedin.metadata.aspect.plugins.config.AspectPluginConfig; import com.linkedin.metadata.aspect.plugins.validation.AspectPayloadValidator; import com.linkedin.metadata.aspect.plugins.validation.AspectValidationException; +import com.linkedin.metadata.aspect.plugins.validation.ValidationExceptionCollection; import com.linkedin.schema.EditableSchemaFieldInfo; import com.linkedin.schema.EditableSchemaMetadata; import com.linkedin.schema.SchemaField; import com.linkedin.schema.SchemaMetadata; import java.util.Collection; -import java.util.Objects; +import java.util.Optional; import java.util.stream.Stream; import javax.annotation.Nonnull; import lombok.Getter; @@ -22,8 +22,8 @@ import lombok.experimental.Accessors; /** - * Validates the Schema Field Path specification, specifically that all field IDs must be unique - * across all fields within a schema. + * 1. Validates the Schema Field Path specification, specifically that all field IDs must be unique + * across all fields within a schema. 2. Validates that the field path id is not empty. * * @see Field * Path V2 docs @@ -34,82 +34,83 @@ public class FieldPathValidator extends AspectPayloadValidator { @Nonnull private AspectPluginConfig config; - /** - * Prevent any MCP for SchemaMetadata where field ids are duplicated (except for MCPs with {@link - * ChangeType#DELETE} and {@link ChangeType#PATCH}, the latter gets handled pre-commit to the DB). - */ + /** Prevent any MCP for SchemaMetadata where field ids are duplicated. */ @Override protected Stream validateProposedAspects( @Nonnull Collection mcpItems, @Nonnull RetrieverContext retrieverContext) { - return mcpItems.stream() - .filter( - i -> - !ChangeType.DELETE.equals(i.getChangeType()) - && !ChangeType.PATCH.equals(i.getChangeType())) - .filter( - i -> - i.getAspectName().equals(SCHEMA_METADATA_ASPECT_NAME) - || i.getAspectName().equals(EDITABLE_SCHEMA_METADATA_ASPECT_NAME)) - .map( - i -> { - if (i.getAspectName().equals(SCHEMA_METADATA_ASPECT_NAME)) { - return processSchemaMetadataAspect(i); - } else { - return processEditableSchemaMetadataAspect(i); - } - }) - .filter(Objects::nonNull); + + ValidationExceptionCollection exceptions = ValidationExceptionCollection.newCollection(); + + mcpItems.forEach( + i -> { + if (i.getAspectName().equals(SCHEMA_METADATA_ASPECT_NAME)) { + processSchemaMetadataAspect(i, exceptions); + } else { + processEditableSchemaMetadataAspect(i, exceptions); + } + }); + + return exceptions.streamAllExceptions(); } @Override protected Stream validatePreCommitAspects( @Nonnull Collection changeMCPs, @Nonnull RetrieverContext retrieverContext) { - return changeMCPs.stream() - .filter(i -> ChangeType.PATCH.equals(i.getChangeType())) - .filter( - i -> - i.getAspectName().equals(SCHEMA_METADATA_ASPECT_NAME) - || i.getAspectName().equals(EDITABLE_SCHEMA_METADATA_ASPECT_NAME)) - .map( - i -> { - if (i.getAspectName().equals(SCHEMA_METADATA_ASPECT_NAME)) { - return processSchemaMetadataAspect(i); - } else { - return processEditableSchemaMetadataAspect(i); - } - }) - .filter(Objects::nonNull); + return Stream.of(); } - private static AspectValidationException processEditableSchemaMetadataAspect(BatchItem i) { + private static void processEditableSchemaMetadataAspect( + BatchItem i, ValidationExceptionCollection exceptions) { final EditableSchemaMetadata schemaMetadata = i.getAspect(EditableSchemaMetadata.class); final long uniquePaths = - schemaMetadata.getEditableSchemaFieldInfo().stream() - .map(EditableSchemaFieldInfo::getFieldPath) - .distinct() - .count(); + validateAndCount( + i, + schemaMetadata.getEditableSchemaFieldInfo().stream() + .map(EditableSchemaFieldInfo::getFieldPath), + exceptions); + if (uniquePaths != schemaMetadata.getEditableSchemaFieldInfo().size()) { - return AspectValidationException.forItem( + exceptions.addException( i, String.format( "Cannot perform %s action on proposal. EditableSchemaMetadata aspect has duplicated field paths", i.getChangeType())); } - return null; } - private static AspectValidationException processSchemaMetadataAspect(BatchItem i) { + private static void processSchemaMetadataAspect( + BatchItem i, ValidationExceptionCollection exceptions) { final SchemaMetadata schemaMetadata = i.getAspect(SchemaMetadata.class); final long uniquePaths = - schemaMetadata.getFields().stream().map(SchemaField::getFieldPath).distinct().count(); + validateAndCount( + i, schemaMetadata.getFields().stream().map(SchemaField::getFieldPath), exceptions); + if (uniquePaths != schemaMetadata.getFields().size()) { - return AspectValidationException.forItem( + exceptions.addException( i, String.format( "Cannot perform %s action on proposal. SchemaMetadata aspect has duplicated field paths", i.getChangeType())); } - return null; + } + + private static long validateAndCount( + BatchItem i, Stream fieldPaths, ValidationExceptionCollection exceptions) { + return fieldPaths + .distinct() + // inspect the stream of fieldPath validation errors since we're already iterating + .peek( + fieldPath -> + validateFieldPath(fieldPath) + .ifPresent(message -> exceptions.addException(i, message))) + .count(); + } + + private static Optional validateFieldPath(String fieldPath) { + if (fieldPath == null || fieldPath.isEmpty()) { + return Optional.of("SchemaMetadata aspect has empty field path."); + } + return Optional.empty(); } } diff --git a/entity-registry/src/test/java/com/linkedin/metadata/aspect/validators/FieldPathValidatorTest.java b/entity-registry/src/test/java/com/linkedin/metadata/aspect/validators/FieldPathValidatorTest.java index 1b2b40b2daddc..bd5912764edce 100644 --- a/entity-registry/src/test/java/com/linkedin/metadata/aspect/validators/FieldPathValidatorTest.java +++ b/entity-registry/src/test/java/com/linkedin/metadata/aspect/validators/FieldPathValidatorTest.java @@ -4,16 +4,14 @@ import static org.mockito.Mockito.*; import static org.testng.Assert.*; -import com.google.common.collect.ImmutableList; -import com.linkedin.common.UrnArray; import com.linkedin.common.urn.DatasetUrn; import com.linkedin.common.urn.UrnUtils; -import com.linkedin.domain.Domains; import com.linkedin.events.metadata.ChangeType; import com.linkedin.metadata.aspect.AspectRetriever; import com.linkedin.metadata.aspect.GraphRetriever; import com.linkedin.metadata.aspect.RetrieverContext; import com.linkedin.metadata.aspect.plugins.config.AspectPluginConfig; +import com.linkedin.metadata.aspect.plugins.validation.AspectValidationException; import com.linkedin.metadata.aspect.validation.CreateIfNotExistsValidator; import com.linkedin.metadata.aspect.validation.FieldPathValidator; import com.linkedin.metadata.models.registry.EntityRegistry; @@ -34,6 +32,7 @@ import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; +import javax.annotation.Nullable; import org.testng.annotations.BeforeTest; import org.testng.annotations.Test; @@ -51,15 +50,21 @@ public class FieldPathValidatorTest { .build(); private EntityRegistry entityRegistry; private RetrieverContext mockRetrieverContext; - private DatasetUrn testDatasetUrn; + private static final DatasetUrn TEST_DATASET_URN; private final FieldPathValidator test = new FieldPathValidator().setConfig(validatorConfig); - @BeforeTest - public void init() throws URISyntaxException { - testDatasetUrn = - DatasetUrn.createFromUrn( - UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:hive,test,PROD)")); + static { + try { + TEST_DATASET_URN = + DatasetUrn.createFromUrn( + UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:hive,test,PROD)")); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + } + @BeforeTest + public void init() { entityRegistry = new TestEntityRegistry(); AspectRetriever mockAspectRetriever = mock(AspectRetriever.class); when(mockAspectRetriever.getEntityRegistry()).thenReturn(entityRegistry); @@ -69,29 +74,6 @@ public void init() throws URISyntaxException { when(mockRetrieverContext.getGraphRetriever()).thenReturn(mockGraphRetriever); } - @Test - public void testValidateIncorrectAspect() { - final Domains domains = - new Domains() - .setDomains(new UrnArray(ImmutableList.of(UrnUtils.getUrn("urn:li:domain:123")))); - assertEquals( - test.validateProposed( - Set.of( - TestMCP.builder() - .changeType(ChangeType.UPSERT) - .urn(testDatasetUrn) - .entitySpec(entityRegistry.getEntitySpec(testDatasetUrn.getEntityType())) - .aspectSpec( - entityRegistry - .getEntitySpec(testDatasetUrn.getEntityType()) - .getAspectSpec(DOMAINS_ASPECT_NAME)) - .recordTemplate(domains) - .build()), - mockRetrieverContext) - .count(), - 0); - } - @Test public void testValidateNonDuplicatedSchemaFieldPath() { final SchemaMetadata schema = getMockSchemaMetadataAspect(false); @@ -100,11 +82,11 @@ public void testValidateNonDuplicatedSchemaFieldPath() { Set.of( TestMCP.builder() .changeType(ChangeType.UPSERT) - .urn(testDatasetUrn) - .entitySpec(entityRegistry.getEntitySpec(testDatasetUrn.getEntityType())) + .urn(TEST_DATASET_URN) + .entitySpec(entityRegistry.getEntitySpec(TEST_DATASET_URN.getEntityType())) .aspectSpec( entityRegistry - .getEntitySpec(testDatasetUrn.getEntityType()) + .getEntitySpec(TEST_DATASET_URN.getEntityType()) .getAspectSpec(SCHEMA_METADATA_ASPECT_NAME)) .recordTemplate(schema) .build()), @@ -122,11 +104,11 @@ public void testValidateDuplicatedSchemaFieldPath() { Set.of( TestMCP.builder() .changeType(ChangeType.UPSERT) - .urn(testDatasetUrn) - .entitySpec(entityRegistry.getEntitySpec(testDatasetUrn.getEntityType())) + .urn(TEST_DATASET_URN) + .entitySpec(entityRegistry.getEntitySpec(TEST_DATASET_URN.getEntityType())) .aspectSpec( entityRegistry - .getEntitySpec(testDatasetUrn.getEntityType()) + .getEntitySpec(TEST_DATASET_URN.getEntityType()) .getAspectSpec(SCHEMA_METADATA_ASPECT_NAME)) .recordTemplate(schema) .build()), @@ -135,28 +117,6 @@ public void testValidateDuplicatedSchemaFieldPath() { 1); } - @Test - public void testValidateDeleteDuplicatedSchemaFieldPath() { - final SchemaMetadata schema = getMockSchemaMetadataAspect(true); - - assertEquals( - test.validateProposed( - Set.of( - TestMCP.builder() - .changeType(ChangeType.DELETE) - .urn(testDatasetUrn) - .entitySpec(entityRegistry.getEntitySpec(testDatasetUrn.getEntityType())) - .aspectSpec( - entityRegistry - .getEntitySpec(testDatasetUrn.getEntityType()) - .getAspectSpec(SCHEMA_METADATA_ASPECT_NAME)) - .recordTemplate(schema) - .build()), - mockRetrieverContext) - .count(), - 0); - } - @Test public void testValidateNonDuplicatedEditableSchemaFieldPath() { final EditableSchemaMetadata schema = getMockEditableSchemaMetadataAspect(false); @@ -165,11 +125,11 @@ public void testValidateNonDuplicatedEditableSchemaFieldPath() { Set.of( TestMCP.builder() .changeType(ChangeType.UPSERT) - .urn(testDatasetUrn) - .entitySpec(entityRegistry.getEntitySpec(testDatasetUrn.getEntityType())) + .urn(TEST_DATASET_URN) + .entitySpec(entityRegistry.getEntitySpec(TEST_DATASET_URN.getEntityType())) .aspectSpec( entityRegistry - .getEntitySpec(testDatasetUrn.getEntityType()) + .getEntitySpec(TEST_DATASET_URN.getEntityType()) .getAspectSpec(EDITABLE_SCHEMA_METADATA_ASPECT_NAME)) .recordTemplate(schema) .build()), @@ -187,11 +147,11 @@ public void testValidateDuplicatedEditableSchemaFieldPath() { Set.of( TestMCP.builder() .changeType(ChangeType.UPSERT) - .urn(testDatasetUrn) - .entitySpec(entityRegistry.getEntitySpec(testDatasetUrn.getEntityType())) + .urn(TEST_DATASET_URN) + .entitySpec(entityRegistry.getEntitySpec(TEST_DATASET_URN.getEntityType())) .aspectSpec( entityRegistry - .getEntitySpec(testDatasetUrn.getEntityType()) + .getEntitySpec(TEST_DATASET_URN.getEntityType()) .getAspectSpec(EDITABLE_SCHEMA_METADATA_ASPECT_NAME)) .recordTemplate(schema) .build()), @@ -200,7 +160,37 @@ public void testValidateDuplicatedEditableSchemaFieldPath() { 1); } - private SchemaMetadata getMockSchemaMetadataAspect(boolean duplicateFields) { + @Test + public void testEmptySchemaFieldPath() { + final SchemaMetadata schema = getMockSchemaMetadataAspect(false, ""); + TestMCP testItem = + TestMCP.builder() + .changeType(ChangeType.UPSERT) + .urn(TEST_DATASET_URN) + .entitySpec(entityRegistry.getEntitySpec(TEST_DATASET_URN.getEntityType())) + .aspectSpec( + entityRegistry + .getEntitySpec(TEST_DATASET_URN.getEntityType()) + .getAspectSpec(SCHEMA_METADATA_ASPECT_NAME)) + .recordTemplate(schema) + .build(); + + Set exceptions = + test.validateProposed(Set.of(testItem), mockRetrieverContext).collect(Collectors.toSet()); + + assertEquals( + exceptions, + Set.of( + AspectValidationException.forItem( + testItem, "SchemaMetadata aspect has empty field path."))); + } + + private static SchemaMetadata getMockSchemaMetadataAspect(boolean duplicateFields) { + return getMockSchemaMetadataAspect(duplicateFields, null); + } + + private static SchemaMetadata getMockSchemaMetadataAspect( + boolean duplicateFields, @Nullable String fieldPath) { List fields = new ArrayList<>(); fields.add( new SchemaField() @@ -209,7 +199,7 @@ private SchemaMetadata getMockSchemaMetadataAspect(boolean duplicateFields) { .setType(SchemaFieldDataType.Type.create(new StringType()))) .setNullable(false) .setNativeDataType("string") - .setFieldPath("test")); + .setFieldPath(fieldPath == null ? "test" : fieldPath)); if (duplicateFields) { fields.add( @@ -219,15 +209,16 @@ private SchemaMetadata getMockSchemaMetadataAspect(boolean duplicateFields) { .setType(SchemaFieldDataType.Type.create(new StringType()))) .setNullable(false) .setNativeDataType("string") - .setFieldPath("test")); + .setFieldPath(fieldPath == null ? "test" : fieldPath)); } return new SchemaMetadata() - .setPlatform(testDatasetUrn.getPlatformEntity()) + .setPlatform(TEST_DATASET_URN.getPlatformEntity()) .setFields(new SchemaFieldArray(fields)); } - private EditableSchemaMetadata getMockEditableSchemaMetadataAspect(boolean duplicateFields) { + private static EditableSchemaMetadata getMockEditableSchemaMetadataAspect( + boolean duplicateFields) { List fields = new ArrayList<>(); fields.add(new EditableSchemaFieldInfo().setFieldPath("test")); diff --git a/li-utils/src/main/java/com/linkedin/metadata/Constants.java b/li-utils/src/main/java/com/linkedin/metadata/Constants.java index e085a5876a42b..8961677b56878 100644 --- a/li-utils/src/main/java/com/linkedin/metadata/Constants.java +++ b/li-utils/src/main/java/com/linkedin/metadata/Constants.java @@ -319,6 +319,13 @@ public class Constants { public static final String EXECUTION_REQUEST_INPUT_ASPECT_NAME = "dataHubExecutionRequestInput"; public static final String EXECUTION_REQUEST_SIGNAL_ASPECT_NAME = "dataHubExecutionRequestSignal"; public static final String EXECUTION_REQUEST_RESULT_ASPECT_NAME = "dataHubExecutionRequestResult"; + public static final String EXECUTION_REQUEST_STATUS_RUNNING = "RUNNING"; + public static final String EXECUTION_REQUEST_STATUS_FAILURE = "FAILURE"; + public static final String EXECUTION_REQUEST_STATUS_SUCCESS = "SUCCESS"; + public static final String EXECUTION_REQUEST_STATUS_TIMEOUT = "TIMEOUT"; + public static final String EXECUTION_REQUEST_STATUS_CANCELLED = "CANCELLED"; + public static final String EXECUTION_REQUEST_STATUS_ABORTED = "ABORTED"; + public static final String EXECUTION_REQUEST_STATUS_DUPLICATE = "DUPLICATE"; // DataHub Access Token public static final String ACCESS_TOKEN_KEY_ASPECT_NAME = "dataHubAccessTokenKey"; diff --git a/metadata-io/src/main/java/com/linkedin/metadata/aspect/validation/ExecutionRequestResultValidator.java b/metadata-io/src/main/java/com/linkedin/metadata/aspect/validation/ExecutionRequestResultValidator.java new file mode 100644 index 0000000000000..b77d3b48d5bd5 --- /dev/null +++ b/metadata-io/src/main/java/com/linkedin/metadata/aspect/validation/ExecutionRequestResultValidator.java @@ -0,0 +1,70 @@ +package com.linkedin.metadata.aspect.validation; + +import static com.linkedin.metadata.Constants.EXECUTION_REQUEST_STATUS_ABORTED; +import static com.linkedin.metadata.Constants.EXECUTION_REQUEST_STATUS_CANCELLED; +import static com.linkedin.metadata.Constants.EXECUTION_REQUEST_STATUS_DUPLICATE; +import static com.linkedin.metadata.Constants.EXECUTION_REQUEST_STATUS_SUCCESS; + +import com.linkedin.execution.ExecutionRequestResult; +import com.linkedin.metadata.aspect.RetrieverContext; +import com.linkedin.metadata.aspect.batch.BatchItem; +import com.linkedin.metadata.aspect.batch.ChangeMCP; +import com.linkedin.metadata.aspect.plugins.config.AspectPluginConfig; +import com.linkedin.metadata.aspect.plugins.validation.AspectPayloadValidator; +import com.linkedin.metadata.aspect.plugins.validation.AspectValidationException; +import java.util.Collection; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Stream; +import javax.annotation.Nonnull; +import lombok.Getter; +import lombok.Setter; +import lombok.experimental.Accessors; +import lombok.extern.slf4j.Slf4j; + +/** A Validator for StructuredProperties Aspect that is attached to entities like Datasets, etc. */ +@Setter +@Getter +@Slf4j +@Accessors(chain = true) +public class ExecutionRequestResultValidator extends AspectPayloadValidator { + private static final Set IMMUTABLE_STATUS = + Set.of( + EXECUTION_REQUEST_STATUS_ABORTED, + EXECUTION_REQUEST_STATUS_CANCELLED, + EXECUTION_REQUEST_STATUS_SUCCESS, + EXECUTION_REQUEST_STATUS_DUPLICATE); + + @Nonnull private AspectPluginConfig config; + + @Override + protected Stream validateProposedAspects( + @Nonnull Collection mcpItems, + @Nonnull RetrieverContext retrieverContext) { + return Stream.of(); + } + + @Override + protected Stream validatePreCommitAspects( + @Nonnull Collection changeMCPs, @Nonnull RetrieverContext retrieverContext) { + return changeMCPs.stream() + .filter(item -> item.getPreviousRecordTemplate() != null) + .map( + item -> { + ExecutionRequestResult existingResult = + item.getPreviousAspect(ExecutionRequestResult.class); + + if (IMMUTABLE_STATUS.contains(existingResult.getStatus())) { + ExecutionRequestResult currentResult = item.getAspect(ExecutionRequestResult.class); + return AspectValidationException.forItem( + item, + String.format( + "Invalid update to immutable state for aspect dataHubExecutionRequestResult. Execution urn: %s previous status: %s. Denied status update: %s", + item.getUrn(), existingResult.getStatus(), currentResult.getStatus())); + } + + return null; + }) + .filter(Objects::nonNull); + } +} diff --git a/metadata-io/src/test/java/com/linkedin/metadata/aspect/validation/ExecutionRequestResultValidatorTest.java b/metadata-io/src/test/java/com/linkedin/metadata/aspect/validation/ExecutionRequestResultValidatorTest.java new file mode 100644 index 0000000000000..f46772ca7b350 --- /dev/null +++ b/metadata-io/src/test/java/com/linkedin/metadata/aspect/validation/ExecutionRequestResultValidatorTest.java @@ -0,0 +1,166 @@ +package com.linkedin.metadata.aspect.validation; + +import static com.linkedin.metadata.Constants.EXECUTION_REQUEST_ENTITY_NAME; +import static com.linkedin.metadata.Constants.EXECUTION_REQUEST_RESULT_ASPECT_NAME; +import static com.linkedin.metadata.Constants.EXECUTION_REQUEST_STATUS_ABORTED; +import static com.linkedin.metadata.Constants.EXECUTION_REQUEST_STATUS_CANCELLED; +import static com.linkedin.metadata.Constants.EXECUTION_REQUEST_STATUS_DUPLICATE; +import static com.linkedin.metadata.Constants.EXECUTION_REQUEST_STATUS_FAILURE; +import static com.linkedin.metadata.Constants.EXECUTION_REQUEST_STATUS_RUNNING; +import static com.linkedin.metadata.Constants.EXECUTION_REQUEST_STATUS_SUCCESS; +import static com.linkedin.metadata.Constants.EXECUTION_REQUEST_STATUS_TIMEOUT; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + +import com.linkedin.common.urn.Urn; +import com.linkedin.common.urn.UrnUtils; +import com.linkedin.events.metadata.ChangeType; +import com.linkedin.execution.ExecutionRequestResult; +import com.linkedin.metadata.aspect.RetrieverContext; +import com.linkedin.metadata.aspect.SystemAspect; +import com.linkedin.metadata.aspect.batch.ChangeMCP; +import com.linkedin.metadata.aspect.plugins.config.AspectPluginConfig; +import com.linkedin.metadata.aspect.plugins.validation.AspectValidationException; +import com.linkedin.metadata.entity.ebean.batch.ChangeItemImpl; +import com.linkedin.metadata.utils.AuditStampUtils; +import io.datahubproject.metadata.context.OperationContext; +import io.datahubproject.test.metadata.context.TestOperationContexts; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.testng.annotations.Test; + +public class ExecutionRequestResultValidatorTest { + private static final OperationContext TEST_CONTEXT = + TestOperationContexts.systemContextNoSearchAuthorization(); + private static final AspectPluginConfig TEST_PLUGIN_CONFIG = + AspectPluginConfig.builder() + .className(ExecutionRequestResultValidator.class.getName()) + .enabled(true) + .supportedOperations(List.of("UPSERT")) + .supportedEntityAspectNames( + List.of( + AspectPluginConfig.EntityAspectName.builder() + .entityName(EXECUTION_REQUEST_ENTITY_NAME) + .aspectName(EXECUTION_REQUEST_RESULT_ASPECT_NAME) + .build())) + .build(); + private static final Urn TEST_URN = UrnUtils.getUrn("urn:li:dataHubExecutionRequest:xyz"); + + @Test + public void testAllowed() { + ExecutionRequestResultValidator test = new ExecutionRequestResultValidator(); + test.setConfig(TEST_PLUGIN_CONFIG); + + Set allowedUpdateStates = + Set.of( + EXECUTION_REQUEST_STATUS_RUNNING, + EXECUTION_REQUEST_STATUS_FAILURE, + EXECUTION_REQUEST_STATUS_TIMEOUT); + Set destinationStates = new HashSet<>(allowedUpdateStates); + destinationStates.addAll( + Set.of( + EXECUTION_REQUEST_STATUS_ABORTED, + EXECUTION_REQUEST_STATUS_CANCELLED, + EXECUTION_REQUEST_STATUS_SUCCESS, + EXECUTION_REQUEST_STATUS_DUPLICATE)); + + List testItems = + new ArrayList<>( + // Tests with previous state + allowedUpdateStates.stream() + .flatMap( + prevState -> + destinationStates.stream() + .map( + destState -> { + SystemAspect prevData = mock(SystemAspect.class); + when(prevData.getRecordTemplate()) + .thenReturn( + new ExecutionRequestResult().setStatus(prevState)); + return ChangeItemImpl.builder() + .changeType(ChangeType.UPSERT) + .urn(TEST_URN) + .aspectName(EXECUTION_REQUEST_RESULT_ASPECT_NAME) + .recordTemplate( + new ExecutionRequestResult().setStatus(destState)) + .previousSystemAspect(prevData) + .auditStamp(AuditStampUtils.createDefaultAuditStamp()) + .build(TEST_CONTEXT.getAspectRetriever()); + })) + .toList()); + // Tests with no previous + testItems.addAll( + destinationStates.stream() + .map( + destState -> + ChangeItemImpl.builder() + .changeType(ChangeType.UPSERT) + .urn(TEST_URN) + .aspectName(EXECUTION_REQUEST_RESULT_ASPECT_NAME) + .recordTemplate(new ExecutionRequestResult().setStatus(destState)) + .auditStamp(AuditStampUtils.createDefaultAuditStamp()) + .build(TEST_CONTEXT.getAspectRetriever())) + .toList()); + + List result = + test.validatePreCommitAspects(testItems, mock(RetrieverContext.class)).toList(); + + assertTrue(result.isEmpty(), "Did not expect any validation errors."); + } + + @Test + public void testDenied() { + ExecutionRequestResultValidator test = new ExecutionRequestResultValidator(); + test.setConfig(TEST_PLUGIN_CONFIG); + + Set deniedUpdateStates = + Set.of( + EXECUTION_REQUEST_STATUS_ABORTED, + EXECUTION_REQUEST_STATUS_CANCELLED, + EXECUTION_REQUEST_STATUS_SUCCESS, + EXECUTION_REQUEST_STATUS_DUPLICATE); + Set destinationStates = new HashSet<>(deniedUpdateStates); + destinationStates.addAll( + Set.of( + EXECUTION_REQUEST_STATUS_RUNNING, + EXECUTION_REQUEST_STATUS_FAILURE, + EXECUTION_REQUEST_STATUS_TIMEOUT)); + + List testItems = + new ArrayList<>( + // Tests with previous state + deniedUpdateStates.stream() + .flatMap( + prevState -> + destinationStates.stream() + .map( + destState -> { + SystemAspect prevData = mock(SystemAspect.class); + when(prevData.getRecordTemplate()) + .thenReturn( + new ExecutionRequestResult().setStatus(prevState)); + return ChangeItemImpl.builder() + .changeType(ChangeType.UPSERT) + .urn(TEST_URN) + .aspectName(EXECUTION_REQUEST_RESULT_ASPECT_NAME) + .recordTemplate( + new ExecutionRequestResult().setStatus(destState)) + .previousSystemAspect(prevData) + .auditStamp(AuditStampUtils.createDefaultAuditStamp()) + .build(TEST_CONTEXT.getAspectRetriever()); + })) + .toList()); + + List result = + test.validatePreCommitAspects(testItems, mock(RetrieverContext.class)).toList(); + + assertEquals( + result.size(), + deniedUpdateStates.size() * destinationStates.size(), + "Expected ALL items to be denied."); + } +} diff --git a/metadata-models/src/main/resources/entity-registry.yml b/metadata-models/src/main/resources/entity-registry.yml index eee8ef9ebaf07..ec9c3fee1c404 100644 --- a/metadata-models/src/main/resources/entity-registry.yml +++ b/metadata-models/src/main/resources/entity-registry.yml @@ -661,19 +661,6 @@ plugins: supportedEntityAspectNames: - entityName: '*' aspectName: '*' - - className: 'com.linkedin.metadata.aspect.validation.FieldPathValidator' - enabled: true - supportedOperations: - - CREATE - - CREATE_ENTITY - - UPSERT - - UPDATE - - RESTATE - supportedEntityAspectNames: - - entityName: '*' - aspectName: 'schemaMetadata' - - entityName: '*' - aspectName: 'editableSchemaMetadata' - className: 'com.linkedin.metadata.aspect.validation.ConditionalWriteValidator' enabled: true supportedOperations: @@ -686,6 +673,12 @@ plugins: supportedEntityAspectNames: - entityName: '*' aspectName: '*' + - className: 'com.linkedin.metadata.aspect.plugins.validation.AspectPayloadValidator' + enabled: true + spring: + enabled: true + packageScan: + - com.linkedin.gms.factory.plugins mcpSideEffects: - className: 'com.linkedin.metadata.structuredproperties.hooks.PropertyDefinitionDeleteSideEffect' packageScan: diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/plugins/SpringStandardPluginConfiguration.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/plugins/SpringStandardPluginConfiguration.java index 4a2095685abe1..943b1c7184a60 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/plugins/SpringStandardPluginConfiguration.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/plugins/SpringStandardPluginConfiguration.java @@ -1,5 +1,8 @@ package com.linkedin.gms.factory.plugins; +import static com.linkedin.metadata.Constants.EDITABLE_SCHEMA_METADATA_ASPECT_NAME; +import static com.linkedin.metadata.Constants.EXECUTION_REQUEST_ENTITY_NAME; +import static com.linkedin.metadata.Constants.EXECUTION_REQUEST_RESULT_ASPECT_NAME; import static com.linkedin.metadata.Constants.SCHEMA_METADATA_ASPECT_NAME; import com.linkedin.metadata.Constants; @@ -7,6 +10,9 @@ import com.linkedin.metadata.aspect.plugins.config.AspectPluginConfig; import com.linkedin.metadata.aspect.plugins.hooks.MCPSideEffect; import com.linkedin.metadata.aspect.plugins.hooks.MutationHook; +import com.linkedin.metadata.aspect.plugins.validation.AspectPayloadValidator; +import com.linkedin.metadata.aspect.validation.ExecutionRequestResultValidator; +import com.linkedin.metadata.aspect.validation.FieldPathValidator; import com.linkedin.metadata.dataproducts.sideeffects.DataProductUnsetSideEffect; import com.linkedin.metadata.schemafields.sideeffects.SchemaFieldSideEffect; import com.linkedin.metadata.timeline.eventgenerator.EntityChangeEventGeneratorRegistry; @@ -21,6 +27,7 @@ @Configuration @Slf4j public class SpringStandardPluginConfiguration { + private static final String ALL = "*"; @Value("${metadataChangeProposal.validation.ignoreUnknown}") private boolean ignoreUnknownEnabled; @@ -104,4 +111,43 @@ public MCPSideEffect dataProductUnsetSideEffect() { log.info("Initialized {}", SchemaFieldSideEffect.class.getName()); return new DataProductUnsetSideEffect().setConfig(config); } + + @Bean + public AspectPayloadValidator fieldPathValidator() { + return new FieldPathValidator() + .setConfig( + AspectPluginConfig.builder() + .className(FieldPathValidator.class.getName()) + .enabled(true) + .supportedOperations( + List.of("CREATE", "CREATE_ENTITY", "UPSERT", "UPDATE", "RESTATE")) + .supportedEntityAspectNames( + List.of( + AspectPluginConfig.EntityAspectName.builder() + .entityName(ALL) + .aspectName(SCHEMA_METADATA_ASPECT_NAME) + .build(), + AspectPluginConfig.EntityAspectName.builder() + .entityName(ALL) + .aspectName(EDITABLE_SCHEMA_METADATA_ASPECT_NAME) + .build())) + .build()); + } + + @Bean + public AspectPayloadValidator dataHubExecutionRequestResultValidator() { + return new ExecutionRequestResultValidator() + .setConfig( + AspectPluginConfig.builder() + .className(ExecutionRequestResultValidator.class.getName()) + .enabled(true) + .supportedOperations(List.of("UPSERT", "UPDATE")) + .supportedEntityAspectNames( + List.of( + AspectPluginConfig.EntityAspectName.builder() + .entityName(EXECUTION_REQUEST_ENTITY_NAME) + .aspectName(EXECUTION_REQUEST_RESULT_ASPECT_NAME) + .build())) + .build()); + } } diff --git a/smoke-test/tests/utils.py b/smoke-test/tests/utils.py index 2ddf233f5029a..dbd2a7d755e39 100644 --- a/smoke-test/tests/utils.py +++ b/smoke-test/tests/utils.py @@ -112,6 +112,7 @@ def ingest_file_via_rest(auth_session, filename: str) -> Pipeline: "token": auth_session.gms_token(), }, }, + "run_id": f"test_{filename}", } ) pipeline.run()