Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker]Prevent StackOverFlowException in SHARED subscription #16968

Merged
merged 2 commits into from
Aug 8, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
protected volatile PositionImpl minReplayedPosition = null;
protected boolean shouldRewindBeforeReadingOrReplaying = false;
protected final String name;
protected boolean sendInProgress;
protected volatile boolean sendInProgress;
mattisonchao marked this conversation as resolved.
Show resolved Hide resolved
protected static final AtomicIntegerFieldUpdater<PersistentDispatcherMultipleConsumers>
TOTAL_AVAILABLE_PERMITS_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class,
Expand Down Expand Up @@ -244,6 +244,14 @@ private synchronized void internalConsumerFlow(Consumer consumer, int additional
readMoreEntries();
}

/**
* We should not call readMoreEntries() recursively in the same thread as there is a risk of StackOverflowError.
*
*/
public void readMoreEntiresAsync() {
topic.getBrokerService().executor().execute(() -> readMoreEntries());
}

public synchronized void readMoreEntries() {
if (sendInProgress) {
// we cannot read more entries while sending the previous batch
Expand Down Expand Up @@ -287,9 +295,7 @@ public synchronized void readMoreEntries() {
// next entries as readCompletedEntries-callback was never called
if ((messagesToReplayNow.size() - deletedMessages.size()) == 0) {
havePendingReplayRead = false;
// We should not call readMoreEntries() recursively in the same thread
// as there is a risk of StackOverflowError
topic.getBrokerService().executor().execute(() -> readMoreEntries());
readMoreEntiresAsync();
}
} else if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.get(this) == TRUE) {
if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -544,24 +550,25 @@ public final synchronized void readEntriesComplete(List<Entry> entries, Object c
if (serviceConfig.isDispatcherDispatchMessagesInSubscriptionThread()) {
// setting sendInProgress here, because sendMessagesToConsumers will be executed
// in a separate thread, and we want to prevent more reads
michaeljmarshall marked this conversation as resolved.
Show resolved Hide resolved
sendInProgress = true;
dispatchMessagesThread.execute(safeRun(() -> sendMessagesToConsumers(readType, entries)));
dispatchMessagesThread.execute(safeRun(() -> {
if (sendMessagesToConsumers(readType, entries)) {
readMoreEntries();
}
}));
michaeljmarshall marked this conversation as resolved.
Show resolved Hide resolved
} else {
sendMessagesToConsumers(readType, entries);
if (sendMessagesToConsumers(readType, entries)) {
readMoreEntiresAsync();
}
}
}

protected final synchronized void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
protected final synchronized boolean sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
sendInProgress = true;
boolean readMoreEntries;
try {
readMoreEntries = trySendMessagesToConsumers(readType, entries);
return trySendMessagesToConsumers(readType, entries);
} finally {
sendInProgress = false;
}
if (readMoreEntries) {
readMoreEntries();
}
}

/**
Expand Down Expand Up @@ -916,7 +923,7 @@ public void addUnAckedMessages(int numberOfMessages) {
if (maxUnackedMessages <= 0 && blockedDispatcherOnUnackedMsgs == TRUE
&& BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.compareAndSet(this, TRUE, FALSE)) {
log.info("[{}] Dispatcher is unblocked, since maxUnackedMessagesPerSubscription=0", name);
topic.getBrokerService().executor().execute(() -> readMoreEntries());
readMoreEntiresAsync();
}

int unAckedMessages = TOTAL_UNACKED_MESSAGES_UPDATER.addAndGet(this, numberOfMessages);
Expand All @@ -939,7 +946,7 @@ public void addUnAckedMessages(int numberOfMessages) {
// unblock dispatcher if it acks back enough messages
if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.compareAndSet(this, TRUE, FALSE)) {
log.info("[{}] Dispatcher is unblocked", name);
topic.getBrokerService().executor().execute(() -> readMoreEntries());
readMoreEntiresAsync();
}
}
// increment broker-level count
Expand Down