Skip to content

Commit

Permalink
Merge branch 'master' into feature/ing-553
Browse files Browse the repository at this point in the history
  • Loading branch information
mayurinehate authored Mar 27, 2024
2 parents 2176891 + f0bdc24 commit 0f6426d
Show file tree
Hide file tree
Showing 84 changed files with 21,547 additions and 1,104 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build-and-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ jobs:
./gradlew build \
-x :metadata-ingestion:build \
-x :metadata-ingestion:check \
-x docs-website:build \
-x :docs-website:build \
-x :metadata-integration:java:spark-lineage:test \
-x :metadata-io:test \
-x :metadata-ingestion-modules:airflow-plugin:build \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.datahub.authorization.AuthorizationConfiguration;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.data.template.SetMode;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.authorization.AuthorizationUtils;
import com.linkedin.datahub.graphql.generated.Entity;
Expand All @@ -16,6 +17,7 @@
import com.linkedin.datahub.graphql.generated.Restricted;
import com.linkedin.datahub.graphql.types.common.mappers.UrnToEntityMapper;
import com.linkedin.metadata.graph.SiblingGraphService;
import com.linkedin.metadata.query.LineageFlags;
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;
import io.datahubproject.metadata.services.RestrictedService;
Expand Down Expand Up @@ -77,8 +79,9 @@ public CompletableFuture<EntityLineageResult> get(DataFetchingEnvironment enviro
1,
separateSiblings != null ? input.getSeparateSiblings() : false,
new HashSet<>(),
startTimeMillis,
endTimeMillis);
new LineageFlags()
.setStartTimeMillis(startTimeMillis, SetMode.REMOVE_IF_NULL)
.setEndTimeMillis(endTimeMillis, SetMode.REMOVE_IF_NULL));

