Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(lineagecounts) Include entities that are filtered out due to sibling logic in the filtered count of lineage counts #8152

Merged
merged 1 commit into from
May 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public CompletableFuture<EntityCountResults> get(final DataFetchingEnvironment e
try {
// First, get all counts
Map<String, Long> gmsResult = _entityClient.batchGetTotalEntityCount(
input.getTypes().stream().map(type -> EntityTypeMapper.getName(type)).collect(Collectors.toList()), context.getAuthentication());
input.getTypes().stream().map(EntityTypeMapper::getName).collect(Collectors.toList()), context.getAuthentication());

// bind to a result.
List<EntityCountResult> resultList = gmsResult.entrySet().stream().map(entry -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,77 +122,88 @@ public EntityLineageResult getLineage(@Nonnull Urn entityUrn, @Nonnull LineageDi
offset = Math.max(0, offset - nextEntityLineage.getTotal());
count = Math.max(0, count - nextEntityLineage.getCount() - entityLineage.getCount());

entityLineage.setFiltered(getFiltered(entityLineage) + getFiltered(nextEntityLineage));
entityLineage = nextEntityLineage;
};
}

return ValidationUtils.validateEntityLineageResult(entityLineage, _entityService);
}

private int getFiltered(@Nullable EntityLineageResult entityLineageResult) {
return (entityLineageResult != null && entityLineageResult.getFiltered() != null ? entityLineageResult.getFiltered() : 0);
}

// takes a lineage result and removes any nodes that are siblings of some other node already in the result
private EntityLineageResult filterLineageResultFromSiblings(
@Nonnull final Urn urn,
@Nonnull final Set<Urn> allSiblingsInGroup,
@Nonnull final EntityLineageResult entityLineageResult,
@Nullable final EntityLineageResult existingResult
) {
int numFiltered = 0;

// 1) remove the source entities siblings from this entity's downstreams
final List<LineageRelationship> filteredRelationships = entityLineageResult.getRelationships()
.stream()
.filter(lineageRelationship -> !allSiblingsInGroup.contains(lineageRelationship.getEntity())
|| lineageRelationship.getEntity().equals(urn))
.collect(Collectors.toList());
final Map<Boolean, List<LineageRelationship>> partitionedFilteredRelationships = entityLineageResult.getRelationships()
.stream().collect(Collectors.partitioningBy(
lineageRelationship -> !allSiblingsInGroup.contains(lineageRelationship.getEntity())
|| lineageRelationship.getEntity().equals(urn)));
numFiltered += partitionedFilteredRelationships.get(Boolean.FALSE).size();

final List<LineageRelationship> filteredRelationships = partitionedFilteredRelationships.get(Boolean.TRUE);

// 2) filter out existing lineage to avoid duplicates in our combined result
final Set<Urn> existingUrns = existingResult != null
? existingResult.getRelationships().stream().map(LineageRelationship::getEntity).collect(Collectors.toSet())
: new HashSet<>();
List<LineageRelationship> uniqueFilteredRelationships = filteredRelationships.stream().filter(
lineageRelationship -> !existingUrns.contains(lineageRelationship.getEntity())).collect(Collectors.toList());

// 3) combine this entity's lineage with the lineage we've already seen and remove duplicates
Map<Boolean, List<LineageRelationship>> partitionedUniqueFilteredRelationships = filteredRelationships.stream().collect(
Collectors.partitioningBy(lineageRelationship -> !existingUrns.contains(lineageRelationship.getEntity())));
numFiltered += partitionedUniqueFilteredRelationships.get(Boolean.FALSE).size();

List<LineageRelationship> uniqueFilteredRelationships = partitionedUniqueFilteredRelationships.get(Boolean.TRUE);

// 3) combine this entity's lineage with the lineage we've already seen
final List<LineageRelationship> combinedResults = Stream.concat(
uniqueFilteredRelationships.stream(),
existingResult != null ? existingResult.getRelationships().stream() : ImmutableList.<LineageRelationship>of().stream())
.collect(Collectors.toList());

// 4) fetch the siblings of each lineage result
final Set<Urn> combinedResultUrns = combinedResults.stream().map(result -> result.getEntity()).collect(Collectors.toSet());
final Set<Urn> combinedResultUrns = combinedResults.stream().map(LineageRelationship::getEntity).collect(Collectors.toSet());

final Map<Urn, List<RecordTemplate>> siblingAspects =
_entityService.getLatestAspects(combinedResultUrns, ImmutableSet.of(SIBLINGS_ASPECT_NAME));

// 5) if you are not primary & your sibling is in the results, filter yourself out of the return set
uniqueFilteredRelationships = combinedResults.stream().filter(result -> {
Map<Boolean, List<LineageRelationship>> partitionedFilteredSiblings = combinedResults.stream().collect(Collectors.partitioningBy(result -> {
Optional<RecordTemplate> optionalSiblingsAspect = siblingAspects.get(result.getEntity()).stream().filter(
aspect -> aspect instanceof Siblings
).findAny();

if (!optionalSiblingsAspect.isPresent()) {
if (optionalSiblingsAspect.isEmpty()) {
return true;
}


final Siblings siblingsAspect = (Siblings) optionalSiblingsAspect.get();

if (siblingsAspect.isPrimary()) {
return true;
}

// if you are not primary and your sibling exists in the result set, filter yourself out
if (siblingsAspect.getSiblings().stream().anyMatch(
sibling -> combinedResultUrns.contains(sibling)
)) {
return false;
}

return true;
}).collect(Collectors.toList());

entityLineageResult.setRelationships(new LineageRelationshipArray(uniqueFilteredRelationships));
entityLineageResult.setTotal(entityLineageResult.getTotal() + (existingResult != null ? existingResult.getTotal() : 0));
entityLineageResult.setCount(uniqueFilteredRelationships.size());
return ValidationUtils.validateEntityLineageResult(entityLineageResult, _entityService);
return siblingsAspect.getSiblings().stream().noneMatch(combinedResultUrns::contains);
}));

numFiltered += partitionedFilteredSiblings.get(Boolean.FALSE).size();
uniqueFilteredRelationships = partitionedFilteredSiblings.get(Boolean.TRUE);

EntityLineageResult combinedLineageResult = new EntityLineageResult();
combinedLineageResult.setStart(entityLineageResult.getStart());
combinedLineageResult.setRelationships(new LineageRelationshipArray(uniqueFilteredRelationships));
combinedLineageResult.setTotal(entityLineageResult.getTotal() + (existingResult != null ? existingResult.getTotal() : 0));
combinedLineageResult.setCount(uniqueFilteredRelationships.size());
combinedLineageResult.setFiltered(numFiltered + getFiltered(existingResult) + getFiltered(entityLineageResult));
return ValidationUtils.validateEntityLineageResult(combinedLineageResult, _entityService);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ public static EntityLineageResult validateEntityLineageResult(@Nullable final En
.collect(Collectors.toCollection(LineageRelationshipArray::new));

validatedEntityLineageResult.setFiltered(
entityLineageResult.getRelationships().size() - validatedRelationships.size());
(entityLineageResult.hasFiltered() && entityLineageResult.getFiltered() != null ? entityLineageResult.getFiltered() : 0)
+ entityLineageResult.getRelationships().size() - validatedRelationships.size());
validatedEntityLineageResult.setRelationships(validatedRelationships);

return validatedEntityLineageResult;
Expand Down
Loading