diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/group/EntityCountsResolver.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/group/EntityCountsResolver.java index 9fba4b8ca7712..d0874b21fb106 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/group/EntityCountsResolver.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/group/EntityCountsResolver.java @@ -38,7 +38,7 @@ public CompletableFuture get(final DataFetchingEnvironment e try { // First, get all counts Map gmsResult = _entityClient.batchGetTotalEntityCount( - input.getTypes().stream().map(type -> EntityTypeMapper.getName(type)).collect(Collectors.toList()), context.getAuthentication()); + input.getTypes().stream().map(EntityTypeMapper::getName).collect(Collectors.toList()), context.getAuthentication()); // bind to a result. List resultList = gmsResult.entrySet().stream().map(entry -> { diff --git a/metadata-io/src/main/java/com/linkedin/metadata/graph/SiblingGraphService.java b/metadata-io/src/main/java/com/linkedin/metadata/graph/SiblingGraphService.java index 8457ad478e243..f3ba33a5f4081 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/graph/SiblingGraphService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/graph/SiblingGraphService.java @@ -122,6 +122,7 @@ public EntityLineageResult getLineage(@Nonnull Urn entityUrn, @Nonnull LineageDi offset = Math.max(0, offset - nextEntityLineage.getTotal()); count = Math.max(0, count - nextEntityLineage.getCount() - entityLineage.getCount()); + entityLineage.setFiltered(getFiltered(entityLineage) + getFiltered(nextEntityLineage)); entityLineage = nextEntityLineage; }; } @@ -129,6 +130,10 @@ public EntityLineageResult getLineage(@Nonnull Urn entityUrn, @Nonnull LineageDi return ValidationUtils.validateEntityLineageResult(entityLineage, _entityService); } + private int getFiltered(@Nullable EntityLineageResult entityLineageResult) { + return (entityLineageResult != null && entityLineageResult.getFiltered() != null ? entityLineageResult.getFiltered() : 0); + } + // takes a lineage result and removes any nodes that are siblings of some other node already in the result private EntityLineageResult filterLineageResultFromSiblings( @Nonnull final Urn urn, @@ -136,43 +141,50 @@ private EntityLineageResult filterLineageResultFromSiblings( @Nonnull final EntityLineageResult entityLineageResult, @Nullable final EntityLineageResult existingResult ) { + int numFiltered = 0; + // 1) remove the source entities siblings from this entity's downstreams - final List filteredRelationships = entityLineageResult.getRelationships() - .stream() - .filter(lineageRelationship -> !allSiblingsInGroup.contains(lineageRelationship.getEntity()) - || lineageRelationship.getEntity().equals(urn)) - .collect(Collectors.toList()); + final Map> partitionedFilteredRelationships = entityLineageResult.getRelationships() + .stream().collect(Collectors.partitioningBy( + lineageRelationship -> !allSiblingsInGroup.contains(lineageRelationship.getEntity()) + || lineageRelationship.getEntity().equals(urn))); + numFiltered += partitionedFilteredRelationships.get(Boolean.FALSE).size(); + + final List filteredRelationships = partitionedFilteredRelationships.get(Boolean.TRUE); // 2) filter out existing lineage to avoid duplicates in our combined result final Set existingUrns = existingResult != null ? existingResult.getRelationships().stream().map(LineageRelationship::getEntity).collect(Collectors.toSet()) : new HashSet<>(); - List uniqueFilteredRelationships = filteredRelationships.stream().filter( - lineageRelationship -> !existingUrns.contains(lineageRelationship.getEntity())).collect(Collectors.toList()); - // 3) combine this entity's lineage with the lineage we've already seen and remove duplicates + Map> partitionedUniqueFilteredRelationships = filteredRelationships.stream().collect( + Collectors.partitioningBy(lineageRelationship -> !existingUrns.contains(lineageRelationship.getEntity()))); + numFiltered += partitionedUniqueFilteredRelationships.get(Boolean.FALSE).size(); + + List uniqueFilteredRelationships = partitionedUniqueFilteredRelationships.get(Boolean.TRUE); + + // 3) combine this entity's lineage with the lineage we've already seen final List combinedResults = Stream.concat( uniqueFilteredRelationships.stream(), existingResult != null ? existingResult.getRelationships().stream() : ImmutableList.of().stream()) .collect(Collectors.toList()); // 4) fetch the siblings of each lineage result - final Set combinedResultUrns = combinedResults.stream().map(result -> result.getEntity()).collect(Collectors.toSet()); + final Set combinedResultUrns = combinedResults.stream().map(LineageRelationship::getEntity).collect(Collectors.toSet()); final Map> siblingAspects = _entityService.getLatestAspects(combinedResultUrns, ImmutableSet.of(SIBLINGS_ASPECT_NAME)); // 5) if you are not primary & your sibling is in the results, filter yourself out of the return set - uniqueFilteredRelationships = combinedResults.stream().filter(result -> { + Map> partitionedFilteredSiblings = combinedResults.stream().collect(Collectors.partitioningBy(result -> { Optional optionalSiblingsAspect = siblingAspects.get(result.getEntity()).stream().filter( aspect -> aspect instanceof Siblings ).findAny(); - if (!optionalSiblingsAspect.isPresent()) { + if (optionalSiblingsAspect.isEmpty()) { return true; } - final Siblings siblingsAspect = (Siblings) optionalSiblingsAspect.get(); if (siblingsAspect.isPrimary()) { @@ -180,19 +192,18 @@ private EntityLineageResult filterLineageResultFromSiblings( } // if you are not primary and your sibling exists in the result set, filter yourself out - if (siblingsAspect.getSiblings().stream().anyMatch( - sibling -> combinedResultUrns.contains(sibling) - )) { - return false; - } - - return true; - }).collect(Collectors.toList()); - - entityLineageResult.setRelationships(new LineageRelationshipArray(uniqueFilteredRelationships)); - entityLineageResult.setTotal(entityLineageResult.getTotal() + (existingResult != null ? existingResult.getTotal() : 0)); - entityLineageResult.setCount(uniqueFilteredRelationships.size()); - return ValidationUtils.validateEntityLineageResult(entityLineageResult, _entityService); + return siblingsAspect.getSiblings().stream().noneMatch(combinedResultUrns::contains); + })); + + numFiltered += partitionedFilteredSiblings.get(Boolean.FALSE).size(); + uniqueFilteredRelationships = partitionedFilteredSiblings.get(Boolean.TRUE); + + EntityLineageResult combinedLineageResult = new EntityLineageResult(); + combinedLineageResult.setStart(entityLineageResult.getStart()); + combinedLineageResult.setRelationships(new LineageRelationshipArray(uniqueFilteredRelationships)); + combinedLineageResult.setTotal(entityLineageResult.getTotal() + (existingResult != null ? existingResult.getTotal() : 0)); + combinedLineageResult.setCount(uniqueFilteredRelationships.size()); + combinedLineageResult.setFiltered(numFiltered + getFiltered(existingResult) + getFiltered(entityLineageResult)); + return ValidationUtils.validateEntityLineageResult(combinedLineageResult, _entityService); } - } diff --git a/metadata-io/src/main/java/com/linkedin/metadata/shared/ValidationUtils.java b/metadata-io/src/main/java/com/linkedin/metadata/shared/ValidationUtils.java index 92e68d805d664..928c70a7b3de1 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/shared/ValidationUtils.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/shared/ValidationUtils.java @@ -150,7 +150,8 @@ public static EntityLineageResult validateEntityLineageResult(@Nullable final En .collect(Collectors.toCollection(LineageRelationshipArray::new)); validatedEntityLineageResult.setFiltered( - entityLineageResult.getRelationships().size() - validatedRelationships.size()); + (entityLineageResult.hasFiltered() && entityLineageResult.getFiltered() != null ? entityLineageResult.getFiltered() : 0) + + entityLineageResult.getRelationships().size() - validatedRelationships.size()); validatedEntityLineageResult.setRelationships(validatedRelationships); return validatedEntityLineageResult; diff --git a/metadata-io/src/test/java/com/linkedin/metadata/graph/sibling/SiblingGraphServiceTest.java b/metadata-io/src/test/java/com/linkedin/metadata/graph/sibling/SiblingGraphServiceTest.java index 388763789fb8f..c6677c171b30e 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/graph/sibling/SiblingGraphServiceTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/graph/sibling/SiblingGraphServiceTest.java @@ -14,8 +14,11 @@ import com.linkedin.metadata.graph.LineageRelationshipArray; import com.linkedin.metadata.graph.SiblingGraphService; import java.net.URISyntaxException; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import javax.annotation.Nonnull; import org.mockito.Mockito; import org.testng.annotations.BeforeClass; @@ -54,6 +57,7 @@ public class SiblingGraphServiceTest { * Some test relationships. */ protected static String downstreamOf = "DownstreamOf"; + protected static String upstreamOf = "UpstreamOf"; private GraphService _graphService; private SiblingGraphService _client; @@ -263,7 +267,7 @@ public void testSiblingInResult() throws Exception { EntityLineageResult expectedResult = mockResult.clone(); expectedResult.setTotal(3); expectedResult.setCount(2); - expectedResult.setFiltered(0); + expectedResult.setFiltered(1); expectedResult.setRelationships(new LineageRelationshipArray(relationship1, relationship2)); EntityLineageResult upstreamLineage = service.getLineage(datasetFourUrn, LineageDirection.UPSTREAM, 0, 100, 1); @@ -311,8 +315,8 @@ public void testCombineSiblingResult() { expectedResult.setCount(3); expectedResult.setStart(0); - expectedResult.setTotal(3); - expectedResult.setFiltered(0); + expectedResult.setTotal(4); + expectedResult.setFiltered(1); expectedResult.setRelationships(expectedRelationships); mockResult.setStart(0); @@ -324,18 +328,19 @@ public void testCombineSiblingResult() { siblingRelationships.add(relationship4); siblingRelationships.add(relationship1); // duplicate from sibling's lineage, we should not see duplicates in result siblingMockResult.setStart(0); - siblingMockResult.setTotal(2); + siblingMockResult.setTotal(3); siblingMockResult.setCount(2); siblingMockResult.setRelationships(siblingRelationships); when(_graphService.getLineage( - datasetThreeUrn, LineageDirection.UPSTREAM, 0, 99, 1, null, null - )).thenReturn(siblingMockResult); - + Mockito.eq(datasetThreeUrn), Mockito.eq(LineageDirection.UPSTREAM), Mockito.anyInt(), Mockito.anyInt(), + Mockito.eq(1), Mockito.eq(null), Mockito.eq(null) + )).then(invocation -> siblingMockResult.clone()); when(_graphService.getLineage( - datasetFourUrn, LineageDirection.UPSTREAM, 0, 100, 1, null, null - )).thenReturn(mockResult); + Mockito.eq(datasetFourUrn), Mockito.eq(LineageDirection.UPSTREAM), Mockito.anyInt(), Mockito.anyInt(), + Mockito.eq(1), Mockito.eq(null), Mockito.eq(null) + )).then(invocation -> mockResult.clone()); Siblings siblingInSearchResult = new Siblings(); siblingInSearchResult.setPrimary(true); @@ -397,20 +402,20 @@ public void testUpstreamOfSiblings() { relationship3.setType(downstreamOf); relationship3.setEntity(datasetThreeUrn); - LineageRelationship relationship4 = new LineageRelationship(); - relationship4.setDegree(0); - relationship4.setType(downstreamOf); - relationship4.setEntity(datasetFiveUrn); + LineageRelationship relationship5 = new LineageRelationship(); + relationship5.setDegree(0); + relationship5.setType(downstreamOf); + relationship5.setEntity(datasetFiveUrn); relationships.add(relationship1); - expectedRelationships.add(relationship4); + expectedRelationships.add(relationship5); expectedRelationships.add(relationship1); expectedResult.setCount(2); expectedResult.setStart(0); expectedResult.setTotal(3); - expectedResult.setFiltered(0); + expectedResult.setFiltered(1); expectedResult.setRelationships(expectedRelationships); mockResult.setStart(0); @@ -419,7 +424,7 @@ public void testUpstreamOfSiblings() { mockResult.setRelationships(relationships); siblingRelationships.add(relationship2); - siblingRelationships.add(relationship4); + siblingRelationships.add(relationship5); siblingMockResult.setStart(0); siblingMockResult.setTotal(2); siblingMockResult.setCount(2); @@ -450,7 +455,11 @@ public void testUpstreamOfSiblings() { Siblings dataset3Siblings = new Siblings(); dataset3Siblings.setPrimary(false); - dataset3Siblings.setSiblings(new UrnArray(ImmutableList.of())); + dataset3Siblings.setSiblings(new UrnArray(ImmutableList.of(datasetFourUrn))); + + Siblings dataset4Siblings = new Siblings(); + dataset4Siblings.setPrimary(true); + dataset4Siblings.setSiblings(new UrnArray(ImmutableList.of(datasetThreeUrn))); Siblings dataset5Siblings = new Siblings(); dataset5Siblings.setPrimary(true); @@ -460,6 +469,7 @@ public void testUpstreamOfSiblings() { datasetOneUrn, ImmutableList.of(dataset1Siblings), datasetTwoUrn, ImmutableList.of(dataset2Siblings), datasetThreeUrn, ImmutableList.of(dataset3Siblings), + datasetFourUrn, ImmutableList.of(dataset4Siblings), datasetFiveUrn, ImmutableList.of(dataset5Siblings) ); @@ -471,6 +481,32 @@ public void testUpstreamOfSiblings() { // assert your lineage will not contain two siblings assertEquals(upstreamLineage, expectedResult); + + when(_graphService.getLineage( + datasetThreeUrn, LineageDirection.UPSTREAM, 0, 100, 1, null, null + )).thenReturn(siblingMockResult); + + + when(_graphService.getLineage( + datasetFourUrn, LineageDirection.UPSTREAM, 0, 99, 1, null, null + )).thenReturn(mockResult); + + siblingInSearchResult = new Siblings(); + siblingInSearchResult.setPrimary(false); + siblingInSearchResult.setSiblings(new UrnArray(ImmutableList.of(datasetFourUrn))); + + when(_mockEntityService.getLatestAspect(datasetThreeUrn, SIBLINGS_ASPECT_NAME)).thenReturn(siblingInSearchResult); + + upstreamLineage = service.getLineage(datasetThreeUrn, LineageDirection.UPSTREAM, 0, 100, 1); + + LineageRelationshipArray siblingExpectedRelationships = new LineageRelationshipArray(); + siblingExpectedRelationships.add(relationship1); + siblingExpectedRelationships.add(relationship5); + + expectedResult.setRelationships(siblingExpectedRelationships); + + // assert your lineage will not contain two siblings + assertEquals(upstreamLineage, expectedResult); } // we should be combining lineage of siblings of siblings @@ -560,11 +596,403 @@ public void testUpstreamOfSiblingSiblings() { SiblingGraphService service = _client; - EntityLineageResult upstreamLineage = service.getLineage(datasetOneUrn, LineageDirection.UPSTREAM, 0, 100, 1); + for (Urn urn : List.of(datasetOneUrn, datasetTwoUrn, datasetThreeUrn)) { + EntityLineageResult upstreamLineage = service.getLineage(datasetOneUrn, LineageDirection.UPSTREAM, 0, 100, 1); + assertEquals(upstreamLineage, expectedResult); + } + } + + @Test + public void testRelationshipWithSibling() throws CloneNotSupportedException { + EntityLineageResult mockResult = new EntityLineageResult(); + EntityLineageResult siblingMockResult = new EntityLineageResult(); + EntityLineageResult expectedResult = new EntityLineageResult(); + + LineageRelationshipArray relationships = new LineageRelationshipArray(); + LineageRelationshipArray siblingRelationships = new LineageRelationshipArray(); + LineageRelationshipArray expectedRelationships = new LineageRelationshipArray(); + + LineageRelationship relationship1 = new LineageRelationship(); + relationship1.setDegree(0); + relationship1.setType(downstreamOf); + relationship1.setEntity(datasetOneUrn); + + LineageRelationship relationship2 = new LineageRelationship(); + relationship2.setDegree(0); + relationship2.setType(downstreamOf); + relationship2.setEntity(datasetTwoUrn); + + LineageRelationship relationship3 = new LineageRelationship(); + relationship3.setDegree(0); + relationship3.setType(downstreamOf); + relationship3.setEntity(datasetThreeUrn); + + LineageRelationship relationship5 = new LineageRelationship(); + relationship5.setDegree(0); + relationship5.setType(downstreamOf); + relationship5.setEntity(datasetFiveUrn); + + relationships.add(relationship1); + // relationship between entity and its sibling + relationships.add(relationship3); + + expectedRelationships.add(relationship5); + expectedRelationships.add(relationship1); + + expectedResult.setCount(2); + expectedResult.setStart(0); + expectedResult.setTotal(4); + expectedResult.setFiltered(2); + expectedResult.setRelationships(expectedRelationships); + + mockResult.setStart(0); + mockResult.setTotal(2); + mockResult.setCount(2); + mockResult.setRelationships(relationships); + + siblingRelationships.add(relationship2); + siblingRelationships.add(relationship5); + siblingMockResult.setStart(0); + siblingMockResult.setTotal(2); + siblingMockResult.setCount(2); + siblingMockResult.setRelationships(siblingRelationships); + + when(_graphService.getLineage( + Mockito.eq(datasetThreeUrn), Mockito.eq(LineageDirection.UPSTREAM), Mockito.anyInt(), Mockito.anyInt(), + Mockito.eq(1), Mockito.eq(null), Mockito.eq(null) + )).then(invocation -> siblingMockResult.clone()); + + when(_graphService.getLineage( + Mockito.eq(datasetFourUrn), Mockito.eq(LineageDirection.UPSTREAM), Mockito.anyInt(), Mockito.anyInt(), + Mockito.eq(1), Mockito.eq(null), Mockito.eq(null) + )).then(invocation -> mockResult.clone()); + + Siblings primarySibling = new Siblings(); + primarySibling.setPrimary(true); + primarySibling.setSiblings(new UrnArray(ImmutableList.of(datasetThreeUrn))); + + when(_mockEntityService.getLatestAspect(datasetFourUrn, SIBLINGS_ASPECT_NAME)).thenReturn(primarySibling); + + Siblings siblingInSearchResult = new Siblings(); + siblingInSearchResult.setPrimary(false); + siblingInSearchResult.setSiblings(new UrnArray(ImmutableList.of(datasetFourUrn))); + + when(_mockEntityService.getLatestAspect(datasetThreeUrn, SIBLINGS_ASPECT_NAME)).thenReturn(siblingInSearchResult); + + Siblings dataset1Siblings = new Siblings(); + dataset1Siblings.setPrimary(false); + dataset1Siblings.setSiblings(new UrnArray(ImmutableList.of())); + + Siblings dataset2Siblings = new Siblings(); + dataset2Siblings.setPrimary(false); + dataset2Siblings.setSiblings(new UrnArray(ImmutableList.of(datasetFiveUrn))); + + Siblings dataset3Siblings = new Siblings(); + dataset3Siblings.setPrimary(false); + dataset3Siblings.setSiblings(new UrnArray(ImmutableList.of(datasetFourUrn))); + + Siblings dataset4Siblings = new Siblings(); + dataset4Siblings.setPrimary(true); + dataset4Siblings.setSiblings(new UrnArray(ImmutableList.of(datasetThreeUrn))); + + Siblings dataset5Siblings = new Siblings(); + dataset5Siblings.setPrimary(true); + dataset5Siblings.setSiblings(new UrnArray(ImmutableList.of(datasetTwoUrn))); + + Map> siblingsMap = ImmutableMap.of( + datasetOneUrn, ImmutableList.of(dataset1Siblings), + datasetTwoUrn, ImmutableList.of(dataset2Siblings), + datasetThreeUrn, ImmutableList.of(dataset3Siblings), + datasetFourUrn, ImmutableList.of(dataset4Siblings), + datasetFiveUrn, ImmutableList.of(dataset5Siblings) + ); + + when(_mockEntityService.getLatestAspects(any(), any())).thenReturn(siblingsMap); + + SiblingGraphService service = _client; + + EntityLineageResult upstreamLineage = service.getLineage(datasetFourUrn, LineageDirection.UPSTREAM, 0, 100, 1); + + // assert your lineage will not contain two siblings + assertEquals(upstreamLineage, expectedResult); + + // Now test for starting from the other sibling + + upstreamLineage = service.getLineage(datasetThreeUrn, LineageDirection.UPSTREAM, 0, 100, 1); + + LineageRelationshipArray siblingExpectedRelationships = new LineageRelationshipArray(); + siblingExpectedRelationships.add(relationship1); + siblingExpectedRelationships.add(relationship5); + + expectedResult.setRelationships(siblingExpectedRelationships); + + // assert your lineage will not contain two siblings assertEquals(upstreamLineage, expectedResult); } + @Test + public void testSiblingCombinations() throws URISyntaxException { + Urn primarySiblingUrn = Urn.createFromString("urn:li:" + datasetType + ":(urn:li:dataPlatform:dbt,PrimarySibling,PROD)"); + Urn alternateSiblingUrn = Urn.createFromString("urn:li:" + datasetType + ":(urn:li:dataPlatform:snowflake,SecondarySibling,PROD)"); + + Urn upstreamUrn1 = Urn.createFromString("urn:li:" + datasetType + ":(urn:li:dataPlatform:snowflake,Upstream1,PROD)"); + Urn upstreamUrn2 = Urn.createFromString("urn:li:" + datasetType + ":(urn:li:dataPlatform:snowflake,Upstream2,PROD)"); + + LineageRelationshipArray alternateDownstreamRelationships = new LineageRelationshipArray(); + // Populate sibling service + Siblings primarySiblings = new Siblings(); + primarySiblings.setPrimary(true); + primarySiblings.setSiblings(new UrnArray(ImmutableList.of(alternateSiblingUrn))); + + when(_mockEntityService.getLatestAspect(primarySiblingUrn, SIBLINGS_ASPECT_NAME)).thenReturn(primarySiblings); + + Siblings secondarySiblings = new Siblings(); + secondarySiblings.setPrimary(false); + secondarySiblings.setSiblings(new UrnArray(ImmutableList.of(primarySiblingUrn))); + + when(_mockEntityService.getLatestAspect(alternateSiblingUrn, SIBLINGS_ASPECT_NAME)).thenReturn(secondarySiblings); + + Map> siblingsMap = new HashMap<>(); + siblingsMap.put(primarySiblingUrn, ImmutableList.of(primarySiblings)); + siblingsMap.put(alternateSiblingUrn, ImmutableList.of(secondarySiblings)); + + // Create many downstreams of the alternate URN string + final int numDownstreams = 42; + for (int i = 0; i < numDownstreams; i++) { + Urn downstreamUrn = Urn.createFromString("urn:li:" + datasetType + ":(urn:li:dataPlatform:snowflake,Downstream" + i + ",PROD)"); + LineageRelationship relationship = new LineageRelationship(); + relationship.setDegree(0); + relationship.setType(upstreamOf); + relationship.setEntity(downstreamUrn); + alternateDownstreamRelationships.add(relationship); + siblingsMap.put(downstreamUrn, ImmutableList.of()); + } + + LineageRelationshipArray alternateUpstreamRelationships = new LineageRelationshipArray(); + for (Urn upstreamUrn : List.of(upstreamUrn1, upstreamUrn2, primarySiblingUrn)) { + LineageRelationship relationship = new LineageRelationship(); + relationship.setDegree(0); + relationship.setType(downstreamOf); + relationship.setEntity(upstreamUrn); + alternateUpstreamRelationships.add(relationship); + } + + EntityLineageResult mockAlternateUpstreamResult = new EntityLineageResult(); + mockAlternateUpstreamResult.setRelationships(alternateUpstreamRelationships); + mockAlternateUpstreamResult.setStart(0); + mockAlternateUpstreamResult.setTotal(3); + mockAlternateUpstreamResult.setCount(3); + + when(_graphService.getLineage( + Mockito.eq(alternateSiblingUrn), Mockito.eq(LineageDirection.UPSTREAM), Mockito.anyInt(), Mockito.anyInt(), + Mockito.eq(1), Mockito.eq(null), Mockito.eq(null) + )).then(invocation -> mockAlternateUpstreamResult.clone()); + + EntityLineageResult mockAlternateDownstreamResult = new EntityLineageResult(); + mockAlternateDownstreamResult.setRelationships(alternateDownstreamRelationships); + mockAlternateDownstreamResult.setStart(0); + mockAlternateDownstreamResult.setTotal(numDownstreams); + mockAlternateDownstreamResult.setCount(numDownstreams); + + when(_graphService.getLineage( + Mockito.eq(alternateSiblingUrn), Mockito.eq(LineageDirection.DOWNSTREAM), Mockito.anyInt(), Mockito.anyInt(), + Mockito.eq(1), Mockito.eq(null), Mockito.eq(null) + )).then(invocation -> mockAlternateDownstreamResult.clone()); + + // Set up mocks for primary sibling + LineageRelationshipArray primaryUpstreamRelationships = new LineageRelationshipArray(); + for (Urn upstreamUrn : List.of(upstreamUrn1, upstreamUrn2)) { + LineageRelationship relationship = new LineageRelationship(); + relationship.setDegree(0); + relationship.setType(downstreamOf); + relationship.setEntity(upstreamUrn); + primaryUpstreamRelationships.add(relationship); + siblingsMap.put(upstreamUrn, ImmutableList.of()); + } + + EntityLineageResult mockPrimaryUpstreamResult = new EntityLineageResult(); + mockPrimaryUpstreamResult.setRelationships(primaryUpstreamRelationships); + mockPrimaryUpstreamResult.setStart(0); + mockPrimaryUpstreamResult.setTotal(2); + mockPrimaryUpstreamResult.setCount(2); + + when(_graphService.getLineage( + Mockito.eq(primarySiblingUrn), Mockito.eq(LineageDirection.UPSTREAM), Mockito.anyInt(), Mockito.anyInt(), + Mockito.eq(1), Mockito.eq(null), Mockito.eq(null) + )).then(invocation -> mockPrimaryUpstreamResult.clone()); + + LineageRelationshipArray primaryDowntreamRelationships = new LineageRelationshipArray(); + LineageRelationship relationship = new LineageRelationship(); + relationship.setDegree(0); + relationship.setType(upstreamOf); + relationship.setEntity(alternateSiblingUrn); + primaryDowntreamRelationships.add(relationship); + + EntityLineageResult mockPrimaryDownstreamResult = new EntityLineageResult(); + mockPrimaryDownstreamResult.setRelationships(primaryDowntreamRelationships); + mockPrimaryDownstreamResult.setStart(0); + mockPrimaryDownstreamResult.setTotal(1); + mockPrimaryDownstreamResult.setCount(1); + + when(_graphService.getLineage( + Mockito.eq(primarySiblingUrn), Mockito.eq(LineageDirection.DOWNSTREAM), Mockito.anyInt(), Mockito.anyInt(), + Mockito.eq(1), Mockito.eq(null), Mockito.eq(null) + )).then(invocation -> mockPrimaryDownstreamResult.clone()); + + + when(_mockEntityService.getLatestAspects(any(), any())).thenReturn(siblingsMap); + + SiblingGraphService service = _client; + + // Tests for separateSiblings = true: primary sibling + EntityLineageResult primaryDownstreamSeparated = service.getLineage( + primarySiblingUrn, + LineageDirection.DOWNSTREAM, + 0, + 100, + 1, + true, + Set.of(), + null, + null); + + LineageRelationshipArray expectedRelationships = new LineageRelationshipArray(); + expectedRelationships.add(relationship); + + EntityLineageResult expectedResultPrimarySeparated = new EntityLineageResult(); + expectedResultPrimarySeparated.setCount(1); + expectedResultPrimarySeparated.setStart(0); + expectedResultPrimarySeparated.setTotal(1); + expectedResultPrimarySeparated.setFiltered(0); + expectedResultPrimarySeparated.setRelationships(expectedRelationships); + + assertEquals(primaryDownstreamSeparated, expectedResultPrimarySeparated); + + EntityLineageResult primaryUpstreamSeparated = service.getLineage( + primarySiblingUrn, + LineageDirection.UPSTREAM, + 0, + 100, + 1, + true, + Set.of(), + null, + null); + EntityLineageResult expectedResultPrimaryUpstreamSeparated = new EntityLineageResult(); + expectedResultPrimaryUpstreamSeparated.setCount(2); + expectedResultPrimaryUpstreamSeparated.setStart(0); + expectedResultPrimaryUpstreamSeparated.setTotal(2); + expectedResultPrimaryUpstreamSeparated.setFiltered(0); + expectedResultPrimaryUpstreamSeparated.setRelationships(primaryUpstreamRelationships); + + assertEquals(primaryUpstreamSeparated, expectedResultPrimaryUpstreamSeparated); + + // Test for separateSiblings = true, secondary sibling + EntityLineageResult secondarySiblingSeparated = service.getLineage( + alternateSiblingUrn, + LineageDirection.DOWNSTREAM, + 0, + 100, + 1, + true, + Set.of(), + null, + null); + + EntityLineageResult expectedResultSecondarySeparated = new EntityLineageResult(); + expectedResultSecondarySeparated.setCount(numDownstreams); + expectedResultSecondarySeparated.setStart(0); + expectedResultSecondarySeparated.setTotal(42); + expectedResultSecondarySeparated.setFiltered(0); + expectedResultSecondarySeparated.setRelationships(alternateDownstreamRelationships); + + assertEquals(secondarySiblingSeparated, expectedResultSecondarySeparated); + + EntityLineageResult secondaryUpstreamSeparated = service.getLineage( + alternateSiblingUrn, + LineageDirection.UPSTREAM, + 0, + 100, + 1, + true, + Set.of(), + null, + null); + EntityLineageResult expectedResultSecondaryUpstreamSeparated = new EntityLineageResult(); + expectedResultSecondaryUpstreamSeparated.setCount(3); + expectedResultSecondaryUpstreamSeparated.setStart(0); + expectedResultSecondaryUpstreamSeparated.setTotal(3); + expectedResultSecondaryUpstreamSeparated.setFiltered(0); + expectedResultSecondaryUpstreamSeparated.setRelationships(alternateUpstreamRelationships); + + assertEquals(secondaryUpstreamSeparated, expectedResultSecondaryUpstreamSeparated); + + // Test for separateSiblings = false, primary sibling + EntityLineageResult primarySiblingNonSeparated = service.getLineage( + primarySiblingUrn, + LineageDirection.DOWNSTREAM, + 0, + 100, + 1, + false, + new HashSet<>(), + null, + null); + EntityLineageResult expectedResultPrimaryNonSeparated = new EntityLineageResult(); + expectedResultPrimaryNonSeparated.setCount(numDownstreams); + expectedResultPrimaryNonSeparated.setStart(0); + expectedResultPrimaryNonSeparated.setTotal(43); + expectedResultPrimaryNonSeparated.setFiltered(1); + expectedResultPrimaryNonSeparated.setRelationships(alternateDownstreamRelationships); + assertEquals(primarySiblingNonSeparated, expectedResultPrimaryNonSeparated); + + EntityLineageResult primarySiblingNonSeparatedUpstream = service.getLineage( + primarySiblingUrn, + LineageDirection.UPSTREAM, + 0, + 100, + 1, + false, + new HashSet<>(), + null, + null + ); + EntityLineageResult expectedResultPrimaryUpstreamNonSeparated = new EntityLineageResult(); + expectedResultPrimaryUpstreamNonSeparated.setCount(2); + expectedResultPrimaryUpstreamNonSeparated.setStart(0); + expectedResultPrimaryUpstreamNonSeparated.setTotal(5); + expectedResultPrimaryUpstreamNonSeparated.setFiltered(3); + expectedResultPrimaryUpstreamNonSeparated.setRelationships(primaryUpstreamRelationships); + assertEquals(primarySiblingNonSeparatedUpstream, expectedResultPrimaryUpstreamNonSeparated); + + // Test for separateSiblings = false, secondary sibling + EntityLineageResult secondarySiblingNonSeparated = service.getLineage( + alternateSiblingUrn, + LineageDirection.DOWNSTREAM, + 0, + 100, + 1, + false, + new HashSet<>(), + null, + null); + assertEquals(secondarySiblingNonSeparated, expectedResultPrimaryNonSeparated); + + EntityLineageResult secondarySiblingNonSeparatedUpstream = service.getLineage( + alternateSiblingUrn, + LineageDirection.UPSTREAM, + 0, + 100, + 1, + false, + new HashSet<>(), + null, + null + ); + assertEquals(secondarySiblingNonSeparatedUpstream, expectedResultPrimaryUpstreamNonSeparated); + } + static Urn createFromString(@Nonnull String rawUrn) { try { return Urn.createFromString(rawUrn);