Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] Make read compacted entries support maxReadSizeBytes limitation #21065

Merged
merged 4 commits into from
Sep 1, 2023

Conversation

coderzc
Copy link
Member

@coderzc coderzc commented Aug 25, 2023

Motivation

In #7060, we allow managed cursor to specify the max read batch in bytes to avoid reading too large data at a time, but reading compacted entries did not consider maxReadSizeBytes, we should make reading compacted entries also support maxReadSizeBytes limitation.

Modifications

  • Using the applyMaxSizeCap method to recalculate numberOfEntriesToRead before reading compacted entries.
  • Update read stats after completion of reading compacted entries.

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository:

@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Aug 25, 2023
Copy link
Contributor

@gaoran10 gaoran10 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@coderzc coderzc self-assigned this Aug 28, 2023
@coderzc coderzc added type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages area/broker labels Aug 28, 2023
@coderzc coderzc added this to the 3.2.0 milestone Aug 28, 2023
@coderzc coderzc closed this Aug 28, 2023
@coderzc coderzc reopened this Aug 28, 2023
@coderzc
Copy link
Member Author

coderzc commented Aug 28, 2023

org.apache.pulsar.compaction.StrategicCompactionTest.testCompactCompressedBatching always failed after applying this change. I noticed the StrategicTwoPhaseCompactor may write messages from the same batch into different entries, which will result in skipping some messages when reading compacted entries. Is that so?
Refer to:

Entry lastEntry = entries.get(entries.size() - 1);
// The compaction task depends on the last snapshot and the incremental
// entries to build the new snapshot. So for the compaction cursor, we
// need to force seek the read position to ensure the compactor can read
// the complete last snapshot because of the compactor will read the data
// before the compaction cursor mark delete position
cursor.seek(lastEntry.getPosition().getNext(), true);

<T> CompletableFuture<Boolean> addToCompactedLedger(
LedgerHandle lh, Message<T> m, String topic, Semaphore outstanding) {
CompletableFuture<Boolean> bkf = new CompletableFuture<>();
if (m == null || batchMessageContainer.add((MessageImpl<?>) m, null)) {
if (batchMessageContainer.getNumMessagesInBatch() > 0) {
try {
ByteBuf serialized = batchMessageContainer.toByteBuf();
outstanding.acquire();
mxBean.addCompactionWriteOp(topic, serialized.readableBytes());
long start = System.nanoTime();
lh.asyncAddEntry(serialized,
(rc, ledger, eid, ctx) -> {

/cc @heesung-sn @Demogorgon314

@Demogorgon314
Copy link
Member

@coderzc Yes, this is a bug. We will fix this issue.

@Demogorgon314
Copy link
Member

@heesung-sn Should we limit the batchingMaxMessages to 200(default is 1000), so the ServiceUnitStateChannelImpl will not hit this issue? Since the StrategicTwoPhaseCompactor uses 1000 as the max num messages in batch.

https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java#L301-L307

@heesung-sn
Copy link
Contributor

@heesung-sn Should we limit the batchingMaxMessages to 200(default is 1000), so the ServiceUnitStateChannelImpl will not hit this issue? Since the StrategicTwoPhaseCompactor uses 1000 as the max num messages in batch.

https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java#L301-L307

public class StrategicTwoPhaseCompactor extends TwoPhaseCompactor {
    private static final Logger log = LoggerFactory.getLogger(StrategicTwoPhaseCompactor.class);
    private static final int MAX_OUTSTANDING = 500;
    private static final int MAX_NUM_MESSAGES_IN_BATCH = 1000;
    private static final int MAX_BYTES_IN_BATCH = 128 * 1024;

Yes, if the big batching matters, then we could limit the batching in both the producer and the compactor side in BSC.

@coderzc coderzc force-pushed the improve_compacted_max_read_size branch from de6e18a to da0afc0 Compare August 30, 2023 03:49
@coderzc coderzc force-pushed the improve_compacted_max_read_size branch from da0afc0 to ce678d9 Compare August 30, 2023 07:03
@coderzc coderzc merged commit 835e9b6 into apache:master Sep 1, 2023
45 checks passed
Technoboy- pushed a commit that referenced this pull request Sep 5, 2023
coderzc added a commit to coderzc/pulsar that referenced this pull request Sep 11, 2023
coderzc added a commit to coderzc/pulsar that referenced this pull request Sep 11, 2023
coderzc added a commit to coderzc/pulsar that referenced this pull request Sep 11, 2023
coderzc added a commit to coderzc/pulsar that referenced this pull request Sep 13, 2023
coderzc added a commit to coderzc/pulsar that referenced this pull request Sep 14, 2023
coderzc added a commit to coderzc/pulsar that referenced this pull request Sep 14, 2023
coderzc added a commit to coderzc/pulsar that referenced this pull request Sep 18, 2023
coderzc added a commit to coderzc/pulsar that referenced this pull request Sep 18, 2023
coderzc added a commit to coderzc/pulsar that referenced this pull request Sep 20, 2023
coderzc added a commit that referenced this pull request Sep 22, 2023
@lhotari
Copy link
Member

lhotari commented Oct 1, 2023

There's a related really flaky test #21284 which slows down CI , @coderzc do you have a chance to check that? Thanks

@shibd
Copy link
Member

shibd commented Oct 22, 2023

@coderzc Can you help create a PR to cherry-pick this change to branch-2.11?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants