Skip to content

Commit

Permalink
Revert "[improve][broker] Make some methods in NamespacesBase async. (a…
Browse files Browse the repository at this point in the history
…pache#15518)"

This reverts commit 138ea35.
  • Loading branch information
lhotari committed May 18, 2022
1 parent 99e6f2d commit d23a442
Show file tree
Hide file tree
Showing 10 changed files with 308 additions and 451 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,6 @@ public void createPolicies(NamespaceName ns, Policies policies) throws MetadataS
create(joinPath(BASE_POLICIES_PATH, ns.toString()), policies);
}

public CompletableFuture<Void> createPoliciesAsync(NamespaceName ns, Policies policies) {
return createAsync(joinPath(BASE_POLICIES_PATH, ns.toString()), policies);
}

public boolean namespaceExists(NamespaceName ns) throws MetadataStoreException {
String path = joinPath(BASE_POLICIES_PATH, ns.toString());
return super.exists(path) && super.getChildren(path).isEmpty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.pulsar.broker.resources;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -79,7 +78,7 @@ public CompletableFuture<Void> updateTenantAsync(String tenantName, Function<Ten
}

public CompletableFuture<Boolean> tenantExistsAsync(String tenantName) {
return existsAsync(joinPath(BASE_POLICIES_PATH, tenantName));
return getCache().exists(joinPath(BASE_POLICIES_PATH, tenantName));
}

public List<String> getListOfNamespaces(String tenant) throws MetadataStoreException {
Expand Down Expand Up @@ -111,41 +110,6 @@ public List<String> getListOfNamespaces(String tenant) throws MetadataStoreExcep
return namespaces;
}

public CompletableFuture<List<String>> getListOfNamespacesAsync(String tenant) {
// this will return a cluster in v1 and a namespace in v2
return getChildrenAsync(joinPath(BASE_POLICIES_PATH, tenant))
.thenCompose(clusterOrNamespaces -> clusterOrNamespaces.stream().map(key ->
getChildrenAsync(joinPath(BASE_POLICIES_PATH, tenant, key))
.thenCompose(children -> {
if (children == null || children.isEmpty()) {
String namespace = NamespaceName.get(tenant, key).toString();
// if the length is 0 then this is probably a leftover cluster from namespace
// created with the v1 admin format (prop/cluster/ns) and then deleted, so no
// need to add it to the list
return getAsync(joinPath(BASE_POLICIES_PATH, namespace))
.thenApply(opt -> opt.isPresent() ? Collections.singletonList(namespace)
: new ArrayList<String>())
.exceptionally(ex -> {
Throwable cause = FutureUtil.unwrapCompletionException(ex);
if (cause instanceof MetadataStoreException
.ContentDeserializationException) {
return new ArrayList<>();
}
throw FutureUtil.wrapToCompletionException(ex);
});
} else {
CompletableFuture<List<String>> ret = new CompletableFuture();
ret.complete(children.stream().map(ns -> NamespaceName.get(tenant, key, ns)
.toString()).collect(Collectors.toList()));
return ret;
}
})).reduce(CompletableFuture.completedFuture(new ArrayList<>()),
(accumulator, n) -> accumulator.thenCompose(namespaces -> n.thenApply(m -> {
namespaces.addAll(m);
return namespaces;
}))));
}

public CompletableFuture<List<String>> getActiveNamespaces(String tenant, String cluster) {
return getChildrenAsync(joinPath(BASE_POLICIES_PATH, tenant, cluster));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,11 +323,6 @@ protected CompletableFuture<Policies> getNamespacePoliciesAsync(NamespaceName na
return FutureUtil.failedFuture(new RestException(e));
}
policies.get().bundles = bundleData != null ? bundleData : policies.get().bundles;
if (policies.get().is_allow_auto_update_schema == null) {
// the type changed from boolean to Boolean. return broker value here for keeping compatibility.
policies.get().is_allow_auto_update_schema = pulsar().getConfig()
.isAllowAutoUpdateSchemaEnabled();
}
return CompletableFuture.completedFuture(policies.get());
});
} else {
Expand Down Expand Up @@ -539,11 +534,6 @@ protected List<String> getPartitionedTopicList(TopicDomain topicDomain) {
}
}

protected CompletableFuture<List<String>> getPartitionedTopicListAsync(TopicDomain topicDomain) {
return namespaceResources().getPartitionedTopicResources()
.listPartitionedTopicsAsync(namespaceName, topicDomain);
}

