-
Notifications
You must be signed in to change notification settings - Fork 181
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Publisher.replay #2684
Add Publisher.replay #2684
Conversation
4d5db14
to
c7dc329
Compare
System.arraycopy(currSubs, 0, newSubs, 0, currSubs.length); | ||
newSubs[currSubs.length] = multiSubscriber; | ||
if (newSubscribersUpdater.compareAndSet(this, currSubs, newSubs)) { | ||
addSubscriber(multiSubscriber); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note this code was moved to addSubscriber
bcz replay requires that we acquire the lock before making the subscriber visible in the array (via newSubscribersUpdater
) and this is preferred approach for multicast too.
This operator can be used to replace |
fd9369c
to
cac808a
Compare
dda1047
to
8d3d602
Compare
servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/ReplayStrategy.java
Show resolved
Hide resolved
servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/ReplayStrategies.java
Show resolved
Hide resolved
servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/ReplayStrategies.java
Outdated
Show resolved
Hide resolved
8d3d602
to
0ce443b
Compare
0ce443b
to
1963359
Compare
servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/ReplayStrategies.java
Outdated
Show resolved
Hide resolved
d47fad3
to
3d6cc9a
Compare
7f08898
to
758c2d0
Compare
servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/ReplayStrategies.java
Show resolved
Hide resolved
758c2d0
to
632f2e0
Compare
632f2e0
to
35e2494
Compare
Motivation: Publisher.replay provides the ability to keep state that is preserved for multiple subscribers and across resubscribes.
7a7f6fe
to
b8c6237
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good other than some minor optimizations.
Thank you for your patience.
servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/ReplayStrategies.java
Outdated
Show resolved
Hide resolved
public void accumulate(@Nullable final T t) { | ||
final TimeStampSignal<T> signal = new TimeStampSignal<>(executor.currentTime(NANOSECONDS), t); | ||
for (;;) { | ||
final int qSize = queueSize; | ||
if (qSize < maxItems) { | ||
if (queueSizeUpdater.compareAndSet(this, qSize, qSize + 1)) { | ||
items.add(signal); | ||
break; | ||
} | ||
} else if (queueSizeUpdater.compareAndSet(this, qSize, qSize)) { | ||
items.poll(); | ||
items.add(signal); | ||
break; | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is also an opportunity to evict stale elements if we'd like. We can have a simple private evictStale()
method and call it at the start of these public methods. Then the rest looks almost exactly like MostRecentReplayAccumulator
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets defer this bcz LazyTimeLimitedReplayAccumulator
can do the iteration and removal at the same time which is a bit harder to generalize.
Motivation:
Publisher.replay provides the ability to keep state that is preserved for multiple subscribers and across resubscribes.