Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure no Watches are running after Watcher is stopped. #43888

Closed
wants to merge 7 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import static org.elasticsearch.xpack.core.ClientHelper.WATCHER_ORIGIN;

Expand Down Expand Up @@ -106,7 +105,7 @@ public class ExecutionService {
private final WatchExecutor executor;
private final ExecutorService genericExecutor;

private AtomicReference<CurrentExecutions> currentExecutions = new AtomicReference<>();
private CurrentExecutions currentExecutions;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can CurrentExecutions remain inside an AtomicReference?
It is read else where without acquiring a lock and as far as I understand it is about making sure that clearExecutions() happens in a sync manner, which is possible with keeping the AtomicReference?

(also the currentExecutions field can then be made final)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think this either needs to be volatile or go back to being an AtomicReference.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks nice catch. I also had messed up the order of "sealing" the concurrent executions. The change here is now a single line that removes the fork.

private final AtomicBoolean paused = new AtomicBoolean(false);

public ExecutionService(Settings settings, HistoryStore historyStore, TriggeredWatchStore triggeredWatchStore, WatchExecutor executor,
Expand All @@ -123,7 +122,7 @@ public ExecutionService(Settings settings, HistoryStore historyStore, TriggeredW
this.client = client;
this.genericExecutor = genericExecutor;
this.indexDefaultTimeout = settings.getAsTime("xpack.watcher.internal.ops.index.default_timeout", TimeValue.timeValueSeconds(30));
this.currentExecutions.set(new CurrentExecutions());
this.currentExecutions = new CurrentExecutions();
}

public void unPause() {
Expand Down Expand Up @@ -169,12 +168,12 @@ public long executionThreadPoolMaxSize() {

// for testing only
CurrentExecutions getCurrentExecutions() {
return currentExecutions.get();
return currentExecutions;
}

public List<WatchExecutionSnapshot> currentExecutions() {
List<WatchExecutionSnapshot> currentExecutions = new ArrayList<>();
for (WatchExecution watchExecution : this.currentExecutions.get()) {
for (WatchExecution watchExecution : this.currentExecutions) {
currentExecutions.add(watchExecution.createSnapshot());
}
// Lets show the longest running watch first:
Expand Down Expand Up @@ -279,7 +278,7 @@ public WatchRecord execute(WatchExecutionContext ctx) {
WatchRecord record = null;
final String watchId = ctx.id().watchId();
try {
boolean executionAlreadyExists = currentExecutions.get().put(watchId, new WatchExecution(ctx, Thread.currentThread()));
boolean executionAlreadyExists = currentExecutions.put(watchId, new WatchExecution(ctx, Thread.currentThread()));
if (executionAlreadyExists) {
logger.trace("not executing watch [{}] because it is already queued", watchId);
record = ctx.abortBeforeExecution(ExecutionState.NOT_EXECUTED_ALREADY_QUEUED, "Watch is already queued in thread pool");
Expand Down Expand Up @@ -334,7 +333,7 @@ record = createWatchRecord(record, ctx, e);

triggeredWatchStore.delete(ctx.id());
}
currentExecutions.get().remove(watchId);
currentExecutions.remove(watchId);
logger.debug("finished [{}]/[{}]", watchId, ctx.id());
}
return record;
Expand Down Expand Up @@ -578,10 +577,9 @@ public Counters executionTimes() {
* This clears out the current executions and sets new empty current executions
* This is needed, because when this method is called, watcher keeps running, so sealing executions would be a bad idea
*/
private void clearExecutions() {
final CurrentExecutions currentExecutionsBeforeSetting = currentExecutions.getAndSet(new CurrentExecutions());
// clear old executions in background, no need to wait
genericExecutor.execute(() -> currentExecutionsBeforeSetting.sealAndAwaitEmpty(maxStopTimeout));
private synchronized void clearExecutions() {
currentExecutions.sealAndAwaitEmpty(maxStopTimeout);
currentExecutions = new CurrentExecutions();
}

// the watch execution task takes another runnable as parameter
Expand Down