Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ManagedLedger] Pin executor and scheduled executor threads for ManagedLedgerImpl #11387

Closed

Conversation

lhotari
Copy link
Member

@lhotari lhotari commented Jul 20, 2021

Motivation

OpReadEntry is not multi-thread safe. OpReadEntry.entries is an ArrayList without any synchronization.
However it is accessed from multiple threads.

Here's an example of such code:

OpReadEntry.entries mutated:

Mutation triggered from multiple threads:

if (entries.size() < count && cursor.hasMoreEntries() &&
((PositionImpl) cursor.getReadPosition()).compareTo(maxPosition) < 0) {
// We still have more entries to read from the next ledger, schedule a new async operation
if (nextReadPosition.getLedgerId() != readPosition.getLedgerId()) {
cursor.ledger.startReadOperationOnLedger(nextReadPosition, OpReadEntry.this);
}
// Schedule next read in a different thread
cursor.ledger.getExecutor().execute(safeRun(() -> {
readPosition = cursor.ledger.startReadOperationOnLedger(nextReadPosition, OpReadEntry.this);
cursor.ledger.asyncReadEntries(OpReadEntry.this);
}));

The operations seem to happen sequentially when OpReadEntry.entries is mutated or accessed, but this is not enough for ensuring thread safety in Java.

The goal of this change is to improve Managed Ledger operations by running operations in the pinned executor thread where the thread is picked by the hash of the managed ledger name. This is how most of the code is already written, but there are some exceptions in the current code. The goal of this PR change is to ensure that the pinned executor is used in all cases where work is scheduled to run using the ledger's executor.

Modifications

  • Don't expose OrderedExecutor from ManagedLedgerImpl.getExecutor

    • instead, return executor that is pinned to a single thread with .chooseThread(getName())
      • most usages of ManagedLedgerImpl.getExecutor were already calling .chooseThread(ml.getName()), however
        some locations were omitting it. It's better to always pin the ManagedLedgerImpl.getExecutor
        to a single thread.
  • Don't expose OrderedScheduler from ManagedLedgerImpl.getScheduledExecutor

  • instead return scheduled executor service that is pinned to a single thread with .chooseThread(getName()).

  • Pin executor and scheduled executor usage inside ManagedLedgerImpl class

  • this improves thread safety of Managed Ledger code base since more operations will happen in a single thread

    • some classes such as OpReadEntry are not multi-thread safe. OpReadEntry.entries is a ArrayList without any synchronization.

Known gaps

  • ManagedLedgerImpl uses two separate executors: the scheduled executor and a "normal" executor. This leads to multi-thread access. It would be better to combine the execution of both scheduled and "normal" execution to a single thread in some upcoming PRs.

@lhotari lhotari changed the title [ManagedLedger] Don't expose OrderedExecutor from ManagedLedgerImpl.getExecutor [ManagedLedger] Pin executor thread for usages of ManagedLedgerImpl.getExecutor Jul 20, 2021
@lhotari lhotari added this to the 2.9.0 milestone Jul 20, 2021
@lhotari lhotari self-assigned this Jul 20, 2021
@BewareMyPower
Copy link
Contributor

Great find!

But I have a question about it. entries is exposed by followed public method:

public int getNumberOfEntriesToRead() {
return count - entries.size();
}

And the method's call stack could be:

ManagedCursor#asyncReadEntries
  ManagedCursorImpl#asyncReadEntries
    ManagedLedgerImpl#asyncReadEntries
      ManagedLedgerImpl#internalReadFromLedger

Could ManagedCursor#asyncReadEntries be called in a different thread?

@lhotari lhotari added the doc-not-needed Your PR changes do not impact docs label Jul 20, 2021
@lhotari
Copy link
Member Author

lhotari commented Jul 20, 2021

Could ManagedCursor#asyncReadEntries be called in a different thread?

@BewareMyPower This is one example in the current code base where a different thread might be used:

if (entries.size() < count && cursor.hasMoreEntries() &&
((PositionImpl) cursor.getReadPosition()).compareTo(maxPosition) < 0) {
// We still have more entries to read from the next ledger, schedule a new async operation
if (nextReadPosition.getLedgerId() != readPosition.getLedgerId()) {
cursor.ledger.startReadOperationOnLedger(nextReadPosition, OpReadEntry.this);
}
// Schedule next read in a different thread
cursor.ledger.getExecutor().execute(safeRun(() -> {
readPosition = cursor.ledger.startReadOperationOnLedger(nextReadPosition, OpReadEntry.this);
cursor.ledger.asyncReadEntries(OpReadEntry.this);
}));

@BewareMyPower
Copy link
Contributor

@lhotari I know. What I want to ask is when ManagedCursor#asyncReadEntries is called, the OpReadEntry#getNumberOfEntriesToRead will be called eventually even after your change. Is it called in another thread? If yes, there could still be a thread safety problem.

@lhotari lhotari changed the title [ManagedLedger] Pin executor thread for usages of ManagedLedgerImpl.getExecutor [ManagedLedger] Pin executor and scheduled executor threads for ManagedLedgerImpl Jul 20, 2021
@lhotari
Copy link
Member Author

lhotari commented Jul 20, 2021

@lhotari I know. What I want to ask is when ManagedCursor#asyncReadEntries is called, the OpReadEntry#getNumberOfEntriesToRead will be called eventually even after your change. Is it called in another thread? If yes, there could still be a thread safety problem.

The concurrency design doesn't currently ensure single thread access. It's possible that there is multi-thread access at the moment. For example, the scheduler uses a different executor and therefore also a different thread. However, I believe that this PR improves the existing solution and can help reduce issues caused by thread safety issues.
I think it's possible to incrementally refactor the concurrency solution towards a solution where a single thread handles all accesses for a specific key (managed ledger name in this case). We can have a broader discussion about this on the mailing list or in some upcoming community meeting.

BewareMyPower
BewareMyPower previously approved these changes Jul 20, 2021
@@ -141,8 +141,8 @@ void checkReadCompletion() {
cursor.ledger.startReadOperationOnLedger(nextReadPosition, OpReadEntry.this);
}

// Schedule next read in a different thread
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a behaviour change
how can we verify that we are not breaking something or reducing overall performances ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how can we verify that we are not breaking something or reducing overall performances ?

testing, testing, testing. we need more Fallout tests. :)

@devinbost
Copy link
Contributor

Related to #6054

@lhotari lhotari closed this Jul 22, 2021
@lhotari lhotari reopened this Jul 22, 2021
@sijie sijie self-requested a review July 22, 2021 15:38
@lhotari lhotari force-pushed the lh-fix-managedledger-thread-safety branch from f8e2aae to 43a099f Compare August 11, 2021 06:13
eolivelli
eolivelli previously approved these changes Aug 16, 2021
Copy link
Contributor

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdhabalia @merlimat @codelipenghui can you please give your opinion on this patch ?

I believe it is a good fix, but we need more eyes

@lhotari lhotari requested a review from ivankelly August 17, 2021 14:49
…etExecutor

- instead, return executor that is pinned to a single thread
  - most usages of ManagedLedgerImpl.getExecutor were already calling chooseThread(ml.getName()), however
    some locations were omitting it. It's better to always pin the ManagedLedgerImpl.getExecutor
    to a single thread.

- this improves thread safety of Managed Ledger code base since more operations will happen in a single thread
  - some classes such as OpReadEntry are not multi-thread safe. OpReadEntry.entries is a ArrayList without any
    synchronization.
@lhotari lhotari force-pushed the lh-fix-managedledger-thread-safety branch from 43a099f to c869ec4 Compare August 17, 2021 14:53
@@ -252,7 +253,8 @@
protected volatile State state = null;

private final OrderedScheduler scheduledExecutor;
private final OrderedExecutor executor;
private final ScheduledExecutorService pinnedScheduledExecutor;
private final Executor pinnedExecutor;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are using 2 threads, one with the regular executor (which is more efficient) and the other for the pinnedScheduledExecutor, wouldn't that mean that we still have more than 1 thread accessing some of the objects?

Would it make sense to use the generic scheduledExecutor (just for deferring purposes) and then jump back into the same pinnedExecutor?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's true.

Perhaps a more optimal solution would be to have the capability for scheduling tasks on the pinned scheduler. I don't know why this solution isn't available in the underlying Bookkeeper libraries that are used. The benefit of that is that there isn't an additional thread switch when the scheduled task triggers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to use the generic scheduledExecutor (just for deferring purposes) and then jump back into the same pinnedExecutor?

@merlimat Do you mean scheduledExecutor.schedule(pinnedExecutor.execute() ...) ?
Seems to be a feasible way right now :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps a more optimal solution would be to have the capability for scheduling tasks on the pinned scheduler

@lhotari The scheduled executor is less efficient compared to the normal executor because it has to maintain the delayed tasks. For that it's preferable not to use it directly in the critical data path, but only when we want to defer actions or for background tasks.

@@ -298,7 +300,8 @@ public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper
this.ledgerMetadata = LedgerMetadataUtils.buildBaseManagedLedgerMetadata(name);
this.digestType = BookKeeper.DigestType.fromApiDigestType(config.getDigestType());
this.scheduledExecutor = scheduledExecutor;
this.executor = bookKeeper.getMainWorkerPool();
this.pinnedScheduledExecutor = scheduledExecutor.chooseThread(name);
this.pinnedExecutor = bookKeeper.getMainWorkerPool().chooseThread(name);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know why we never did this, but this saves a lot of string hashings too :)

