Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Avoid compaction task stuck when the last message to compact is a marker #21718

Merged
merged 5 commits into from
Dec 20, 2023

Conversation

coderzc
Copy link
Member

@coderzc coderzc commented Dec 13, 2023

Motivation

If the last message to compact is a marker, then the compaction task will stuck until timeout fails. The root cause is that the marker has been filtered out by the dispatcher, and the compaction reader will not read the last message.

 WARN  - [broker-client-shared-internal-executor-18-1:PersistentTopic@3432] - [persistent://tnx/ns-prechecks/test_transaction_topic211ca13d-f8a4-4648-a7b9-bd2f679e0f3b] Compaction failure.
java.util.concurrent.CompletionException: org.apache.pulsar.common.util.FutureUtil$LowOverheadTimeoutException: Timeout
	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:332) ~[?:?]
	at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:347) ~[?:?]
	at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:708) ~[?:?]
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[?:?]
	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2162) ~[?:?]
	at org.apache.pulsar.common.util.FutureUtil.lambda$addTimeoutHandling$9(FutureUtil.java:236) ~[classes/:?]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[?:?]
	at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.100.Final.jar:4.1.100.Final]
	at java.lang.Thread.run(Thread.java:833) ~[?:?]
Caused by: org.apache.pulsar.common.util.FutureUtil$LowOverheadTimeoutException: Timeout
	at org.apache.pulsar.compaction.TwoPhaseCompactor.phaseOneLoop(...)(Unknown Source) ~[classes/:?]

Modifications

For __compaction internal cursor, we don't need to filter out marker messages and delete marker messages during compaction.

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository:

@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Dec 13, 2023
@coderzc coderzc added type/bug The PR fixed a bug or issue reported a bug area/broker labels Dec 13, 2023
@coderzc coderzc added this to the 3.2.0 milestone Dec 13, 2023
@Technoboy-
Copy link
Contributor

We need to cherry to 2.9 ~ 3.1 right ?

@coderzc
Copy link
Member Author

coderzc commented Dec 14, 2023

We need to cherry to 2.9 ~ 3.1 right ?

Yes.

@codecov-commenter
Copy link

Codecov Report

Merging #21718 (bbef1e6) into master (50007c3) will decrease coverage by 0.17%.
Report is 10 commits behind head on master.
The diff coverage is 93.33%.

Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #21718      +/-   ##
============================================
- Coverage     73.50%   73.33%   -0.17%     
+ Complexity    32818    32754      -64     
============================================
  Files          1893     1893              
  Lines        140721   140768      +47     
  Branches      15502    15512      +10     
============================================
- Hits         103432   103228     -204     
- Misses        29200    29427     +227     
- Partials       8089     8113      +24     
Flag Coverage Δ
inttests 24.14% <40.00%> (-0.02%) ⬇️
systests 24.72% <6.66%> (-0.12%) ⬇️
unittests 72.62% <93.33%> (-0.16%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files Coverage Δ
...rg/apache/pulsar/compaction/TwoPhaseCompactor.java 74.59% <100.00%> (+0.64%) ⬆️
.../pulsar/broker/service/AbstractBaseDispatcher.java 93.84% <85.71%> (+0.03%) ⬆️

... and 80 files with indirect coverage changes

@@ -192,7 +195,7 @@ public int filterEntriesForConsumer(@Nullable MessageMetadata[] metadataArray, i
}
}

if (msgMetadata == null || Markers.isServerOnlyMarker(msgMetadata)) {
if (msgMetadata == null || (Markers.isServerOnlyMarker(msgMetadata) && !Markers.isTxnMarker(msgMetadata))) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should pass all the markers and filter them at the client side when doing compaction.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

@@ -262,7 +266,10 @@ private void phaseTwoLoop(RawReader reader, MessageId to, Map<String, MessageId>
MessageId id = m.getMessageId();
Optional<RawMessage> messageToAdd = Optional.empty();
mxBean.addCompactionReadOp(reader.getTopic(), m.getHeadersAndPayload().readableBytes());
if (RawBatchConverter.isReadableBatch(m)) {
MessageMetadata metadata = Commands.parseMessageMetadata(m.getHeadersAndPayload());
if (Markers.isTxnMarker(metadata)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why only check the txn marker here? Should we use Markers.isServerOnlyMarker(metadata)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I will fix it.

@@ -1849,4 +1850,62 @@ public void testReadCommittedWithReadCompacted() throws Exception{
Assert.assertEquals(messages, List.of("V2", "V3"));
}


@Test
public void testReadCommittedWithCompaction() throws Exception{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a test to send a replicated subscription snapshot marker and test the compaction works?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I add test testReplicatedSubscriptionWithCompaction, PLAT~

@coderzc coderzc merged commit fc393f6 into apache:master Dec 20, 2023
48 of 49 checks passed
@coderzc coderzc deleted the fix_compaction_marker branch December 20, 2023 01:39
Technoboy- pushed a commit that referenced this pull request Jan 3, 2024
nikhil-ctds pushed a commit to datastax/pulsar that referenced this pull request Jan 4, 2024
…mpact is a marker (apache#21718)

(cherry picked from commit 1f99568)

 Conflicts:
	pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java

Fixed duplicate namespace created in transactionTest
srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request Jan 8, 2024
…mpact is a marker (apache#21718)

(cherry picked from commit 1f99568)

 Conflicts:
	pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java

Fixed duplicate namespace created in transactionTest
Technoboy- pushed a commit that referenced this pull request Jan 16, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants