Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

support delete topics using the kafka client #348

Merged
merged 2 commits into from
Jan 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.requests.DescribeConfigsResponse;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;

@Slf4j
class AdminManager {
Expand Down Expand Up @@ -171,4 +172,20 @@ CompletableFuture<Map<ConfigResource, DescribeConfigsResponse.Config>> describeC
});
return resultFuture;
}

public Map<String, Errors> deleteTopics(Set<String> topicsToDelete) {
Map<String, Errors> result = new ConcurrentHashMap<>();
topicsToDelete.forEach(topic -> {
try {
String topicFullName = new KopTopic(topic).getFullName();
admin.topics().deletePartitionedTopic(topicFullName);
result.put(topic, Errors.NONE);
log.info("delete topic {} successfully.", topicFullName);
} catch (PulsarAdminException e) {
log.error("delete topic {} failed, exception: ", topic, e);
result.put(topic, Errors.UNKNOWN_TOPIC_OR_PARTITION);
}
});
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
case DESCRIBE_CONFIGS:
handleDescribeConfigs(kafkaHeaderAndRequest, responseFuture);
break;
case DELETE_TOPICS:
handleDeleteTopics(kafkaHeaderAndRequest, responseFuture);
break;
default:
handleError(kafkaHeaderAndRequest, responseFuture);
}
Expand Down Expand Up @@ -408,6 +411,9 @@ protected void writeAndFlushWhenInactiveChannel(Channel channel) {
protected abstract void
handleWriteTxnMarkers(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> response);

protected abstract void
handleDeleteTopics(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> response);

static class KafkaHeaderAndRequest implements Closeable {

private static final String DEFAULT_CLIENT_HOST = "";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@
import org.apache.kafka.common.requests.CreateTopicsResponse;
import org.apache.kafka.common.requests.DeleteGroupsRequest;
import org.apache.kafka.common.requests.DeleteGroupsResponse;
import org.apache.kafka.common.requests.DeleteTopicsRequest;
import org.apache.kafka.common.requests.DeleteTopicsResponse;
import org.apache.kafka.common.requests.DescribeConfigsRequest;
import org.apache.kafka.common.requests.DescribeConfigsResponse;
import org.apache.kafka.common.requests.DescribeGroupsRequest;
Expand Down Expand Up @@ -1410,6 +1412,15 @@ protected void handleWriteTxnMarkers(KafkaHeaderAndRequest kafkaHeaderAndRequest
});
}

@Override
protected void handleDeleteTopics(KafkaHeaderAndRequest deleteTopics,
CompletableFuture<AbstractResponse> resultFuture) {
checkArgument(deleteTopics.getRequest() instanceof DeleteTopicsRequest);
DeleteTopicsRequest request = (DeleteTopicsRequest) deleteTopics.getRequest();
Set<String> topicsToDelete = request.topics();
resultFuture.complete(new DeleteTopicsResponse(adminManager.deleteTopics(topicsToDelete)));
}

/**
* Write the txn marker to the topic partition.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -321,7 +322,7 @@ public void testGetKafkaTopicNameFromPulsarTopicName() {
assertEquals(localName, getKafkaTopicNameFromPulsarTopicname(topicNamePartition));
}

void createTopicsByKafkaAdmin(AdminClient admin, Map<String, Integer> topicToNumPartitions)
private void createTopicsByKafkaAdmin(AdminClient admin, Map<String, Integer> topicToNumPartitions)
throws ExecutionException, InterruptedException {
final short replicationFactor = 1; // replication factor will be ignored
admin.createTopics(topicToNumPartitions.entrySet().stream().map(entry -> {
Expand All @@ -331,7 +332,7 @@ void createTopicsByKafkaAdmin(AdminClient admin, Map<String, Integer> topicToNum
}).collect(Collectors.toList())).all().get();
}

void verifyTopicsByPulsarAdmin(Map<String, Integer> topicToNumPartitions)
private void verifyTopicsCreatedByPulsarAdmin(Map<String, Integer> topicToNumPartitions)
throws PulsarAdminException {
for (Map.Entry<String, Integer> entry : topicToNumPartitions.entrySet()) {
final String topic = entry.getKey();
Expand All @@ -340,8 +341,22 @@ void verifyTopicsByPulsarAdmin(Map<String, Integer> topicToNumPartitions)
}
}

private void verifyTopicsDeletedByPulsarAdmin(Map<String, Integer> topicToNumPartitions)
throws PulsarAdminException {
for (Map.Entry<String, Integer> entry : topicToNumPartitions.entrySet()) {
final String topic = entry.getKey();
assertEquals(this.admin.topics().getPartitionedTopicMetadata(topic).partitions, 0);
}
}

private void deleteTopicsByKafkaAdmin(AdminClient admin, Set<String> topicsToDelete)
throws ExecutionException, InterruptedException {
admin.deleteTopics(topicsToDelete).all().get();
}


@Test(timeOut = 10000)
public void testCreateTopics() throws Exception {
public void testCreateAndDeleteTopics() throws Exception {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + getKafkaBrokerPort());

Expand All @@ -353,8 +368,12 @@ public void testCreateTopics() throws Exception {
put("my-tenant/my-ns/testCreateTopics-2", 1);
put("persistent://my-tenant/my-ns/testCreateTopics-3", 5);
}};
// create
createTopicsByKafkaAdmin(kafkaAdmin, topicToNumPartitions);
verifyTopicsByPulsarAdmin(topicToNumPartitions);
verifyTopicsCreatedByPulsarAdmin(topicToNumPartitions);
// delete
deleteTopicsByKafkaAdmin(kafkaAdmin, topicToNumPartitions.keySet());
verifyTopicsDeletedByPulsarAdmin(topicToNumPartitions);
}

@Test(timeOut = 10000)
Expand All @@ -377,6 +396,23 @@ public void testCreateInvalidTopics() {
}
}

@Test(timeOut = 10000)
public void testDeleteNotExistedTopics() throws Exception {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + getKafkaBrokerPort());

@Cleanup
AdminClient kafkaAdmin = AdminClient.create(props);
Set<String> topics = new HashSet<>();
topics.add("testDeleteNotExistedTopics");
try {
deleteTopicsByKafkaAdmin(kafkaAdmin, topics);
fail();
} catch (ExecutionException e) {
assertTrue(e.getCause() instanceof UnknownTopicOrPartitionException);
}
}

@Test(timeOut = 10000)
public void testDescribeTopics() throws Exception {
Properties props = new Properties();
Expand Down