Skip to content
This repository has been archived by the owner on Apr 1, 2024. It is now read-only.

Commit

Permalink
[broker][admin] Add cmd to remove topic properties (apache#17337)
Browse files Browse the repository at this point in the history
* [broker][admin] Add cmd to remove topic properties

* address comment

* address comment

(cherry picked from commit 7075a5c)
  • Loading branch information
yuruguo authored and liangyepianzhou committed May 10, 2023
1 parent 3179143 commit 6f8dbc7
Show file tree
Hide file tree
Showing 9 changed files with 162 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,7 @@ public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName topicName,
case COMPACT:
case OFFLOAD:
case UNLOAD:
case DELETE_METADATA:
case ADD_BUNDLE_RANGE:
case GET_BUNDLE_RANGE:
case DELETE_BUNDLE_RANGE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -667,7 +667,8 @@ protected CompletableFuture<Void> internalUpdatePropertiesAsync(boolean authorit
return namespaceResources()
.getPartitionedTopicResources().updatePartitionedTopicAsync(topicName,
p -> new PartitionedTopicMetadata(p.partitions,
MapUtils.putAll(p.properties, properties.entrySet().toArray())));
p.properties == null ? properties
: MapUtils.putAll(p.properties, properties.entrySet().toArray())));
});
}
}).thenAccept(__ ->
Expand Down Expand Up @@ -701,6 +702,58 @@ public void updatePropertiesFailed(ManagedLedgerException exception, Object ctx)
return future;
}

protected CompletableFuture<Void> internalRemovePropertiesAsync(boolean authoritative, String key) {
return validateTopicOwnershipAsync(topicName, authoritative)
.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.DELETE_METADATA))
.thenCompose(__ -> {
if (topicName.isPartitioned()) {
return internalRemoveNonPartitionedTopicProperties(key);
} else {
return pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName)
.thenCompose(metadata -> {
if (metadata.partitions == 0) {
return internalRemoveNonPartitionedTopicProperties(key);
}
return namespaceResources()
.getPartitionedTopicResources().updatePartitionedTopicAsync(topicName,
p -> {
if (p.properties != null) {
p.properties.remove(key);
}
return new PartitionedTopicMetadata(p.partitions, p.properties);
});
});
}
}).thenAccept(__ ->
log.info("[{}] remove [{}] properties success with key {}",
clientAppId(), topicName, key));
}

private CompletableFuture<Void> internalRemoveNonPartitionedTopicProperties(String key) {
CompletableFuture<Void> future = new CompletableFuture<>();
pulsar().getBrokerService().getTopicIfExists(topicName.toString())
.thenAccept(opt -> {
if (!opt.isPresent()) {
throw new RestException(Status.NOT_FOUND,
getTopicNotFoundErrorMessage(topicName.toString()));
}
ManagedLedger managedLedger = ((PersistentTopic) opt.get()).getManagedLedger();
managedLedger.asyncDeleteProperty(key, new AsyncCallbacks.UpdatePropertiesCallback() {

@Override
public void updatePropertiesComplete(Map<String, String> properties, Object ctx) {
future.complete(null);
}

@Override
public void updatePropertiesFailed(ManagedLedgerException exception, Object ctx) {
future.completeExceptionally(exception);
}
}, null);
});
return future;
}

protected CompletableFuture<Void> internalCheckTopicExists(TopicName topicName) {
return pulsar().getNamespaceService().checkTopicExists(topicName)
.thenAccept(exist -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -951,6 +951,41 @@ public void updateProperties(
});
}