protected List<String> getTopicPartitionList(TopicDomain topicDomain) {
try {
return getPulsarResources().getTopicResources().getExistingPartitions(topicName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,54 +103,60 @@
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyExistsException;
import org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class NamespacesBase extends AdminResource {

protected CompletableFuture<List<String>> internalGetTenantNamespaces(String tenant) {
if (tenant == null) {
return FutureUtil.failedFuture(new RestException(Status.BAD_REQUEST, "Tenant should not be null"));
}
protected List<String> internalGetTenantNamespaces(String tenant) {
checkNotNull(tenant, "Tenant should not be null");
try {
NamedEntity.checkName(tenant);
} catch (IllegalArgumentException e) {
log.warn("[{}] Tenant name is invalid {}", clientAppId(), tenant, e);
return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED, "Tenant name is not valid"));
throw new RestException(Status.PRECONDITION_FAILED, "Tenant name is not valid");
}
validateTenantOperation(tenant, TenantOperation.LIST_NAMESPACES);

try {
if (!tenantResources().tenantExists(tenant)) {
throw new RestException(Status.NOT_FOUND, "Tenant not found");
}

return tenantResources().getListOfNamespaces(tenant);
} catch (Exception e) {
log.error("[{}] Failed to get namespaces list: {}", clientAppId(), e);
throw new RestException(e);
}
return validateTenantOperationAsync(tenant, TenantOperation.LIST_NAMESPACES)
.thenCompose(__ -> tenantResources().tenantExistsAsync(tenant))
.thenCompose(existed -> {
if (!existed) {
throw new RestException(Status.NOT_FOUND, "Tenant not found");
}
return tenantResources().getListOfNamespacesAsync(tenant);
});
}

protected CompletableFuture<Void> internalCreateNamespace(Policies policies) {
return validateTenantOperationAsync(namespaceName.getTenant(), TenantOperation.CREATE_NAMESPACE)
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenAccept(__ -> validatePolicies(namespaceName, policies))
.thenCompose(__ -> {
int maxNamespacesPerTenant = pulsar().getConfiguration().getMaxNamespacesPerTenant();
// no distributed locks are added here.In a concurrent scenario, the threshold will be exceeded.
if (maxNamespacesPerTenant > 0) {
return tenantResources().getListOfNamespacesAsync(namespaceName.getTenant())
.thenAccept(namespaces -> {
if (namespaces != null && namespaces.size() > maxNamespacesPerTenant) {
throw new RestException(Status.PRECONDITION_FAILED,
"Exceed the maximum number of namespace in tenant :"
+ namespaceName.getTenant());
}
});
}
return CompletableFuture.completedFuture(null);
})
.thenCompose(__ -> namespaceResources().createPoliciesAsync(namespaceName, policies))
.thenAccept(__ -> log.info("[{}] Created namespace {}", clientAppId(), namespaceName));
protected void internalCreateNamespace(Policies policies) {
validateTenantOperation(namespaceName.getTenant(), TenantOperation.CREATE_NAMESPACE);
validatePoliciesReadOnlyAccess();
validatePolicies(namespaceName, policies);

try {
int maxNamespacesPerTenant = pulsar().getConfiguration().getMaxNamespacesPerTenant();
// no distributed locks are added here.In a concurrent scenario, the threshold will be exceeded.
if (maxNamespacesPerTenant > 0) {
List<String> namespaces = tenantResources().getListOfNamespaces(namespaceName.getTenant());
if (namespaces != null && namespaces.size() > maxNamespacesPerTenant) {
throw new RestException(Status.PRECONDITION_FAILED,
"Exceed the maximum number of namespace in tenant :" + namespaceName.getTenant());
}
}
namespaceResources().createPolicies(namespaceName, policies);
log.info("[{}] Created namespace {}", clientAppId(), namespaceName);
} catch (AlreadyExistsException e) {
log.warn("[{}] Failed to create namespace {} - already exists", clientAppId(), namespaceName);
throw new RestException(Status.CONFLICT, "Namespace already exists");
} catch (Exception e) {
log.error("[{}] Failed to create namespace {}", clientAppId(), namespaceName, e);
throw new RestException(e);
}
}

protected void internalDeleteNamespace(AsyncResponse asyncResponse, boolean authoritative, boolean force) {
Expand Down
Loading

0 comments on commit d23a442

Please sign in to comment.