Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stop all informers is calling start watcher #3029

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* Fix #3011: properly handle enum types for additional printer columns
* Fix #3020: annotations should now properly have their associated values when processing CRDs from the API
* Fix #3027: fix NPE when sorting events in KubernetesResourceUtil
* Fix #3024: stopAllRegisteredInformers will not call startWatcher but stopWatcher onClose instead

#### Improvements
* Fix #2788: Support FIPS mode in kubernetes-client with BouncyCastleFipsProvider
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public Reflector(Class<T> apiTypeClass, ListerWatcher<T, L> listerWatcher, Store
this.resyncPeriodMillis = resyncPeriodMillis;
this.lastSyncResourceVersion = new AtomicReference<>();
this.resyncExecutor = resyncExecutor;
this.watcher = new ReflectorWatcher<>(store, lastSyncResourceVersion, this::startWatcher, this::reListAndSync);
this.watcher = new ReflectorWatcher<>(store, lastSyncResourceVersion, this::stop, this::reListAndSync);
this.isActive = new AtomicBoolean(true);
this.isWatcherStarted = new AtomicBoolean(false);
this.watch = new AtomicReference<>(null);
Expand All @@ -78,8 +78,11 @@ public void listAndWatch() {
}

public void stop() {
String ns = operationContext.getNamespace();
log.debug("Stopping watcher for resource {} v{} in namespace {}", apiTypeClass, lastSyncResourceVersion.get(), ns);
isActive.set(false);
if (watch.get() != null) {
log.debug("Stopping watcher");
watch.get().close();
watch.set(null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ public static boolean waitUntilReady(BlockingQueue<Object> queue, long amount, T
* @return True if shutdown is complete.
*/
public static boolean shutdownExecutorService(ExecutorService executorService) {
LOGGER.debug("shuting down executor: {}", executorService);
if (executorService == null) {
return false;
}
Expand All @@ -173,6 +174,7 @@ public static boolean shutdownExecutorService(ExecutorService executorService) {

try {
//Wait for clean termination
LOGGER.debug("waiting for clean termination 5 sec");
if (executorService.awaitTermination(5, TimeUnit.SECONDS)) {
return true;
}
Expand All @@ -181,20 +183,22 @@ public static boolean shutdownExecutorService(ExecutorService executorService) {
if (!executorService.isTerminated()) {
executorService.shutdownNow();
}

LOGGER.debug("waiting for clean termination 5 sec");
if (executorService.awaitTermination(5, TimeUnit.SECONDS)) {
return true;
}

if (LOGGER.isDebugEnabled()) {
List<Runnable> tasks = executorService.shutdownNow();
if (!tasks.isEmpty()) {
LOGGER.debug("ExecutorService was not cleanly shutdown, after waiting for 10 seconds. Number of remaining tasks: {}", tasks.size());
LOGGER.info("ExecutorService was not cleanly shutdown, after waiting for 10 seconds. Number of remaining tasks: {}", tasks.size());
LOGGER.debug("Remaining tasks: {}", tasks);
}
}
} catch (InterruptedException e) {
executorService.shutdownNow();
//Preserve interrupted status
LOGGER.debug("ServiceExectuor thread forcibly interrupted");
Thread.currentThread().interrupt();
}
return false;
Expand Down