@@ -2159,7 +2162,7 @@ void notifyCursors() {
break;
}

executor.execute(safeRun(waitingCursor::notifyEntriesAvailable));
pinnedExecutor.execute(safeRun(waitingCursor::notifyEntriesAvailable));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this required to be on the same executor?

We're notify multiple cursors that entries are available, this should be able to progress in parallel.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are 2 places will call the notifyCursors() method, one is OpAddEntry.safeRun(), it already run the pinnedExecutor so don't need to jump again.

Another one is the ledger closed, looks only need to change here.

@@ -2170,7 +2173,7 @@ void notifyWaitingEntryCallBacks() {
break;
}

executor.execute(safeRun(cb::entriesAvailable));
pinnedExecutor.execute(safeRun(cb::entriesAvailable));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same for this one, it should be same to spread into multiple threads.

}

private void scheduleDeferredTrimming(boolean isTruncate, CompletableFuture<?> promise) {
scheduledExecutor.schedule(safeRun(() -> trimConsumedLedgersInBackground(isTruncate, promise)), 100, TimeUnit.MILLISECONDS);
pinnedScheduledExecutor
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since trimConsumedLedgersInBackground() is already jumping on the pinnedExecutor, we shouldn't need to use a specific thread for the scheduled executor.

@@ -93,7 +93,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {

if (!entries.isEmpty()) {
// There were already some entries that were read before, we can return them
cursor.ledger.getExecutor().execute(safeRun(() -> {
cursor.ledger.getPinnedExecutor().execute(safeRun(() -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should be careful in not serializing every cursor into the managed ledger pinned thread, as it could become a bottleneck where there are many cursors on a topic.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that's true.

The reason to use the pinned executor is to adhere to Java Memory Model rules of correct synchronization. There's a generic problem in OpReadEntry since it's sharing an array that is mutated by multiple threads. JLS 17.4 explains that "Incorrectly Synchronized Programs May Exhibit Surprising Behavior".

I would assume that "entries" would have to be copied to a new list before sharing if we want to use multiple threads. Is that right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the entry reading happens one by one, if we got the read entries failed here, this means we will not get a chance to add more elements to the list right(all the previous read operations are done)?

// Schedule next read in a different thread
cursor.ledger.getExecutor().execute(safeRun(() -> {
// Schedule next read
cursor.ledger.getPinnedExecutor().execute(safeRun(() -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other than the consideration that different cursors shouldn't be pinned on a single thread, the reason for jumping to a different thread here is to avoid a stack overflow.

When the read is being served from the ML cache, it's coming back from same thread. There are some conditions in which we ask for next read.

eg. If you ask to read 100 entries and we only got 20 entries from current ledger, we'll schedule a read for the remaining 80 on next ledger. In some cases there could be abnormal distributions, like 1 entry per ledger and it would be chaining all the reads and callback within the same stack.

Therefore, the "jump to a random thread" was introduced to break that chain.

Copy link
Member Author

@lhotari lhotari Aug 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't the usage of the pinned executor achieve the same result? It prevents the stack from going deeper and deeper.
Why would it have to jump to a random thread to break the chain?

The only reason that comes into my mind is the case where there's a completable future that gets triggered as part of the call flow and it is being waited to complete in the same thread as where the result should be executed in. That would never complete and would dead lock. Would that be the case here to use a different executor?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the stack should be checkReadCompletion -> entryCache.asyncReadEntry0 -> checkReadCompletion -> entryCache.asyncReadEntry0 -> checkReadCompletion -> entryCache.asyncReadEntry0 and so on, if we have entries in the cache.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't the usage of the pinned executor achieve the same result? It prevents the stack from going deeper and >deeper.
Why would it have to jump to a random thread to break the chain?

@lhotari Uhm, I think that some executors are short-circuiting the queue if they detect that you're trying to add a task from the same executor thread. That is the case for Netty IO thread, though I just check that it shouldn't happen on the ThreadPoolExecutor on which the OrderedExecutor is based upon.

@tisonkun
Copy link
Member

tisonkun commented Dec 9, 2022

Closed as stale and conflict. Please rebase and resubmit the patch if it's still relevant.

@tisonkun tisonkun closed this Dec 9, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/broker doc-not-needed Your PR changes do not impact docs
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants