Skip to content

Commit

Permalink
Change the onClose method of ReflectorWatcher instance to only close …
Browse files Browse the repository at this point in the history
…watcher and not re-open it
  • Loading branch information
akram committed Apr 21, 2021
1 parent 7a95e59 commit d62b193
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### 5.4-SNAPSHOT

#### Bugs
* Fix #3024: stopAllRegisteredInformers will not call startWatcher but stopWatcher onClose instead
* Fix #2989: serialization will generate valid yaml when using subtypes
* Fix #2996: Generating CRDs from the API should now properly work
* Fix #3000: Set no_proxy in the okhttp builder in case the proxy_url is null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public Reflector(Class<T> apiTypeClass, ListerWatcher<T, L> listerWatcher, Store
this.resyncPeriodMillis = resyncPeriodMillis;
this.lastSyncResourceVersion = new AtomicReference<>();
this.resyncExecutor = resyncExecutor;
this.watcher = new ReflectorWatcher<>(store, lastSyncResourceVersion, this::startWatcher, this::reListAndSync);
this.watcher = new ReflectorWatcher<>(store, lastSyncResourceVersion, this::stopWatcher, this::reListAndSync);
this.isActive = new AtomicBoolean(true);
this.isWatcherStarted = new AtomicBoolean(false);
this.watch = new AtomicReference<>(null);
Expand Down Expand Up @@ -111,6 +111,15 @@ private void reListAndSync() {
}
}

private void stopWatcher() {
String ns = operationContext.getNamespace();
log.debug("Stopping watcher for resource {} v{} in namespace {}", apiTypeClass, lastSyncResourceVersion.get(), ns);
if (watch.get() != null) {
log.debug("Stopping watcher");
watch.get().close();
}
}

private void startWatcher() {
log.debug("Starting watcher for resource {} v{}", apiTypeClass, lastSyncResourceVersion.get());
if (watch.get() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ public static boolean waitUntilReady(BlockingQueue<Object> queue, long amount, T
* @return True if shutdown is complete.
*/
public static boolean shutdownExecutorService(ExecutorService executorService) {
LOGGER.debug("shuting down executor: {}", executorService);
if (executorService == null) {
return false;
}
Expand All @@ -173,6 +174,7 @@ public static boolean shutdownExecutorService(ExecutorService executorService) {

try {
//Wait for clean termination
LOGGER.debug("waiting for clean termination 5 sec");
if (executorService.awaitTermination(5, TimeUnit.SECONDS)) {
return true;
}
Expand All @@ -181,20 +183,22 @@ public static boolean shutdownExecutorService(ExecutorService executorService) {
if (!executorService.isTerminated()) {
executorService.shutdownNow();
}

LOGGER.debug("waiting for clean termination 5 sec");
if (executorService.awaitTermination(5, TimeUnit.SECONDS)) {
return true;
}

if (LOGGER.isDebugEnabled()) {
List<Runnable> tasks = executorService.shutdownNow();
if (!tasks.isEmpty()) {
LOGGER.debug("ExecutorService was not cleanly shutdown, after waiting for 10 seconds. Number of remaining tasks: {}", tasks.size());
LOGGER.info("ExecutorService was not cleanly shutdown, after waiting for 10 seconds. Number of remaining tasks: {}", tasks.size());
LOGGER.debug("Remaining tasks: {}", tasks);
}
}
} catch (InterruptedException e) {
executorService.shutdownNow();
//Preserve interrupted status
LOGGER.debug("ServiceExectuor thread forcibly interrupted");
Thread.currentThread().interrupt();
}
return false;
Expand Down

0 comments on commit d62b193

Please sign in to comment.