Skip to content

Commit

Permalink
[fix][transaction] Contains UUID when auto topic creation (streamnati…
Browse files Browse the repository at this point in the history
…ve#1993)

### Motivation

In PR streamnative#1943 , it introduced
using UUID to distinguish deleted/created topics, but when the topic is
auto-created, the UUID is not contained in the properties.


### Modifications

Contains UUID when auto topic creation
  • Loading branch information
Demogorgon314 authored Aug 9, 2023
1 parent 0c9f946 commit 26980a3
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.streamnative.pulsar.handlers.kop.exceptions.KoPTopicException;
import io.streamnative.pulsar.handlers.kop.storage.PartitionLog;
import io.streamnative.pulsar.handlers.kop.utils.KopTopic;
import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperation;
import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperationPurgatory;
Expand Down Expand Up @@ -136,7 +137,7 @@ public CompletableFuture<Map<String, ApiError>> createTopicsAsync(
return;
}
admin.topics().createPartitionedTopicAsync(kopTopic.getFullName(), numPartitions,
Map.of("kafkaTopicUUID", UUID.randomUUID().toString()))
Map.of(PartitionLog.KAFKA_TOPIC_UUID_PROPERTY_NAME, UUID.randomUUID().toString()))
.whenComplete((ignored, e) -> {
if (e == null) {
if (log.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
Expand Down Expand Up @@ -539,7 +540,9 @@ private CompletableFuture<TopicAndMetadata> getTopicMetadataAsync(String topic,
return;
}
if (allowed) {
admin.topics().createPartitionedTopicAsync(topic, defaultNumPartitions)
Map<String, String> properties =
Map.of(PartitionLog.KAFKA_TOPIC_UUID_PROPERTY_NAME, UUID.randomUUID().toString());
admin.topics().createPartitionedTopicAsync(topic, defaultNumPartitions, properties)
.whenComplete((__, createException) -> {
if (createException == null) {
future.complete(TopicAndMetadata.success(topic, defaultNumPartitions));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ class AnalyzeResult {
@Slf4j
public class PartitionLog {

public static final String KAFKA_TOPIC_UUID_PROPERTY_NAME = "kafkaTopicUUID";
private static final String PID_PREFIX = "KOP-PID-PREFIX";

private static final KopLogValidator.CompressionCodec DEFAULT_COMPRESSION =
Expand Down Expand Up @@ -212,7 +213,7 @@ private CompletableFuture<Void> loadTopicProperties() {
this.topicProperties = properties;
log.info("Topic properties for {} are {}", fullPartitionName, properties);
this.entryFormatter = buildEntryFormatter(topicProperties);
this.kafkaTopicUUID = properties.get("kafkaTopicUUID");
this.kafkaTopicUUID = properties.get(KAFKA_TOPIC_UUID_PROPERTY_NAME);
this.producerStateManager =
new ProducerStateManager(
fullPartitionName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
Expand All @@ -38,6 +39,7 @@
import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadata;
import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadataManager;
import io.streamnative.pulsar.handlers.kop.offset.OffsetAndMetadata;
import io.streamnative.pulsar.handlers.kop.storage.PartitionLog;
import io.streamnative.pulsar.handlers.kop.utils.KafkaResponseUtils;
import io.streamnative.pulsar.handlers.kop.utils.TopicNameUtils;
import java.net.InetSocketAddress;
Expand Down Expand Up @@ -1024,6 +1026,9 @@ public void testBrokerHandleTopicMetadataRequestAllowAutoTopicCreation(boolean b
expectedError = null;
assertEquals(1, metadataResponse.topicMetadata().size());
assertEquals(topicName, metadataResponse.topicMetadata().iterator().next().topic());
Map<String, String> properties = admin.topics().getProperties(topicName);
assertFalse(properties.isEmpty());
assertTrue(properties.containsKey(PartitionLog.KAFKA_TOPIC_UUID_PROPERTY_NAME));
} else {
// topic does not exist and it is not created
expectedError = Errors.UNKNOWN_TOPIC_OR_PARTITION;
Expand Down

0 comments on commit 26980a3

Please sign in to comment.