Skip to content

Commit

Permalink
change method following PIP-12, add more methods for pattern sub in c…
Browse files Browse the repository at this point in the history
…lient
  • Loading branch information
jiazhai committed Feb 5, 2018
1 parent 6132aee commit ff4bc1a
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,14 @@
*/
package org.apache.pulsar.client.impl;

import static org.mockito.Mockito.doReturn;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertSame;
import static org.testng.Assert.assertTrue;

import com.google.common.collect.Lists;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.IntStream;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerConfiguration;
import org.apache.pulsar.client.api.Message;
Expand All @@ -52,7 +49,9 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase {
@Override
@BeforeMethod
public void setup() throws Exception {

// set isTcpLookup = true, to use BinaryProtoLookupService to get topics for a pattern.
isTcpLookup = true;
super.internalSetup();
}

@Override
Expand All @@ -61,69 +60,64 @@ public void cleanup() throws Exception {
super.internalCleanup();
}

// set isTcpLookup = true, to use BinaryProtoLookupService to get topics for a pattern.
// verify consumer create success, and works well.
@Test(timeOut = testTimeout)
public void testBinaryProtoToGetTopicsOfNamespace() throws Exception {
isTcpLookup = true;
super.internalSetup();

String key = "BinaryProtoToGetTopics";
final String subscriptionName = "my-ex-subscription-" + key;
final String topicName1 = "persistent://prop/use/ns-abc/pattern-topic-1-" + key;
final String topicName2 = "persistent://prop/use/ns-abc/pattern-topic-2-" + key;
final String topicName3 = "persistent://prop/use/ns-abc/pattern-topic-3-" + key;
Pattern pattern = Pattern.compile("pattern-topic.*");
String namespace = "prop/use/ns-abc";
List<String> topicNames = Lists.newArrayList(topicName1, topicName2, topicName3);

// 0. mock NamespaceService.getListOfDestinations, it is used in BinaryProtoLookupService.getTopicsUnderNamespace
NamespaceService nss = pulsar.getNamespaceService();
doReturn(topicNames).when(nss).getListOfDestinations("prop", "use", "ns-abc");
String subscriptionName = "my-ex-subscription-" + key;
String topicName1 = "persistent://prop/use/ns-abc/pattern-topic-1-" + key;
String topicName2 = "persistent://prop/use/ns-abc/pattern-topic-2-" + key;
String topicName3 = "persistent://prop/use/ns-abc/pattern-topic-3-" + key;
Pattern pattern = Pattern.compile("persistent://prop/use/ns-abc/pattern-topic.*");

// 1. create partition
admin.properties().createProperty("prop", new PropertyAdmin());
admin.persistentTopics().createPartitionedTopic(topicName2, 2);
admin.persistentTopics().createPartitionedTopic(topicName3, 3);

// 2. Create consumer, this should success
// 2. create producer
ProducerConfiguration producerConfiguration = new ProducerConfiguration();
producerConfiguration.setMessageRoutingMode(MessageRoutingMode.RoundRobinPartition);
String messagePredicate = "my-message-" + key + "-";
int totalMessages = 30;

Producer producer1 = pulsarClient.createProducer(topicName1);
Producer producer2 = pulsarClient.createProducer(topicName2, producerConfiguration);
Producer producer3 = pulsarClient.createProducer(topicName3, producerConfiguration);

// 3. Create consumer, this should success
ConsumerConfiguration conf = new ConsumerConfiguration();
conf.setReceiverQueueSize(4);
conf.setAckTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS);
conf.setSubscriptionType(SubscriptionType.Shared);
Consumer consumer = pulsarClient.subscribeAsync(namespace, pattern, subscriptionName, conf).get();
Consumer consumer = pulsarClient.subscribeAsync(pattern, subscriptionName, conf).get();
assertTrue(consumer instanceof PatternTopicsConsumerImpl);

// 3. verify consumer get methods, to get right number of partitions and topics.
// 4. verify consumer get methods, to get right number of partitions and topics.
assertSame(pattern, ((PatternTopicsConsumerImpl) consumer).getPattern());
List<String> topics = ((PatternTopicsConsumerImpl) consumer).getPartitionedTopics();
List<ConsumerImpl> consumers = ((PatternTopicsConsumerImpl) consumer).getConsumers();

assertEquals(topics.size(), 6);
assertEquals(consumers.size(), 6);
assertEquals(((PatternTopicsConsumerImpl) consumer).getTopics().size(), 3);

topics.forEach(topic -> log.info("topic: {}", topic));
consumers.forEach(c -> log.info("consumer: {}", c.getTopic()));

IntStream.range(0, topics.size()).forEach(index ->
assertTrue(topics.get(index).equals(consumers.get(index).getTopic())));

assertTrue(((PatternTopicsConsumerImpl) consumer).getTopics().size() == 3);

// 4. producer publish messages
ProducerConfiguration producerConfiguration = new ProducerConfiguration();
producerConfiguration.setMessageRoutingMode(MessageRoutingMode.RoundRobinPartition);
final String messagePredicate = "my-message-" + key + "-";
final int totalMessages = 30;

Producer producer1 = pulsarClient.createProducer(topicName1);
Producer producer2 = pulsarClient.createProducer(topicName2, producerConfiguration);
Producer producer3 = pulsarClient.createProducer(topicName3, producerConfiguration);
((PatternTopicsConsumerImpl) consumer).getTopics().forEach(topic -> log.info("getTopics topic: {}", topic));

// 5. produce data
for (int i = 0; i < totalMessages / 3; i++) {
producer1.send((messagePredicate + "producer1-" + i).getBytes());
producer2.send((messagePredicate + "producer2-" + i).getBytes());
producer3.send((messagePredicate + "producer3-" + i).getBytes());
}

// 5. should receive all the message
// 6. should receive all the message
int messageSet = 0;
Message message = consumer.receive();
do {
Expand All @@ -141,4 +135,5 @@ public void testBinaryProtoToGetTopicsOfNamespace() throws Exception {
producer2.close();
producer3.close();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ public static PulsarClient create(String serviceUrl, ClientConfiguration conf) t


/**
* Subscribe to the given topic and subscription combination with default {@code ConsumerConfiguration}
* Subscribe to the given topics and subscription combination with default {@code ConsumerConfiguration}
*
* @param topics
* The collection of topic names, they should be under same namespace
Expand Down Expand Up @@ -287,9 +287,6 @@ public static PulsarClient create(String serviceUrl, ClientConfiguration conf) t
Consumer subscribe(Collection<String> topics, String subscription, ConsumerConfiguration conf)
throws PulsarClientException;

CompletableFuture<Consumer> subscribeAsync(String namespace, Pattern topicsPattern,
String subscription, ConsumerConfiguration conf);

/**
* Asynchronously subscribe to the given topics and subscription combination using given
* {@code ConsumerConfiguration}
Expand All @@ -306,4 +303,60 @@ CompletableFuture<Consumer> subscribeAsync(Collection<String> topics,
String subscription,
ConsumerConfiguration conf);

/**
* Subscribe to the topics with given regex pattern and subscription combination with default
* {@code ConsumerConfiguration}
*
* @param topics
* The collection of topic names, they should be under same namespace
* @param subscription
* The name of the subscription
* @return The {@code Consumer} object
* @throws PulsarClientException
*/
Consumer subscribe(Pattern topicsPattern, String subscription) throws PulsarClientException;

/**
* Asynchronously subscribe to the topics with given regex pattern and subscription combination with
* default {@code ConsumerConfiguration}
*
* @param topics
* The collection of topic names, they should be under same namespace
* @param subscription
* The name of the subscription
* @return Future of the {@code Consumer} object
*/
CompletableFuture<Consumer> subscribeAsync(Pattern topicsPattern, String subscription);

/**
* Subscribe to the topics with given regex pattern and subscription combination using given
* {@code ConsumerConfiguration}
*
* @param topicsPattern
* The regex pattern that wanted to subscribe. e.g. "persistent://prop/cluster/ns/abc.*"
* @param subscription
* The name of the subscription
* @param conf
* The {@code ConsumerConfiguration} object
* @return Future of the {@code Consumer} object
*/
Consumer subscribe(Pattern topicsPattern, String subscription, ConsumerConfiguration conf)
throws PulsarClientException;

/**
* Asynchronously subscribe to the topics with given regex pattern and subscription combination using given
* {@code ConsumerConfiguration}
*
* @param topicsPattern
* The regex pattern that wanted to subscribe. e.g. "persistent://prop/cluster/ns/abc.*"
* @param subscription
* The name of the subscription
* @param conf
* The {@code ConsumerConfiguration} object
* @return Future of the {@code Consumer} object
*/
CompletableFuture<Consumer> subscribeAsync(Pattern topicsPattern,
String subscription,
ConsumerConfiguration conf);

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static java.lang.String.format;

import com.google.common.collect.Lists;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.Arrays;
Expand Down Expand Up @@ -197,7 +198,17 @@ private CompletableFuture<List<String>> getTopicsUnderNamespace(InetSocketAddres
if (log.isDebugEnabled()) {
log.debug("[{}] Success get topics list in request: {}", namespace.toString(), requestId);
}
topicsFuture.complete(topicsList);

// do not keep partition part of topic name
List<String> result = Lists.newArrayList();
topicsList.forEach(topic -> {
String filtered = DestinationName.get(topic).getPartitionedTopicName();
if (!result.contains(filtered)) {
result.add(filtered);
}
});

topicsFuture.complete(result);
}).exceptionally((e) -> {
log.warn("[{}] failed to get topics list: {}", namespace.toString(), e.getCause().getMessage(), e);
topicsFuture.completeExceptionally(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,13 @@
*/
package org.apache.pulsar.client.impl;

import com.fasterxml.jackson.core.type.TypeReference;
import java.net.InetAddress;
import com.google.common.collect.Lists;
import io.netty.channel.EventLoopGroup;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;

import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.api.ClientConfiguration;
import org.apache.pulsar.client.api.PulsarClientException;
Expand All @@ -38,8 +36,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.netty.channel.EventLoopGroup;

class HttpLookupService implements LookupService {

private final HttpClient httpClient;
Expand Down Expand Up @@ -96,10 +92,25 @@ public String getServiceUrl() {

@Override
public CompletableFuture<List<String>> getTopicsUnderNamespace(NamespaceName namespace) {
CompletableFuture<String[]> result = httpClient
.get(String.format("admin/namespaces/%s/destinations", namespace.getLookupName()), String[].class);

return result.thenApply(Arrays::asList);
CompletableFuture<List<String>> future = new CompletableFuture<>();
httpClient
.get(String.format("admin/namespaces/%s/destinations", namespace.getLookupName()), String[].class)
.thenAccept(topics -> {
List<String> result = Lists.newArrayList();
// do not keep partition part of topic name
Arrays.asList(topics).forEach(topic -> {
String filtered = DestinationName.get(topic).getPartitionedTopicName();
if (!result.contains(filtered)) {
result.add(filtered);
}
});
future.complete(result);})
.exceptionally(ex -> {
log.warn("Failed to getTopicsUnderNamespace namespace: {}.", namespace, ex.getMessage());
future.completeExceptionally(ex);
return null;
});
return future;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,8 +341,39 @@ public CompletableFuture<Consumer> subscribeAsync(Collection<String> topics,
}

@Override
public CompletableFuture<Consumer> subscribeAsync(String namespace, Pattern topicsPattern,
String subscription, ConsumerConfiguration conf) {
public Consumer subscribe(Pattern topicsPattern, final String subscription) throws PulsarClientException {
return subscribe(topicsPattern, subscription, new ConsumerConfiguration());
}

@Override
public Consumer subscribe(Pattern topicsPattern,
String subscription,
ConsumerConfiguration conf)
throws PulsarClientException {
try {
return subscribeAsync(topicsPattern, subscription, conf).get();
} catch (ExecutionException e) {
Throwable t = e.getCause();
if (t instanceof PulsarClientException) {
throw (PulsarClientException) t;
} else {
throw new PulsarClientException(t);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarClientException(e);
}
}

@Override
public CompletableFuture<Consumer> subscribeAsync(Pattern topicsPattern, String subscription) {
return subscribeAsync(topicsPattern, subscription, new ConsumerConfiguration());
}

@Override
public CompletableFuture<Consumer> subscribeAsync(Pattern topicsPattern,
String subscription,
ConsumerConfiguration conf) {
if (state.get() != State.Open) {
return FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException("Client already closed"));
}
Expand All @@ -355,17 +386,18 @@ public CompletableFuture<Consumer> subscribeAsync(String namespace, Pattern topi
new PulsarClientException.InvalidConfigurationException("Consumer configuration undefined"));
}

NamespaceName namespaceName = NamespaceName.get(namespace);
String regex = topicsPattern.pattern();
DestinationName destination = DestinationName.get(regex);
NamespaceName namespaceName = destination.getNamespaceObject();

CompletableFuture<Consumer> consumerSubscribedFuture = new CompletableFuture<>();

lookup.getTopicsUnderNamespace(namespaceName)
.thenAccept(topics -> {
List<String> topicsList = topics.stream()
.filter(topic -> {
DestinationName destinationName = DestinationName.get(topic);
checkState(destinationName.getNamespaceObject().equals(namespaceName));
return topicsPattern.matcher(destinationName.getLocalName()).matches();
return topicsPattern.matcher(destinationName.toString()).matches();
})
.collect(Collectors.toList());

Expand All @@ -378,7 +410,7 @@ public CompletableFuture<Consumer> subscribeAsync(String namespace, Pattern topi
}
})
.exceptionally(ex -> {
log.warn("[{}] Failed to get topics under namespace", namespace);
log.warn("[{}] Failed to get topics under namespace", namespaceName);
consumerSubscribedFuture.completeExceptionally(ex);
return null;
});
Expand Down

0 comments on commit ff4bc1a

Please sign in to comment.