Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[UTO] Fix wrong error message on resource conflicts #9303

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ private boolean rememberTopic(ReconcilableTopic reconcilableTopic) {
String tn = reconcilableTopic.topicName();
var existing = topics.computeIfAbsent(tn, k -> new HashSet<>());
KubeRef thisRef = new KubeRef(reconcilableTopic.kt());
thisRef.processingOrder(existing.size() + 1);
existing.add(thisRef);
return true;
}
Expand All @@ -210,26 +211,26 @@ private Either<TopicOperatorException, Boolean> validateSingleManagingResource(R
String tn = reconcilableTopic.topicName();
var existing = topics.get(tn);
KubeRef thisRef = new KubeRef(reconcilableTopic.kt());
if (existing.size() != 1) {
var byCreationTime = existing.stream().sorted(Comparator.comparing(KubeRef::creationTime)).toList();

var oldest = byCreationTime.get(0);
var nextOldest = byCreationTime.size() >= 2 ? byCreationTime.get(1) : null;
TopicOperatorException e = new TopicOperatorException.ResourceConflict("Managed by " + oldest);
if (nextOldest == null) {
// This is only resource for that topic => it is the unique oldest
return Either.ofRight(true);
} else if (thisRef.equals(oldest) && nextOldest.creationTime() != oldest.creationTime()) {
// This resource is the unique oldest, so it's OK.
if (existing.size() > 1) {
var byCreationTimeAndProcessingOrder = existing.stream()
.sorted(Comparator.comparing(KubeRef::creationTime)
.thenComparing(Comparator.comparing(KubeRef::processingOrder))).toList();
var oldest = byCreationTimeAndProcessingOrder.get(0);
var nextOldest = byCreationTimeAndProcessingOrder.get(1);
if (thisRef.equals(oldest) && nextOldest.creationTime() != oldest.creationTime()) {
// This resource is the unique oldest, so it's OK
// The others will eventually get reconciled and put into ResourceConflict
return Either.ofRight(true);
} else if (thisRef.equals(oldest)
&& reconcilableTopic.kt().getStatus() != null
&& reconcilableTopic.kt().getStatus().getConditions() != null
&& reconcilableTopic.kt().getStatus().getConditions().stream().anyMatch(c -> "Ready".equals(c.getType()) && "True".equals(c.getStatus()))) {
&& reconcilableTopic.kt().getStatus().getConditions().stream()
.anyMatch(c -> "Ready".equals(c.getType()) && "True".equals(c.getStatus()))) {
// This is the unique oldest because it is reconciled
return Either.ofRight(true);
} else {
// Return an error putting this resource into ResourceConflict
TopicOperatorException e = new TopicOperatorException.ResourceConflict("Managed by " + oldest);
return Either.ofLeft(e);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,64 @@
/**
* Reference to a resource in Kube.
* Equality is based on {@link #namespace()} and {@link #name()}.
* {@link #creationTime()} is present to allow disambiguation of multiple KafkaTopics managing the same topic in Kafka.
* {@link #creationTime()} is present to allow disambiguation of multiple KafkaTopics with the same topicName.
* {@link #processingOrder()} is present to allow disambiguation of multiple KafkaTopics with the same topicName in the same batch.
*/
record KubeRef(String namespace, String name, long creationTime) {
public class KubeRef {
private String namespace;
private String name;
private long creationTime;
private long processingOrder;

KubeRef(String namespace, String name, long creationTime) {
this.namespace = namespace;
this.name = name;
this.creationTime = creationTime;
}

KubeRef(KafkaTopic kt) {
this(kt.getMetadata().getNamespace(), kt.getMetadata().getName(), StatusUtils.isoUtcDatetime(kt.getMetadata().getCreationTimestamp()).toEpochMilli());
this(kt.getMetadata().getNamespace(), kt.getMetadata().getName(),
StatusUtils.isoUtcDatetime(kt.getMetadata().getCreationTimestamp()).toEpochMilli());
}

/**
* Returns the metadata.namespace.
* @return metadata.namespace
*/
public String namespace() {
return namespace;
}

/**
* Returns the metadata.name.
* @return metadata.name
*/
public String name() {
return name;
}

/**
* Returns the metadata.creationTime.
* @return metadata.creationTime
*/
public long creationTime() {
return creationTime;
}

/**
* Returns the processing order
* @return processing order
*/
public long processingOrder() {
return processingOrder;
}

/**
* Sets the processing order.
* @param processingOrder processing order
*/
public void processingOrder(long processingOrder) {
this.processingOrder = processingOrder;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,10 @@
import java.util.Set;
import java.util.Stack;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand All @@ -79,6 +82,7 @@

import static io.strimzi.api.ResourceAnnotations.ANNO_STRIMZI_IO_PAUSE_RECONCILIATION;
import static io.strimzi.operator.topic.v2.BatchingTopicController.isPaused;
import static io.strimzi.operator.topic.v2.TopicOperatorTestUtil.findKafkaTopicByName;
import static java.lang.String.format;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
Expand Down Expand Up @@ -475,6 +479,42 @@ private KafkaTopic createTopic(KafkaCluster kc, KafkaTopic kt, Predicate<KafkaTo
return waitUntil(created, condition);
}

private List<KafkaTopic> createTopicsConcurrently(KafkaCluster kc, KafkaTopic... kts) throws InterruptedException, ExecutionException {
if (kts == null || kts.length == 0) {
throw new IllegalArgumentException("You need pass at least one topic to be created");
}
String ns = namespace(kts[0].getMetadata().getNamespace());
maybeStartOperator(topicOperatorConfig(ns, kc));
ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
CountDownLatch latch = new CountDownLatch(kts.length);
List<KafkaTopic> result = new ArrayList<>();
for (KafkaTopic kt : kts) {
executor.submit(() -> {
try {
var created = Crds.topicOperation(client).resource(kt).create();
LOGGER.info("Test created KafkaTopic {} with creationTimestamp {}",
created.getMetadata().getName(),
created.getMetadata().getCreationTimestamp());
var reconciled = waitUntil(created, readyIsTrueOrFalse());
result.add(reconciled);
} catch (Exception e) {
throw new RuntimeException(e);
}
latch.countDown();
});
}
latch.await(1, TimeUnit.MINUTES);
try {
executor.shutdown();
executor.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
if (!executor.isTerminated()) {
executor.shutdownNow();
}
}
return result;
}

private KafkaTopic pauseTopic(String namespace, String topicName) {
var current = Crds.topicOperation(client).inNamespace(namespace).withName(topicName).get();
var paused = Crds.topicOperation(client).resource(new KafkaTopicBuilder(current)
Expand Down Expand Up @@ -1833,20 +1873,15 @@ public void shouldFailDeleteIfNoTopicAuthz(KafkaTopic kt,

@Test
public void shouldFailIfNumPartitionsDivergedWithConfigChange(@BrokerConfig(name = "auto.create.topics.enable", value = "false")
KafkaCluster kafkaCluster) throws ExecutionException, InterruptedException, TimeoutException {
// Scenario from https://github.com/strimzi/strimzi-kafka-operator/pull/8627#pullrequestreview-1477513413

// given
KafkaCluster kafkaCluster) throws ExecutionException, InterruptedException, TimeoutException {
// scenario from https://github.com/strimzi/strimzi-kafka-operator/pull/8627#pullrequestreview-1477513413

// create foo
var topicName = randomTopicName();
LOGGER.info("Create foo");
var foo = kafkaTopic(NAMESPACE, "foo", null, null, 1, 1);
var createdFoo = createTopicAndAssertSuccess(kafkaCluster, foo);

// TODO remove after fixing https://github.com/strimzi/strimzi-kafka-operator/issues/9270
Thread.sleep(1000);

// create conflicting bar
LOGGER.info("Create conflicting bar");
var bar = kafkaTopic(NAMESPACE, "bar", SELECTOR, null, null, "foo", 1, 1,
Expand All @@ -1860,30 +1895,75 @@ public void shouldFailIfNumPartitionsDivergedWithConfigChange(@BrokerConfig(name
// increase partitions of foo
LOGGER.info("Increase partitions of foo");
var editedFoo = modifyTopicAndAwait(createdFoo, theKt ->
new KafkaTopicBuilder(theKt).editSpec().withPartitions(3).endSpec().build(),
readyIsTrue());
new KafkaTopicBuilder(theKt).editSpec().withPartitions(3).endSpec().build(),
readyIsTrue());

// unmanage foo
LOGGER.info("Unmanage foo");
var unmanagedFoo = modifyTopicAndAwait(editedFoo, theKt ->
new KafkaTopicBuilder(theKt).editMetadata().addToAnnotations(BatchingTopicController.MANAGED, "false").endMetadata().build(),
readyIsTrue());
new KafkaTopicBuilder(theKt).editMetadata().addToAnnotations(BatchingTopicController.MANAGED, "false").endMetadata().build(),
readyIsTrue());

// when: delete foo
LOGGER.info("Delete foo");
Crds.topicOperation(client).resource(unmanagedFoo).delete();
LOGGER.info("Test deleted KafkaTopic {} with resourceVersion {}",
unmanagedFoo.getMetadata().getName(), BatchingTopicController.resourceVersion(unmanagedFoo));
unmanagedFoo.getMetadata().getName(), BatchingTopicController.resourceVersion(unmanagedFoo));
Resource<KafkaTopic> resource = Crds.topicOperation(client).resource(unmanagedFoo);
TopicOperatorTestUtil.waitUntilCondition(resource, Objects::isNull);

// then: expect bar's unreadiness to be due to mismatching #partitions
waitUntil(createdBar, readyIsFalseAndReasonIs(
TopicOperatorException.Reason.NOT_SUPPORTED.reason,
"Decreasing partitions not supported"));
TopicOperatorException.Reason.NOT_SUPPORTED.reason,
"Decreasing partitions not supported"));
}

@Test
public void shouldDetectConflictingKafkaTopicCreations(
@BrokerConfig(name = "auto.create.topics.enable", value = "false")
KafkaCluster kafkaCluster) throws ExecutionException, InterruptedException {
var foo = kafkaTopic("ns", "foo", null, null, 1, 1);
var bar = kafkaTopic("ns", "bar", SELECTOR, null, null, "foo", 1, 1,
Map.of(TopicConfig.COMPRESSION_TYPE_CONFIG, "snappy"));

LOGGER.info("Create conflicting topics: foo and bar");
var reconciledTopics = createTopicsConcurrently(kafkaCluster, foo, bar);
var reconciledFoo = findKafkaTopicByName(reconciledTopics, "foo");
var reconciledBar = findKafkaTopicByName(reconciledTopics, "bar");

// only one resource with the same topicName should be reconciled
var fooFailed = readyIsFalse().test(reconciledFoo);
var barFailed = readyIsFalse().test(reconciledBar);
assertTrue(fooFailed ^ barFailed);

if (fooFailed) {
assertKafkaTopicConflict(reconciledFoo, reconciledBar);
} else {
assertKafkaTopicConflict(reconciledBar, reconciledFoo);
}
}

private void assertKafkaTopicConflict(KafkaTopic failed, KafkaTopic ready) {
// the error message should refer to the ready resource name
var condition = assertExactlyOneCondition(failed);
assertEquals(TopicOperatorException.Reason.RESOURCE_CONFLICT.reason, condition.getReason());
assertEquals(format("Managed by Ref{namespace='ns', name='%s'}", ready.getMetadata().getName()), condition.getMessage());

// the failed resource should become ready after we unmanage and delete the other
LOGGER.info("Unmanage {}", ready.getMetadata().getName());
var unmanagedBar = modifyTopicAndAwait(ready, theKt -> new KafkaTopicBuilder(theKt)
.editMetadata().addToAnnotations(BatchingTopicController.MANAGED, "false").endMetadata().build(),
readyIsTrue());

LOGGER.info("Delete {}", ready.getMetadata().getName());
Crds.topicOperation(client).resource(unmanagedBar).delete();
Resource<KafkaTopic> resource = Crds.topicOperation(client).resource(unmanagedBar);
TopicOperatorTestUtil.waitUntilCondition(resource, Objects::isNull);

waitUntil(failed, readyIsTrue());
}

private static <T> KafkaFuture<T> failedFuture(Throwable error) throws ExecutionException, InterruptedException {
private static <T> KafkaFuture<T> failedFuture(Throwable error) {
var future = new KafkaFutureImpl<T>();
future.completeExceptionally(error);
return future;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.logging.log4j.Logger;
import org.junit.jupiter.api.TestInfo;

import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
Expand Down Expand Up @@ -124,4 +125,8 @@ static <T> T waitUntilCondition(Resource<T> resource, Predicate<T> condition) {
}
}
}

static KafkaTopic findKafkaTopicByName(List<KafkaTopic> topics, String name) {
return topics.stream().filter(kt -> kt.getMetadata().getName().equals(name)).findFirst().orElse(null);
}
}
Loading