@DELETE
@Path("/{tenant}/{namespace}/{topic}/properties")
@ApiOperation(value = "Remove the key in properties on the given topic.")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Partitioned topic does not exist"),
@ApiResponse(code = 409, message = "Concurrent modification"),
@ApiResponse(code = 412, message = "Partitioned topic name is invalid"),
@ApiResponse(code = 500, message = "Internal server error")
})
public void removeProperties(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("key") String key,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validatePersistentTopicName(tenant, namespace, encodedTopic);
internalRemovePropertiesAsync(authoritative, key)
.thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
if (!isRedirectException(ex)) {
log.error("[{}] Failed to remove key {} in properties on topic {}",
clientAppId(), key, topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@DELETE
@Path("/{tenant}/{namespace}/{topic}/partitions")
@ApiOperation(value = "Delete a partitioned topic.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -906,6 +906,7 @@ public void testCreateAndGetTopicProperties() throws Exception {
public void testUpdatePartitionedTopicProperties() throws Exception {
final String namespace = "prop-xyz/ns2";
final String topicName = "persistent://" + namespace + "/testUpdatePartitionedTopicProperties";
final String topicNameTwo = "persistent://" + namespace + "/testUpdatePartitionedTopicProperties2";
admin.namespaces().createNamespace(namespace, 20);

// create partitioned topic with properties
Expand Down Expand Up @@ -935,6 +936,25 @@ public void testUpdatePartitionedTopicProperties() throws Exception {
Assert.assertEquals(properties.size(), 2);
Assert.assertEquals(properties.get("key1"), "value11");
Assert.assertEquals(properties.get("key2"), "value2");

// create topic without properties
admin.topics().createPartitionedTopic(topicNameTwo, 2);
properties = admin.topics().getProperties(topicNameTwo);
Assert.assertNull(properties);
// remove key of properties on this topic
admin.topics().removeProperties(topicNameTwo, "key1");
properties = admin.topics().getProperties(topicNameTwo);
Assert.assertNull(properties);
Map<String, String> topicProp = new HashMap<>();
topicProp.put("key1", "value1");
topicProp.put("key2", "value2");
admin.topics().updateProperties(topicNameTwo, topicProp);
properties = admin.topics().getProperties(topicNameTwo);
Assert.assertEquals(properties, topicProp);
admin.topics().removeProperties(topicNameTwo, "key1");
topicProp.remove("key1");
properties = admin.topics().getProperties(topicNameTwo);
Assert.assertEquals(properties, topicProp);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -697,6 +697,24 @@ void updatePartitionedTopic(String topic, int numPartitions, boolean updateLocal
*/
CompletableFuture<Void> updatePropertiesAsync(String topic, Map<String, String> properties);

/**
* Remove the key in properties on a topic.
*
* @param topic
* @param key
* @throws PulsarAdminException
*/
void removeProperties(String topic, String key) throws PulsarAdminException;

/**
* Remove the key in properties on a topic asynchronously.
*
* @param topic
* @param key
* @return
*/
CompletableFuture<Void> removePropertiesAsync(String topic, String key);

/**
* Delete a partitioned topic.
* <p/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,19 @@ public void deletePartitionedTopic(String topic) throws PulsarAdminException {
deletePartitionedTopic(topic, false);
}

@Override
public void removeProperties(String topic, String key) throws PulsarAdminException {
sync(() -> removePropertiesAsync(topic, key));
}

@Override
public CompletableFuture<Void> removePropertiesAsync(String topic, String key) {
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "properties")
.queryParam("key", key);
return asyncDeleteRequest(path);
}

@Override
public CompletableFuture<Void> deletePartitionedTopicAsync(String topic) {
return deletePartitionedTopicAsync(topic, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1429,6 +1429,10 @@ public void topics() throws Exception {
props.put("x", "y,z");
verify(mockTopics).updateProperties("persistent://myprop/clust/ns1/ds1", props);

cmdTopics = new CmdTopics(() -> admin);
cmdTopics.run(split("remove-properties persistent://myprop/clust/ns1/ds1 --key a"));
verify(mockTopics).removeProperties("persistent://myprop/clust/ns1/ds1", "a");

cmdTopics = new CmdTopics(() -> admin);
props = new HashMap<>();
props.put("a", "b");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ public CmdTopics(Supplier<PulsarAdmin> admin) {
jcommander.addCommand("get-partitioned-topic-metadata", new GetPartitionedTopicMetadataCmd());
jcommander.addCommand("get-properties", new GetPropertiesCmd());
jcommander.addCommand("update-properties", new UpdateProperties());
jcommander.addCommand("remove-properties", new RemoveProperties());

jcommander.addCommand("delete-partitioned-topic", new DeletePartitionedCmd());
jcommander.addCommand("peek-messages", new PeekMessages());
Expand Down Expand Up @@ -627,6 +628,21 @@ void run() throws Exception {
}
}

@Parameters(commandDescription = "Remove the key in properties of a topic")
private class RemoveProperties extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;

@Parameter(names = {"--key", "-k"}, description = "The key to remove in the properties of topic")
private String key;

@Override
void run() throws Exception {
String topic = validateTopicName(params);
getTopics().removeProperties(topic, key);
}
}

@Parameters(commandDescription = "Delete a partitioned topic. "
+ "It will also delete all the partitions of the topic if it exists.")
private class DeletePartitionedCmd extends CliCommand {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public enum TopicOperation {

GET_STATS,
GET_METADATA,
DELETE_METADATA,
GET_BACKLOG_SIZE,

SET_REPLICATED_SUBSCRIPTION_STATUS,
Expand Down

0 comments on commit 6f8dbc7

Please sign in to comment.