Skip to content

Commit

Permalink
[feat][broker] Update and remove topic properties (#4)
Browse files Browse the repository at this point in the history
* [broker][admin]Add api for update topic properties (apache#17238)

(cherry picked from commit b21f728)
Signed-off-by: Zixuan Liu <[email protected]>

* [broker][admin] Add cmd to remove topic properties (apache#17337)

* [broker][admin] Add cmd to remove topic properties

* address comment

* address comment

(cherry picked from commit 7075a5c)
Signed-off-by: Zixuan Liu <[email protected]>

* [fix][broker] Fix NPE when updating topic's properties (apache#17352)

Co-authored-by: bjhuxiaohua <[email protected]>
(cherry picked from commit f1d1158)

---------

Co-authored-by: Xiaoyu Hou <[email protected]>
Co-authored-by: Ruguo Yu <[email protected]>
Co-authored-by: Flowermin <[email protected]>
  • Loading branch information
4 people committed Mar 15, 2024
1 parent d2c9798 commit 888bd2f
Show file tree
Hide file tree
Showing 9 changed files with 387 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,7 @@ public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName topicName,
case COMPACT:
case OFFLOAD:
case UNLOAD:
case DELETE_METADATA:
case ADD_BUNDLE_RANGE:
case GET_BUNDLE_RANGE:
case DELETE_BUNDLE_RANGE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerOfflineBacklog;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
Expand Down Expand Up @@ -654,6 +655,113 @@ private CompletableFuture<Map<String, String>> getPropertiesAsync() {
});
}

protected CompletableFuture<Void> internalUpdatePropertiesAsync(boolean authoritative,
Map<String, String> properties) {
if (properties == null || properties.isEmpty()) {
log.warn("[{}] [{}] properties is empty, ignore update", clientAppId(), topicName);
return CompletableFuture.completedFuture(null);
}
return validateTopicOwnershipAsync(topicName, authoritative)
.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.PRODUCE))
.thenCompose(__ -> {
if (topicName.isPartitioned()) {
return internalUpdateNonPartitionedTopicProperties(properties);
} else {
return pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName)
.thenCompose(metadata -> {
if (metadata.partitions == 0) {
return internalUpdateNonPartitionedTopicProperties(properties);
}
return namespaceResources()
.getPartitionedTopicResources().updatePartitionedTopicAsync(topicName,
p -> new PartitionedTopicMetadata(p.partitions,
p.properties == null ? properties
: MapUtils.putAll(p.properties, properties.entrySet().toArray())));
});
}
}).thenAccept(__ ->
log.info("[{}] [{}] update properties success with properties {}",
clientAppId(), topicName, properties));
}

private CompletableFuture<Void> internalUpdateNonPartitionedTopicProperties(Map<String, String> properties) {
CompletableFuture<Void> future = new CompletableFuture<>();
pulsar().getBrokerService().getTopicIfExists(topicName.toString())
.thenAccept(opt -> {
if (!opt.isPresent()) {
throw new RestException(Status.NOT_FOUND,
getTopicNotFoundErrorMessage(topicName.toString()));
}
ManagedLedger managedLedger = ((PersistentTopic) opt.get()).getManagedLedger();
managedLedger.asyncSetProperties(properties, new AsyncCallbacks.UpdatePropertiesCallback() {

@Override
public void updatePropertiesComplete(Map<String, String> properties, Object ctx) {
managedLedger.getConfig().getProperties().putAll(properties);
future.complete(null);
}

@Override
public void updatePropertiesFailed(ManagedLedgerException exception, Object ctx) {
future.completeExceptionally(exception);
}
}, null);
});
return future;
}

protected CompletableFuture<Void> internalRemovePropertiesAsync(boolean authoritative, String key) {
return validateTopicOwnershipAsync(topicName, authoritative)
.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.DELETE_METADATA))
.thenCompose(__ -> {
if (topicName.isPartitioned()) {
return internalRemoveNonPartitionedTopicProperties(key);
} else {
return pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName)
.thenCompose(metadata -> {
if (metadata.partitions == 0) {
return internalRemoveNonPartitionedTopicProperties(key);
}
return namespaceResources()
.getPartitionedTopicResources().updatePartitionedTopicAsync(topicName,
p -> {
if (p.properties != null) {
p.properties.remove(key);
}
return new PartitionedTopicMetadata(p.partitions, p.properties);
});
});
}
}).thenAccept(__ ->
log.info("[{}] remove [{}] properties success with key {}",
clientAppId(), topicName, key));
}

