Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ManagedLedger] Pin executor and scheduled executor threads for ManagedLedgerImpl #11387

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ private void asyncReadEntry0(ReadHandle lh, PositionImpl position, final ReadEnt
} finally {
ledgerEntries.close();
}
}, ml.getExecutor().chooseThread(ml.getName())).exceptionally(exception->{
}, ml.getPinnedExecutor()).exceptionally(exception->{
ml.invalidateLedgerHandle(lh);
callback.readEntryFailed(createManagedLedgerException(exception), ctx);
return null;
Expand Down Expand Up @@ -313,7 +313,7 @@ private void asyncReadEntry0(ReadHandle lh, long firstEntry, long lastEntry, boo
}

checkNotNull(ml.getName());
checkNotNull(ml.getExecutor());
checkNotNull(ml.getPinnedExecutor());

try {
// We got the entries, we need to transform them to a List<> type
Expand All @@ -334,7 +334,7 @@ private void asyncReadEntry0(ReadHandle lh, long firstEntry, long lastEntry, boo
} finally {
ledgerEntries.close();
}
}, ml.getExecutor().chooseThread(ml.getName())).exceptionally(exception->{
}, ml.getPinnedExecutor()).exceptionally(exception->{
if (exception instanceof BKException
&& ((BKException)exception).getCode() == BKException.Code.TooManyRequestsException) {
callback.readEntriesFailed(createManagedLedgerException(exception), ctx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ public void asyncReadEntry(ReadHandle lh, PositionImpl position, AsyncCallbacks.
} finally {
ledgerEntries.close();
}
}, ml.getExecutor().chooseThread(ml.getName()));
}, ml.getPinnedExecutor());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -776,7 +776,7 @@ public void asyncReadEntriesOrWait(int maxEntries, long maxSizeBytes, ReadEntrie
// Check again for new entries after the configured time, then if still no entries are available register
// to be notified
if (config.getNewEntriesCheckDelayInMillis() > 0) {
ledger.getScheduledExecutor()
ledger.getPinnedScheduledExecutor()
.schedule(() -> checkForNewEntries(op, callback, ctx),
config.getNewEntriesCheckDelayInMillis(), TimeUnit.MILLISECONDS);
} else {
Expand Down Expand Up @@ -1137,7 +1137,7 @@ public void asyncResetCursor(Position newPos, AsyncCallbacks.ResetCursorCallback
final PositionImpl newPosition = (PositionImpl) newPos;

// order trim and reset operations on a ledger
ledger.getExecutor().executeOrdered(ledger.getName(), safeRun(() -> {
ledger.getPinnedExecutor().execute(safeRun(() -> {
PositionImpl actualPosition = newPosition;

if (!ledger.isValidPosition(actualPosition) &&
Expand Down Expand Up @@ -2286,7 +2286,7 @@ private boolean shouldPersistUnackRangesToLedger() {
private void persistPositionMetaStore(long cursorsLedgerId, PositionImpl position, Map<String, Long> properties,
MetaStoreCallback<Void> callback, boolean persistIndividualDeletedMessageRanges) {
if (state == State.Closed) {
ledger.getExecutor().execute(safeRun(() -> callback.operationFailed(new MetaStoreException(
ledger.getPinnedExecutor().execute(safeRun(() -> callback.operationFailed(new MetaStoreException(
new CursorAlreadyClosedException(name + " cursor already closed")))));
return;
}
Expand Down Expand Up @@ -2444,7 +2444,7 @@ void createNewMetadataLedger(final VoidCallback callback) {
return;
}

ledger.getExecutor().execute(safeRun(() -> {
ledger.getPinnedExecutor().execute(safeRun(() -> {
ledger.mbean.endCursorLedgerCreateOp();
if (rc != BKException.Code.OK) {
log.warn("[{}] Error creating ledger for cursor {}: {}", ledger.getName(), name,
Expand Down Expand Up @@ -2785,7 +2785,7 @@ private void asyncDeleteLedger(final LedgerHandle lh, int retry) {
log.warn("[{}] Failed to delete ledger {}: {}", ledger.getName(), lh.getId(),
BKException.getMessage(rc));
if (!isNoSuchLedgerExistsException(rc)) {
ledger.getScheduledExecutor().schedule(safeRun(() -> asyncDeleteLedger(lh, retry - 1)),
ledger.getPinnedScheduledExecutor().schedule(safeRun(() -> asyncDeleteLedger(lh, retry - 1)),
DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC, TimeUnit.SECONDS);
}
return;
Expand Down Expand Up @@ -2820,7 +2820,7 @@ private void asyncDeleteCursorLedger(int retry) {
log.warn("[{}][{}] Failed to delete ledger {}: {}", ledger.getName(), name, cursorLedger.getId(),
BKException.getMessage(rc));
if (!isNoSuchLedgerExistsException(rc)) {
ledger.getScheduledExecutor().schedule(safeRun(() -> asyncDeleteCursorLedger(retry - 1)),
ledger.getPinnedScheduledExecutor().schedule(safeRun(() -> asyncDeleteCursorLedger(retry - 1)),
DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC, TimeUnit.SECONDS);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand All @@ -78,7 +80,6 @@
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.common.util.Backoff;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.common.util.Retries;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
Expand Down Expand Up @@ -252,7 +253,8 @@ public enum PositionBound {
protected volatile State state = null;

private final OrderedScheduler scheduledExecutor;
private final OrderedExecutor executor;
private final ScheduledExecutorService pinnedScheduledExecutor;
private final Executor pinnedExecutor;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are using 2 threads, one with the regular executor (which is more efficient) and the other for the pinnedScheduledExecutor, wouldn't that mean that we still have more than 1 thread accessing some of the objects?

Would it make sense to use the generic scheduledExecutor (just for deferring purposes) and then jump back into the same pinnedExecutor?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's true.

Perhaps a more optimal solution would be to have the capability for scheduling tasks on the pinned scheduler. I don't know why this solution isn't available in the underlying Bookkeeper libraries that are used. The benefit of that is that there isn't an additional thread switch when the scheduled task triggers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to use the generic scheduledExecutor (just for deferring purposes) and then jump back into the same pinnedExecutor?

@merlimat Do you mean scheduledExecutor.schedule(pinnedExecutor.execute() ...) ?
Seems to be a feasible way right now :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps a more optimal solution would be to have the capability for scheduling tasks on the pinned scheduler

@lhotari The scheduled executor is less efficient compared to the normal executor because it has to maintain the delayed tasks. For that it's preferable not to use it directly in the critical data path, but only when we want to defer actions or for background tasks.

final ManagedLedgerFactoryImpl factory;
protected final ManagedLedgerMBeanImpl mbean;
protected final Clock clock;
Expand Down Expand Up @@ -298,7 +300,8 @@ public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper
this.ledgerMetadata = LedgerMetadataUtils.buildBaseManagedLedgerMetadata(name);
this.digestType = BookKeeper.DigestType.fromApiDigestType(config.getDigestType());
this.scheduledExecutor = scheduledExecutor;
this.executor = bookKeeper.getMainWorkerPool();
this.pinnedScheduledExecutor = scheduledExecutor.chooseThread(name);
this.pinnedExecutor = bookKeeper.getMainWorkerPool().chooseThread(name);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know why we never did this, but this saves a lot of string hashings too :)

TOTAL_SIZE_UPDATER.set(this, 0);
NUMBER_OF_ENTRIES_UPDATER.set(this, 0);
ENTRIES_ADDED_COUNTER_UPDATER.set(this, 0);
Expand Down Expand Up @@ -353,7 +356,7 @@ public void operationComplete(ManagedLedgerInfo mlInfo, Stat stat) {
if (ledgers.size() > 0) {
final long id = ledgers.lastKey();
OpenCallback opencb = (rc, lh, ctx1) -> {
executor.executeOrdered(name, safeRun(() -> {
pinnedExecutor.execute(safeRun(() -> {
mbean.endDataLedgerOpenOp();
if (log.isDebugEnabled()) {
log.debug("[{}] Opened ledger {}: {}", name, id, BKException.getMessage(rc));
Expand Down Expand Up @@ -462,7 +465,7 @@ public void operationFailed(MetaStoreException e) {
return;
}

executor.executeOrdered(name, safeRun(() -> {
pinnedExecutor.execute(safeRun(() -> {
mbean.endDataLedgerCreateOp();
if (rc != BKException.Code.OK) {
callback.initializeFailed(createManagedLedgerException(rc));
Expand Down Expand Up @@ -701,7 +704,7 @@ public void asyncAddEntry(ByteBuf buffer, AddEntryCallback callback, Object ctx)
OpAddEntry addOperation = OpAddEntry.create(this, buffer, callback, ctx);

// Jump to specific thread to avoid contention from writers writing from different threads
executor.executeOrdered(name, safeRun(() -> internalAsyncAddEntry(addOperation)));
pinnedExecutor.execute(safeRun(() -> internalAsyncAddEntry(addOperation)));
}

@Override
Expand All @@ -713,7 +716,7 @@ public void asyncAddEntry(ByteBuf buffer, int numberOfMessages, AddEntryCallback
OpAddEntry addOperation = OpAddEntry.create(this, buffer, numberOfMessages, callback, ctx);

// Jump to specific thread to avoid contention from writers writing from different threads
executor.executeOrdered(name, safeRun(() -> internalAsyncAddEntry(addOperation)));
pinnedExecutor.execute(safeRun(() -> internalAsyncAddEntry(addOperation)));
}

private synchronized void internalAsyncAddEntry(OpAddEntry addOperation) {
Expand Down Expand Up @@ -1481,7 +1484,7 @@ public void operationFailed(MetaStoreException e) {
private void updateLedgersListAfterRollover(MetaStoreCallback<Void> callback) {
if (!metadataMutex.tryLock()) {
// Defer update for later
scheduledExecutor.schedule(() -> updateLedgersListAfterRollover(callback), 100, TimeUnit.MILLISECONDS);
pinnedScheduledExecutor.schedule(() -> updateLedgersListAfterRollover(callback), 100, TimeUnit.MILLISECONDS);
return;
}

Expand Down Expand Up @@ -1778,7 +1781,7 @@ CompletableFuture<ReadHandle> getLedgerHandle(long ledgerId) {
}
promise.complete(res);
}
}, executor.chooseThread(name));
}, pinnedExecutor);
return promise;
});
}
Expand Down Expand Up @@ -2159,7 +2162,7 @@ void notifyCursors() {
break;
}

executor.execute(safeRun(waitingCursor::notifyEntriesAvailable));
pinnedExecutor.execute(safeRun(waitingCursor::notifyEntriesAvailable));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this required to be on the same executor?

We're notify multiple cursors that entries are available, this should be able to progress in parallel.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are 2 places will call the notifyCursors() method, one is OpAddEntry.safeRun(), it already run the pinnedExecutor so don't need to jump again.

Another one is the ledger closed, looks only need to change here.

}
}

Expand All @@ -2170,7 +2173,7 @@ void notifyWaitingEntryCallBacks() {
break;
}

executor.execute(safeRun(cb::entriesAvailable));
pinnedExecutor.execute(safeRun(cb::entriesAvailable));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same for this one, it should be same to spread into multiple threads.

}
}

Expand Down Expand Up @@ -2217,15 +2220,16 @@ private void trimConsumedLedgersInBackground() {

@Override
public void trimConsumedLedgersInBackground(CompletableFuture<?> promise) {
executor.executeOrdered(name, safeRun(() -> internalTrimConsumedLedgers(promise)));
pinnedExecutor.execute(safeRun(() -> internalTrimConsumedLedgers(promise)));
}

public void trimConsumedLedgersInBackground(boolean isTruncate, CompletableFuture<?> promise) {
executor.executeOrdered(name, safeRun(() -> internalTrimLedgers(isTruncate, promise)));
pinnedExecutor.execute(safeRun(() -> internalTrimLedgers(isTruncate, promise)));
}

private void scheduleDeferredTrimming(boolean isTruncate, CompletableFuture<?> promise) {
scheduledExecutor.schedule(safeRun(() -> trimConsumedLedgersInBackground(isTruncate, promise)), 100, TimeUnit.MILLISECONDS);
pinnedScheduledExecutor
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since trimConsumedLedgersInBackground() is already jumping on the pinnedExecutor, we shouldn't need to use a specific thread for the scheduled executor.

.schedule(safeRun(() -> trimConsumedLedgersInBackground(isTruncate, promise)), 100, TimeUnit.MILLISECONDS);
}

private void maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
Expand All @@ -2234,13 +2238,13 @@ private void maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
&& config.getLedgerOffloader().getOffloadPolicies() != null
&& config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() != null
&& config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() >= 0) {
executor.executeOrdered(name, safeRun(() -> maybeOffload(promise)));
pinnedExecutor.execute(safeRun(() -> maybeOffload(promise)));
}
}

private void maybeOffload(CompletableFuture<PositionImpl> finalPromise) {
if (!offloadMutex.tryLock()) {
scheduledExecutor.schedule(safeRun(() -> maybeOffloadInBackground(finalPromise)),
pinnedScheduledExecutor.schedule(safeRun(() -> maybeOffloadInBackground(finalPromise)),
100, TimeUnit.MILLISECONDS);
} else {
CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
Expand Down Expand Up @@ -2660,7 +2664,7 @@ private void asyncDeleteLedger(long ledgerId, long retry) {
log.warn("[{}] Ledger was already deleted {}", name, ledgerId);
} else if (rc != BKException.Code.OK) {
log.error("[{}] Error deleting ledger {} : {}", name, ledgerId, BKException.getMessage(rc));
scheduledExecutor.schedule(safeRun(() -> asyncDeleteLedger(ledgerId, retry - 1)), DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC, TimeUnit.SECONDS);
pinnedScheduledExecutor.schedule(safeRun(() -> asyncDeleteLedger(ledgerId, retry - 1)), DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC, TimeUnit.SECONDS);
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] Deleted ledger {}", name, ledgerId);
Expand Down Expand Up @@ -2932,7 +2936,7 @@ private void tryTransformLedgerInfo(long ledgerId, LedgerInfoTransformation tran
synchronized (this) {
if (!metadataMutex.tryLock()) {
// retry in 100 milliseconds
scheduledExecutor.schedule(
pinnedScheduledExecutor.schedule(
safeRun(() -> tryTransformLedgerInfo(ledgerId, transformation, finalPromise)), 100,
TimeUnit.MILLISECONDS);
} else { // lock acquired
Expand Down Expand Up @@ -3412,12 +3416,12 @@ public NavigableMap<Long, LedgerInfo> getLedgersInfo() {
return ledgers;
}

OrderedScheduler getScheduledExecutor() {
return scheduledExecutor;
ScheduledExecutorService getPinnedScheduledExecutor() {
return pinnedScheduledExecutor;
}

OrderedExecutor getExecutor() {
return executor;
Executor getPinnedExecutor() {
return pinnedExecutor;
}

private ManagedLedgerInfo getManagedLedgerInfo() {
Expand Down Expand Up @@ -3618,7 +3622,7 @@ protected void asyncCreateLedger(BookKeeper bookKeeper, ManagedLedgerConfig conf
cb.createComplete(Code.UnexpectedConditionException, null, ledgerCreated);
return;
}
scheduledExecutor.schedule(() -> {
pinnedScheduledExecutor.schedule(() -> {
if (!ledgerCreated.get()) {
if (log.isDebugEnabled()) {
log.debug("[{}] Timeout creating ledger", name);
Expand Down Expand Up @@ -3667,7 +3671,7 @@ private void scheduleTimeoutTask() {
timeoutSec = timeoutSec <= 0
? Math.max(config.getAddEntryTimeoutSeconds(), config.getReadEntryTimeoutSeconds())
: timeoutSec;
this.timeoutTask = this.scheduledExecutor.scheduleAtFixedRate(safeRun(() -> {
this.timeoutTask = this.pinnedScheduledExecutor.scheduleAtFixedRate(safeRun(() -> {
checkAddTimeout();
checkReadTimeout();
}), timeoutSec, timeoutSec, TimeUnit.SECONDS);
Expand Down Expand Up @@ -3808,7 +3812,7 @@ private void asyncUpdateProperties(Map<String, String> properties, boolean isDel
String deleteKey, final UpdatePropertiesCallback callback, Object ctx) {
if (!metadataMutex.tryLock()) {
// Defer update for later
scheduledExecutor.schedule(() -> asyncUpdateProperties(properties, isDelete, deleteKey,
pinnedScheduledExecutor.schedule(() -> asyncUpdateProperties(properties, isDelete, deleteKey,
callback, ctx), 100, TimeUnit.MILLISECONDS);
return;
}
Expand Down Expand Up @@ -3955,7 +3959,7 @@ private void updateLastLedgerCreatedTimeAndScheduleRolloverTask() {
// and the previous checkLedgerRollTask is not done, we could cancel it
checkLedgerRollTask.cancel(true);
}
this.checkLedgerRollTask = this.scheduledExecutor.schedule(
this.checkLedgerRollTask = this.pinnedScheduledExecutor.schedule(
safeRun(this::rollCurrentLedgerIfFull), this.maximumRolloverTimeMs, TimeUnit.MILLISECONDS);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public void addComplete(int rc, final LedgerHandle lh, long entryId, Object ctx)
}
checkArgument(ledger.getId() == lh.getId(), "ledgerId %s doesn't match with acked ledgerId %s", ledger.getId(),
lh.getId());

if (!checkAndCompleteOp(ctx)) {
// means callback might have been completed by different thread (timeout task thread).. so do nothing
return;
Expand All @@ -170,7 +170,7 @@ public void addComplete(int rc, final LedgerHandle lh, long entryId, Object ctx)
handleAddFailure(lh);
} else {
// Trigger addComplete callback in a thread hashed on the managed ledger name
ml.getExecutor().executeOrdered(ml.getName(), this);
ml.getPinnedExecutor().execute(this);
}
}

Expand Down Expand Up @@ -248,7 +248,7 @@ private void updateLatency() {

/**
* Checks if add-operation is completed
*
*
* @return true if task is not already completed else returns false.
*/
private boolean checkAndCompleteOp(Object ctx) {
Expand All @@ -269,7 +269,7 @@ void handleAddTimeoutFailure(final LedgerHandle ledger, Object ctx) {

/**
* It handles add failure on the given ledger. it can be triggered when add-entry fails or times out.
*
*
* @param ledger
*/
void handleAddFailure(final LedgerHandle ledger) {
Expand All @@ -278,7 +278,7 @@ void handleAddFailure(final LedgerHandle ledger) {
// be marked as failed.
ml.mbean.recordAddEntryError();

ml.getExecutor().executeOrdered(ml.getName(), SafeRun.safeRun(() -> {
ml.getPinnedExecutor().execute(SafeRun.safeRun(() -> {
// Force the creation of a new ledger. Doing it in a background thread to avoid acquiring ML lock
// from a BK callback.
ml.ledgerClosed(ledger);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {

if (!entries.isEmpty()) {
// There were already some entries that were read before, we can return them
cursor.ledger.getExecutor().execute(safeRun(() -> {
cursor.ledger.getPinnedExecutor().execute(safeRun(() -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should be careful in not serializing every cursor into the managed ledger pinned thread, as it could become a bottleneck where there are many cursors on a topic.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that's true.

The reason to use the pinned executor is to adhere to Java Memory Model rules of correct synchronization. There's a generic problem in OpReadEntry since it's sharing an array that is mutated by multiple threads. JLS 17.4 explains that "Incorrectly Synchronized Programs May Exhibit Surprising Behavior".

I would assume that "entries" would have to be copied to a new list before sharing if we want to use multiple threads. Is that right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the entry reading happens one by one, if we got the read entries failed here, this means we will not get a chance to add more elements to the list right(all the previous read operations are done)?

callback.readEntriesComplete(entries, ctx);
recycle();
}));
Expand Down Expand Up @@ -141,8 +141,8 @@ void checkReadCompletion() {
cursor.ledger.startReadOperationOnLedger(nextReadPosition, OpReadEntry.this);
}

// Schedule next read in a different thread
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a behaviour change
how can we verify that we are not breaking something or reducing overall performances ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how can we verify that we are not breaking something or reducing overall performances ?

testing, testing, testing. we need more Fallout tests. :)

cursor.ledger.getExecutor().execute(safeRun(() -> {
// Schedule next read
cursor.ledger.getPinnedExecutor().execute(safeRun(() -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other than the consideration that different cursors shouldn't be pinned on a single thread, the reason for jumping to a different thread here is to avoid a stack overflow.

When the read is being served from the ML cache, it's coming back from same thread. There are some conditions in which we ask for next read.

eg. If you ask to read 100 entries and we only got 20 entries from current ledger, we'll schedule a read for the remaining 80 on next ledger. In some cases there could be abnormal distributions, like 1 entry per ledger and it would be chaining all the reads and callback within the same stack.

Therefore, the "jump to a random thread" was introduced to break that chain.

Copy link
Member Author

@lhotari lhotari Aug 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't the usage of the pinned executor achieve the same result? It prevents the stack from going deeper and deeper.
Why would it have to jump to a random thread to break the chain?

The only reason that comes into my mind is the case where there's a completable future that gets triggered as part of the call flow and it is being waited to complete in the same thread as where the result should be executed in. That would never complete and would dead lock. Would that be the case here to use a different executor?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the stack should be checkReadCompletion -> entryCache.asyncReadEntry0 -> checkReadCompletion -> entryCache.asyncReadEntry0 -> checkReadCompletion -> entryCache.asyncReadEntry0 and so on, if we have entries in the cache.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't the usage of the pinned executor achieve the same result? It prevents the stack from going deeper and >deeper.
Why would it have to jump to a random thread to break the chain?

@lhotari Uhm, I think that some executors are short-circuiting the queue if they detect that you're trying to add a task from the same executor thread. That is the case for Netty IO thread, though I just check that it shouldn't happen on the ThreadPoolExecutor on which the OrderedExecutor is based upon.

readPosition = cursor.ledger.startReadOperationOnLedger(nextReadPosition, OpReadEntry.this);
cursor.ledger.asyncReadEntries(OpReadEntry.this);
}));
Expand All @@ -152,7 +152,7 @@ void checkReadCompletion() {
cursor.readOperationCompleted();

} finally {
cursor.ledger.getExecutor().executeOrdered(cursor.ledger.getName(), safeRun(() -> {
cursor.ledger.getPinnedExecutor().execute(safeRun(() -> {
callback.readEntriesComplete(entries, ctx);
recycle();
}));
Expand Down
Loading