Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][admin] PIP-369 Introduce unload flag in ns-isolation-policy set call #23120

Merged
merged 10 commits into from
Sep 1, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@
import io.swagger.annotations.ExampleProperty;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -65,6 +67,7 @@
import org.apache.pulsar.common.policies.data.ClusterPoliciesImpl;
import org.apache.pulsar.common.policies.data.FailureDomainImpl;
import org.apache.pulsar.common.policies.data.NamespaceIsolationDataImpl;
import org.apache.pulsar.common.policies.data.NamespaceIsolationPolicyUnloadScope;
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicyImpl;
import org.apache.pulsar.common.util.FutureUtil;
Expand Down Expand Up @@ -721,10 +724,13 @@ public void setNamespaceIsolationPolicy(
.setIsolationDataWithCreateAsync(cluster, (p) -> Collections.emptyMap())
.thenApply(__ -> new NamespaceIsolationPolicies()))
).thenCompose(nsIsolationPolicies -> {
NamespaceIsolationDataImpl oldPolicy = nsIsolationPolicies
.getPolicies().getOrDefault(policyName, null);
nsIsolationPolicies.setPolicy(policyName, policyData);
return namespaceIsolationPolicies()
.setIsolationDataAsync(cluster, old -> nsIsolationPolicies.getPolicies());
}).thenCompose(__ -> filterAndUnloadMatchedNamespaceAsync(cluster, policyData))
.setIsolationDataAsync(cluster, old -> nsIsolationPolicies.getPolicies())
.thenApply(__ -> oldPolicy);
}).thenCompose(oldPolicy -> filterAndUnloadMatchedNamespaceAsync(cluster, policyData, oldPolicy))
.thenAccept(__ -> {
log.info("[{}] Successful to update clusters/{}/namespaceIsolationPolicies/{}.",
clientAppId(), cluster, policyName);
Expand Down Expand Up @@ -759,7 +765,13 @@ public void setNamespaceIsolationPolicy(
* Get matched namespaces; call unload for each namespaces.
*/
private CompletableFuture<Void> filterAndUnloadMatchedNamespaceAsync(String cluster,
NamespaceIsolationDataImpl policyData) {
NamespaceIsolationDataImpl policyData,
NamespaceIsolationDataImpl oldPolicy) {
// exit early if none of the namespaces need to be unloaded
if (NamespaceIsolationPolicyUnloadScope.none.equals(policyData.getUnloadScope())) {
return CompletableFuture.completedFuture(null);
}

PulsarAdmin adminClient;
try {
adminClient = pulsar().getAdminClient();
Expand All @@ -768,6 +780,7 @@ private CompletableFuture<Void> filterAndUnloadMatchedNamespaceAsync(String clus
}
// compile regex patterns once
List<Pattern> namespacePatterns = policyData.getNamespaces().stream().map(Pattern::compile).toList();
// TODO for 4.x, we should include both old and new namespace regex pattern for unload `all_matching` option
return adminClient.tenants().getTenantsAsync().thenCompose(tenants -> {
List<CompletableFuture<List<String>>> filteredNamespacesForEachTenant = tenants.stream()
.map(tenant -> adminClient.namespaces().getNamespacesAsync(tenant).thenCompose(namespaces -> {
Expand All @@ -793,6 +806,44 @@ private CompletableFuture<Void> filterAndUnloadMatchedNamespaceAsync(String clus
if (CollectionUtils.isEmpty(shouldUnloadNamespaces)) {
return CompletableFuture.completedFuture(null);
}
// If unload type is 'changed', we need to figure out a further subset of namespaces whose placement might
// actually have been changed.

log.debug("Old policy: {} ; new policy: {}", oldPolicy, policyData);
if (oldPolicy != null && NamespaceIsolationPolicyUnloadScope.changed.equals(policyData.getUnloadScope())) {
// We also compare that the previous primary broker list is same as current, in case all namespaces need
// to be placed again anyway.
List<String> oldBrokers = new ArrayList<>(policyData.getPrimary());
grssam marked this conversation as resolved.
Show resolved Hide resolved
oldBrokers.removeAll(policyData.getPrimary());

if (oldBrokers.isEmpty()) {
// list is same, so we continue finding the changed namespaces.

// We create a union regex list contains old + new regexes
Set<String> combinedNamespaces = new HashSet<>(oldPolicy.getNamespaces());
combinedNamespaces.addAll(policyData.getNamespaces());
// We create a intersection of the old and new regexes. These won't need to be unloaded
Set<String> commonNamespaces = new HashSet<>(oldPolicy.getNamespaces());
commonNamespaces.retainAll(policyData.getNamespaces());

log.debug("combined regexes: {}; common regexes:{}", combinedNamespaces, combinedNamespaces);

// Find the changed regexes (new - new ∩ old). TODO for 4.x, make this (new U old - new ∩ old)
combinedNamespaces.removeAll(commonNamespaces);

log.debug("changed regexes: {}", commonNamespaces);

// Now we further filter the filtered namespaces based on this combinedNamespaces set
shouldUnloadNamespaces = shouldUnloadNamespaces.stream()
.filter(name -> combinedNamespaces.stream()
.map(Pattern::compile)
.anyMatch(pattern -> pattern.matcher(name).matches())
).toList();

}
}
// unload type is either null or not in (changed, none), so we proceed to unload all namespaces
// TODO - default in 4.x should become `changed`
List<CompletableFuture<Void>> futures = shouldUnloadNamespaces.stream()
.map(namespaceName -> adminClient.namespaces().unloadAsync(namespaceName))
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.pulsar.broker.BrokerTestUtil.newUniqueName;
import static org.apache.pulsar.broker.resources.LoadBalanceResources.BUNDLE_DATA_BASE_PATH;
import static org.apache.pulsar.common.policies.data.NamespaceIsolationPolicyUnloadScope.*;
grssam marked this conversation as resolved.
Show resolved Hide resolved
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
Expand Down Expand Up @@ -53,6 +54,7 @@
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.ws.rs.NotAcceptableException;
Expand Down Expand Up @@ -109,27 +111,7 @@
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AutoFailoverPolicyData;
import org.apache.pulsar.common.policies.data.AutoFailoverPolicyType;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData;
import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationDataImpl;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.policies.data.EntryFilters;
import org.apache.pulsar.common.policies.data.FailureDomain;
import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
import org.apache.pulsar.common.policies.data.NonPersistentTopicStats;
import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.policies.data.*;
grssam marked this conversation as resolved.
Show resolved Hide resolved
import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
Expand Down Expand Up @@ -3496,4 +3478,168 @@ public void testGetStatsIfPartitionNotExists() throws Exception {
// cleanup.
admin.topics().deletePartitionedTopic(partitionedTp);
}

private NamespaceIsolationData createPolicyData(NamespaceIsolationPolicyUnloadScope scope, List<String> namespaces) {
// setup ns-isolation-policy in both the clusters.
Map<String, String> parameters1 = new HashMap<>();
parameters1.put("min_limit", "1");
parameters1.put("usage_threshold", "100");
List<String> nsRegexList = new ArrayList<>(namespaces);

return NamespaceIsolationData.builder()
// "prop-ig/ns1" is present in test cluster, policy set on test2 should work
.namespaces(nsRegexList)
.primary(Collections.singletonList(".*"))
.secondary(Collections.singletonList(""))
.autoFailoverPolicy(AutoFailoverPolicyData.builder()
.policyType(AutoFailoverPolicyType.min_available)
.parameters(parameters1)
.build())
.unloadScope(scope)
.build();
}

private boolean allTopicsUnloaded(List<String> topics) {
for (String topic : topics) {
if (pulsar.getBrokerService().getTopicReference(topic).isPresent()) {
return false;
}
}
return true;
}

private void loadTopics(List<String> topics) throws PulsarClientException, ExecutionException, InterruptedException {
// create a topic by creating a producer so that the topic is present on the broker
for (String topic : topics) {
Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create();
producer.close();
pulsar.getBrokerService().getTopicIfExists(topic).get();
}

// All namespaces are loaded onto broker. Assert that
for (String topic : topics) {
assertTrue(pulsar.getBrokerService().getTopicReference(topic).isPresent());
}
}

/**
* Validates that the namespace isolation policy set and update is unloading only the relevant namespaces based on
* the unload scope provided.
*
* @param topicType persistent or non persistent.
* @param policyName policy name.
* @param nsPrefix unique namespace prefix.
* @param totalNamespaces total namespaces to create. Only the end part. Each namespace also gets a topic t1.
* @param initialScope unload scope while creating the policy.
* @param initialNamespaceRegex namespace regex while creating the policy.
* @param initialLoadedNS expected namespaces to be still loaded after the policy create call. Remaining namespaces
* will be asserted to be unloaded within 20 seconds.
* @param updatedScope unload scope while updating the policy.
* @param updatedNamespaceRegex namespace regex while updating the policy.
* @param updatedLoadedNS expected namespaces to be loaded after policy update call. Remaining namespaces will be
* asserted to be unloaded within 20 seconds.
* @throws PulsarAdminException
* @throws PulsarClientException
* @throws ExecutionException
* @throws InterruptedException
*/
private void testIsolationPolicyUnloadsNSWithScope(String topicType, String policyName, String nsPrefix,
List<String> totalNamespaces,
NamespaceIsolationPolicyUnloadScope initialScope,
List<String> initialNamespaceRegex, List<String> initialLoadedNS,
NamespaceIsolationPolicyUnloadScope updatedScope,
List<String> updatedNamespaceRegex, List<String> updatedLoadedNS)
throws PulsarAdminException, PulsarClientException, ExecutionException, InterruptedException {

// Create all namespaces
List<String> allTopics = new ArrayList<>();
for (String namespacePart: totalNamespaces) {
admin.namespaces().createNamespace(nsPrefix + namespacePart, Set.of("test"));
allTopics.add(topicType + "://" + nsPrefix + namespacePart + "/t1");
}
// Load all topics so that they are present. Assume topic t1 under each namespace
loadTopics(allTopics);

// Create the policy
NamespaceIsolationData nsPolicyData1 = createPolicyData(initialScope, initialNamespaceRegex);
admin.clusters().createNamespaceIsolationPolicy("test", policyName, nsPolicyData1);

List<String> initialLoadedTopics = new ArrayList<>();
for (String namespacePart: initialLoadedNS) {
initialLoadedTopics.add(topicType + "://" + nsPrefix + namespacePart + "/t1");
}

List<String> initialUnloadedTopics = new ArrayList<>(allTopics);
initialUnloadedTopics.removeAll(initialLoadedTopics);

// Assert that all topics (and thus ns) not under initialLoadedNS namespaces are unloaded
if (initialUnloadedTopics.isEmpty()) {
// Just wait a bit to ensure we don't miss lazy unloading of topics we expect not to unload
TimeUnit.SECONDS.sleep(5);
} else {
Awaitility.await()
.atMost(10, TimeUnit.SECONDS)
.until(() -> allTopicsUnloaded(initialUnloadedTopics));
}
// Assert that all topics under initialLoadedNS are still present
initialLoadedTopics.forEach(t -> assertTrue(pulsar.getBrokerService().getTopicReference(t).isPresent()));

// Load the topics again
loadTopics(allTopics);

// Update policy using updatedScope with updated namespace regex
nsPolicyData1 = createPolicyData(updatedScope, updatedNamespaceRegex);
admin.clusters().updateNamespaceIsolationPolicy("test", policyName, nsPolicyData1);

List<String> updatedLoadedTopics = new ArrayList<>();
for (String namespacePart : updatedLoadedNS) {
updatedLoadedTopics.add(topicType + "://" + nsPrefix + namespacePart + "/t1");
}

List<String> updatedUnloadedTopics = new ArrayList<>(allTopics);
updatedUnloadedTopics.removeAll(updatedLoadedTopics);

// Assert that all topics (and thus ns) not under updatedLoadedNS namespaces are unloaded
if (updatedUnloadedTopics.isEmpty()) {
// Just wait a bit to ensure we don't miss lazy unloading of topics we expect not to unload
TimeUnit.SECONDS.sleep(5);
} else {
Awaitility.await()
.atMost(10, TimeUnit.SECONDS)
.until(() -> allTopicsUnloaded(updatedUnloadedTopics));
}
// Assert that all topics under updatedLoadedNS are still present
updatedLoadedTopics.forEach(t -> assertTrue(pulsar.getBrokerService().getTopicReference(t).isPresent()));

}

@Test(dataProvider = "topicType")
public void testIsolationPolicyUnloadsNSWithAllScope(final String topicType) throws Exception {
String nsPrefix = newUniqueName(defaultTenant + "/") + "-unload-test-";
testIsolationPolicyUnloadsNSWithScope(
topicType, "policy-all", nsPrefix, List.of("a1", "a2", "b1", "b2", "c1"),
all_matching, List.of(".*-unload-test-a.*"), List.of("b1", "b2", "c1"),
all_matching, List.of(".*-unload-test-a.*", ".*-unload-test-c.*"), List.of("b1", "b2")
);
}

@Test(dataProvider = "topicType")
public void testIsolationPolicyUnloadsNSWithChangedScope(final String topicType) throws Exception {
String nsPrefix = newUniqueName(defaultTenant + "/") + "-unload-test-";
testIsolationPolicyUnloadsNSWithScope(
topicType, "policy-changed", nsPrefix, List.of("a1", "a2", "b1", "b2", "c1"),
all_matching, List.of(".*-unload-test-a.*"), List.of("b1", "b2", "c1"),
changed, List.of(".*-unload-test-a.*", ".*-unload-test-c.*"), List.of("a1", "a2", "b1", "b2")
);
}

@Test(dataProvider = "topicType")
public void testIsolationPolicyUnloadsNSWithNoneScope(final String topicType) throws Exception {
String nsPrefix = newUniqueName(defaultTenant + "/") + "-unload-test-";
testIsolationPolicyUnloadsNSWithScope(
topicType, "policy-none", nsPrefix, List.of("a1", "a2", "b1", "b2", "c1"),
all_matching, List.of(".*-unload-test-a.*"), List.of("b1", "b2", "c1"),
none, List.of(".*-unload-test-a.*", ".*-unload-test-c.*"), List.of("a1", "a2", "b1", "b2", "c1")
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ public interface NamespaceIsolationData {

AutoFailoverPolicyData getAutoFailoverPolicy();

NamespaceIsolationPolicyUnloadScope getUnloadScope();

void validate();

interface Builder {
Expand All @@ -42,6 +44,8 @@ interface Builder {

Builder autoFailoverPolicy(AutoFailoverPolicyData autoFailoverPolicyData);

Builder unloadScope(NamespaceIsolationPolicyUnloadScope unloadScope);

NamespaceIsolationData build();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.common.policies.data;

/**
* The type of unload to perform while setting the isolation policy.
*/
public enum NamespaceIsolationPolicyUnloadScope {
all_matching, // unloads all matching namespaces as per new regex
none, // unloads no namespaces
changed; // unloads only the namespaces which are newly added or removed from the regex list

public static NamespaceIsolationPolicyUnloadScope fromString(String unloadScopeString) {
for (NamespaceIsolationPolicyUnloadScope unloadScope : NamespaceIsolationPolicyUnloadScope.values()) {
if (unloadScope.toString().equalsIgnoreCase(unloadScopeString)) {
return unloadScope;
}
}
return null;
}
}
Loading
Loading