private CompletableFuture<Void> internalRemoveNonPartitionedTopicProperties(String key) {
CompletableFuture<Void> future = new CompletableFuture<>();
pulsar().getBrokerService().getTopicIfExists(topicName.toString())
.thenAccept(opt -> {
if (!opt.isPresent()) {
throw new RestException(Status.NOT_FOUND,
getTopicNotFoundErrorMessage(topicName.toString()));
}
ManagedLedger managedLedger = ((PersistentTopic) opt.get()).getManagedLedger();
managedLedger.asyncDeleteProperty(key, new AsyncCallbacks.UpdatePropertiesCallback() {

@Override
public void updatePropertiesComplete(Map<String, String> properties, Object ctx) {
future.complete(null);
}

@Override
public void updatePropertiesFailed(ManagedLedgerException exception, Object ctx) {
future.completeExceptionally(exception);
}
}, null);
});
return future;
}

protected void internalDeletePartitionedTopic(AsyncResponse asyncResponse, boolean authoritative,
boolean force, boolean deleteSchema) {
validateTopicOwnershipAsync(topicName, authoritative)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -915,6 +915,77 @@ public void getProperties(
});
}

@PUT
@Path("/{tenant}/{namespace}/{topic}/properties")
@ApiOperation(value = "Update the properties on the given topic.")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or"
+ "subscriber is not authorized to access this operation"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic/Subscription does not exist"),
@ApiResponse(code = 405, message = "Method Not Allowed"),
@ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration")
})
public void updateProperties(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(value = "Key value pair properties for the topic metadata") Map<String, String> properties){
validatePersistentTopicName(tenant, namespace, encodedTopic);
internalUpdatePropertiesAsync(authoritative, properties)
.thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
if (!isRedirectException(ex)) {
log.error("[{}] Failed to update topic {} properties", clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@DELETE
@Path("/{tenant}/{namespace}/{topic}/properties")
@ApiOperation(value = "Remove the key in properties on the given topic.")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Partitioned topic does not exist"),
@ApiResponse(code = 409, message = "Concurrent modification"),
@ApiResponse(code = 412, message = "Partitioned topic name is invalid"),
@ApiResponse(code = 500, message = "Internal server error")
})
public void removeProperties(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("key") String key,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validatePersistentTopicName(tenant, namespace, encodedTopic);
internalRemovePropertiesAsync(authoritative, key)
.thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
if (!isRedirectException(ex)) {
log.error("[{}] Failed to remove key {} in properties on topic {}",
clientAppId(), key, topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@DELETE
@Path("/{tenant}/{namespace}/{topic}/partitions")
@ApiOperation(value = "Delete a partitioned topic.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -902,6 +902,99 @@ public void testCreateAndGetTopicProperties() throws Exception {
Assert.assertEquals(properties22.get("key2"), "value2");
}

@Test
public void testUpdatePartitionedTopicProperties() throws Exception {
final String namespace = "prop-xyz/ns2";
final String topicName = "persistent://" + namespace + "/testUpdatePartitionedTopicProperties";
final String topicNameTwo = "persistent://" + namespace + "/testUpdatePartitionedTopicProperties2";
admin.namespaces().createNamespace(namespace, 20);

// create partitioned topic without properties
admin.topics().createPartitionedTopic(topicName, 2);
Map<String, String> properties = admin.topics().getProperties(topicName);
Assert.assertNull(properties);
Map<String, String> topicProperties = new HashMap<>();
topicProperties.put("key1", "value1");
admin.topics().updateProperties(topicName, topicProperties);
properties = admin.topics().getProperties(topicName);
Assert.assertNotNull(properties);
Assert.assertEquals(properties.get("key1"), "value1");

// update with new key, old properties should keep
topicProperties = new HashMap<>();
topicProperties.put("key2", "value2");
admin.topics().updateProperties(topicName, topicProperties);
properties = admin.topics().getProperties(topicName);
Assert.assertNotNull(properties);
Assert.assertEquals(properties.size(), 2);
Assert.assertEquals(properties.get("key1"), "value1");
Assert.assertEquals(properties.get("key2"), "value2");

// override old values
topicProperties = new HashMap<>();
topicProperties.put("key1", "value11");
admin.topics().updateProperties(topicName, topicProperties);
properties = admin.topics().getProperties(topicName);
Assert.assertNotNull(properties);
Assert.assertEquals(properties.size(), 2);
Assert.assertEquals(properties.get("key1"), "value11");
Assert.assertEquals(properties.get("key2"), "value2");

// create topic without properties
admin.topics().createPartitionedTopic(topicNameTwo, 2);
properties = admin.topics().getProperties(topicNameTwo);
Assert.assertNull(properties);
// remove key of properties on this topic
admin.topics().removeProperties(topicNameTwo, "key1");
properties = admin.topics().getProperties(topicNameTwo);
Assert.assertNull(properties);
Map<String, String> topicProp = new HashMap<>();
topicProp.put("key1", "value1");
topicProp.put("key2", "value2");
admin.topics().updateProperties(topicNameTwo, topicProp);
properties = admin.topics().getProperties(topicNameTwo);
Assert.assertEquals(properties, topicProp);
admin.topics().removeProperties(topicNameTwo, "key1");
topicProp.remove("key1");
properties = admin.topics().getProperties(topicNameTwo);
Assert.assertEquals(properties, topicProp);
}

@Test
public void testUpdateNonPartitionedTopicProperties() throws Exception {
final String namespace = "prop-xyz/ns2";
final String topicName = "persistent://" + namespace + "/testUpdateNonPartitionedTopicProperties";
admin.namespaces().createNamespace(namespace, 20);

// create non-partitioned topic with properties
Map<String, String> topicProperties = new HashMap<>();
topicProperties.put("key1", "value1");
admin.topics().createNonPartitionedTopic(topicName, topicProperties);
Map<String, String> properties = admin.topics().getProperties(topicName);
Assert.assertNotNull(properties);
Assert.assertEquals(properties.get("key1"), "value1");

// update with new key, old properties should keep
topicProperties = new HashMap<>();
topicProperties.put("key2", "value2");
admin.topics().updateProperties(topicName, topicProperties);
properties = admin.topics().getProperties(topicName);
Assert.assertNotNull(properties);
Assert.assertEquals(properties.size(), 2);
Assert.assertEquals(properties.get("key1"), "value1");
Assert.assertEquals(properties.get("key2"), "value2");

// override old values
topicProperties = new HashMap<>();
topicProperties.put("key1", "value11");
admin.topics().updateProperties(topicName, topicProperties);
properties = admin.topics().getProperties(topicName);
Assert.assertNotNull(properties);
Assert.assertEquals(properties.size(), 2);
Assert.assertEquals(properties.get("key1"), "value11");
Assert.assertEquals(properties.get("key2"), "value2");
}

@Test
public void testNonPersistentTopics() throws Exception {
final String namespace = "prop-xyz/ns2";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -679,6 +679,42 @@ void updatePartitionedTopic(String topic, int numPartitions, boolean updateLocal
*/
CompletableFuture<Map<String, String>> getPropertiesAsync(String topic);

/**
* Update Topic Properties on a topic.
* The new properties will override the existing values, old properties in the topic will be keep if not override.
* @param topic
* @param properties
* @throws PulsarAdminException
*/
void updateProperties(String topic, Map<String, String> properties) throws PulsarAdminException;

/**
* Update Topic Properties on a topic.
* The new properties will override the existing values, old properties in the topic will be keep if not override.
* @param topic
* @param properties
* @return
*/
CompletableFuture<Void> updatePropertiesAsync(String topic, Map<String, String> properties);

/**
* Remove the key in properties on a topic.
*
* @param topic
* @param key
* @throws PulsarAdminException
*/
void removeProperties(String topic, String key) throws PulsarAdminException;

/**
* Remove the key in properties on a topic asynchronously.
*
* @param topic
* @param key
* @return
*/
CompletableFuture<Void> removePropertiesAsync(String topic, String key);

/**
* Delete a partitioned topic.
* <p/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -474,11 +474,39 @@ public void failed(Throwable throwable) {
return future;
}

@Override
public void updateProperties(String topic, Map<String, String> properties) throws PulsarAdminException {
sync(() -> updatePropertiesAsync(topic, properties));
}

@Override
public CompletableFuture<Void> updatePropertiesAsync(String topic, Map<String, String> properties) {
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "properties");
if (properties == null) {
properties = new HashMap<>();
}
return asyncPutRequest(path, Entity.entity(properties, MediaType.APPLICATION_JSON));
}

@Override
public void deletePartitionedTopic(String topic) throws PulsarAdminException {
deletePartitionedTopic(topic, false);
}

@Override
public void removeProperties(String topic, String key) throws PulsarAdminException {
sync(() -> removePropertiesAsync(topic, key));
}

@Override
public CompletableFuture<Void> removePropertiesAsync(String topic, String key) {
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "properties")
.queryParam("key", key);
return asyncDeleteRequest(path);
}

@Override
public CompletableFuture<Void> deletePartitionedTopicAsync(String topic) {
return deletePartitionedTopicAsync(topic, false);
Expand Down
Loading

0 comments on commit 888bd2f

Please sign in to comment.