Skip to content

Commit

Permalink
fix fabric8io#3609 addressing concurrency with long running Watcher m…
Browse files Browse the repository at this point in the history
…ethods
  • Loading branch information
shawkins authored and manusa committed Nov 29, 2021
1 parent 22c3e9e commit 9ada97f
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
* Fix #3555: using the new builder for generic resources
* Fix #3535: ensure clientKeyAlgo is set properly when loading config YAML from `fromKubeconfig`
* Fix #3598: applying cancel to the correct future for waitUntilCondition and waitUntilReady
* Fix #3609: adding locking to prevent long running Watcher methods from causing reconnects with concurrent processing

#### Improvements

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@
class WatcherWebSocketListener<T extends HasMetadata> extends WebSocketListener {
protected static final Logger logger = LoggerFactory.getLogger(WatcherWebSocketListener.class);

// don't allow for concurrent failure and message processing
// if something is holding the message thread, this can lead to concurrent processing on the watcher
// or worse additional reconnection attempts while the previous threads are still held
private final Object reconnectLock = new Object();

private final CompletableFuture<Void> startedFuture = new CompletableFuture<>();
protected final AbstractWatchManager<T> manager;

Expand Down Expand Up @@ -85,12 +90,16 @@ public void onFailure(WebSocket webSocket, Throwable t, Response response) {
return;
}

manager.scheduleReconnect();
synchronized (reconnectLock) {
manager.scheduleReconnect();
}
}

@Override
public void onMessage(WebSocket webSocket, String text) {
manager.onMessage(text);
synchronized (reconnectLock) {
manager.onMessage(text);
}
}

private void pushException(KubernetesClientException exception) {
Expand Down
77 changes: 77 additions & 0 deletions kubernetes-itests/src/test/java/io/fabric8/kubernetes/WatchIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@

package io.fabric8.kubernetes;

import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodBuilder;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.Watch;
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.WatcherException;
import io.fabric8.kubernetes.client.dsl.Resource;
import io.fabric8.kubernetes.client.dsl.base.PatchContext;
import io.fabric8.kubernetes.client.dsl.base.PatchType;
import org.arquillian.cube.kubernetes.api.Session;
Expand All @@ -35,7 +38,9 @@

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

@RunWith(ArquillianConditionalRunner.class)
Expand Down Expand Up @@ -97,6 +102,78 @@ public void onClose() {
.build());

assertTrue(eventLatch.await(10, TimeUnit.SECONDS));
assertTrue(modifyLatch.await(10, TimeUnit.SECONDS));
watch.close();
assertTrue(closeLatch.await(30, TimeUnit.SECONDS));
}

@Test
public void testWatchFailureHandling() throws InterruptedException {
String currentNamespace = session.getNamespace();
String name = "sample-configmap-watch";

Resource<ConfigMap> configMapClient = client.configMaps().inNamespace(currentNamespace).withName(name);

configMapClient.create(new ConfigMapBuilder().withNewMetadata().withName(name).endMetadata().build());

final CountDownLatch eventLatch = new CountDownLatch(1);
final CountDownLatch modifyLatch = new CountDownLatch(1);
final AtomicBoolean inMethod = new AtomicBoolean();
final CountDownLatch closeLatch = new CountDownLatch(1);
final AtomicBoolean concurrent = new AtomicBoolean();
Watch watch = configMapClient.watch(new Watcher<ConfigMap>() {
@Override
public void eventReceived(Action action, ConfigMap pod) {
eventLatch.countDown();

if (action.equals(Action.MODIFIED)) {
if (inMethod.getAndSet(true)) {
concurrent.set(true);
modifyLatch.countDown();
}
try {
// introduce a delay to cause the ping to terminate the connection
// if this doesn't work reliably, then an alternative would be to
// restart the apiserver
Thread.sleep(10000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new AssertionError(e);
}
modifyLatch.countDown();
inMethod.set(false);
}
}

@Override
public void onClose(WatcherException cause) {

}

@Override
public void onClose() {
closeLatch.countDown();
logger.info("watch closed...");
}
});

configMapClient
.patch(PatchContext.of(PatchType.STRATEGIC_MERGE), new ConfigMapBuilder()
.withNewMetadata()
.addToLabels("foo", "bar")
.endMetadata()
.build());

configMapClient
.patch(PatchContext.of(PatchType.STRATEGIC_MERGE), new ConfigMapBuilder()
.withNewMetadata()
.addToLabels("foo", "bar1")
.endMetadata()
.build());

assertTrue(eventLatch.await(10, TimeUnit.SECONDS));
assertTrue(modifyLatch.await(15, TimeUnit.SECONDS));
assertFalse(concurrent.get());
watch.close();
assertTrue(closeLatch.await(30, TimeUnit.SECONDS));
}
Expand Down

0 comments on commit 9ada97f

Please sign in to comment.