diff --git a/CHANGELOG.md b/CHANGELOG.md index e81000ea501..33d4758c8b6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ * Fix #3555: using the new builder for generic resources * Fix #3535: ensure clientKeyAlgo is set properly when loading config YAML from `fromKubeconfig` * Fix #3598: applying cancel to the correct future for waitUntilCondition and waitUntilReady +* Fix #3609: adding locking to prevent long running Watcher methods from causing reconnects with concurrent processing #### Improvements diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatcherWebSocketListener.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatcherWebSocketListener.java index 857a47d3166..4c4897f4f98 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatcherWebSocketListener.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatcherWebSocketListener.java @@ -37,6 +37,11 @@ class WatcherWebSocketListener extends WebSocketListener { protected static final Logger logger = LoggerFactory.getLogger(WatcherWebSocketListener.class); + // don't allow for concurrent failure and message processing + // if something is holding the message thread, this can lead to concurrent processing on the watcher + // or worse additional reconnection attempts while the previous threads are still held + private final Object reconnectLock = new Object(); + private final CompletableFuture startedFuture = new CompletableFuture<>(); protected final AbstractWatchManager manager; @@ -85,12 +90,16 @@ public void onFailure(WebSocket webSocket, Throwable t, Response response) { return; } - manager.scheduleReconnect(); + synchronized (reconnectLock) { + manager.scheduleReconnect(); + } } @Override public void onMessage(WebSocket webSocket, String text) { - manager.onMessage(text); + synchronized (reconnectLock) { + manager.onMessage(text); + } } private void pushException(KubernetesClientException exception) { diff --git a/kubernetes-itests/src/test/java/io/fabric8/kubernetes/WatchIT.java b/kubernetes-itests/src/test/java/io/fabric8/kubernetes/WatchIT.java index e251848d90b..f2f14b9c4d2 100644 --- a/kubernetes-itests/src/test/java/io/fabric8/kubernetes/WatchIT.java +++ b/kubernetes-itests/src/test/java/io/fabric8/kubernetes/WatchIT.java @@ -16,12 +16,15 @@ package io.fabric8.kubernetes; +import io.fabric8.kubernetes.api.model.ConfigMap; +import io.fabric8.kubernetes.api.model.ConfigMapBuilder; import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.api.model.PodBuilder; import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.Watch; import io.fabric8.kubernetes.client.Watcher; import io.fabric8.kubernetes.client.WatcherException; +import io.fabric8.kubernetes.client.dsl.Resource; import io.fabric8.kubernetes.client.dsl.base.PatchContext; import io.fabric8.kubernetes.client.dsl.base.PatchType; import org.arquillian.cube.kubernetes.api.Session; @@ -35,7 +38,9 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @RunWith(ArquillianConditionalRunner.class) @@ -97,6 +102,78 @@ public void onClose() { .build()); assertTrue(eventLatch.await(10, TimeUnit.SECONDS)); + assertTrue(modifyLatch.await(10, TimeUnit.SECONDS)); + watch.close(); + assertTrue(closeLatch.await(30, TimeUnit.SECONDS)); + } + + @Test + public void testWatchFailureHandling() throws InterruptedException { + String currentNamespace = session.getNamespace(); + String name = "sample-configmap-watch"; + + Resource configMapClient = client.configMaps().inNamespace(currentNamespace).withName(name); + + configMapClient.create(new ConfigMapBuilder().withNewMetadata().withName(name).endMetadata().build()); + + final CountDownLatch eventLatch = new CountDownLatch(1); + final CountDownLatch modifyLatch = new CountDownLatch(1); + final AtomicBoolean inMethod = new AtomicBoolean(); + final CountDownLatch closeLatch = new CountDownLatch(1); + final AtomicBoolean concurrent = new AtomicBoolean(); + Watch watch = configMapClient.watch(new Watcher() { + @Override + public void eventReceived(Action action, ConfigMap pod) { + eventLatch.countDown(); + + if (action.equals(Action.MODIFIED)) { + if (inMethod.getAndSet(true)) { + concurrent.set(true); + modifyLatch.countDown(); + } + try { + // introduce a delay to cause the ping to terminate the connection + // if this doesn't work reliably, then an alternative would be to + // restart the apiserver + Thread.sleep(10000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new AssertionError(e); + } + modifyLatch.countDown(); + inMethod.set(false); + } + } + + @Override + public void onClose(WatcherException cause) { + + } + + @Override + public void onClose() { + closeLatch.countDown(); + logger.info("watch closed..."); + } + }); + + configMapClient + .patch(PatchContext.of(PatchType.STRATEGIC_MERGE), new ConfigMapBuilder() + .withNewMetadata() + .addToLabels("foo", "bar") + .endMetadata() + .build()); + + configMapClient + .patch(PatchContext.of(PatchType.STRATEGIC_MERGE), new ConfigMapBuilder() + .withNewMetadata() + .addToLabels("foo", "bar1") + .endMetadata() + .build()); + + assertTrue(eventLatch.await(10, TimeUnit.SECONDS)); + assertTrue(modifyLatch.await(15, TimeUnit.SECONDS)); + assertFalse(concurrent.get()); watch.close(); assertTrue(closeLatch.await(30, TimeUnit.SECONDS)); }