From 2749652695babbdbbe451c99ccc43deeb5dbff88 Mon Sep 17 00:00:00 2001 From: dockerzhang Date: Tue, 26 Jan 2021 11:40:01 +0800 Subject: [PATCH] support delete topics using the kafka client --- .../pulsar/handlers/kop/AdminManager.java | 17 ++++++++++++ .../handlers/kop/KafkaCommandDecoder.java | 6 +++++ .../handlers/kop/KafkaRequestHandler.java | 11 ++++++++ .../handlers/kop/KafkaRequestHandlerTest.java | 27 ++++++++++++++++--- 4 files changed, 57 insertions(+), 4 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/AdminManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/AdminManager.java index 253a458b04..a8a90ecd0e 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/AdminManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/AdminManager.java @@ -38,6 +38,7 @@ import org.apache.kafka.common.requests.ApiError; import org.apache.kafka.common.requests.DescribeConfigsResponse; import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; @Slf4j class AdminManager { @@ -171,4 +172,20 @@ CompletableFuture> describeC }); return resultFuture; } + + public Map deleteTopics(Set topicsToDelete) { + Map result = new ConcurrentHashMap<>(); + topicsToDelete.forEach(topic -> { + try { + String topicFullName = new KopTopic(topic).getFullName(); + admin.topics().deletePartitionedTopic(topicFullName); + result.put(topic, Errors.NONE); + log.info("delete topic {} successfully.", topicFullName); + } catch (PulsarAdminException e) { + log.error("delete topic {} failed, exception: ", topic, e); + result.put(topic, Errors.UNKNOWN_TOPIC_OR_PARTITION); + } + }); + return result; + } } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java index 1bdcda495d..0f5a5fa96f 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java @@ -253,6 +253,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception case DESCRIBE_CONFIGS: handleDescribeConfigs(kafkaHeaderAndRequest, responseFuture); break; + case DELETE_TOPICS: + handleDeleteTopics(kafkaHeaderAndRequest, responseFuture); + break; default: handleError(kafkaHeaderAndRequest, responseFuture); } @@ -408,6 +411,9 @@ protected void writeAndFlushWhenInactiveChannel(Channel channel) { protected abstract void handleWriteTxnMarkers(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture response); + protected abstract void + handleDeleteTopics(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture response); + static class KafkaHeaderAndRequest implements Closeable { private static final String DEFAULT_CLIENT_HOST = ""; diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java index d53078dc4c..590cda4901 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java @@ -87,6 +87,8 @@ import org.apache.kafka.common.requests.CreateTopicsResponse; import org.apache.kafka.common.requests.DeleteGroupsRequest; import org.apache.kafka.common.requests.DeleteGroupsResponse; +import org.apache.kafka.common.requests.DeleteTopicsRequest; +import org.apache.kafka.common.requests.DeleteTopicsResponse; import org.apache.kafka.common.requests.DescribeConfigsRequest; import org.apache.kafka.common.requests.DescribeConfigsResponse; import org.apache.kafka.common.requests.DescribeGroupsRequest; @@ -1410,6 +1412,15 @@ protected void handleWriteTxnMarkers(KafkaHeaderAndRequest kafkaHeaderAndRequest }); } + @Override + protected void handleDeleteTopics(KafkaHeaderAndRequest deleteTopics, + CompletableFuture resultFuture) { + checkArgument(deleteTopics.getRequest() instanceof DeleteTopicsRequest); + DeleteTopicsRequest request = (DeleteTopicsRequest) deleteTopics.getRequest(); + Set topicsToDelete = request.topics(); + resultFuture.complete(new DeleteTopicsResponse(adminManager.deleteTopics(topicsToDelete))); + } + /** * Write the txn marker to the topic partition. * diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java index 6dd06aad8e..21cf192b9d 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java @@ -42,6 +42,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -321,7 +322,7 @@ public void testGetKafkaTopicNameFromPulsarTopicName() { assertEquals(localName, getKafkaTopicNameFromPulsarTopicname(topicNamePartition)); } - void createTopicsByKafkaAdmin(AdminClient admin, Map topicToNumPartitions) + private void createTopicsByKafkaAdmin(AdminClient admin, Map topicToNumPartitions) throws ExecutionException, InterruptedException { final short replicationFactor = 1; // replication factor will be ignored admin.createTopics(topicToNumPartitions.entrySet().stream().map(entry -> { @@ -331,7 +332,7 @@ void createTopicsByKafkaAdmin(AdminClient admin, Map topicToNum }).collect(Collectors.toList())).all().get(); } - void verifyTopicsByPulsarAdmin(Map topicToNumPartitions) + private void verifyTopicsCreatedByPulsarAdmin(Map topicToNumPartitions) throws PulsarAdminException { for (Map.Entry entry : topicToNumPartitions.entrySet()) { final String topic = entry.getKey(); @@ -340,8 +341,22 @@ void verifyTopicsByPulsarAdmin(Map topicToNumPartitions) } } + private void verifyTopicsDeletedByPulsarAdmin(Map topicToNumPartitions) + throws PulsarAdminException { + for (Map.Entry entry : topicToNumPartitions.entrySet()) { + final String topic = entry.getKey(); + assertEquals(this.admin.topics().getPartitionedTopicMetadata(topic).partitions, 0); + } + } + + private void deleteTopicsByKafkaAdmin(AdminClient admin, Set topicsToDelete) + throws ExecutionException, InterruptedException { + admin.deleteTopics(topicsToDelete).all().get(); + } + + @Test(timeOut = 10000) - public void testCreateTopics() throws Exception { + public void testCreateAndDeleteTopics() throws Exception { Properties props = new Properties(); props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + getKafkaBrokerPort()); @@ -353,8 +368,12 @@ public void testCreateTopics() throws Exception { put("my-tenant/my-ns/testCreateTopics-2", 1); put("persistent://my-tenant/my-ns/testCreateTopics-3", 5); }}; + // create createTopicsByKafkaAdmin(kafkaAdmin, topicToNumPartitions); - verifyTopicsByPulsarAdmin(topicToNumPartitions); + verifyTopicsCreatedByPulsarAdmin(topicToNumPartitions); + // delete + deleteTopicsByKafkaAdmin(kafkaAdmin, topicToNumPartitions.keySet()); + verifyTopicsDeletedByPulsarAdmin(topicToNumPartitions); } @Test(timeOut = 10000)