Skip to content

Commit

Permalink
Merge pull request #1731 from julianhowarth/new-distinct-variant
Browse files Browse the repository at this point in the history
Additional overload of distinct using a key extractor
  • Loading branch information
jponge authored Nov 4, 2024
2 parents 93587f2 + be44288 commit be39120
Show file tree
Hide file tree
Showing 5 changed files with 432 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ If you have a stream emitting the `{1, 1, 2, 3, 4, 5, 5, 6, 1, 4, 4}` items, the
By default, `select().distinct()` uses the `hashCode` method from the item's class.
You can pass a custom comparator for more advanced checks.

If you have a stream emitting items of type `T`, where duplicates can be identified through an attribute of `T` of type `K`,
then an `extractor` of type `Function<T, K>` can be defined. Applying `.select().distinct(extractor)` on such a stream will
eliminate duplicates but have a lesser memory overhead as only the references to the extracted keys need to be kept, not the whole object.
A typical usage of this might be for a stream of records where uniqueness is determined by a UUID assigned to every record.

## Skipping repetitions

The `.skip().repetitions()` operator removes subsequent repetitions of an item:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ public Multi<T> when(Function<? super T, Uni<Boolean>> predicate) {

/**
* Selects all the distinct items from the upstream.
* This methods uses {@link Object#hashCode()} to compare items.
* This method uses {@link Object#hashCode()} to compare items.
* <p>
* Do NOT call this method on unbounded upstream, as it would lead to an {@link OutOfMemoryError}.
* <p>
Expand All @@ -222,6 +222,7 @@ public Multi<T> when(Function<? super T, Uni<Boolean>> predicate) {
* @return the resulting {@link Multi}.
* @see MultiSkip#repetitions()
* @see #distinct(Comparator)
* @see #distinct(Function)
*/
@CheckReturnValue
public Multi<T> distinct() {
Expand All @@ -230,7 +231,7 @@ public Multi<T> distinct() {

/**
* Selects all the distinct items from the upstream.
* This methods uses the given comparator to compare the items.
* This method uses the given comparator to compare the items.
* <p>
* Do NOT call this method on unbounded upstream, as it would lead to an {@link OutOfMemoryError}.
* <p>
Expand All @@ -241,7 +242,7 @@ public Multi<T> distinct() {
* {@link java.util.TreeSet} initialized with the given comparator. If the comparator is {@code null}, it uses a
* {@link java.util.HashSet} as backend.
*
* @param comparator the comparator used to compare items. If {@code null}, it will uses the item's {@code hashCode}
* @param comparator the comparator used to compare items. If {@code null}, it will use the item's {@code hashCode}
* method.
* @return the resulting {@link Multi}.
* @see MultiSkip#repetitions()
Expand All @@ -251,4 +252,27 @@ public Multi<T> distinct(Comparator<? super T> comparator) {
return Infrastructure.onMultiCreation(new MultiDistinctOp<>(upstream, comparator));
}

/**
* Selects all the distinct items from the upstream.
* This method uses the given key extractor to extract an object from each item which is then
* used to compare the items. This method allows for a smaller memory footprint than {@link #distinct()}
* and {@link #distinct(Comparator)} as only the extracted keys are held in memory rather than the items
* themselves.
* <p>
* Do NOT call this method on unbounded upstream, as it would lead to an {@link OutOfMemoryError}.
* <p>
* If the comparison throws an exception, the produced {@link Multi} fails.
* The produced {@link Multi} completes when the upstream sends the completion event.
*
* @param keyExtractor the function used to extract keys from items, must not be null, must not produce null.
* @return the resulting {@link Multi}.
* @see MultiSkip#repetitions()
*/
@CheckReturnValue
public <K> Multi<T> distinct(Function<T, K> keyExtractor) {
return Infrastructure.onMultiCreation(
new MultiDistinctByKeyOp<>(
upstream, Infrastructure.decorate(nonNull(keyExtractor, "keyExtractor"))));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package io.smallrye.mutiny.operators.multi;

import java.util.*;
import java.util.function.Function;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.helpers.ParameterValidation;
import io.smallrye.mutiny.subscription.MultiSubscriber;

/**
* Eliminates the duplicated items from the upstream.
*
* @param <T> the type of items
* @param <K> the type of key, used to identify duplicates
*/
public final class MultiDistinctByKeyOp<T, K> extends AbstractMultiOperator<T, T> {

private final Function<T, K> keyExtractor;

public MultiDistinctByKeyOp(Multi<? extends T> upstream, Function<T, K> keyExtractor) {
super(upstream);
this.keyExtractor = keyExtractor;
}

@Override
public void subscribe(MultiSubscriber<? super T> subscriber) {
upstream.subscribe(
new DistinctByKeyProcessor<>(ParameterValidation.nonNullNpe(subscriber, "subscriber"), keyExtractor));
}

static final class DistinctByKeyProcessor<T, K> extends MultiOperatorProcessor<T, T> {

private final Collection<K> foundKeys = new HashSet<>();
private final Function<T, K> keyExtractor;

DistinctByKeyProcessor(MultiSubscriber<? super T> downstream,
Function<T, K> keyExtractor) {
super(downstream);
this.keyExtractor = keyExtractor;
}

@Override
public void onItem(T t) {
if (isDone()) {
return;
}

boolean added;
try {
added = foundKeys.add(keyExtractor.apply(t));
} catch (Throwable e) {
// catch exception thrown by the equals / comparator
failAndCancel(e);
return;
}

if (added) {
downstream.onItem(t);
} else {
request(1);
}
}

@Override
public void onFailure(Throwable t) {
super.onFailure(t);
foundKeys.clear();
}

@Override
public void onCompletion() {
super.onCompletion();
foundKeys.clear();
}

@Override
public void cancel() {
super.cancel();
foundKeys.clear();
}
}

}
Loading

0 comments on commit be39120

Please sign in to comment.