Skip to content

Commit

Permalink
Fix fabric8io#2812: SharedIndexInformer EventHandler sees double upda…
Browse files Browse the repository at this point in the history
…tes at resync interval

+ Remove scheduleAtFixedDelay execution of Reflector#resyncAndList(),
  resync should happen in memory without contacting the apiserver just
  like client go implementation
+ Added logic for watch.close() when Reflector#stop() is called
  • Loading branch information
rohanKanojia committed Apr 5, 2021
1 parent 700acec commit a541e36
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 78 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
* Fix #2620: Add support for `config.openshift.io/v1` Ingress

- Fix: CRD generator no longer treat enum values as properties (performance)
* Fix #2812: SharedIndexInformer EventHandler sees double updates at resync interval

#### Improvements
- RawCustomResourceOperationsImpl should also work with standard resources
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

Expand Down Expand Up @@ -82,7 +81,6 @@ public void listAndWatch() throws Exception {
try {
log.info("Started ReflectorRunnable watch for {}", apiTypeClass);
reListAndSync();
scheduleResyncExecution();
startWatcher();
} catch (Exception exception) {
store.isPopulated(false);
Expand All @@ -92,6 +90,10 @@ public void listAndWatch() throws Exception {

public void stop() {
isActive.set(false);
if (watch.get() != null) {
watch.get().close();
watch.set(null);
}
}

public long getResyncPeriodMillis() {
Expand Down Expand Up @@ -138,10 +140,4 @@ private void startWatcher() {
public String getLastSyncResourceVersion() {
return lastSyncResourceVersion.get();
}

void scheduleResyncExecution() {
if (resyncPeriodMillis > 0) {
resyncExecutor.scheduleWithFixedDelay(this::reListAndSync, 0L, resyncPeriodMillis, TimeUnit.MILLISECONDS);
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/**
* Copyright (C) 2015 Red Hat, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.fabric8.kubernetes;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
import io.fabric8.kubernetes.client.informers.SharedInformerFactory;
import org.arquillian.cube.kubernetes.api.Session;
import org.arquillian.cube.kubernetes.impl.requirement.RequiresKubernetes;
import org.arquillian.cube.requirement.ArquillianConditionalRunner;
import org.jboss.arquillian.test.api.ArquillianResource;
import org.junit.Test;
import org.junit.runner.RunWith;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;

@RunWith(ArquillianConditionalRunner.class)
@RequiresKubernetes
public class DefaultSharedIndexInformerIT {
@ArquillianResource
KubernetesClient client;

private static final long RESYNC_PERIOD = 1000L;

@Test
public void testPodSharedIndexInformerGetsEvents() throws InterruptedException {
// Given
CountDownLatch addEvents = new CountDownLatch(1);
CountDownLatch updateEvents = new CountDownLatch(0);
SharedInformerFactory informerFactory = client.informers();
SharedIndexInformer<Pod> podInformer = informerFactory.sharedIndexInformerFor(Pod.class, RESYNC_PERIOD);
podInformer.addEventHandler(new TestResourceEventHandler<>(addEvents, updateEvents));

// When
informerFactory.startAllRegisteredInformers();
addEvents.await(2 * RESYNC_PERIOD, TimeUnit.MILLISECONDS);

// Then
assertThat(addEvents.getCount()).isZero();
informerFactory.stopAllRegisteredInformers();
}

@Test
public void testPodSharedIndexInformerGetsSingleUpdates() throws InterruptedException {
// Given
String namespace = "kube-system";
PodList podList = client.pods().inNamespace(namespace).list();
int expectedAddEvents = podList.getItems().size();
CountDownLatch addEvents = new CountDownLatch(expectedAddEvents);
// Let's assume informer resyncs at least two times so we would get update
// events for all pods two times
CountDownLatch updateEvents = new CountDownLatch(2 * expectedAddEvents);
SharedInformerFactory informerFactory = client.informers();
SharedIndexInformer<Pod> podInformer = informerFactory.inNamespace(namespace).sharedIndexInformerFor(Pod.class, RESYNC_PERIOD);
TestResourceEventHandler<Pod> eventHandler = new TestResourceEventHandler<>(addEvents, updateEvents);
podInformer.addEventHandler(eventHandler);

// When
informerFactory.startAllRegisteredInformers();
updateEvents.await(3 * RESYNC_PERIOD, TimeUnit.MILLISECONDS);

// Then
assertThat(addEvents.getCount()).isZero();
assertThat(updateEvents.getCount()).isZero();
assertThat(eventHandler.getUpdateEventsCount()).isLessThan(4 * expectedAddEvents);
informerFactory.stopAllRegisteredInformers();
}

private static class TestResourceEventHandler<T extends HasMetadata> implements ResourceEventHandler<T> {
private final CountDownLatch addEventRecievedLatch;
private final CountDownLatch updateEventRecievedLatch;
private final AtomicInteger updateCount;

public TestResourceEventHandler(CountDownLatch addLatch, CountDownLatch updateLatch) {
this.updateEventRecievedLatch = updateLatch;
this.addEventRecievedLatch = addLatch;
this.updateCount = new AtomicInteger(0);
}

@Override
public void onAdd(T resource) {
addEventRecievedLatch.countDown();
}

@Override
public void onUpdate(T oldResource, T newResource) {
updateEventRecievedLatch.countDown();
updateCount.incrementAndGet();
}

@Override
public void onDelete(T obj, boolean deletedFinalStateUnknown) { }

public int getUpdateEventsCount() {
return updateCount.get();
}
}
}

0 comments on commit a541e36

Please sign in to comment.