Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Commit

Permalink
support delete topics using the kafka client (#348)
Browse files Browse the repository at this point in the history
fixes #347
  • Loading branch information
dockerzhang authored and BewareMyPower committed Jun 29, 2021
1 parent 73b4e13 commit e1f4efb
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.requests.DescribeConfigsResponse;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;

@Slf4j
class AdminManager {
Expand Down Expand Up @@ -175,4 +176,20 @@ CompletableFuture<Map<ConfigResource, DescribeConfigsResponse.Config>> describeC
});
return resultFuture;
}

public Map<String, Errors> deleteTopics(Set<String> topicsToDelete) {
Map<String, Errors> result = new ConcurrentHashMap<>();
topicsToDelete.forEach(topic -> {
try {
String topicFullName = new KopTopic(topic).getFullName();
admin.topics().deletePartitionedTopic(topicFullName);
result.put(topic, Errors.NONE);
log.info("delete topic {} successfully.", topicFullName);
} catch (PulsarAdminException e) {
log.error("delete topic {} failed, exception: ", topic, e);
result.put(topic, Errors.UNKNOWN_TOPIC_OR_PARTITION);
}
});
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
case DESCRIBE_CONFIGS:
handleDescribeConfigs(kafkaHeaderAndRequest, responseFuture);
break;
case DELETE_TOPICS:
handleDeleteTopics(kafkaHeaderAndRequest, responseFuture);
break;
default:
handleError(kafkaHeaderAndRequest, responseFuture);
}
Expand Down Expand Up @@ -372,6 +375,9 @@ protected void writeAndFlushWhenInactiveChannel(Channel channel) {
protected abstract void
handleDescribeConfigs(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> response);

protected abstract void
handleDeleteTopics(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> response);

static class KafkaHeaderAndRequest implements Closeable {

private static final String DEFAULT_CLIENT_HOST = "";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@
import org.apache.kafka.common.requests.CreateTopicsResponse;
import org.apache.kafka.common.requests.DeleteGroupsRequest;
import org.apache.kafka.common.requests.DeleteGroupsResponse;
import org.apache.kafka.common.requests.DeleteTopicsRequest;
import org.apache.kafka.common.requests.DeleteTopicsResponse;
import org.apache.kafka.common.requests.DescribeConfigsRequest;
import org.apache.kafka.common.requests.DescribeConfigsResponse;
import org.apache.kafka.common.requests.DescribeGroupsRequest;
Expand Down Expand Up @@ -1207,6 +1209,15 @@ protected void handleDescribeConfigs(KafkaHeaderAndRequest describeConfigs,
});
}

@Override
protected void handleDeleteTopics(KafkaHeaderAndRequest deleteTopics,
CompletableFuture<AbstractResponse> resultFuture) {
checkArgument(deleteTopics.getRequest() instanceof DeleteTopicsRequest);
DeleteTopicsRequest request = (DeleteTopicsRequest) deleteTopics.getRequest();
Set<String> topicsToDelete = request.topics();
resultFuture.complete(new DeleteTopicsResponse(adminManager.deleteTopics(topicsToDelete)));
}

private SaslHandshakeResponse checkSaslMechanism(String mechanism) {
if (getKafkaConfig().getSaslAllowedMechanisms().contains(mechanism)) {
return new SaslHandshakeResponse(Errors.NONE, getKafkaConfig().getSaslAllowedMechanisms());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -316,7 +317,7 @@ public void testGetKafkaTopicNameFromPulsarTopicName() {
assertEquals(localName, getKafkaTopicNameFromPulsarTopicname(topicNamePartition));
}

void createTopicsByKafkaAdmin(AdminClient admin, Map<String, Integer> topicToNumPartitions)
private void createTopicsByKafkaAdmin(AdminClient admin, Map<String, Integer> topicToNumPartitions)
throws ExecutionException, InterruptedException {
final short replicationFactor = 1; // replication factor will be ignored
admin.createTopics(topicToNumPartitions.entrySet().stream().map(entry -> {
Expand All @@ -326,7 +327,7 @@ void createTopicsByKafkaAdmin(AdminClient admin, Map<String, Integer> topicToNum
}).collect(Collectors.toList())).all().get();
}

void verifyTopicsByPulsarAdmin(Map<String, Integer> topicToNumPartitions)
private void verifyTopicsCreatedByPulsarAdmin(Map<String, Integer> topicToNumPartitions)
throws PulsarAdminException {
for (Map.Entry<String, Integer> entry : topicToNumPartitions.entrySet()) {
final String topic = entry.getKey();
Expand All @@ -335,8 +336,22 @@ void verifyTopicsByPulsarAdmin(Map<String, Integer> topicToNumPartitions)
}
}

private void verifyTopicsDeletedByPulsarAdmin(Map<String, Integer> topicToNumPartitions)
throws PulsarAdminException {
for (Map.Entry<String, Integer> entry : topicToNumPartitions.entrySet()) {
final String topic = entry.getKey();
assertEquals(this.admin.topics().getPartitionedTopicMetadata(topic).partitions, 0);
}
}

private void deleteTopicsByKafkaAdmin(AdminClient admin, Set<String> topicsToDelete)
throws ExecutionException, InterruptedException {
admin.deleteTopics(topicsToDelete).all().get();
}


@Test(timeOut = 10000)
public void testCreateTopics() throws Exception {
public void testCreateAndDeleteTopics() throws Exception {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + getKafkaBrokerPort());

Expand All @@ -348,8 +363,12 @@ public void testCreateTopics() throws Exception {
put("my-tenant/my-ns/testCreateTopics-2", 1);
put("persistent://my-tenant/my-ns/testCreateTopics-3", 5);
}};
// create
createTopicsByKafkaAdmin(kafkaAdmin, topicToNumPartitions);
verifyTopicsByPulsarAdmin(topicToNumPartitions);
verifyTopicsCreatedByPulsarAdmin(topicToNumPartitions);
// delete
deleteTopicsByKafkaAdmin(kafkaAdmin, topicToNumPartitions.keySet());
verifyTopicsDeletedByPulsarAdmin(topicToNumPartitions);
}

@Test(timeOut = 10000)
Expand All @@ -373,6 +392,23 @@ public void testCreateInvalidTopics() {
}
}

@Test(timeOut = 10000)
public void testDeleteNotExistedTopics() throws Exception {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + getKafkaBrokerPort());

@Cleanup
AdminClient kafkaAdmin = AdminClient.create(props);
Set<String> topics = new HashSet<>();
topics.add("testDeleteNotExistedTopics");
try {
deleteTopicsByKafkaAdmin(kafkaAdmin, topics);
fail();
} catch (ExecutionException e) {
assertTrue(e.getCause() instanceof UnknownTopicOrPartitionException);
}
}

@Test(timeOut = 10000)
public void testDescribeTopics() throws Exception {
Properties props = new Properties();
Expand Down

0 comments on commit e1f4efb

Please sign in to comment.