Skip to content

Commit

Permalink
feat(Impact Analysis): Support impact analysis to check all downstrea…
Browse files Browse the repository at this point in the history
…ms of given entity (datahub-project#4322)
  • Loading branch information
Dexter Lee authored and maggiehays committed Aug 1, 2022
1 parent 8e12dd6 commit 42067d7
Show file tree
Hide file tree
Showing 143 changed files with 4,452 additions and 2,042 deletions.
2 changes: 2 additions & 0 deletions datahub-frontend/app/controllers/Application.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import play.shaded.ahc.org.asynchttpclient.DefaultAsyncHttpClient;
import play.shaded.ahc.org.asynchttpclient.DefaultAsyncHttpClientConfig;
import utils.ConfigUtil;
import java.time.Duration;

import static auth.AuthUtils.*;

Expand Down Expand Up @@ -122,6 +123,7 @@ public CompletableFuture<Result> proxy(String path) throws ExecutionException, I
.addHeader(Http.HeaderNames.AUTHORIZATION, authorizationHeaderValue)
.addHeader(AuthenticationConstants.LEGACY_X_DATAHUB_ACTOR_HEADER, getDataHubActorHeader())
.setBody(new InMemoryBodyWritable(ByteString.fromByteBuffer(request().body().asBytes().asByteBuffer()), "application/json"))
.setRequestTimeout(Duration.ofSeconds(120))
.execute()
.thenApply(apiResponse -> {
final ResponseHeader header = new ResponseHeader(apiResponse.getStatus(), apiResponse.getHeaders()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.linkedin.datahub.graphql.generated.EntityRelationshipLegacy;
import com.linkedin.datahub.graphql.generated.ForeignKeyConstraint;
import com.linkedin.datahub.graphql.generated.InstitutionalMemoryMetadata;
import com.linkedin.datahub.graphql.generated.LineageRelationship;
import com.linkedin.datahub.graphql.generated.ListDomainsResult;
import com.linkedin.datahub.graphql.generated.MLFeature;
import com.linkedin.datahub.graphql.generated.MLFeatureProperties;
Expand All @@ -42,6 +43,7 @@
import com.linkedin.datahub.graphql.generated.MLPrimaryKeyProperties;
import com.linkedin.datahub.graphql.generated.Owner;
import com.linkedin.datahub.graphql.generated.RecommendationContent;
import com.linkedin.datahub.graphql.generated.SearchAcrossLineageResult;
import com.linkedin.datahub.graphql.generated.SearchResult;
import com.linkedin.datahub.graphql.generated.UsageQueryResult;
import com.linkedin.datahub.graphql.generated.UserUsageCounts;
Expand Down Expand Up @@ -82,6 +84,7 @@
import com.linkedin.datahub.graphql.resolvers.ingest.source.ListIngestionSourcesResolver;
import com.linkedin.datahub.graphql.resolvers.ingest.source.UpsertIngestionSourceResolver;
import com.linkedin.datahub.graphql.resolvers.load.AspectResolver;
import com.linkedin.datahub.graphql.resolvers.load.EntityLineageResultResolver;
import com.linkedin.datahub.graphql.resolvers.load.EntityRelationshipsResultResolver;
import com.linkedin.datahub.graphql.resolvers.load.EntityTypeBatchResolver;
import com.linkedin.datahub.graphql.resolvers.load.EntityTypeResolver;
Expand All @@ -107,6 +110,7 @@
import com.linkedin.datahub.graphql.resolvers.search.AutoCompleteForMultipleResolver;
import com.linkedin.datahub.graphql.resolvers.search.AutoCompleteResolver;
import com.linkedin.datahub.graphql.resolvers.search.SearchAcrossEntitiesResolver;
import com.linkedin.datahub.graphql.resolvers.search.SearchAcrossLineageResolver;
import com.linkedin.datahub.graphql.resolvers.search.SearchResolver;
import com.linkedin.datahub.graphql.resolvers.tag.SetTagColorResolver;
import com.linkedin.datahub.graphql.resolvers.type.AspectInterfaceTypeResolver;
Expand Down Expand Up @@ -200,6 +204,7 @@ public class GmsGraphQLEngine {
private final TokenService tokenService;
private final SecretService secretService;
private final GitVersion gitVersion;
private final boolean supportsImpactAnalysis;
private final TimeseriesAspectService timeseriesAspectService;

private final IngestionConfiguration ingestionConfiguration;
Expand Down Expand Up @@ -265,7 +270,8 @@ public GmsGraphQLEngine() {
null,
null,
null,
null);
null,
false);
}

public GmsGraphQLEngine(
Expand All @@ -280,7 +286,8 @@ public GmsGraphQLEngine(
final EntityRegistry entityRegistry,
final SecretService secretService,
final IngestionConfiguration ingestionConfiguration,
final GitVersion gitVersion
final GitVersion gitVersion,
final boolean supportsImpactAnalysis
) {

this.entityClient = entityClient;
Expand All @@ -294,6 +301,7 @@ public GmsGraphQLEngine(
this.secretService = secretService;
this.entityRegistry = entityRegistry;
this.gitVersion = gitVersion;
this.supportsImpactAnalysis = supportsImpactAnalysis;
this.timeseriesAspectService = timeseriesAspectService;

this.ingestionConfiguration = Objects.requireNonNull(ingestionConfiguration);
Expand Down Expand Up @@ -522,13 +530,16 @@ private void configureContainerResolvers(final RuntimeWiring.Builder builder) {
private void configureQueryResolvers(final RuntimeWiring.Builder builder) {
builder.type("Query", typeWiring -> typeWiring
.dataFetcher("appConfig",
new AppConfigResolver(gitVersion, analyticsService != null, this.ingestionConfiguration))
new AppConfigResolver(gitVersion, analyticsService != null, this.ingestionConfiguration,
supportsImpactAnalysis))
.dataFetcher("me", new AuthenticatedResolver<>(
new MeResolver(this.entityClient)))
.dataFetcher("search", new AuthenticatedResolver<>(
new SearchResolver(this.entityClient)))
.dataFetcher("searchAcrossEntities",
new SearchAcrossEntitiesResolver(this.entityClient))
.dataFetcher("searchAcrossLineage",
new SearchAcrossLineageResolver(this.entityClient))
.dataFetcher("autoComplete", new AuthenticatedResolver<>(
new AutoCompleteResolver(searchableTypes)))
.dataFetcher("autoCompleteForMultiple", new AuthenticatedResolver<>(
Expand Down Expand Up @@ -668,6 +679,13 @@ private void configureGenericEntityResolvers(final RuntimeWiring.Builder builder
(env) -> ((SearchResult) env.getSource()).getEntity()))
)
)
.type("SearchAcrossLineageResult", typeWiring -> typeWiring
.dataFetcher("entity", new AuthenticatedResolver<>(
new EntityTypeResolver(
entityTypes.stream().collect(Collectors.toList()),
(env) -> ((SearchAcrossLineageResult) env.getSource()).getEntity()))
)
)
.type("AggregationMetadata", typeWiring -> typeWiring
.dataFetcher("entity", new EntityTypeResolver(
entityTypes.stream().collect(Collectors.toList()),
Expand Down Expand Up @@ -699,6 +717,13 @@ private void configureGenericEntityResolvers(final RuntimeWiring.Builder builder
(env) -> ((EntityRelationship) env.getSource()).getEntity()))
)
)
.type("LineageRelationship", typeWiring -> typeWiring
.dataFetcher("entity", new AuthenticatedResolver<>(
new EntityTypeResolver(
new ArrayList<>(entityTypes),
(env) -> ((LineageRelationship) env.getSource()).getEntity()))
)
)
.type("ListDomainsResult", typeWiring -> typeWiring
.dataFetcher("domains",
new LoadableTypeBatchResolver<>(domainType,
Expand All @@ -717,6 +742,9 @@ private void configureDatasetResolvers(final RuntimeWiring.Builder builder) {
.dataFetcher("relationships", new AuthenticatedResolver<>(
new EntityRelationshipsResultResolver(graphClient)
))
.dataFetcher("lineage", new AuthenticatedResolver<>(
new EntityLineageResultResolver(graphClient)
))
.dataFetcher("domain",
new LoadableTypeResolver<>(
domainType,
Expand Down Expand Up @@ -864,6 +892,9 @@ private void configureDashboardResolvers(final RuntimeWiring.Builder builder) {
.dataFetcher("relationships", new AuthenticatedResolver<>(
new EntityRelationshipsResultResolver(graphClient)
))
.dataFetcher("lineage", new AuthenticatedResolver<>(
new EntityLineageResultResolver(graphClient)
))
.dataFetcher("platform", new AuthenticatedResolver<>(
new LoadableTypeResolver<>(dataPlatformType,
(env) -> ((Dashboard) env.getSource()).getPlatform().getUrn()))
Expand Down Expand Up @@ -903,6 +934,9 @@ private void configureChartResolvers(final RuntimeWiring.Builder builder) {
.dataFetcher("relationships", new AuthenticatedResolver<>(
new EntityRelationshipsResultResolver(graphClient)
))
.dataFetcher("lineage", new AuthenticatedResolver<>(
new EntityLineageResultResolver(graphClient)
))
.dataFetcher("platform", new AuthenticatedResolver<>(
new LoadableTypeResolver<>(dataPlatformType,
(env) -> ((Chart) env.getSource()).getPlatform().getUrn()))
Expand Down Expand Up @@ -985,6 +1019,9 @@ private void configureDataJobResolvers(final RuntimeWiring.Builder builder) {
.dataFetcher("relationships", new AuthenticatedResolver<>(
new EntityRelationshipsResultResolver(graphClient)
))
.dataFetcher("lineage", new AuthenticatedResolver<>(
new EntityLineageResultResolver(graphClient)
))
.dataFetcher("dataFlow", new AuthenticatedResolver<>(
new LoadableTypeResolver<>(dataFlowType,
(env) -> ((DataJob) env.getSource()).getDataFlow().getUrn()))
Expand Down Expand Up @@ -1029,6 +1066,9 @@ private void configureDataFlowResolvers(final RuntimeWiring.Builder builder) {
.dataFetcher("relationships", new AuthenticatedResolver<>(
new EntityRelationshipsResultResolver(graphClient)
))
.dataFetcher("lineage", new AuthenticatedResolver<>(
new EntityLineageResultResolver(graphClient)
))
.dataFetcher("platform", new AuthenticatedResolver<>(
new LoadableTypeResolver<>(dataPlatformType,
(env) -> ((DataFlow) env.getSource()).getPlatform().getUrn()))
Expand All @@ -1053,6 +1093,9 @@ private void configureMLFeatureTableResolvers(final RuntimeWiring.Builder builde
.dataFetcher("relationships", new AuthenticatedResolver<>(
new EntityRelationshipsResultResolver(graphClient)
))
.dataFetcher("lineage", new AuthenticatedResolver<>(
new EntityLineageResultResolver(graphClient)
))
.dataFetcher("platform", new AuthenticatedResolver<>(
new LoadableTypeResolver<>(dataPlatformType,
(env) -> ((MLFeatureTable) env.getSource()).getPlatform().getUrn()))
Expand Down Expand Up @@ -1092,6 +1135,9 @@ private void configureMLFeatureTableResolvers(final RuntimeWiring.Builder builde
.dataFetcher("relationships", new AuthenticatedResolver<>(
new EntityRelationshipsResultResolver(graphClient)
))
.dataFetcher("lineage", new AuthenticatedResolver<>(
new EntityLineageResultResolver(graphClient)
))
.dataFetcher("platform", new AuthenticatedResolver<>(
new LoadableTypeResolver<>(dataPlatformType,
(env) -> ((MLModel) env.getSource()).getPlatform().getUrn()))
Expand All @@ -1115,6 +1161,9 @@ private void configureMLFeatureTableResolvers(final RuntimeWiring.Builder builde
.dataFetcher("relationships", new AuthenticatedResolver<>(
new EntityRelationshipsResultResolver(graphClient)
))
.dataFetcher("lineage", new AuthenticatedResolver<>(
new EntityLineageResultResolver(graphClient)
))
.dataFetcher("platform", new AuthenticatedResolver<>(
new LoadableTypeResolver<>(dataPlatformType,
(env) -> ((MLModelGroup) env.getSource()).getPlatform().getUrn()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public static Map<String, String> buildFacetFilters(@Nullable List<FacetFilterIn

@Nullable
public static Filter buildFilter(@Nullable List<FacetFilterInput> facetFilterInputs) {
if (facetFilterInputs == null) {
if (facetFilterInputs == null || facetFilterInputs.isEmpty()) {
return null;
}
return new Filter().setOr(new ConjunctiveCriterionArray(new ConjunctiveCriterion().setAnd(new CriterionArray(facetFilterInputs.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.linkedin.datahub.graphql.generated.AppConfig;
import com.linkedin.datahub.graphql.generated.EntityType;
import com.linkedin.datahub.graphql.generated.IdentityManagementConfig;
import com.linkedin.datahub.graphql.generated.LineageConfig;
import com.linkedin.datahub.graphql.generated.ManagedIngestionConfig;
import com.linkedin.datahub.graphql.generated.PoliciesConfig;
import com.linkedin.datahub.graphql.generated.Privilege;
Expand All @@ -26,11 +27,17 @@ public class AppConfigResolver implements DataFetcher<CompletableFuture<AppConfi
private final GitVersion _gitVersion;
private final boolean _isAnalyticsEnabled;
private final IngestionConfiguration _ingestionConfiguration;
private final boolean _supportsImpactAnalysis;

public AppConfigResolver(final GitVersion gitVersion, final boolean isAnalyticsEnabled, final IngestionConfiguration ingestionConfiguration) {
public AppConfigResolver(
final GitVersion gitVersion,
final boolean isAnalyticsEnabled,
final IngestionConfiguration ingestionConfiguration,
final boolean supportsImpactAnalysis) {
_gitVersion = gitVersion;
_isAnalyticsEnabled = isAnalyticsEnabled;
_ingestionConfiguration = ingestionConfiguration;
_supportsImpactAnalysis = supportsImpactAnalysis;
}

@Override
Expand All @@ -42,6 +49,10 @@ public CompletableFuture<AppConfig> get(final DataFetchingEnvironment environmen

appConfig.setAppVersion(_gitVersion.getVersion());

final LineageConfig lineageConfig = new LineageConfig();
lineageConfig.setSupportsImpactAnalysis(_supportsImpactAnalysis);
appConfig.setLineageConfig(lineageConfig);

final AnalyticsConfig analyticsConfig = new AnalyticsConfig();
analyticsConfig.setEnabled(_isAnalyticsEnabled);

Expand Down Expand Up @@ -122,4 +133,4 @@ private EntityType mapResourceTypeToEntityType(final String resourceType) {
return null;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package com.linkedin.datahub.graphql.resolvers.load;

import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.generated.Entity;
import com.linkedin.datahub.graphql.generated.EntityLineageResult;
import com.linkedin.datahub.graphql.generated.LineageDirection;
import com.linkedin.datahub.graphql.generated.LineageInput;
import com.linkedin.datahub.graphql.generated.LineageRelationship;
import com.linkedin.datahub.graphql.types.common.mappers.UrnToEntityMapper;
import com.linkedin.metadata.graph.GraphClient;
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import javax.annotation.Nullable;

import static com.linkedin.datahub.graphql.resolvers.ResolverUtils.bindArgument;


/**
* GraphQL Resolver responsible for fetching lineage relationships between entities in the DataHub graph.
* Lineage relationship denotes whether an entity is directly upstream or downstream of another entity
*/
public class EntityLineageResultResolver implements DataFetcher<CompletableFuture<EntityLineageResult>> {

private final GraphClient _graphClient;

public EntityLineageResultResolver(final GraphClient graphClient) {
_graphClient = graphClient;
}

@Override
public CompletableFuture<EntityLineageResult> get(DataFetchingEnvironment environment) {
final QueryContext context = environment.getContext();
final String urn = ((Entity) environment.getSource()).getUrn();
final LineageInput input = bindArgument(environment.getArgument("input"), LineageInput.class);

final LineageDirection lineageDirection = input.getDirection();
@Nullable
final Integer start = input.getStart(); // Optional!
@Nullable
final Integer count = input.getCount(); // Optional!

com.linkedin.metadata.graph.LineageDirection resolvedDirection =
com.linkedin.metadata.graph.LineageDirection.valueOf(lineageDirection.toString());
return CompletableFuture.supplyAsync(() -> mapEntityRelationships(lineageDirection,
_graphClient.getLineageEntities(urn, resolvedDirection, start, count, 1, context.getActorUrn())));
}

private EntityLineageResult mapEntityRelationships(final LineageDirection lineageDirection,
final com.linkedin.metadata.graph.EntityLineageResult entityLineageResult) {
final EntityLineageResult result = new EntityLineageResult();
result.setStart(entityLineageResult.getStart());
result.setCount(entityLineageResult.getCount());
result.setTotal(entityLineageResult.getTotal());
result.setRelationships(entityLineageResult.getRelationships()
.stream()
.map(entityRelationship -> mapEntityRelationship(lineageDirection, entityRelationship))
.collect(Collectors.toList()));
return result;
}

private LineageRelationship mapEntityRelationship(final LineageDirection direction,
final com.linkedin.metadata.graph.LineageRelationship lineageRelationship) {
final LineageRelationship result = new LineageRelationship();
final Entity partialEntity = UrnToEntityMapper.map(lineageRelationship.getEntity());
if (partialEntity != null) {
result.setEntity(partialEntity);
}
result.setType(lineageRelationship.getType());
result.setDegree(lineageRelationship.getDegree());
return result;
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.linkedin.datahub.graphql.resolvers.search;

import com.google.common.collect.ImmutableList;
import com.linkedin.datahub.graphql.generated.EntityType;
import com.linkedin.datahub.graphql.generated.SearchAcrossEntitiesInput;
import com.linkedin.datahub.graphql.generated.SearchResults;
Expand All @@ -17,6 +16,7 @@
import lombok.extern.slf4j.Slf4j;

import static com.linkedin.datahub.graphql.resolvers.ResolverUtils.bindArgument;
import static com.linkedin.datahub.graphql.resolvers.search.SearchUtils.SEARCHABLE_ENTITY_TYPES;


/**
Expand All @@ -29,11 +29,6 @@ public class SearchAcrossEntitiesResolver implements DataFetcher<CompletableFutu
private static final int DEFAULT_START = 0;
private static final int DEFAULT_COUNT = 10;

private static final List<EntityType> SEARCHABLE_ENTITY_TYPES =
ImmutableList.of(EntityType.DATASET, EntityType.DASHBOARD, EntityType.CHART, EntityType.MLMODEL,
EntityType.MLMODEL_GROUP, EntityType.MLFEATURE_TABLE, EntityType.DATA_FLOW, EntityType.DATA_JOB,
EntityType.GLOSSARY_TERM, EntityType.TAG, EntityType.CORP_USER, EntityType.CORP_GROUP, EntityType.CONTAINER, EntityType.DOMAIN);

private final EntityClient _entityClient;

@Override
Expand Down
Loading

0 comments on commit 42067d7

Please sign in to comment.