From 86928f05f92d36bc9657b33367f3463bf107838c Mon Sep 17 00:00:00 2001 From: Julian Howarth <3205453+julianhowarth@users.noreply.github.com> Date: Mon, 4 Nov 2024 08:52:26 +0000 Subject: [PATCH 1/3] feat: Add initial implementation and tests for distinct by key --- .../smallrye/mutiny/groups/MultiSelect.java | 30 ++- .../operators/multi/MultiDistinctByKeyOp.java | 83 +++++++++ .../mutiny/operators/MultiDistinctTest.java | 175 +++++++++++++++++- .../MultiSelectDistinctByKeyTckTest.java | 94 ++++++++++ 4 files changed, 378 insertions(+), 4 deletions(-) create mode 100644 implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiDistinctByKeyOp.java create mode 100644 reactive-streams-tck-tests/src/test/java/io/smallrye/mutiny/tcktests/MultiSelectDistinctByKeyTckTest.java diff --git a/implementation/src/main/java/io/smallrye/mutiny/groups/MultiSelect.java b/implementation/src/main/java/io/smallrye/mutiny/groups/MultiSelect.java index 26252e2fa..a1eac8590 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/groups/MultiSelect.java +++ b/implementation/src/main/java/io/smallrye/mutiny/groups/MultiSelect.java @@ -212,7 +212,7 @@ public Multi when(Function> predicate) { /** * Selects all the distinct items from the upstream. - * This methods uses {@link Object#hashCode()} to compare items. + * This method uses {@link Object#hashCode()} to compare items. *

* Do NOT call this method on unbounded upstream, as it would lead to an {@link OutOfMemoryError}. *

@@ -222,6 +222,7 @@ public Multi when(Function> predicate) { * @return the resulting {@link Multi}. * @see MultiSkip#repetitions() * @see #distinct(Comparator) + * @see #distinct(Function) */ @CheckReturnValue public Multi distinct() { @@ -230,7 +231,7 @@ public Multi distinct() { /** * Selects all the distinct items from the upstream. - * This methods uses the given comparator to compare the items. + * This method uses the given comparator to compare the items. *

* Do NOT call this method on unbounded upstream, as it would lead to an {@link OutOfMemoryError}. *

@@ -241,7 +242,7 @@ public Multi distinct() { * {@link java.util.TreeSet} initialized with the given comparator. If the comparator is {@code null}, it uses a * {@link java.util.HashSet} as backend. * - * @param comparator the comparator used to compare items. If {@code null}, it will uses the item's {@code hashCode} + * @param comparator the comparator used to compare items. If {@code null}, it will use the item's {@code hashCode} * method. * @return the resulting {@link Multi}. * @see MultiSkip#repetitions() @@ -251,4 +252,27 @@ public Multi distinct(Comparator comparator) { return Infrastructure.onMultiCreation(new MultiDistinctOp<>(upstream, comparator)); } + /** + * Selects all the distinct items from the upstream. + * This method uses the given key extractor to extract an object from each item which is then + * used to compare the items. This method allows for a smaller memory footprint than {@link #distinct()} + * and {@link #distinct(Comparator)} as only the extracted keys are held in memory rather than the items + * themselves. + *

+ * Do NOT call this method on unbounded upstream, as it would lead to an {@link OutOfMemoryError}. + *

+ * If the comparison throws an exception, the produced {@link Multi} fails. + * The produced {@link Multi} completes when the upstream sends the completion event. + * + * @param keyExtractor the function used to extract keys from items, must not be null, must not produce null. + * @return the resulting {@link Multi}. + * @see MultiSkip#repetitions() + */ + @CheckReturnValue + public Multi distinct(Function keyExtractor) { + return Infrastructure.onMultiCreation( + new MultiDistinctByKeyOp<>( + upstream, Infrastructure.decorate(nonNull(keyExtractor, "keyExtractor")))); + } + } diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiDistinctByKeyOp.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiDistinctByKeyOp.java new file mode 100644 index 000000000..21510cf04 --- /dev/null +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiDistinctByKeyOp.java @@ -0,0 +1,83 @@ +package io.smallrye.mutiny.operators.multi; + +import java.util.*; +import java.util.function.Function; + +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.helpers.ParameterValidation; +import io.smallrye.mutiny.subscription.MultiSubscriber; + +/** + * Eliminates the duplicated items from the upstream. + * + * @param the type of items + * @param the type of key, used to identify duplicates + */ +public final class MultiDistinctByKeyOp extends AbstractMultiOperator { + + private final Function keyExtractor; + + public MultiDistinctByKeyOp(Multi upstream, Function keyExtractor) { + super(upstream); + this.keyExtractor = keyExtractor; + } + + @Override + public void subscribe(MultiSubscriber subscriber) { + upstream.subscribe( + new DistinctByKeyProcessor<>(ParameterValidation.nonNullNpe(subscriber, "subscriber"), keyExtractor)); + } + + static final class DistinctByKeyProcessor extends MultiOperatorProcessor { + + private final Collection foundKeys = new HashSet<>(); + private final Function keyExtractor; + + DistinctByKeyProcessor(MultiSubscriber downstream, + Function keyExtractor) { + super(downstream); + this.keyExtractor = keyExtractor; + } + + @Override + public void onItem(T t) { + if (isDone()) { + return; + } + + boolean added; + try { + added = foundKeys.add(keyExtractor.apply(t)); + } catch (Throwable e) { + // catch exception thrown by the equals / comparator + failAndCancel(e); + return; + } + + if (added) { + downstream.onItem(t); + } else { + request(1); + } + } + + @Override + public void onFailure(Throwable t) { + super.onFailure(t); + foundKeys.clear(); + } + + @Override + public void onCompletion() { + super.onCompletion(); + foundKeys.clear(); + } + + @Override + public void cancel() { + super.cancel(); + foundKeys.clear(); + } + } + +} diff --git a/implementation/src/test/java/io/smallrye/mutiny/operators/MultiDistinctTest.java b/implementation/src/test/java/io/smallrye/mutiny/operators/MultiDistinctTest.java index 9a05cdeb3..f2bdafdf8 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/operators/MultiDistinctTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/operators/MultiDistinctTest.java @@ -1,17 +1,21 @@ package io.smallrye.mutiny.operators; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.mock; import java.io.IOException; import java.time.Duration; +import java.util.Comparator; import java.util.Objects; import java.util.concurrent.Flow; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; +import java.util.function.Function; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.parallel.ResourceAccessMode; @@ -49,7 +53,7 @@ public void testDistinctWithComparator() { @Test public void testDistinctWithNullComparator() { Multi.createFrom().items(1, 2, 3, 4, 2, 4, 2, 4) - .select().distinct(null) + .select().distinct((Comparator) null) .subscribe().withSubscriber(AssertSubscriber.create(10)) .assertCompleted() .assertItems(1, 2, 3, 4); @@ -74,6 +78,95 @@ public void testDistinctWithComparatorReturningAlways1() { .assertItems(1, 2, 3, 4, 2, 4, 2, 4); } + @Test + public void testDistinctByKeyWithIdentityExtractor() { + + KeyTester kt1 = new KeyTester(1, "foo"); + KeyTester kt2 = new KeyTester(2, "bar"); + KeyTester kt3 = new KeyTester(3, "baz"); + KeyTester kt4 = new KeyTester(4, "foo-foo"); + KeyTester kt5 = new KeyTester(2, "foo-bar"); + KeyTester kt6 = new KeyTester(4, "foo-baz"); + KeyTester kt7 = new KeyTester(2, "bar-bar"); + KeyTester kt8 = new KeyTester(4, "bar-baz"); + + Multi.createFrom().items(kt1, kt2, kt3, kt4, kt5, kt6, kt7, kt8) + .select().distinct(Function.identity()) + .subscribe().withSubscriber(AssertSubscriber.create(10)) + .assertCompleted() + .assertItems(kt1, kt2, kt3, kt4, kt5, kt6, kt7, kt8); + } + + @Test + public void testDistinctByKeyWithExtractor() { + + KeyTester kt1 = new KeyTester(1, "foo"); + KeyTester kt2 = new KeyTester(2, "bar"); + KeyTester kt3 = new KeyTester(3, "baz"); + KeyTester kt4 = new KeyTester(4, "foo-foo"); + KeyTester kt5 = new KeyTester(2, "foo-bar"); + KeyTester kt6 = new KeyTester(4, "foo-baz"); + KeyTester kt7 = new KeyTester(2, "bar-bar"); + KeyTester kt8 = new KeyTester(4, "bar-baz"); + + Multi.createFrom().items(kt1, kt2, kt3, kt4, kt5, kt6, kt7, kt8) + .select().distinct(kt -> kt.id) + .subscribe().withSubscriber(AssertSubscriber.create(10)) + .assertCompleted() + .assertItems(kt1, kt2, kt3, kt4); + } + + @Test + public void testDistinctWithNullExtractor() { + + assertThatExceptionOfType(IllegalArgumentException.class) + .isThrownBy(() -> Multi.createFrom().items(1, 2, 3, 4, 2, 4, 2, 4) + .select().distinct((Function) null) + .subscribe() + .withSubscriber(AssertSubscriber.create(10))) + .withMessageContaining("`keyExtractor` must not be `null`"); + } + + @Test + public void testDistinctByKeyReturningSameKey() { + + KeyTester kt1 = new KeyTester(1, "foo"); + KeyTester kt2 = new KeyTester(2, "bar"); + KeyTester kt3 = new KeyTester(3, "baz"); + KeyTester kt4 = new KeyTester(4, "foo-foo"); + KeyTester kt5 = new KeyTester(2, "foo-bar"); + KeyTester kt6 = new KeyTester(4, "foo-baz"); + KeyTester kt7 = new KeyTester(2, "bar-bar"); + KeyTester kt8 = new KeyTester(4, "bar-baz"); + + Multi.createFrom().items(kt1, kt2, kt3, kt4, kt5, kt6, kt7, kt8) + .select().distinct(a -> 0) + .subscribe().withSubscriber(AssertSubscriber.create(10)) + .assertCompleted() + .assertItems(kt1); + } + + @Test + public void testDistinctByKeyReturningDifferentKey() { + + AtomicInteger counter = new AtomicInteger(); + + KeyTester kt1 = new KeyTester(1, "foo"); + KeyTester kt2 = new KeyTester(2, "bar"); + KeyTester kt3 = new KeyTester(3, "baz"); + KeyTester kt4 = new KeyTester(4, "foo-foo"); + KeyTester kt5 = new KeyTester(2, "foo-bar"); + KeyTester kt6 = new KeyTester(4, "foo-baz"); + KeyTester kt7 = new KeyTester(2, "bar-bar"); + KeyTester kt8 = new KeyTester(4, "bar-baz"); + + Multi.createFrom().items(kt1, kt2, kt3, kt4, kt5, kt6, kt7, kt8) + .select().distinct(a -> counter.getAndIncrement()) + .subscribe().withSubscriber(AssertSubscriber.create(10)) + .assertCompleted() + .assertItems(kt1, kt2, kt3, kt4, kt5, kt6, kt7, kt8); + } + @Test public void testDistinctWithUpstreamFailure() { Multi.createFrom(). failure(new IOException("boom")) @@ -90,6 +183,14 @@ public void testDistinctWithComparatorWithUpstreamFailure() { .assertFailedWith(IOException.class, "boom"); } + @Test + public void testDistinctByKeyWithUpstreamFailure() { + Multi.createFrom(). failure(new IOException("boom")) + .select().distinct(Function.identity()) + .subscribe().withSubscriber(AssertSubscriber.create(10)) + .assertFailedWith(IOException.class, "boom"); + } + @Test public void testThatNullSubscriberAreRejectedDistinct() { assertThrows(NullPointerException.class, () -> Multi.createFrom().items(1, 2, 3, 4, 2, 4, 2, 4) @@ -97,6 +198,13 @@ public void testThatNullSubscriberAreRejectedDistinct() { .subscribe(null)); } + @Test + public void testThatNullSubscriberAreRejectedDistinctByKey() { + assertThrows(NullPointerException.class, () -> Multi.createFrom().items(1, 2, 3, 4, 2, 4, 2, 4) + .select().distinct(Function.identity()) + .subscribe(null)); + } + @Test public void testThatNullSubscriberAreRejectedSkipRepetitions() { assertThrows(NullPointerException.class, () -> Multi.createFrom().items(1, 2, 3, 4, 2, 4, 2, 4) @@ -113,6 +221,15 @@ public void testDistinctOnAStreamWithoutDuplicates() { .assertItems(1, 2, 3, 4); } + @Test + public void testDistinctByKeyOnAStreamWithoutDuplicates() { + Multi.createFrom().range(1, 5) + .select().distinct(Function.identity()) + .subscribe().withSubscriber(AssertSubscriber.create(10)) + .assertCompleted() + .assertItems(1, 2, 3, 4); + } + @Test public void testSkipRepetitionsWithUpstreamFailure() { Multi.createFrom(). failure(new IOException("boom")) @@ -270,6 +387,23 @@ public void testDistinctExceptionInComparator() { subscriber.assertFailedWith(TestException.class, "boom"); } + @Test + public void testDistinctByKeyExceptionInExtractor() { + AtomicReference> emitter = new AtomicReference<>(); + AssertSubscriber subscriber = Multi.createFrom().emitter( + (Consumer>) emitter::set) + .select().distinct(a -> { + throw new TestException("boom"); + }) + .subscribe().withSubscriber(AssertSubscriber.create(10)); + + subscriber.assertSubscribed() + .assertNotTerminated(); + + emitter.get().emit(1).emit(2).complete(); + subscriber.assertFailedWith(TestException.class, "boom"); + } + @Test public void testSkipRepetitionsExceptionInEquals() { AtomicReference> emitter = new AtomicReference<>(); @@ -334,6 +468,19 @@ public void subscribe(Flow.Subscriber subscriber) { .run(() -> ref.get().onNext(4)) .assertItems(1, 3); + upstream + .select().distinct(Function.identity()) + .subscribe().withSubscriber(AssertSubscriber.create(1)) + .run(() -> ref.get().onNext(1)) + .assertItems(1) + .request(1) + .run(() -> ref.get().onNext(1)) + .run(() -> ref.get().onNext(3)) + .assertItems(1, 3) + .cancel() + .run(() -> ref.get().onNext(4)) + .assertItems(1, 3); + upstream .skip().repetitions() .subscribe().withSubscriber(AssertSubscriber.create(1)) @@ -370,4 +517,30 @@ public boolean equals(Object obj) { } } + private static class KeyTester { + + private final int id; + private final String text; + + private KeyTester(int id, String text) { + this.id = id; + this.text = text; + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + KeyTester keyTester = (KeyTester) o; + return id == keyTester.id && Objects.equals(text, keyTester.text); + } + + @Override + public int hashCode() { + return Objects.hash(id, text); + } + } + } diff --git a/reactive-streams-tck-tests/src/test/java/io/smallrye/mutiny/tcktests/MultiSelectDistinctByKeyTckTest.java b/reactive-streams-tck-tests/src/test/java/io/smallrye/mutiny/tcktests/MultiSelectDistinctByKeyTckTest.java new file mode 100644 index 000000000..fa9891432 --- /dev/null +++ b/reactive-streams-tck-tests/src/test/java/io/smallrye/mutiny/tcktests/MultiSelectDistinctByKeyTckTest.java @@ -0,0 +1,94 @@ +package io.smallrye.mutiny.tcktests; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.Flow; +import java.util.function.Function; + +import org.testng.Assert; +import org.testng.annotations.Test; + +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.helpers.Subscriptions; + +public class MultiSelectDistinctByKeyTckTest extends AbstractPublisherTck { + @Test + public void distinctStageShouldReturnDistinctElements() { + Assert.assertEquals( + Await.await( + Multi.createFrom().items(1, 2, 2, 3, 2, 1, 3) + .select().distinct(Function.identity()) + .collect().asList() + .subscribeAsCompletionStage()), + Arrays.asList(1, 2, 3)); + } + + @Test + public void distinctStageShouldReturnAnEmptyStreamWhenCalledOnEmptyStreams() { + Assert.assertEquals( + Await.await(Multi.createFrom().empty() + .select().distinct(Function.identity()) + .collect().asList() + .subscribeAsCompletionStage()), + Collections.emptyList()); + } + + @Test + public void distinctStageShouldPropagateUpstreamExceptions() { + Assert.assertThrows(QuietRuntimeException.class, + () -> Await.await(Multi.createFrom().failure(new QuietRuntimeException("failed")) + .select().distinct(Function.identity()) + .collect().asList() + .subscribeAsCompletionStage())); + } + + @Test + public void distinctStageShouldPropagateExceptionsThrownByEquals() { + Assert.assertThrows(QuietRuntimeException.class, () -> { + CompletableFuture cancelled = new CompletableFuture<>(); + class ObjectThatThrowsFromEquals { + @Override + public int hashCode() { + return 1; + } + + @Override + public boolean equals(Object obj) { + throw new QuietRuntimeException("failed"); + } + } + CompletionStage> result = Multi.createFrom().items( + new ObjectThatThrowsFromEquals(), new ObjectThatThrowsFromEquals()) + .onTermination().invoke(() -> cancelled.complete(null)) + .select().distinct(Function.identity()) + .collect().asList().subscribeAsCompletionStage(); + Await.await(cancelled); + Await.await(result); + }); + } + + @Test + public void distinctStageShouldPropagateCancel() { + CompletableFuture cancelled = new CompletableFuture<>(); + infiniteStream() + .onTermination().invoke(() -> cancelled.complete(null)) + .select().distinct(Function.identity()).subscribe() + .withSubscriber(new Subscriptions.CancelledSubscriber<>()); + Await.await(cancelled); + } + + @Override + public Flow.Publisher createFlowPublisher(long elements) { + return upstream(elements) + .select().distinct(Function.identity()); + } + + @Override + public Flow.Publisher createFailedFlowPublisher() { + return failedUpstream() + .select().distinct(Function.identity()); + } +} From 2b52fef6e0be2f381521868a60511ede5560c10a Mon Sep 17 00:00:00 2001 From: Julian Howarth <3205453+julianhowarth@users.noreply.github.com> Date: Mon, 4 Nov 2024 18:56:50 +0000 Subject: [PATCH 2/3] test: address review comments --- .../mutiny/operators/MultiDistinctTest.java | 6 +- .../MultiSelectDistinctByKeyTckTest.java | 129 ++++++++++++------ 2 files changed, 92 insertions(+), 43 deletions(-) diff --git a/implementation/src/test/java/io/smallrye/mutiny/operators/MultiDistinctTest.java b/implementation/src/test/java/io/smallrye/mutiny/operators/MultiDistinctTest.java index f2bdafdf8..0faf81198 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/operators/MultiDistinctTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/operators/MultiDistinctTest.java @@ -529,10 +529,12 @@ private KeyTester(int id, String text) { @Override public boolean equals(Object o) { - if (this == o) + if (this == o) { return true; - if (o == null || getClass() != o.getClass()) + } + if (o == null || getClass() != o.getClass()) { return false; + } KeyTester keyTester = (KeyTester) o; return id == keyTester.id && Objects.equals(text, keyTester.text); } diff --git a/reactive-streams-tck-tests/src/test/java/io/smallrye/mutiny/tcktests/MultiSelectDistinctByKeyTckTest.java b/reactive-streams-tck-tests/src/test/java/io/smallrye/mutiny/tcktests/MultiSelectDistinctByKeyTckTest.java index fa9891432..8e770eff7 100644 --- a/reactive-streams-tck-tests/src/test/java/io/smallrye/mutiny/tcktests/MultiSelectDistinctByKeyTckTest.java +++ b/reactive-streams-tck-tests/src/test/java/io/smallrye/mutiny/tcktests/MultiSelectDistinctByKeyTckTest.java @@ -1,36 +1,39 @@ package io.smallrye.mutiny.tcktests; +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.helpers.Subscriptions; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.Flow; -import java.util.function.Function; - import org.testng.Assert; import org.testng.annotations.Test; -import io.smallrye.mutiny.Multi; -import io.smallrye.mutiny.helpers.Subscriptions; - -public class MultiSelectDistinctByKeyTckTest extends AbstractPublisherTck { +public class MultiSelectDistinctByKeyTckTest extends AbstractPublisherTck { @Test public void distinctStageShouldReturnDistinctElements() { + + KeyTester kt1 = new KeyTester(1, "foo"); + KeyTester kt2 = new KeyTester(2, "bar"); + KeyTester kt3 = new KeyTester(3, "baz"); + Assert.assertEquals( Await.await( - Multi.createFrom().items(1, 2, 2, 3, 2, 1, 3) - .select().distinct(Function.identity()) + Multi.createFrom().items(kt1, kt2, kt2, kt3, kt2, kt1, kt3) + .select().distinct(kt -> kt.id) .collect().asList() .subscribeAsCompletionStage()), - Arrays.asList(1, 2, 3)); + Arrays.asList(kt1, kt2, kt3)); } @Test public void distinctStageShouldReturnAnEmptyStreamWhenCalledOnEmptyStreams() { Assert.assertEquals( - Await.await(Multi.createFrom().empty() - .select().distinct(Function.identity()) + Await.await(Multi.createFrom().empty() + .select().distinct(kt -> kt.id) .collect().asList() .subscribeAsCompletionStage()), Collections.emptyList()); @@ -39,34 +42,47 @@ public void distinctStageShouldReturnAnEmptyStreamWhenCalledOnEmptyStreams() { @Test public void distinctStageShouldPropagateUpstreamExceptions() { Assert.assertThrows(QuietRuntimeException.class, - () -> Await.await(Multi.createFrom().failure(new QuietRuntimeException("failed")) - .select().distinct(Function.identity()) - .collect().asList() - .subscribeAsCompletionStage())); + () -> Await.await( + Multi.createFrom().failure(new QuietRuntimeException("failed")) + .select().distinct(kt -> kt.id) + .collect().asList() + .subscribeAsCompletionStage())); } @Test - public void distinctStageShouldPropagateExceptionsThrownByEquals() { - Assert.assertThrows(QuietRuntimeException.class, () -> { - CompletableFuture cancelled = new CompletableFuture<>(); - class ObjectThatThrowsFromEquals { - @Override - public int hashCode() { - return 1; - } - - @Override - public boolean equals(Object obj) { - throw new QuietRuntimeException("failed"); - } + public void distinctStageShouldPropagateExceptionsThrownByKeyEquals() { + Assert.assertThrows( + QuietRuntimeException.class, + () -> { + CompletableFuture cancelled = new CompletableFuture<>(); + class ObjectThatThrowsFromKeyEquals { + + Key key = new Key(); + + class Key { + @Override + public int hashCode() { + return 1; + } + + @Override + public boolean equals(Object obj) { + throw new QuietRuntimeException("failed"); + } } - CompletionStage> result = Multi.createFrom().items( - new ObjectThatThrowsFromEquals(), new ObjectThatThrowsFromEquals()) - .onTermination().invoke(() -> cancelled.complete(null)) - .select().distinct(Function.identity()) - .collect().asList().subscribeAsCompletionStage(); - Await.await(cancelled); - Await.await(result); + } + CompletionStage> result = + Multi.createFrom() + .items(new ObjectThatThrowsFromKeyEquals(), new ObjectThatThrowsFromKeyEquals()) + .onTermination() + .invoke(() -> cancelled.complete(null)) + .select() + .distinct(o -> o.key) + .collect() + .asList() + .subscribeAsCompletionStage(); + Await.await(cancelled); + Await.await(result); }); } @@ -75,20 +91,51 @@ public void distinctStageShouldPropagateCancel() { CompletableFuture cancelled = new CompletableFuture<>(); infiniteStream() .onTermination().invoke(() -> cancelled.complete(null)) - .select().distinct(Function.identity()).subscribe() + .map(id -> new KeyTester(id, "text-" + id)) + .select().distinct(kt -> kt.id).subscribe() .withSubscriber(new Subscriptions.CancelledSubscriber<>()); Await.await(cancelled); } @Override - public Flow.Publisher createFlowPublisher(long elements) { - return upstream(elements) - .select().distinct(Function.identity()); + public Flow.Publisher createFlowPublisher(long elements) { + return upstream(elements) + .map(id -> new KeyTester(id, "text-" + id)) + .select().distinct(kt -> kt.id); } @Override - public Flow.Publisher createFailedFlowPublisher() { + public Flow.Publisher createFailedFlowPublisher() { return failedUpstream() - .select().distinct(Function.identity()); + .map(id -> new KeyTester(id, "text-" + id)) + .select().distinct(kt -> kt.id); + } + + public static final class KeyTester { + + private final long id; + private final String text; + + private KeyTester(long id, String text) { + this.id = id; + this.text = text; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + KeyTester keyTester = (KeyTester) o; + return id == keyTester.id && Objects.equals(text, keyTester.text); + } + + @Override + public int hashCode() { + return Objects.hash(id, text); + } } } From be442881e14c8a940959e9806cf34104ad45a59c Mon Sep 17 00:00:00 2001 From: Julian Howarth <3205453+julianhowarth@users.noreply.github.com> Date: Mon, 4 Nov 2024 18:57:29 +0000 Subject: [PATCH 3/3] docs: update distinct in guide --- .../docs/guides/eliminate-duplicates-and-repetitions.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/documentation/docs/guides/eliminate-duplicates-and-repetitions.md b/documentation/docs/guides/eliminate-duplicates-and-repetitions.md index 211dbc315..0035f144b 100644 --- a/documentation/docs/guides/eliminate-duplicates-and-repetitions.md +++ b/documentation/docs/guides/eliminate-duplicates-and-repetitions.md @@ -29,6 +29,11 @@ If you have a stream emitting the `{1, 1, 2, 3, 4, 5, 5, 6, 1, 4, 4}` items, the By default, `select().distinct()` uses the `hashCode` method from the item's class. You can pass a custom comparator for more advanced checks. +If you have a stream emitting items of type `T`, where duplicates can be identified through an attribute of `T` of type `K`, +then an `extractor` of type `Function` can be defined. Applying `.select().distinct(extractor)` on such a stream will +eliminate duplicates but have a lesser memory overhead as only the references to the extracted keys need to be kept, not the whole object. +A typical usage of this might be for a stream of records where uniqueness is determined by a UUID assigned to every record. + ## Skipping repetitions The `.skip().repetitions()` operator removes subsequent repetitions of an item: