Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] [broker] Fix broker deadlock by making blocking bk-create async during ml-creation #22842

Closed
wants to merge 1 commit into from

Conversation

rdhabalia
Copy link
Contributor

Fixes #22840

Main Issue: #22840

Motivation

This PR addresses the second issue discussed in #22840
BookKeeper client creation path has a blocking call and blocks managed-ledger creation, and that eventually causes the deadlock. therefore, managed-ledger creation must create bk-client asynchronously.

"metadata-store-10-1" #25 prio=5 os_prio=0 cpu=7574.38ms elapsed=703.54s tid=0x00007fafdd884ed0 nid=0x37eb waiting for monitor entry  [0x00007fae8f9d3000]
   java.lang.Thread.State: BLOCKED (on object monitor)
        at java.util.concurrent.ConcurrentHashMap.computeIfAbsent([email protected]/ConcurrentHashMap.java:1723)
        - waiting to lock <0x00001000f4a02bc0> (a java.util.concurrent.ConcurrentHashMap$ReservationNode)
        at org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl.asyncOpen(ManagedLedgerFactoryImpl.java:365)
        at org.apache.pulsar.broker.service.BrokerService.lambda$createPersistentTopic$66(BrokerService.java:1533)
        at org.apache.pulsar.broker.service.BrokerService$$Lambda$644/0x00007facb91c8040.accept(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniAcceptNow([email protected]/CompletableFuture.java:753)
        at java.util.concurrent.CompletableFuture.uniAcceptStage([email protected]/CompletableFuture.java:731)
        at java.util.concurrent.CompletableFuture.thenAccept([email protected]/CompletableFuture.java:2108)
        at org.apache.pulsar.broker.service.BrokerService.createPersistentTopic(BrokerService.java:1514)
        at org.apache.pulsar.broker.service.BrokerService.lambda$checkOwnershipAndCreatePersistentTopic$60(BrokerService.java:1472)
        at org.apache.pulsar.broker.service.BrokerService$$Lambda$634/0x00007facb91c6968.accept(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniAcceptNow([email protected]/CompletableFuture.java:753)
        at java.util.concurrent.CompletableFuture.uniAcceptStage([email protected]/CompletableFuture.java:731)
        at java.util.concurrent.CompletableFuture.thenAccept([email protected]/CompletableFuture.java:2108)
        at org.apache.pulsar.broker.service.BrokerService.checkOwnershipAndCreatePersistentTopic(BrokerService.java:1470)
        at org.apache.pulsar.broker.service.BrokerService.lambda$loadOrCreatePersistentTopic$57(BrokerService.java:1437)
        at org.apache.pulsar.broker.service.BrokerService$$Lambda$492/0x00007facc317a960.run(Unknown Source)
        at java.util.concurrent.CompletableFuture$UniRun.tryFire([email protected]/CompletableFuture.java:783)
        at java.util.concurrent.CompletableFuture.postComplete([email protected]/CompletableFuture.java:506)
        at java.util.concurrent.CompletableFuture.complete([email protected]/CompletableFuture.java:2073)
        at org.apache.pulsar.metadata.coordination.impl.LockManagerImpl.lambda$acquireLock$1(LockManagerImpl.java:105)
        at org.apache.pulsar.metadata.coordination.impl.LockManagerImpl$$Lambda$613/0x00007facbf1d9d08.run(Unknown Source)
        at java.util.concurrent.CompletableFuture$UniRun.tryFire([email protected]/CompletableFuture.java:783)
        at java.util.concurrent.CompletableFuture.postComplete([email protected]/CompletableFuture.java:506)
        at java.util.concurrent.CompletableFuture.complete([email protected]/CompletableFuture.java:2073)
        at org.apache.pulsar.metadata.coordination.impl.ResourceLockImpl.lambda$acquire$2(ResourceLockImpl.java:128)
        at org.apache.pulsar.metadata.coordination.impl.ResourceLockImpl$$Lambda$611/0x00007facbdc40040.run(Unknown Source)
        at java.util.concurrent.CompletableFuture$UniRun.tryFire([email protected]/CompletableFuture.java:783)
        at java.util.concurrent.CompletableFuture.postComplete([email protected]/CompletableFuture.java:506)
        at java.util.concurrent.CompletableFuture.complete([email protected]/CompletableFuture.java:2073)
        at org.apache.pulsar.metadata.coordination.impl.ResourceLockImpl.lambda$acquireWithNoRevalidation$6(ResourceLockImpl.java:167)
        at org.apache.pulsar.metadata.coordination.impl.ResourceLockImpl$$Lambda$609/0x00007facbdc42908.accept(Unknown Source)
        at java.util.concurrent.CompletableFuture$UniAccept.tryFire([email protected]/CompletableFuture.java:714)
        at java.util.concurrent.CompletableFuture.postComplete([email protected]/CompletableFuture.java:506)
        at java.util.concurrent.CompletableFuture.complete([email protected]/CompletableFuture.java:2073)
        at org.apache.pulsar.metadata.impl.ZKMetadataStore.handlePutResult(ZKMetadataStore.java:225)
        at org.apache.pulsar.metadata.impl.ZKMetadataStore.lambda$batchOperation$7(ZKMetadataStore.java:182)
        at org.apache.pulsar.metadata.impl.ZKMetadataStore$$Lambda$160/0x00007fae8db950b0.run(Unknown Source)
        at java.util.concurrent.Executors$RunnableAdapter.call([email protected]/Executors.java:515)
        at java.util.concurrent.FutureTask.run([email protected]/FutureTask.java:264)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run([email protected]/ScheduledThreadPoolExecutor.java:304)
        at java.util.concurrent.ThreadPoolExecutor.runWorker([email protected]/ThreadPoolExecutor.java:1128)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run([email protected]/ThreadPoolExecutor.java:628)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run([email protected]/Thread.java:829)




"pulsar-io-4-32" #166 prio=5 os_prio=0 cpu=429.42ms elapsed=701.77s tid=0x00007facc80314b0 nid=0x3881 waiting on condition  [0x00007facc73f4000]
   java.lang.Thread.State: WAITING (parking)
        at jdk.internal.misc.Unsafe.park([email protected]/Native Method)
        - parking to wait for  <0x00001000f838ac60> (a java.util.concurrent.CompletableFuture$Signaller)
        at java.util.concurrent.locks.LockSupport.park([email protected]/LockSupport.java:194)
        at java.util.concurrent.CompletableFuture$Signaller.block([email protected]/CompletableFuture.java:1796)
        at java.util.concurrent.ForkJoinPool.managedBlock([email protected]/ForkJoinPool.java:3128)
        at java.util.concurrent.CompletableFuture.waitingGet([email protected]/CompletableFuture.java:1823)
        at java.util.concurrent.CompletableFuture.get([email protected]/CompletableFuture.java:1998)
        at org.apache.pulsar.bookie.rackawareness.BookieRackAffinityMapping.setConf(BookieRackAffinityMapping.java:121)
        - locked <0x00001000f8389698> (a org.apache.pulsar.bookie.rackawareness.BookieRackAffinityMapping)
        at org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl.initialize(RackawareEnsemblePlacementPolicyImpl.java:265)
        at org.apache.pulsar.bookie.rackawareness.IsolatedBookieEnsemblePlacementPolicy.initialize(IsolatedBookieEnsemblePlacementPolicy.java:105)
        at org.apache.pulsar.bookie.rackawareness.IsolatedBookieEnsemblePlacementPolicy.initialize(IsolatedBookieEnsemblePlacementPolicy.java:51)
        at org.apache.bookkeeper.client.BookKeeper.initializeEnsemblePlacementPolicy(BookKeeper.java:581)
        at org.apache.bookkeeper.client.BookKeeper.<init>(BookKeeper.java:505)
        at org.apache.bookkeeper.client.BookKeeper$Builder.build(BookKeeper.java:306)
        at org.apache.pulsar.broker.BookKeeperClientFactoryImpl.create(BookKeeperClientFactoryImpl.java:87)
        at org.apache.pulsar.broker.ManagedLedgerClientFactory.lambda$initialize$0(ManagedLedgerClientFactory.java:95)
        at org.apache.pulsar.broker.ManagedLedgerClientFactory$$Lambda$1006/0x00007facb41b1440.apply(Unknown Source)
        at java.util.concurrent.ConcurrentHashMap.computeIfAbsent([email protected]/ConcurrentHashMap.java:1705)
        - locked <0x00001000f3c03a28> (a java.util.concurrent.ConcurrentHashMap$ReservationNode)
        at org.apache.pulsar.broker.ManagedLedgerClientFactory.lambda$initialize$1(ManagedLedgerClientFactory.java:93)
        at org.apache.pulsar.broker.ManagedLedgerClientFactory$$Lambda$202/0x00007fae1e73e9b0.get(Unknown Source)
        at org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl.lambda$asyncOpen$6(ManagedLedgerFactoryImpl.java:369)
        at org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl$$Lambda$660/0x00007facb91ecd60.apply(Unknown Source)
        at java.util.concurrent.ConcurrentHashMap.computeIfAbsent([email protected]/ConcurrentHashMap.java:1705)
        - locked <0x00001000f4a02bc0> (a java.util.concurrent.ConcurrentHashMap$ReservationNode)
        at org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl.asyncOpen(ManagedLedgerFactoryImpl.java:365)
        at org.apache.pulsar.broker.service.BrokerService.lambda$createPersistentTopic$66(BrokerService.java:1533)
        at org.apache.pulsar.broker.service.BrokerService$$Lambda$644/0x00007facb91c8040.accept(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniAcceptNow([email protected]/CompletableFuture.java:753)
        at java.util.concurrent.CompletableFuture.uniAcceptStage([email protected]/CompletableFuture.java:731)
        at java.util.concurrent.CompletableFuture.thenAccept([email protected]/CompletableFuture.java:2108)
        at org.apache.pulsar.broker.service.BrokerService.createPersistentTopic(BrokerService.java:1514)
        at org.apache.pulsar.broker.service.BrokerService.lambda$checkOwnershipAndCreatePersistentTopic$60(BrokerService.java:1472)
        at org.apache.pulsar.broker.service.BrokerService$$Lambda$634/0x00007facb91c6968.accept(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniAcceptNow([email protected]/CompletableFuture.java:753)
        at java.util.concurrent.CompletableFuture.uniAcceptStage([email protected]/CompletableFuture.java:731)
        at java.util.concurrent.CompletableFuture.thenAccept([email protected]/CompletableFuture.java:2108)
        at org.apache.pulsar.broker.service.BrokerService.checkOwnershipAndCreatePersistentTopic(BrokerService.java:1470)
        at org.apache.pulsar.broker.service.BrokerService.lambda$loadOrCreatePersistentTopic$57(BrokerService.java:1437)
        at org.apache.pulsar.broker.service.BrokerService$$Lambda$492/0x00007facc317a960.run(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniRunNow([email protected]/CompletableFuture.java:815)
        at java.util.concurrent.CompletableFuture.uniRunStage([email protected]/CompletableFuture.java:799)
        at java.util.concurrent.CompletableFuture.thenRun([email protected]/CompletableFuture.java:2121)
        at org.apache.pulsar.broker.service.BrokerService.loadOrCreatePersistentTopic(BrokerService.java:1433)
        at org.apache.pulsar.broker.service.BrokerService.lambda$getTopic$31(BrokerService.java:1014)
        at org.apache.pulsar.broker.service.BrokerService$$Lambda$476/0x00007facc3057108.apply(Unknown Source)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.put(ConcurrentOpenHashMap.java:404)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.computeIfAbsent(ConcurrentOpenHashMap.java:238)
        at org.apache.pulsar.broker.service.BrokerService.getTopic(BrokerService.java:1013)
        at org.apache.pulsar.broker.service.BrokerService.getTopic(BrokerService.java:978)
        at org.apache.pulsar.broker.service.BrokerService.lambda$getOrCreateTopic$29(BrokerService.java:973)

This PR has unit-test which fails with single thread and it can be fixed by providing multiple threads into callback processing.

Modifications

Allow metadata-store to use multiple callback threads to avoid deadlock situation in broker.

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository:

lhotari
lhotari previously approved these changes Jun 4, 2024
return future;
}).thenAccept(ml -> callback.openLedgerComplete(ml, ctx)).exceptionally(exception -> {
callback.openLedgerFailed((ManagedLedgerException) exception.getCause(), ctx);
return null;
});
}

private CompletableFuture<BookKeeper> createBookKeeperClient(ManagedLedgerConfig config) {
CompletableFuture<BookKeeper> future = new CompletableFuture<>();
scheduledExecutor.execute(() -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the culprit of the issue is in the BookkeeperFactoryForCustomEnsemblePlacementPolicy interface, because it effectively masks a potentially blocking operation behind a method that does not look blocking, and doesn't even declare an exception.

I think a better solution here would to make the interface BookkeeperFactoryForCustomEnsemblePlacementPolicy to return a CompletableFuture of a BK client instead of using an executor here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the culprit of the issue is in the BookkeeperFactoryForCustomEnsemblePlacementPolicy interface

Yes. But that's something that depends on other system which is BookKeeper in this case, Upgrading bk version will not be possible for all Pulsar stable version.
Let's merge this patch and I can make bookkeeper enhancement to fix this issue for future version.

@merlimat
Copy link
Contributor

merlimat commented Jun 5, 2024

@rdhabalia I posted a fix in #22846. PTAL

@rdhabalia
Copy link
Contributor Author

I added a comment on #22846, that fix may not work when we want to strong bookie affinity.

@rdhabalia
Copy link
Contributor Author

@merlimat @lhotari #22846 PR should have not be merged and as I explained in that PR, this PR actually solves the problem.
I think what we did was wrong and I didn't agree with it.

@rdhabalia
Copy link
Contributor Author

also, please let us know if you want us to just create an issue with info so, we don't waste our time in creating PR. I don't think it's a correct way to review and merge something without any discussion. I feel it's not a good practice.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/broker doc-not-needed Your PR changes do not impact docs ready-to-test
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Bug] Broker became irresponsive due to deadlock during race-condition in metadatastore callback
3 participants