Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][txn] Fix getting last message ID when there are ongoing transactions #21466

Merged
merged 11 commits into from
Dec 13, 2023

Conversation

liangyepianzhou
Copy link
Contributor

@liangyepianzhou liangyepianzhou commented Oct 30, 2023

Motivation

  1. Consumer/Reader API getLastMessageIds should return the max read position instead of lastConfirmedEntry.
    if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
    Consumer consumer = consumerFuture.getNow(null);
    long requestId = getLastMessageId.getRequestId();
    Topic topic = consumer.getSubscription().getTopic();
    Position lastPosition = topic.getLastPosition();
  2. When the transaction buffer is recovering, the MaxReadPosition of the topic is unknown. Wait it recovers completely is needed.
  3. The MaxReadPosition should be the previous position of the first ongoing transaction position instead of EntryID -1. Because compacted decide whether the topic's managedLedger is null by EntryID == -1
    compactionHorizonFuture.whenComplete((compactionHorizon, ex) -> {
    if (ex != null) {
    log.error("Failed to get compactionHorizon.", ex);
    writeAndFlush(Commands.newError(requestId, ServerError.MetadataError, ex.getMessage()));
    return;
    }
    if (lastPosition.getEntryId() == -1 || (compactionHorizon != null
    && lastPosition.compareTo((PositionImpl) compactionHorizon) <= 0)) {
    handleLastMessageIdFromCompactionService(persistentTopic, requestId, partitionIndex,
    markDeletePosition);
    return;
    }
    // For a valid position, we read the entry out and parse the batch size from its metadata.
    CompletableFuture<Entry> entryFuture = new CompletableFuture<>();

Modifications

  1. Return max read position when calling getLastMessageIds.
  2. Call checkIfTBRecoverCompletely before getting the max read position.
  3. If transaction buffer recovery fails, return Unknown error to the client.

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository:

@codelipenghui
Copy link
Contributor

@coderzc

Could you please help review

The MaxReadPosition should be the previous position of the first ongoing transaction position instead of EntryID -1.
Compacted decide whether the topic's managedLedger is null by EntryID == -1

I'm not sure what does it actually means.

@liangyepianzhou liangyepianzhou changed the title [fix][txn] Wait TransactionBuffer recover completely when get last message ID [fix][txn] Fix getting last message ID when there are ongoing transactions Nov 6, 2023
@coderzc
Copy link
Member

coderzc commented Nov 7, 2023

@coderzc

Could you please help review

The MaxReadPosition should be the previous position of the first ongoing transaction position instead of EntryID -1.
Compacted decide whether the topic's managedLedger is null by EntryID == -1

I'm not sure what does it actually means.

@coderzc

Could you please help review

The MaxReadPosition should be the previous position of the first ongoing transaction position instead of EntryID -1.
Compacted decide whether the topic's managedLedger is null by EntryID == -1

I'm not sure what does it actually means.

I think this change is correct since the first entryID of ongoing maybe 0, so its previous entry can't be directly by entryID-1.

@coderzc
Copy link
Member

coderzc commented Nov 7, 2023

@liangyepianzhou I think this PR also fixes #21456.

Modification:
1. Wait transaction buffer ready
2. Use `getPreviousPosition` replace `EntryID - 1`
@codelipenghui
Copy link
Contributor

/pulsarbot run-failure-checks

@coderzc
Copy link
Member

coderzc commented Dec 12, 2023

/pulsarbot run-failure-checks

@codecov-commenter
Copy link

Codecov Report

Merging #21466 (f229294) into master (517961e) will increase coverage by 41.55%.
Report is 111 commits behind head on master.
The diff coverage is 75.00%.

Additional details and impacted files

Impacted file tree graph

@@              Coverage Diff              @@
##             master   #21466       +/-   ##
=============================================
+ Coverage     31.82%   73.37%   +41.55%     
- Complexity    11870    32661    +20791     
=============================================
  Files          1511     1893      +382     
  Lines        116133   140721    +24588     
  Branches      12585    15502     +2917     
=============================================
+ Hits          36963   103260    +66297     
+ Misses        74228    29344    -44884     
- Partials       4942     8117     +3175     
Flag Coverage Δ
inttests 24.11% <66.66%> (?)
systests 24.72% <62.50%> (?)
unittests 72.68% <75.00%> (+40.86%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files Coverage Δ
...ransaction/buffer/impl/TopicTransactionBuffer.java 87.74% <100.00%> (+67.23%) ⬆️
...nsaction/buffer/impl/TransactionBufferDisable.java 56.52% <100.00%> (+8.90%) ⬆️
...sar/broker/service/persistent/PersistentTopic.java 78.49% <50.00%> (+32.61%) ⬆️
...ransaction/buffer/impl/InMemTransactionBuffer.java 57.57% <50.00%> (+57.57%) ⬆️
...va/org/apache/pulsar/broker/service/ServerCnx.java 72.10% <73.33%> (+35.24%) ⬆️

... and 1575 files with indirect coverage changes

@coderzc coderzc merged commit 50007c3 into apache:master Dec 13, 2023
48 checks passed
mukesh-ctds pushed a commit to datastax/pulsar that referenced this pull request Mar 1, 2024
lhotari pushed a commit that referenced this pull request Mar 4, 2024
mukesh-ctds pushed a commit to datastax/pulsar that referenced this pull request Mar 6, 2024
codelipenghui pushed a commit that referenced this pull request Apr 28, 2024
@liangyepianzhou liangyepianzhou deleted the getmaxposition branch July 23, 2024 08:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants