Skip to content

Commit

Permalink
fix(mutator): mutator hook fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
RyanHolstien committed Aug 9, 2024
1 parent 469654c commit 9471759
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public class ConfigEntityRegistry implements EntityRegistry {
private final DataSchemaFactory dataSchemaFactory;
@Getter private final PluginFactory pluginFactory;

@Nullable
@Getter @Nullable
private BiFunction<PluginConfiguration, List<ClassLoader>, PluginFactory> pluginFactoryProvider;

private final Map<String, EntitySpec> entityNameToSpec;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import com.linkedin.metadata.aspect.patch.template.dataset.UpstreamLineageTemplate;
import com.linkedin.metadata.aspect.patch.template.form.FormInfoTemplate;
import com.linkedin.metadata.aspect.patch.template.structuredproperty.StructuredPropertyDefinitionTemplate;
import com.linkedin.metadata.aspect.plugins.PluginFactory;
import com.linkedin.metadata.aspect.plugins.config.PluginConfiguration;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.models.DefaultEntitySpec;
import com.linkedin.metadata.models.EntitySpec;
Expand All @@ -32,8 +34,12 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.Getter;


/**
* Implementation of {@link EntityRegistry} that builds {@link DefaultEntitySpec} objects from the a
Expand All @@ -46,6 +52,11 @@ public class SnapshotEntityRegistry implements EntityRegistry {
private final AspectTemplateEngine _aspectTemplateEngine;
private final Map<String, AspectSpec> _aspectNameToSpec;


@Getter
@Nullable
private BiFunction<PluginConfiguration, List<ClassLoader>, PluginFactory> pluginFactoryProvider;

private static final SnapshotEntityRegistry INSTANCE = new SnapshotEntityRegistry();

public SnapshotEntityRegistry() {
Expand All @@ -56,6 +67,18 @@ public SnapshotEntityRegistry() {
entitySpecs = new ArrayList<>(entityNameToSpec.values());
_aspectNameToSpec = populateAspectMap(entitySpecs);
_aspectTemplateEngine = populateTemplateEngine(_aspectNameToSpec);
pluginFactoryProvider = null;
}

public SnapshotEntityRegistry(
BiFunction<PluginConfiguration, List<ClassLoader>, PluginFactory> pluginFactoryProvider) {
entityNameToSpec = new EntitySpecBuilder().buildEntitySpecs(new Snapshot().schema())
.stream()
.collect(Collectors.toMap(spec -> spec.getName().toLowerCase(), spec -> spec));
entitySpecs = new ArrayList<>(entityNameToSpec.values());
_aspectNameToSpec = populateAspectMap(entitySpecs);
_aspectTemplateEngine = populateTemplateEngine(_aspectNameToSpec);
this.pluginFactoryProvider = pluginFactoryProvider;
}

public SnapshotEntityRegistry(UnionTemplate snapshot) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.datahub.test.TestEntityProfile;
import com.linkedin.data.schema.annotation.PathSpecBasedSchemaAnnotationVisitor;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.aspect.plugins.config.AspectPluginConfig;
import com.linkedin.metadata.models.EntitySpec;
import com.linkedin.metadata.models.EventSpec;
import com.linkedin.metadata.models.registry.ConfigEntityRegistry;
Expand Down Expand Up @@ -262,23 +263,42 @@ public void testUnloadedMerge() throws EntityRegistryException {
mergedEntityRegistry.apply(configEntityRegistry2);

assertEquals(
mergedEntityRegistry.getAllAspectPayloadValidators().stream()
.filter(p -> p.getConfig().getSupportedOperations().contains("DELETE"))
mergedEntityRegistry
.getPluginFactory()
.getPluginConfiguration()
.getAspectPayloadValidators()
.stream()
.filter(AspectPluginConfig::isEnabled)
.filter(p -> p.getSupportedOperations().contains("DELETE"))
.count(),
1);

assertEquals(
mergedEntityRegistry.getAllMutationHooks().stream()
.filter(p -> p.getConfig().getSupportedOperations().contains("DELETE"))
mergedEntityRegistry.getPluginFactory().getPluginConfiguration().getMutationHooks().stream()
.filter(AspectPluginConfig::isEnabled)
.filter(p -> p.getSupportedOperations().contains("DELETE"))
.count(),
1);

assertEquals(
mergedEntityRegistry.getAllMCLSideEffects().stream()
.filter(p -> p.getConfig().getSupportedOperations().contains("DELETE"))
mergedEntityRegistry
.getPluginFactory()
.getPluginConfiguration()
.getMclSideEffects()
.stream()
.filter(AspectPluginConfig::isEnabled)
.filter(p -> p.getSupportedOperations().contains("DELETE"))
.count(),
1);

assertEquals(
mergedEntityRegistry.getAllMCPSideEffects().stream()
.filter(p -> p.getConfig().getSupportedOperations().contains("DELETE"))
mergedEntityRegistry
.getPluginFactory()
.getPluginConfiguration()
.getMcpSideEffects()
.stream()
.filter(AspectPluginConfig::isEnabled)
.filter(p -> p.getSupportedOperations().contains("DELETE"))
.count(),
1);
}
Expand Down
12 changes: 12 additions & 0 deletions li-utils/src/main/java/com/datahub/util/RecordUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,18 @@ public static <T extends RecordTemplate> T toRecordTemplate(
return toRecordTemplate(type, dataMap);
}

@Nonnull
public static DataMap toDataMap(@Nonnull String jsonString) {
DataMap dataMap;
try {
dataMap = DATA_TEMPLATE_CODEC.stringToMap(jsonString);
} catch (IOException e) {
throw new ModelConversionException("Failed to deserialize DataMap: " + jsonString);
}

return dataMap;
}

/**
* Creates a {@link RecordTemplate} object from a {@link DataMap}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.linkedin.metadata.aspect.batch.BatchItem;
import com.linkedin.metadata.aspect.batch.ChangeMCP;
import com.linkedin.metadata.aspect.batch.MCPItem;
import com.linkedin.metadata.aspect.plugins.hooks.MutationHook;
import com.linkedin.metadata.aspect.plugins.validation.ValidationExceptionCollection;
import com.linkedin.mxe.MetadataChangeProposal;
import com.linkedin.util.Pair;
Expand Down Expand Up @@ -98,15 +99,51 @@ public Pair<Map<String, Set<String>>, List<ChangeMCP>> toUpsertBatchItems(
return Pair.of(newUrnAspectNames, upsertBatchItems);
}

private Stream<ChangeMCP> proposedItemsToChangeItemStream(List<MCPItem> proposedItems) {
return applyProposalMutationHooks(proposedItems, retrieverContext)
.filter(mcpItem -> mcpItem.getMetadataChangeProposal() != null)
.map(
mcpItem ->
ChangeItemImpl.ChangeItemImplBuilder.build(
mcpItem.getMetadataChangeProposal(),
mcpItem.getAuditStamp(),
retrieverContext.getAspectRetriever()));
private Stream<? extends BatchItem> proposedItemsToChangeItemStream(List<MCPItem> proposedItems) {
List<MutationHook> mutationHooks =
retrieverContext.getAspectRetriever().getEntityRegistry().getAllMutationHooks();
Stream<? extends BatchItem> unmutatedItems =
proposedItems.stream()
.filter(
proposedItem ->
mutationHooks.stream()
.noneMatch(
mutationHook ->
mutationHook.shouldApply(
proposedItem.getChangeType(),
proposedItem.getUrn(),
proposedItem.getAspectName())))
.map(
mcpItem -> {
if (ChangeType.PATCH.equals(mcpItem.getChangeType())) {
return PatchItemImpl.PatchItemImplBuilder.build(
mcpItem.getMetadataChangeProposal(),
mcpItem.getAuditStamp(),
retrieverContext.getAspectRetriever().getEntityRegistry());
}
return ChangeItemImpl.ChangeItemImplBuilder.build(
mcpItem.getMetadataChangeProposal(),
mcpItem.getAuditStamp(),
retrieverContext.getAspectRetriever());
});
List<MCPItem> mutatedItems =
applyProposalMutationHooks(proposedItems, retrieverContext).collect(Collectors.toList());
Stream<? extends BatchItem> proposedItemsToChangeItems =
mutatedItems.stream()
.filter(mcpItem -> mcpItem.getMetadataChangeProposal() != null)
// Filter on proposed items again to avoid applying builder to Patch Item side effects
.filter(mcpItem -> mcpItem instanceof ProposedItem)
.map(
mcpItem ->
ChangeItemImpl.ChangeItemImplBuilder.build(
mcpItem.getMetadataChangeProposal(),
mcpItem.getAuditStamp(),
retrieverContext.getAspectRetriever()));
Stream<? extends BatchItem> sideEffectItems =
mutatedItems.stream().filter(mcpItem -> !(mcpItem instanceof ProposedItem));
Stream<? extends BatchItem> combinedChangeItems =
Stream.concat(proposedItemsToChangeItems, unmutatedItems);
return Stream.concat(combinedChangeItems, sideEffectItems);
}

public static class AspectsBatchImplBuilder {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,22 @@
package com.linkedin.gms.factory.entityregistry;

import com.datahub.plugins.metadata.aspect.SpringPluginFactory;
import com.linkedin.gms.factory.plugins.SpringStandardPluginConfiguration;
import com.linkedin.metadata.aspect.plugins.PluginFactory;
import com.linkedin.metadata.aspect.plugins.config.PluginConfiguration;
import com.linkedin.metadata.models.registry.ConfigEntityRegistry;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.models.registry.EntityRegistryException;
import com.linkedin.metadata.models.registry.MergedEntityRegistry;
import com.linkedin.metadata.models.registry.PluginEntityRegistryLoader;
import com.linkedin.metadata.models.registry.SnapshotEntityRegistry;
import java.util.List;
import java.util.function.BiFunction;
import javax.annotation.Nonnull;
import lombok.SneakyThrows;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
Expand All @@ -27,13 +34,19 @@ public class EntityRegistryFactory {
@Qualifier("pluginEntityRegistry")
private PluginEntityRegistryLoader pluginEntityRegistryLoader;

@Autowired private ApplicationContext applicationContext;

@SneakyThrows
@Bean("entityRegistry")
@Primary
@Nonnull
protected EntityRegistry getInstance() throws EntityRegistryException {
protected EntityRegistry getInstance(SpringStandardPluginConfiguration springStandardPluginConfiguration)
throws EntityRegistryException {
BiFunction<PluginConfiguration, List<ClassLoader>, PluginFactory> pluginFactoryProvider =
(config, loaders) -> new SpringPluginFactory(applicationContext, config, loaders);
MergedEntityRegistry baseEntityRegistry =
new MergedEntityRegistry(SnapshotEntityRegistry.getInstance()).apply(configEntityRegistry);
new MergedEntityRegistry(new SnapshotEntityRegistry(pluginFactoryProvider))
.apply(configEntityRegistry);
pluginEntityRegistryLoader.withBaseRegistry(baseEntityRegistry).start(true);
return baseEntityRegistry;
}
Expand Down

0 comments on commit 9471759

Please sign in to comment.