-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ML] Avoid passing OpAddEntry across a thread boundary in asyncAddEntry #12606
Conversation
@lhotari:Thanks for providing doc info! |
I'll need to revisit the changes. |
This change makes sense to me. |
Direct evidence in a form of a failing test is hard for a lot of thread safety issues. In many cases, it is worth checking that code follows the rules of the Java Memory Model (JMM), specified in the Java Language Specification (JLS). In Java on 64-bit JVMs, there isn't a problem of corruption of memory pointers since "word tearing" problems are prevented. The main issue is possible data consistency and data race issues. "Incorrectly Synchronized Programs May Exhibit Surprising Behavior" is how JLS explains it. I guess the verification activity is like a model checking exercise, but at a different level. My assumption is that the use of recycled objects and sharing them between multiple threads leads to "incorrectly synchronized programs" within Pulsar code base. It doesn't corrupt memory, but we might see "surprising behavior". One of the surprising behavior comes from the possibility of a field value changing even between subsequent reads while the field value hasn't been changed. This is explained in Aleksey Shipilëv's Java Memory Model Unlearning Experience presentation. at about 21 minutes 50 seconds, there's an interesting example: In this example, the first thread makes a single change to field x and sets it to 1. The second thread reads this field twice. On the first read, the value is 1 and on the second read, the value is 0. This is something that one would assume that "is impossible". This is one example of surprising behavior of incorrectly synchronized programs. There are multiple ways to mitigate the issue. The safest general approach is to ensure that recycled objects are exposed only in a single thread. The main motivation of this PR is to reduce the likelyhood of "surprising behavior" related to the use of Netty recycled object, OpAddEntry. The Netty Recycler pool is thread local. When the creation (and lookup from the Netty Recycler) of OpAddEntry is moved in the same thread where the OpAddEntry is mainly mutated and read, it reduces the likelyhood of possible data races caused by sharing objects across threads without proper synchronization. This isn't perfect, but it's an improvement. Currently ManagedLedgerImpl uses a single thread per ledger. There are gaps in the solution. The goal of my draft PR #11387 would be to continue the improvements and eventually fix the current thread safety issues that the use of recycled OpAddEntry and OpReadEntry instances introduce in Pulsar. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -699,10 +699,14 @@ public void asyncAddEntry(ByteBuf buffer, AddEntryCallback callback, Object ctx) | |||
log.debug("[{}] asyncAddEntry size={} state={}", name, buffer.readableBytes(), state); | |||
} | |||
|
|||
OpAddEntry addOperation = OpAddEntry.create(this, buffer, callback, ctx); | |||
// retain buffer in this thread | |||
buffer.retain(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it necessary to retain the buffer in this thread?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. It also leads to test failures unless it's taken care of in the current thread.
@merlimat Please help review this PR, thanks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
* up/master: (55 commits) [broker] remove useless method "PersistentTopic#getPersistentTopic" (apache#12655) [Python Schema] Python schema support custom Avro configurations for Enum type (apache#12642) Allow to configure different implementations for Pulsar functions state store (apache#12646) Remove replicator global test from the quarantine group (apache#12648) [Java Client] Remove invalid call to Thread.currentThread().interrupt(); (apache#12652) k8s runtime: force deletion to avoid hung function worker during connector restart (apache#12504) [Broker] Optimize exception information for schemas (apache#12647) Close Zk database on unit tests (apache#12649) Fix call sync method in an async callback when enabling geo replicator. (apache#12590) [pulsar-broker] Add git branch information for PulsarVersion (apache#12541) PulsarAdmin: Fix last exit code storage (apache#12581) Add @test annotation to test methods (apache#12640) Upgrade debezium to 1.7.1 (apache#12644) [ML] Avoid passing OpAddEntry across a thread boundary in asyncAddEntry (apache#12606) [Functions] Prevent NPE while stopping a non started Pulsar LogAppender (apache#12643) Update io-debezium-source.md (apache#12638) Add missing cmds on pulsar-admin document page (apache#12634) Clean up the metadata of the non-persistent partitioned topics. (apache#12550) modify check waitingForPingResponse with volatile (apache#12615) [pulsar-admin] Check backlog quota policy for namespace (apache#12512) ...
* up/master: (55 commits) [broker] remove useless method "PersistentTopic#getPersistentTopic" (apache#12655) [Python Schema] Python schema support custom Avro configurations for Enum type (apache#12642) Allow to configure different implementations for Pulsar functions state store (apache#12646) Remove replicator global test from the quarantine group (apache#12648) [Java Client] Remove invalid call to Thread.currentThread().interrupt(); (apache#12652) k8s runtime: force deletion to avoid hung function worker during connector restart (apache#12504) [Broker] Optimize exception information for schemas (apache#12647) Close Zk database on unit tests (apache#12649) Fix call sync method in an async callback when enabling geo replicator. (apache#12590) [pulsar-broker] Add git branch information for PulsarVersion (apache#12541) PulsarAdmin: Fix last exit code storage (apache#12581) Add @test annotation to test methods (apache#12640) Upgrade debezium to 1.7.1 (apache#12644) [ML] Avoid passing OpAddEntry across a thread boundary in asyncAddEntry (apache#12606) [Functions] Prevent NPE while stopping a non started Pulsar LogAppender (apache#12643) Update io-debezium-source.md (apache#12638) Add missing cmds on pulsar-admin document page (apache#12634) Clean up the metadata of the non-persistent partitioned topics. (apache#12550) modify check waitingForPingResponse with volatile (apache#12615) [pulsar-admin] Check backlog quota policy for namespace (apache#12512) ...
…ry (apache#12606) * [ML] Avoid passing OpAddEntry across a thread boundary * Retain buffer in current thread (cherry picked from commit 6af747f)
…ry (apache#12606) * [ML] Avoid passing OpAddEntry across a thread boundary * Retain buffer in current thread
Motivation
There's a concern that passing OpAddEntry across thread boundaries has thread safety issues combined to the use of recycled OpAddEntry object instances.
Modifications
To avoid any thread safety issues caused by passing OpAddEntry, the OpAddEntry instance can be created in the same thread where it is handled.