This repository has been archived by the owner on Apr 1, 2024. It is now read-only.
forked from apache/pulsar
-
Notifications
You must be signed in to change notification settings - Fork 25
Bump pulsar version to 3.0.0-SNAPSHOT #6042
Draft
streamnativebot
wants to merge
494
commits into
master
Choose a base branch
from
branch-3.0.0-SNAPSHOT
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Draft
Conversation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
…pache#21035) Motivation: After [PIP-118: reconnect broker when ZooKeeper session expires](apache#13341), the Broker will not shut down after losing the connection of the local metadata store in the default configuration. However, before the ZK client is reconnected, the events of BK online and offline are lost, resulting in incorrect BK info in the memory. You can reproduce the issue by the test `BkEnsemblesChaosTest. testBookieInfoIsCorrectEvenIfLostNotificationDueToZKClientReconnect`(90% probability of reproduce of the issue, run it again if the issue does not occur) Modifications: Refresh BK info in memory after the ZK client is reconnected. (cherry picked from commit db20035)
…he (apache#20763)" This reverts commit 111c14d.
…ageId read reaches lastReadId (apache#20988) (cherry picked from commit 9e2195c)
…letion of compaction (apache#21067) (cherry picked from commit bb9c9b4)
…ed on topic, when dedup is enabled and no producer is there (apache#20951) (cherry picked from commit 30073db)
…n topic level (apache#21041) (cherry picked from commit d3a6df3)
…ata when doing lookup (apache#21063) Motivation: If we set `allowAutoTopicCreationType` to `PARTITIONED`, the flow of the create topic progress is like the below: 1. `Client-side`: Lookup topic to get partitioned topic metadata to create a producer. 1. `Broker-side`: Create partitioned topic metadata. 1. `Broker-side`: response `{"partitions":3}`. 1. `Client-side`: Create separate connections for each partition of the topic. 1. `Broker-side`: Receive 3 connect requests and create 3 partition-topics. In the `step 2` above, the flow of the progress is like the below: 1. Check the policy of topic auto-creation( the policy is `{allowAutoTopicCreationType=PARTITIONED, defaultNumPartitions=3}` ) 1. Check the partitioned topic metadata already exists. 1. Try to create the partitioned topic metadata if it does not exist. 1. If created failed by the partitioned topic metadata already exists( maybe another broker is also creating now), read partitioned topic metadata from the metadata store and respond to the client. There is a race condition that makes the client get non-partitioned metadata of the topic: | time | `broker-1` | `broker-2` | | --- | --- | --- | | 1 | get policy: `PARTITIONED, 3` | get policy: `PARTITIONED, 3` | | 2 | check the partitioned topic metadata already exists | Check the partitioned topic metadata already exists | | 3 | Partitioned topic metadata does not exist, the metadata cache will cache an empty optional for the path | Partitioned topic metadata does not exist, the metadata cache will cache an empty optional for the path | | 4 | | succeed create the partitioned topic metadata | | 5 | Receive a ZK node changed event to invalidate the cache of the partitioned topic metadata | | 6 | Creating the metadata failed due to it already exists | | 7 | Read the partitioned topic metadata again | If `step-5` is executed later than `step-7`, `broker-1` will get an empty optional from the cache of the partitioned topic metadata and respond non-partitioned metadata to the client. **What thing would make the `step-5` is executed later than `step-7`?** Provide a scenario: Such as the issue that the PR apache#20303 fixed, it makes `zk operation` and `zk node changed notifications` executed in different threads: `main-thread of ZK client` and `metadata store thread`. Therefore, the mechanism of the lookup partitioned topic metadata is fragile and we need to optimize it. Modifications: Before reading the partitioned topic metadata again, refresh the cache first. (cherry picked from commit d099ac4)
(cherry picked from commit 671cfb4)
(cherry picked from commit 6ff83b6)
…che#21070) ### Motivation Current, when the producer resend the chunked message like this: - M1: UUID: 0, ChunkID: 0 - M2: UUID: 0, ChunkID: 0 // Resend the first chunk - M3: UUID: 0, ChunkID: 1 When the consumer received the M2, it will find that it's already tracking the UUID:0 chunked messages, and will then discard the message M1 and M2. This will lead to unable to consume the whole chunked message even though it's already persisted in the Pulsar topic. Here is the code logic: https://github.com/apache/pulsar/blob/44a055b8a55078bcf93f4904991598541aa6c1ee/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1436-L1482 The bug can be easily reproduced using the testcase `testResendChunkMessages` introduced by this PR. ### Modifications - When receiving the new duplicated first chunk of a chunked message, the consumer discard the current chunked message context and create a new context to track the following messages. For the case mentioned in Motivation, the M1 will be released and the consumer will assemble M2 and M3 as the chunked message. (cherry picked from commit eb2e3a2)
… was failed (apache#20935) The progress Persist mark deleted position is like this: - persist to BK - If failed to persist to BK, try to persist to ZK But in the current implementation: if the cursor ledger was created failed, Pulsar will not try to persist to ZK. It makes if the cursor ledger created fails, a lot of ack records can not be persisted, and we will get a lot of repeat consumption after the BK recover. Modifications: Try to persist the mark deleted position to ZK if the cursor ledger was created failed (cherry picked from commit 843b830)
…ache#20948) ## Motivation Make the chunk message function work properly when deduplication is enabled. ## Modification ### Only check and store the sequence ID of the last chunk in a chunk message. For example: ```markdown Chunk-1 sequence ID: 0, chunk ID: 0, total chunk: 2 Chunk-2 sequence ID: 0, chunk ID: 1 Chunk-3 sequence ID: 1, chunk ID: 0 total chunk: 3 Chunk-4 sequence ID: 1, chunk ID: 1 Chunk-5 sequence ID: 1, chunk ID: 1 Chunk-6 sequence ID: 1, chunk ID: 2 ``` Only store check and store the sequence ID of Chunk-2 and Chunk-6. **Add a property in the publishContext to determine whether this chunk is the last chunk when persistent completely.** ```java publishContext.setProperty(IS_LAST_CHUNK, Boolean.FALSE); ``` ### Filter and ack duplicated chunks in a chunk message instead of discarding ctx. For example: ```markdown Chunk-1 sequence ID: 0, chunk ID: 0, msgID: 1:1 Chunk-2 sequence ID: 0, chunk ID: 1, msgID: 1:2 Chunk-3 sequence ID: 0, chunk ID: 2, msgID: 1:3 Chunk-4 sequence ID: 0, chunk ID: 1, msgID: 1:4 Chunk-5 sequence ID: 0, chunk ID: 2, msgID: 1:5 Chunk-6 sequence ID: 0, chunk ID: 3, msgID: 1:6 ``` We should filter and ack chunk-4 and chunk-5.
## Motivation Handle ack hole case: For example: ```markdown Chunk-1 sequence ID: 0, chunk ID: 0, msgID: 1:1 Chunk-2 sequence ID: 0, chunk ID: 1, msgID: 1:2 Chunk-3 sequence ID: 0, chunk ID: 0, msgID: 1:3 Chunk-4 sequence ID: 0, chunk ID: 1, msgID: 1:4 Chunk-5 sequence ID: 0, chunk ID: 2, msgID: 1:5 ``` Consumer ack chunk message via ChunkMessageIdImpl that consists of all the chunks in this chunk message(Chunk-3, Chunk-4, Chunk-5). The Chunk-1 and Chunk-2 are not included in the ChunkMessageIdImpl, so we should process it here. ## Modification Ack chunk-1 and chunk-2.
…and schema. (apache#21093) Fixes apache#21075 ### Motivation When the topic is loaded, it will delete the topic-level policy if it is enabled. But if the topic is not loaded, it will directly delete through managed ledger factory. But then we will leave the topic policy there. When the topic is created next time, it will use the old topic policy ### Modifications When deleting the topic, delete the schema and topic policies even if the topic is not loaded.
…ing broker deletion event (apache#21083)
…ickly (apache#21207) (cherry picked from commit e5b0f17)
…isabled allowAutoSubscriptionCreation (apache#22078)
…e-auth-data` flag to update function/sink/source (apache#19450) Co-authored-by: tison <[email protected]> (cherry picked from commit 2b92ed1)
…rect zip/bytecode access (apache#22122) (cherry picked from commit bbc6224)
…-io/solr (apache#22047) Signed-off-by: dependabot[bot] <[email protected]> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> (cherry picked from commit 7a90426)
(cherry picked from commit d9c55b4)
(cherry picked from commit 0fc9f44)
…apache#22101) (cherry picked from commit 48c7e32)
… exist and do not expect to create a new one. apache#21995 (apache#22004) Co-authored-by: Jiwe Guo <[email protected]> (cherry picked from commit 1c652f5)
apache#22099) (cherry picked from commit 8607905)
…ack and delivery concurrency (apache#22090) (cherry picked from commit 1b1cfb5)
(cherry picked from commit 31ed115)
(cherry picked from commit 91de98a)
1. Change to None state before invoking the recovery. 2. Improve the method `checkTopicTransactionBufferState` to see the test result easier. ``` org.awaitility.core.ConditionTimeoutException: Condition with org.apache.pulsar.broker.transaction.buffer.TransactionStablePositionTest was not fulfilled within 10 seconds. at org.awaitility.core.ConditionAwaiter.await(ConditionAwaiter.java:167) at org.awaitility.core.CallableCondition.await(CallableCondition.java:78) at org.awaitility.core.CallableCondition.await(CallableCondition.java:26) at org.awaitility.core.ConditionFactory.until(ConditionFactory.java:985) at org.awaitility.core.ConditionFactory.until(ConditionFactory.java:954) at org.apache.pulsar.broker.transaction.buffer.TransactionStablePositionTest.checkTopicTransactionBufferState(TransactionStablePositionTest.java:239) at org.apache.pulsar.broker.transaction.buffer.TransactionStablePositionTest.testSyncNormalPositionWhenTBRecover(TransactionStablePositionTest.java:229) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:568) at org.testng.internal.invokers.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:139) at org.testng.internal.invokers.TestInvoker.invokeMethod(TestInvoker.java:677) at org.testng.internal.invokers.TestInvoker.invokeTestMethod(TestInvoker.java:221) at org.testng.internal.invokers.MethodRunner.runInSequence(MethodRunner.java:50) at org.testng.internal.invokers.TestInvoker$MethodInvocationAgent.invoke(TestInvoker.java:969) at org.testng.internal.invokers.TestInvoker.invokeTestMethods(TestInvoker.java:194) at org.testng.internal.invokers.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:148) at org.testng.internal.invokers.TestMethodWorker.run(TestMethodWorker.java:128) at java.base/java.util.ArrayList.forEach(ArrayList.java:1511) at org.testng.TestRunner.privateRun(TestRunner.java:829) at org.testng.TestRunner.run(TestRunner.java:602) at org.testng.SuiteRunner.runTest(SuiteRunner.java:437) at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:431) at org.testng.SuiteRunner.privateRun(SuiteRunner.java:391) at org.testng.SuiteRunner.run(SuiteRunner.java:330) at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:52) at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:95) at org.testng.TestNG.runSuitesSequentially(TestNG.java:1256) at org.testng.TestNG.runSuitesLocally(TestNG.java:1176) at org.testng.TestNG.runSuites(TestNG.java:1099) at org.testng.TestNG.run(TestNG.java:1067) at com.intellij.rt.testng.IDEARemoteTestNG.run(IDEARemoteTestNG.java:65) at com.intellij.rt.testng.RemoteTestNGStarter.main(RemoteTestNGStarter.java:105) ``` 1. Change to None state before invoking the recovery. 2. Improve the method `checkTopicTransactionBufferState` to see the test result easier. (cherry picked from commit 3013496)
…ck/deadlock threads (apache#21333) (cherry picked from commit eb9fa63)
…rting broker registry. (apache#22065) (cherry picked from commit baddda5)
…n threshold explicitly, improve getOwnerAsync, and fix other bugs (apache#22064) (apache#22154)
…rce broker upon Assigning and Releasing and handle role change during role init (apache#22112) (apache#22156)
…he amdin (apache#22101)" This reverts commit 6e988ef.
…pache#22149) (cherry picked from commit 74be3fd)
…apache#22163) (cherry picked from commit 6ec473e)
… sources (apache#22184) (cherry picked from commit 207335a)
Sign up for free
to subscribe to this conversation on GitHub.
Already have an account?
Sign in.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
This is a PR created by snbot to trigger the check suite in each repository.