diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaRebalanceAssemblyOperator.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaRebalanceAssemblyOperator.java index 7f6af890b38..241fbcc6a61 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaRebalanceAssemblyOperator.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaRebalanceAssemblyOperator.java @@ -34,8 +34,8 @@ import io.strimzi.operator.cluster.operator.resource.cruisecontrol.CruiseControlApi; import io.strimzi.operator.cluster.operator.resource.cruisecontrol.CruiseControlApiImpl; import io.strimzi.operator.cluster.operator.resource.cruisecontrol.CruiseControlRebalanceResponse; -import io.strimzi.operator.cluster.operator.resource.cruisecontrol.CruiseControlResponse; import io.strimzi.operator.cluster.operator.resource.cruisecontrol.CruiseControlRestException; +import io.strimzi.operator.cluster.operator.resource.cruisecontrol.CruiseControlUserTasksResponse; import io.strimzi.operator.cluster.operator.resource.cruisecontrol.RebalanceOptions; import io.strimzi.operator.cluster.operator.resource.cruisecontrol.RemoveBrokerOptions; import io.strimzi.operator.cluster.operator.resource.kubernetes.AbstractWatchableStatusedNamespacedResourceOperator; @@ -767,12 +767,20 @@ private Future> onPendingProposal( return p.future(); } - private void handleUserTaskStatusResponse(Reconciliation reconciliation, CruiseControlResponse cruiseControlResponse, + private void handleUserTaskStatusResponse(Reconciliation reconciliation, CruiseControlUserTasksResponse cruiseControlResponse, Promise> p, String sessionId, Set conditions, KafkaRebalance kafkaRebalance, ConfigMapOperator configMapOperator, boolean dryRun, String host, CruiseControlApi apiClient, AbstractRebalanceOptions.AbstractRebalanceOptionsBuilder rebalanceOptionsBuilder) { + if (cruiseControlResponse.isMaxActiveUserTasksReached()) { + LOGGER.warnCr(reconciliation, "The maximum number of active user tasks that can run concurrently has reached therefore will retry getting user tasks in the next reconciliation. " + + "If this occurs often, consider increasing the value for max.active.user.tasks configuration."); + configMapOperator.getAsync(kafkaRebalance.getMetadata().getNamespace(), kafkaRebalance.getMetadata().getName()) + .onSuccess(loadmap -> p.complete(new MapAndStatus<>(loadmap, buildRebalanceStatusFromPreviousStatus(kafkaRebalance.getStatus(), conditions)))); + return; + } + if (cruiseControlResponse.getJson().isEmpty()) { // This may happen if: // 1. Cruise Control restarted so resetting the state because the tasks queue is not persisted diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/cruisecontrol/CruiseControlApi.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/cruisecontrol/CruiseControlApi.java index d0ccf205616..383580ac30f 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/cruisecontrol/CruiseControlApi.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/cruisecontrol/CruiseControlApi.java @@ -87,7 +87,7 @@ public interface CruiseControlApi { * This is used to retrieve the task's current state. * @return A future for the state of the specified task. */ - Future getUserTaskStatus(Reconciliation reconciliation, String host, int port, String userTaskID); + Future getUserTaskStatus(Reconciliation reconciliation, String host, int port, String userTaskID); /** * Issue a stop command to the Cruise Control server. This will halt any task (e.g. a rebalance) which is currently diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/cruisecontrol/CruiseControlApiImpl.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/cruisecontrol/CruiseControlApiImpl.java index 9063f8cc88c..59378aa6853 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/cruisecontrol/CruiseControlApiImpl.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/cruisecontrol/CruiseControlApiImpl.java @@ -310,7 +310,7 @@ public Future removeBroker(Reconciliation reconc @Override @SuppressWarnings("deprecation") - public Future getUserTaskStatus(Reconciliation reconciliation, String host, int port, String userTaskId) { + public Future getUserTaskStatus(Reconciliation reconciliation, String host, int port, String userTaskId) { PathBuilder pathBuilder = new PathBuilder(CruiseControlEndpoints.USER_TASKS) .withParameter(CruiseControlParameters.JSON, "true") @@ -345,7 +345,7 @@ public Future getUserTaskStatus(Reconciliation reconcilia // This may happen if: // 1. Cruise Control restarted so resetting the state because the tasks queue is not persisted // 2. Task's retention time expired, or the cache has become full - result.complete(new CruiseControlResponse(userTaskID, statusJson)); + result.complete(new CruiseControlUserTasksResponse(userTaskID, statusJson)); } else { JsonObject jsonUserTask = userTasks.getJsonObject(0); String taskStatusStr = jsonUserTask.getString(STATUS_KEY); @@ -386,7 +386,7 @@ public Future getUserTaskStatus(Reconciliation reconcilia default: throw new IllegalStateException("Unexpected user task status: " + taskStatus); } - result.complete(new CruiseControlResponse(userTaskID, statusJson)); + result.complete(new CruiseControlUserTasksResponse(userTaskID, statusJson)); } }); } else if (response.result().statusCode() == 500) { @@ -400,8 +400,15 @@ public Future getUserTaskStatus(Reconciliation reconcilia } else { errorString = json.toString(); } - result.fail(new CruiseControlRestException( - "Error for request: " + host + ":" + port + path + ". Server returned: " + errorString)); + if (errorString.matches(".*" + "There are already \\d+ active user tasks, which has reached the servlet capacity." + ".*")) { + LOGGER.debugCr(reconciliation, errorString); + CruiseControlUserTasksResponse ccResponse = new CruiseControlUserTasksResponse(userTaskID, json); + ccResponse.setMaxActiveUserTasksReached(true); + result.complete(ccResponse); + } else { + result.fail(new CruiseControlRestException( + "Error for request: " + host + ":" + port + path + ". Server returned: " + errorString)); + } }); } else { result.fail(new CruiseControlRestException( diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/cruisecontrol/CruiseControlUserTasksResponse.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/cruisecontrol/CruiseControlUserTasksResponse.java new file mode 100644 index 00000000000..a2c6d3e0fdc --- /dev/null +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/cruisecontrol/CruiseControlUserTasksResponse.java @@ -0,0 +1,38 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.operator.cluster.operator.resource.cruisecontrol; + +import io.vertx.core.json.JsonObject; + +/** + * Response to user tasks request + */ +public class CruiseControlUserTasksResponse extends CruiseControlResponse { + private boolean isMaxActiveUserTasksReached; + + /** + * Constructor + * + * @param userTaskId User task ID + * @param json JSON data + */ + CruiseControlUserTasksResponse(String userTaskId, JsonObject json) { + super(userTaskId, json); + // The maximum number of active user tasks that can run concurrently has reached + // Sourced from the error message that contains "reached the servlet capacity" from the Cruise Control response + this.isMaxActiveUserTasksReached = false; + } + + /** + * @return True If the maximum number of active user tasks that can run concurrently has reached + */ + public boolean isMaxActiveUserTasksReached() { + return isMaxActiveUserTasksReached; + } + + protected void setMaxActiveUserTasksReached(boolean maxActiveUserTasksReached) { + this.isMaxActiveUserTasksReached = maxActiveUserTasksReached; + } +} diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaRebalanceStateMachineTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaRebalanceStateMachineTest.java index 7612c099a6c..2bc5861537a 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaRebalanceStateMachineTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaRebalanceStateMachineTest.java @@ -56,6 +56,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.when; @ExtendWith(VertxExtension.class) @@ -1125,6 +1126,50 @@ private void krRebalancingCompletedWithError(Vertx vertx, VertxTestContext conte .onComplete(result -> checkOptimizationResults(result, context, true)); } + @Test + public void testGetUserTasksReturnError(Vertx vertx, VertxTestContext context) throws IOException, URISyntaxException { + KafkaRebalance kcRebalance = createKafkaRebalance(KafkaRebalanceState.Rebalancing, MockCruiseControl.REBALANCE_NO_GOALS_RESPONSE_UTID, null, REMOVE_BROKER_KAFKA_REBALANCE_SPEC, null, false); + cruiseControlServer.setupCCUserTasksToReturnError(500, "Some error"); + + CruiseControlApi client = new CruiseControlApiImpl(vertx, HTTP_DEFAULT_IDLE_TIMEOUT_SECONDS, MockCruiseControl.CC_SECRET, MockCruiseControl.CC_API_SECRET, true, true); + KafkaRebalanceAssemblyOperator kcrao = new KafkaRebalanceAssemblyOperator(vertx, ResourceUtils.supplierWithMocks(true), ResourceUtils.dummyClusterOperatorConfig(), cruiseControlPort) { + @Override + public String cruiseControlHost(String clusterName, String clusterNamespace) { + return HOST; + } + }; + + Reconciliation recon = new Reconciliation("test-trigger", KafkaRebalance.RESOURCE_KIND, CLUSTER_NAMESPACE, RESOURCE_NAME); + + kcrao.computeNextStatus(recon, HOST, client, kcRebalance, KafkaRebalanceState.Rebalancing, null) + .onComplete(mapAndStatusAsyncResult -> { + assertTrue(mapAndStatusAsyncResult.failed()); + context.completeNow(); + }); + } + + @Test + public void testGetUserTasksForRebalancingReturnServletFullError(Vertx vertx, VertxTestContext context) throws IOException, URISyntaxException { + KafkaRebalance kcRebalance = createKafkaRebalance(KafkaRebalanceState.Rebalancing, MockCruiseControl.REBALANCE_NO_GOALS_RESPONSE_UTID, null, REMOVE_BROKER_KAFKA_REBALANCE_SPEC, null, false); + cruiseControlServer.setupCCUserTasksToReturnError(500, "Error processing POST request '/remove_broker' due to: 'There are already 5 active user tasks, which has reached the servlet capacity'."); + + checkTransition(vertx, context, + KafkaRebalanceState.Rebalancing, KafkaRebalanceState.Rebalancing, + kcRebalance) + .onComplete(result -> checkOptimizationResults(result, context, true)); + } + + @Test + public void testGetUserTasksForProposalReturnServletFullError(Vertx vertx, VertxTestContext context) throws IOException, URISyntaxException { + KafkaRebalance kcRebalance = createKafkaRebalance(KafkaRebalanceState.PendingProposal, MockCruiseControl.REBALANCE_NO_GOALS_RESPONSE_UTID, null, REMOVE_BROKER_KAFKA_REBALANCE_SPEC, null, false); + cruiseControlServer.setupCCUserTasksToReturnError(500, "Error processing POST request '/remove_broker' due to: 'There are already 5 active user tasks, which has reached the servlet capacity'."); + + checkTransition(vertx, context, + KafkaRebalanceState.PendingProposal, KafkaRebalanceState.PendingProposal, + kcRebalance) + .onComplete(result -> checkOptimizationResults(result, context, true)); + } + /** * Tests the transition from 'Stopped' to 'PendingProposal' when refresh * diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/cruisecontrol/CruiseControlClientTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/cruisecontrol/CruiseControlClientTest.java index 9c58baa4e14..e5f2a87a591 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/cruisecontrol/CruiseControlClientTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/cruisecontrol/CruiseControlClientTest.java @@ -446,7 +446,7 @@ public void testMockCCServerPendingCallsOverride(Vertx vertx, VertxTestContext c cruiseControlServer.setupCCUserTasksResponseNoGoals(0, pendingCalls1); - Future statusFuture = client.getUserTaskStatus(Reconciliation.DUMMY_RECONCILIATION, HOST, cruiseControlPort, userTaskID); + Future statusFuture = client.getUserTaskStatus(Reconciliation.DUMMY_RECONCILIATION, HOST, cruiseControlPort, userTaskID); for (int i = 1; i <= pendingCalls1; i++) { statusFuture = statusFuture.compose(response -> { @@ -504,7 +504,7 @@ private void runTest(Vertx vertx, VertxTestContext context, String userTaskID, i CruiseControlApi client = cruiseControlClientProvider(vertx); - Future statusFuture = client.getUserTaskStatus(Reconciliation.DUMMY_RECONCILIATION, HOST, cruiseControlPort, userTaskID); + Future statusFuture = client.getUserTaskStatus(Reconciliation.DUMMY_RECONCILIATION, HOST, cruiseControlPort, userTaskID); // One interaction is always expected at the end of the test, hence the +1 Checkpoint expectedInteractions = context.checkpoint(pendingCalls + 1); diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/cruisecontrol/MockCruiseControl.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/cruisecontrol/MockCruiseControl.java index f2ac6c9f0d5..780b951348e 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/cruisecontrol/MockCruiseControl.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/cruisecontrol/MockCruiseControl.java @@ -516,6 +516,25 @@ public void setupCCUserTasksCompletedWithError() throws IOException, URISyntaxEx .withDelay(TimeUnit.SECONDS, 0)); } + public void setupCCUserTasksToReturnError(int statusCode, String errorMessage) throws IOException, URISyntaxException { + // This simulates asking for the status of a task that has Complete with error and fetch_completed_task=true + JsonBody errorJson = new JsonBody("{\"errorMessage\":\"" + errorMessage + "\"}"); + server + .when( + request() + .withMethod("GET") + .withQueryStringParameter(Parameter.param(CruiseControlParameters.JSON.toString(), "true")) + .withQueryStringParameter(Parameter.param(CruiseControlParameters.FETCH_COMPLETE.toString(), "true")) + .withPath(CruiseControlEndpoints.USER_TASKS.toString()) + .withHeader(AUTH_HEADER) + .withSecure(true)) + .respond( + response() + .withBody(errorJson) + .withStatusCode(statusCode) + .withDelay(TimeUnit.SECONDS, 0)); + } + /** * Setup response when user task is not found */