Set<Urn> restrictedUrns = new HashSet<>();
entityLineageResult
Expand All @@ -96,7 +99,7 @@ public CompletableFuture<EntityLineageResult> get(DataFetchingEnvironment enviro
} catch (Exception e) {
log.error("Failed to fetch lineage for {}", finalUrn);
throw new RuntimeException(
String.format("Failed to fetch lineage for {}", finalUrn), e);
String.format("Failed to fetch lineage for %s", finalUrn), e);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@
import com.linkedin.datahub.graphql.generated.ScrollAcrossLineageInput;
import com.linkedin.datahub.graphql.generated.ScrollAcrossLineageResults;
import com.linkedin.datahub.graphql.resolvers.ResolverUtils;
import com.linkedin.datahub.graphql.types.common.mappers.LineageFlagsInputMapper;
import com.linkedin.datahub.graphql.types.entitytype.EntityTypeMapper;
import com.linkedin.datahub.graphql.types.mappers.UrnScrollAcrossLineageResultsMapper;
import com.linkedin.entity.client.EntityClient;
import com.linkedin.metadata.query.LineageFlags;
import com.linkedin.metadata.query.SearchFlags;
import com.linkedin.r2.RemoteInvocationException;
import graphql.schema.DataFetcher;
Expand Down Expand Up @@ -73,10 +75,19 @@ public CompletableFuture<ScrollAcrossLineageResults> get(DataFetchingEnvironment
String keepAlive = input.getKeepAlive() != null ? input.getKeepAlive() : "5m";

@Nullable
final Long startTimeMillis =
input.getStartTimeMillis() == null ? null : input.getStartTimeMillis();
Long startTimeMillis = input.getStartTimeMillis() == null ? null : input.getStartTimeMillis();
@Nullable
final Long endTimeMillis = input.getEndTimeMillis() == null ? null : input.getEndTimeMillis();
Long endTimeMillis = input.getEndTimeMillis() == null ? null : input.getEndTimeMillis();

final LineageFlags lineageFlags = LineageFlagsInputMapper.map(context, input.getLineageFlags());
if (lineageFlags.getStartTimeMillis() == null && startTimeMillis != null) {
lineageFlags.setStartTimeMillis(startTimeMillis);
}

if (lineageFlags.getEndTimeMillis() == null && endTimeMillis != null) {
lineageFlags.setEndTimeMillis(endTimeMillis);
}
;

com.linkedin.metadata.graph.LineageDirection resolvedDirection =
com.linkedin.metadata.graph.LineageDirection.valueOf(lineageDirection.toString());
Expand Down Expand Up @@ -110,7 +121,8 @@ public CompletableFuture<ScrollAcrossLineageResults> get(DataFetchingEnvironment
_entityClient.scrollAcrossLineage(
context
.getOperationContext()
.withSearchFlags(flags -> searchFlags != null ? searchFlags : flags),
.withSearchFlags(flags -> searchFlags != null ? searchFlags : flags)
.withLineageFlags(flags -> lineageFlags != null ? lineageFlags : flags),
urn,
resolvedDirection,
entityNames,
Expand All @@ -120,9 +132,7 @@ public CompletableFuture<ScrollAcrossLineageResults> get(DataFetchingEnvironment
null,
scrollId,
keepAlive,
count,
startTimeMillis,
endTimeMillis));
count));
} catch (RemoteInvocationException e) {
log.error(
"Failed to execute scroll across relationships: source urn {}, direction {}, entity types {}, query {}, filters: {}, start: {}, count: {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@
import com.linkedin.datahub.graphql.generated.SearchAcrossLineageInput;
import com.linkedin.datahub.graphql.generated.SearchAcrossLineageResults;
import com.linkedin.datahub.graphql.resolvers.ResolverUtils;
import com.linkedin.datahub.graphql.types.common.mappers.LineageFlagsInputMapper;
import com.linkedin.datahub.graphql.types.common.mappers.SearchFlagsInputMapper;
import com.linkedin.datahub.graphql.types.entitytype.EntityTypeMapper;
import com.linkedin.datahub.graphql.types.mappers.UrnSearchAcrossLineageResultsMapper;
import com.linkedin.entity.client.EntityClient;
import com.linkedin.metadata.models.EntitySpec;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.query.LineageFlags;
import com.linkedin.metadata.query.SearchFlags;
import com.linkedin.metadata.query.filter.Filter;
import com.linkedin.metadata.search.LineageSearchResult;
Expand Down Expand Up @@ -106,10 +108,18 @@ public CompletableFuture<SearchAcrossLineageResults> get(DataFetchingEnvironment
final Integer maxHops = getMaxHops(facetFilters);

@Nullable
final Long startTimeMillis =
input.getStartTimeMillis() == null ? null : input.getStartTimeMillis();
Long startTimeMillis = input.getStartTimeMillis() == null ? null : input.getStartTimeMillis();
@Nullable
final Long endTimeMillis = input.getEndTimeMillis() == null ? null : input.getEndTimeMillis();
Long endTimeMillis = input.getEndTimeMillis() == null ? null : input.getEndTimeMillis();

final LineageFlags lineageFlags = LineageFlagsInputMapper.map(context, input.getLineageFlags());
if (lineageFlags.getStartTimeMillis() == null && startTimeMillis != null) {
lineageFlags.setStartTimeMillis(startTimeMillis);
}

if (lineageFlags.getEndTimeMillis() == null && endTimeMillis != null) {
lineageFlags.setEndTimeMillis(endTimeMillis);
}

com.linkedin.metadata.graph.LineageDirection resolvedDirection =
com.linkedin.metadata.graph.LineageDirection.valueOf(lineageDirection.toString());
Expand Down Expand Up @@ -140,7 +150,10 @@ public CompletableFuture<SearchAcrossLineageResults> get(DataFetchingEnvironment
}
LineageSearchResult salResults =
_entityClient.searchAcrossLineage(
context.getOperationContext().withSearchFlags(flags -> searchFlags),
context
.getOperationContext()
.withSearchFlags(flags -> searchFlags)
.withLineageFlags(flags -> lineageFlags),
urn,
resolvedDirection,
entityNames,
Expand All @@ -149,9 +162,7 @@ public CompletableFuture<SearchAcrossLineageResults> get(DataFetchingEnvironment
filter,
null,
start,
count,
startTimeMillis,
endTimeMillis);
count);

return UrnSearchAcrossLineageResultsMapper.map(context, salResults);
} catch (RemoteInvocationException e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package com.linkedin.datahub.graphql.types.common.mappers;

import com.linkedin.common.UrnArray;
import com.linkedin.common.UrnArrayMap;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.generated.EntityTypeToPlatforms;
import com.linkedin.datahub.graphql.generated.LineageFlags;
import com.linkedin.datahub.graphql.types.entitytype.EntityTypeMapper;
import com.linkedin.datahub.graphql.types.mappers.ModelMapper;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

/**
* Maps GraphQL SearchFlags to Pegasus
*
* <p>To be replaced by auto-generated mappers implementations
*/
public class LineageFlagsInputMapper
implements ModelMapper<LineageFlags, com.linkedin.metadata.query.LineageFlags> {

public static final LineageFlagsInputMapper INSTANCE = new LineageFlagsInputMapper();

@Nonnull
public static com.linkedin.metadata.query.LineageFlags map(
QueryContext queryContext, @Nonnull final LineageFlags lineageFlags) {
return INSTANCE.apply(queryContext, lineageFlags);
}

@Override
public com.linkedin.metadata.query.LineageFlags apply(
QueryContext context, @Nullable final LineageFlags lineageFlags) {
com.linkedin.metadata.query.LineageFlags result =
new com.linkedin.metadata.query.LineageFlags();
if (lineageFlags == null) {
return result;
}
if (lineageFlags.getIgnoreAsHops() != null) {
result.setIgnoreAsHops(mapIgnoreAsHops(lineageFlags.getIgnoreAsHops()));
}
if (lineageFlags.getEndTimeMillis() != null) {
result.setEndTimeMillis(lineageFlags.getEndTimeMillis());
}
if (lineageFlags.getStartTimeMillis() != null) {
result.setStartTimeMillis(lineageFlags.getStartTimeMillis());
}
if (lineageFlags.getEntitiesExploredPerHopLimit() != null) {
result.setEntitiesExploredPerHopLimit(lineageFlags.getEntitiesExploredPerHopLimit());
}
return result;
}

private static UrnArrayMap mapIgnoreAsHops(List<EntityTypeToPlatforms> ignoreAsHops) {
UrnArrayMap result = new UrnArrayMap();
ignoreAsHops.forEach(
ignoreAsHop ->
result.put(
EntityTypeMapper.getName(ignoreAsHop.getEntityType()),
new UrnArray(
Optional.ofNullable(ignoreAsHop.getPlatforms())
.orElse(Collections.emptyList())
.stream()
.map(UrnUtils::getUrn)
.collect(Collectors.toList()))));
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ public com.linkedin.datahub.graphql.generated.DataProcessRunEvent apply(
if (runEvent.hasResult()) {
result.setResult(DataProcessInstanceRunResultMapper.map(context, runEvent.getResult()));
}
if (runEvent.hasDurationMillis()) {
result.setDurationMillis(runEvent.getDurationMillis());
}

return result;
}
Expand Down
5 changes: 5 additions & 0 deletions datahub-graphql-core/src/main/resources/entity.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -6471,6 +6471,11 @@ type DataProcessRunEvent implements TimeSeriesAspect {
The timestamp associated with the run event in milliseconds
"""
timestampMillis: Long!

"""
The duration of the run in milliseconds
"""
durationMillis: Long
}

"""
Expand Down
51 changes: 49 additions & 2 deletions datahub-graphql-core/src/main/resources/search.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,43 @@ input SearchFlags {
includeRestricted: Boolean
}

"""
Flags to control lineage behavior
"""
input LineageFlags {
"""
Limits the number of results explored per hop, still gets all edges each time a hop happens
"""
entitiesExploredPerHopLimit: Int

"""
An optional starting time to filter on
"""
startTimeMillis: Long
"""
An optional ending time to filter on
"""
endTimeMillis: Long

"""
Map of entity types to platforms to ignore when counting hops during graph walk. Note: this can potentially cause
a large amount of additional hops to occur and should be used with caution.
"""
ignoreAsHops: [EntityTypeToPlatforms!]
}

input EntityTypeToPlatforms {
"""
Entity type to ignore as hops, if no platform is applied applies to all entities of this type.
"""
entityType: EntityType!

"""
List of platforms to ignore as hops, empty implies all. Must be a valid platform urn
"""
platforms: [String!]
}

"""
Input arguments for a full text search query across entities
"""
Expand Down Expand Up @@ -345,16 +382,21 @@ input SearchAcrossLineageInput {
"""
An optional starting time to filter on
"""
startTimeMillis: Long
startTimeMillis: Long @deprecated(reason: "Use LineageFlags instead")
"""
An optional ending time to filter on
"""
endTimeMillis: Long
endTimeMillis: Long @deprecated(reason: "Use LineageFlags instead")

"""
Flags controlling search options
"""
searchFlags: SearchFlags

"""
Flags controlling the lineage query
"""
lineageFlags: LineageFlags
}

"""
Expand Down Expand Up @@ -415,6 +457,11 @@ input ScrollAcrossLineageInput {
Flags controlling search options
"""
searchFlags: SearchFlags

"""
Flags controlling the lineage query
"""
lineageFlags: LineageFlags
}

"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@
import com.linkedin.metadata.search.MatchedFieldArray;
import com.linkedin.metadata.search.SearchResultMetadata;
import graphql.schema.DataFetchingEnvironment;
import io.datahubproject.metadata.context.OperationContext;
import java.io.InputStream;
import java.util.Collections;
import java.util.List;
import org.mockito.ArgumentCaptor;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -103,9 +105,10 @@ public void testSearchAcrossLineage() throws Exception {
lineageSearchEntity.setMatchedFields(new MatchedFieldArray());
lineageSearchEntity.setPaths(new UrnArrayArray());
lineageSearchResult.setEntities(new LineageSearchEntityArray(lineageSearchEntity));
ArgumentCaptor<OperationContext> opContext = ArgumentCaptor.forClass(OperationContext.class);

when(_entityClient.searchAcrossLineage(
any(),
opContext.capture(),
eq(UrnUtils.getUrn(SOURCE_URN_STRING)),
eq(com.linkedin.metadata.graph.LineageDirection.DOWNSTREAM),
anyList(),
Expand All @@ -114,14 +117,18 @@ public void testSearchAcrossLineage() throws Exception {
any(),
eq(null),
eq(START),
eq(COUNT),
eq(START_TIMESTAMP_MILLIS),
eq(END_TIMESTAMP_MILLIS)))
eq(COUNT)))
.thenReturn(lineageSearchResult);

final SearchAcrossLineageResults results = _resolver.get(_dataFetchingEnvironment).join();
assertEquals(results.getCount(), 10);
assertEquals(results.getTotal(), 1);
assertEquals(
opContext.getValue().getSearchContext().getLineageFlags().getStartTimeMillis(),
START_TIMESTAMP_MILLIS);
assertEquals(
opContext.getValue().getSearchContext().getLineageFlags().getEndTimeMillis(),
END_TIMESTAMP_MILLIS);

final List<SearchAcrossLineageResult> entities = results.getSearchResults();
assertEquals(entities.size(), 1);
Expand Down
3 changes: 2 additions & 1 deletion datahub-web-react/.eslintrc.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ module.exports = {
'plugin:vitest/recommended',
'prettier',
],
plugins: ['@typescript-eslint'],
plugins: ['@typescript-eslint', 'react-refresh'],
parserOptions: {
ecmaVersion: 2020, // Allows for the parsing of modern ECMAScript features
sourceType: 'module', // Allows for the use of imports
Expand Down Expand Up @@ -48,6 +48,7 @@ module.exports = {
],
'vitest/prefer-to-be': 'off',
'@typescript-eslint/no-use-before-define': ['error', { functions: false, classes: false }],
'react-refresh/only-export-components': ['warn', { 'allowConstantExport': true }],
},
settings: {
react: {
Expand Down
Loading

0 comments on commit 0f6426d

Please sign in to comment.