Skip to content

Commit

Permalink
change the design of APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
poorbarcode committed May 2, 2024
1 parent c46bd23 commit 5e7372b
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -308,14 +308,32 @@ static ClientBuilder builder() {
*
* <p>This can be used to discover the partitions and create {@link Reader}, {@link Consumer} or {@link Producer}
* instances directly on a particular partition.
*
* @Deprecated it is not suggested to use now; please use {@link #getPartitionsForTopic(String, boolean)}.
* @param topic
* the topic name
* @return a future that will yield a list of the topic partitions or {@link PulsarClientException} if there was any
* error in the operation.
*
* @since 2.3.0
*/
CompletableFuture<List<String>> getPartitionsForTopic(String topic);
@Deprecated
default CompletableFuture<List<String>> getPartitionsForTopic(String topic) {
return getPartitionsForTopic(topic, true);
}

/**
* 1. Get the partitions if the topic exists. Return "[{partition-0}, {partition-1}....{partition-n}}]" if a
* partitioned topic exists; return "[{topic}]" if a non-partitioned topic exists.
* 2. When {@param createIfAutoCreationEnabled} is "false", neither the partitioned topic nor non-partitioned
* topic does not exist. You will get an {@link PulsarClientException.NotFoundException}.
* 2-1. You will get a {@link PulsarClientException.NotSupportedException} if the broker's version is an older
* one that does not support this feature and the Pulsar client is using a binary protocol "serviceUrl".
* 3. When {@param createIfAutoCreationEnabled} is "true," it will trigger an auto-creation for this topic(using
* the default topic auto-creation strategy you set for the broker), and the corresponding result is returned.
* For the result, see case 1.
* @version 3.3.0.
*/
CompletableFuture<List<String>> getPartitionsForTopic(String topic, boolean createIfAutoCreationEnabled);

/**
* Close the PulsarClient and release all the resources.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,25 +59,16 @@ public interface LookupService extends AutoCloseable {
CompletableFuture<LookupTopicResult> getBroker(TopicName topicName);

/**
* @param topicName topic-name
* @return {@link PartitionedTopicMetadata} for a given topic.
* @throws PulsarClientException.TopicDoesNotExistException, PulsarClientException.NotFoundException if the topic
* not exists and will not trigger a creation(in other words: disabled topic auto-creation).
* @Deprecated please call {@link #getPartitionedTopicMetadata(TopicName, boolean)}.
*/
@Deprecated
default CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(TopicName topicName) {
return getPartitionedTopicMetadata(topicName, true);
}

/**
* @param topicName topic-name
* @param createIfAutoCreationEnabled create partitioned topic metadata if auto-creation is enabled, this method
* will not create the partitions.
* @return {@link PartitionedTopicMetadata} for a given topic.
* @throws PulsarClientException.TopicDoesNotExistException, PulsarClientException.NotFoundException if the topic
* not exists and will not trigger a creation(in other words: {@param createIfAutoCreationEnabled} is false
* or disabled topic auto-creation).
* 1.Get the partitions if the topic exists. Return "{partition: n}" if a partitioned topic exists;
* return "{partition: 0}" if a non-partitioned topic exists.
* 2. When {@param createIfAutoCreationEnabled} is "false," neither partitioned topic nor non-partitioned topic
* does not exist. You will get an {@link PulsarClientException.NotFoundException}.
* 2-1. You will get a {@link PulsarClientException.NotSupportedException} if the broker's version is an older
* one that does not support this feature and the Pulsar client is using a binary protocol "serviceUrl".
* 3.When {@param createIfAutoCreationEnabled} is "true," it will trigger an auto-creation for this topic(using
* the default topic auto-creation strategy you set for the broker), and the corresponding result is returned.
* For the result, see case 1.
* @version 3.3.0.
*/
CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(TopicName topicName,
boolean createIfAutoCreationEnabled);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1123,8 +1123,8 @@ private void getPartitionedTopicMetadata(TopicName topicName,
}

@Override
public CompletableFuture<List<String>> getPartitionsForTopic(String topic) {
return getPartitionedTopicMetadata(topic, false).thenApply(metadata -> {
public CompletableFuture<List<String>> getPartitionsForTopic(String topic, boolean createIfAutoCreationEnabled) {
return getPartitionedTopicMetadata(topic, createIfAutoCreationEnabled).thenApply(metadata -> {
if (metadata.partitions > 0) {
TopicName topicName = TopicName.get(topic);
List<String> partitions = new ArrayList<>(metadata.partitions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ private static void setFeatureFlags(FeatureFlags flags) {
flags.setSupportsAuthRefresh(true);
flags.setSupportsBrokerEntryMetadata(true);
flags.setSupportsPartialProducer(true);
flags.setSupportsBinaryApiGetPartitionedMetaWithParamCreatedFalse(true);
}

public static ByteBuf newConnect(String authMethodName, String authData, int protocolVersion, String libVersion,
Expand Down
1 change: 1 addition & 0 deletions pulsar-common/src/main/proto/PulsarApi.proto
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@ message FeatureFlags {
optional bool supports_broker_entry_metadata = 2 [default = false];
optional bool supports_partial_producer = 3 [default = false];
optional bool supports_topic_watchers = 4 [default = false];
optional bool supports_binary_api_get_partitioned_meta_with_param_created_false = 5 [default = false];
}

message CommandConnected {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ private void handlePartitionMetadataResponse(CommandPartitionedTopicMetadata par
long requestId = proxyConnection.newRequestId();
ByteBuf command;
command = Commands.newPartitionMetadataRequest(topicName.toString(), requestId,
partitionMetadata.isCreateIfAutoCreationEnabled());
partitionMetadata.isMetadataAutoCreationEnabled());
clientCnx.newLookup(command, requestId).whenComplete((r, t) -> {
if (t != null) {
log.warn("[{}] failed to get Partitioned metadata : {}", topicName.toString(),
Expand Down

0 comments on commit 5e7372b

Please sign in to comment.