Skip to content

Commit

Permalink
[pulsar-admin] Check backlog quota policy for namespace (apache#12512)
Browse files Browse the repository at this point in the history
  • Loading branch information
yuruguo authored and eolivelli committed Nov 29, 2021
1 parent 7b4a7fe commit 58765c4
Showing 1 changed file with 29 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1108,13 +1108,15 @@ private class SetBacklogQuota extends CliCommand {
+ "Valid options are: [producer_request_hold, producer_exception, consumer_backlog_eviction]", required = true)
private String policyStr;

@Parameter(names = {"-t", "--type"}, description = "Backlog quota type to set")
private String backlogQuotaType = BacklogQuota.BacklogQuotaType.destination_storage.name();
@Parameter(names = {"-t", "--type"}, description = "Backlog quota type to set. Valid options are: " +
"destination_storage, message_age")
private String backlogQuotaTypeStr = BacklogQuota.BacklogQuotaType.destination_storage.name();

@Override
void run() throws PulsarAdminException {
BacklogQuota.RetentionPolicy policy;
long limit;
BacklogQuota.BacklogQuotaType backlogQuotaType;

try {
policy = BacklogQuota.RetentionPolicy.valueOf(policyStr);
Expand All @@ -1123,15 +1125,27 @@ void run() throws PulsarAdminException {
policyStr, Arrays.toString(BacklogQuota.RetentionPolicy.values())));
}

limit = validateSizeString(limitStr);
try {
limit = validateSizeString(limitStr);
} catch (IllegalArgumentException e) {
throw new ParameterException(String.format("Invalid retention policy type '%s'. Valid formats are: %s",
limitStr, "(4096, 100K, 10M, 16G, 2T)"));
}

try {
backlogQuotaType = BacklogQuota.BacklogQuotaType.valueOf(backlogQuotaTypeStr);
} catch (IllegalArgumentException e) {
throw new ParameterException(String.format("Invalid backlog quota type '%s'. Valid options are: %s",
backlogQuotaTypeStr, Arrays.toString(BacklogQuota.BacklogQuotaType.values())));
}

String namespace = validateNamespace(params);
getAdmin().namespaces().setBacklogQuota(namespace,
BacklogQuota.builder().limitSize(limit)
.limitTime(limitTime)
.retentionPolicy(policy)
.build(),
BacklogQuota.BacklogQuotaType.valueOf(backlogQuotaType));
backlogQuotaType);
}
}

Expand All @@ -1140,13 +1154,21 @@ private class RemoveBacklogQuota extends CliCommand {
@Parameter(description = "tenant/namespace", required = true)
private java.util.List<String> params;

@Parameter(names = {"-t", "--type"}, description = "Backlog quota type to remove")
private String backlogQuotaType = BacklogQuota.BacklogQuotaType.destination_storage.name();
@Parameter(names = {"-t", "--type"}, description = "Backlog quota type to remove. Valid options are: " +
"destination_storage, message_age")
private String backlogQuotaTypeStr = BacklogQuota.BacklogQuotaType.destination_storage.name();

@Override
void run() throws PulsarAdminException {
String namespace = validateNamespace(params);
getAdmin().namespaces().removeBacklogQuota(namespace, BacklogQuota.BacklogQuotaType.valueOf(backlogQuotaType));
BacklogQuota.BacklogQuotaType backlogQuotaType;
try {
backlogQuotaType = BacklogQuota.BacklogQuotaType.valueOf(backlogQuotaTypeStr);
} catch (IllegalArgumentException e) {
throw new ParameterException(String.format("Invalid backlog quota type '%s'. Valid options are: %s",
backlogQuotaTypeStr, Arrays.toString(BacklogQuota.BacklogQuotaType.values())));
}
getAdmin().namespaces().removeBacklogQuota(namespace, backlogQuotaType);
}
}

Expand Down

0 comments on commit 58765c4

Please sign in to comment.