Skip to content

Commit

Permalink
[ML] Wait for job updates in AutoDetectResultProcessor.awaitCompletion (
Browse files Browse the repository at this point in the history
#36856)

There was a race where the job update in `AutoDetectResultProcessor.updateEstablishedModelMemoryOnJob`
could execute after `AutoDetectResultProcessor.awaitCompletion`
returned. This was because ` jobUpdateSemaphore` was acquired
after the call to `jobResultsProvider.getEstablishedMemoryUsage`
and during that call `awaitCompletion` is free to acquire and
release the semaphore after which the method returns.

This commit fixes the problem.

Closes #36849
  • Loading branch information
davidkyle authored and droberts195 committed Dec 20, 2018
1 parent 92a11c0 commit 5a9ea81
Showing 1 changed file with 41 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -432,41 +432,51 @@ private void updateEstablishedModelMemoryOnJob() {
// We need to make all results written up to and including these stats available for the established memory calculation
persister.commitResultWrites(jobId);

try {
jobUpdateSemaphore.acquire();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.info("[{}] Interrupted acquiring update established model memory semaphore", jobId);
return;
}

jobResultsProvider.getEstablishedMemoryUsage(jobId, latestBucketTimestamp, modelSizeStatsForCalc, establishedModelMemory -> {
if (latestEstablishedModelMemory != establishedModelMemory) {

client.threadPool().executor(MachineLearning.UTILITY_THREAD_POOL_NAME).submit(() -> {
try {
jobUpdateSemaphore.acquire();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.info("[{}] Interrupted acquiring update established model memory semaphore", jobId);
return;
}

JobUpdate update = new JobUpdate.Builder(jobId).setEstablishedModelMemory(establishedModelMemory).build();
UpdateJobAction.Request updateRequest = UpdateJobAction.Request.internal(jobId, update);
updateRequest.setWaitForAck(false);

executeAsyncWithOrigin(client, ML_ORIGIN, UpdateJobAction.INSTANCE, updateRequest,
new ActionListener<PutJobAction.Response>() {
@Override
public void onResponse(PutJobAction.Response response) {
jobUpdateSemaphore.release();
latestEstablishedModelMemory = establishedModelMemory;
LOGGER.debug("[{}] Updated job with established model memory [{}]", jobId, establishedModelMemory);
}

@Override
public void onFailure(Exception e) {
jobUpdateSemaphore.release();
LOGGER.error("[" + jobId + "] Failed to update job with new established model memory [" +
establishedModelMemory + "]", e);
}
});
});
try {
client.threadPool().executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(() -> {
JobUpdate update = new JobUpdate.Builder(jobId).setEstablishedModelMemory(establishedModelMemory).build();
UpdateJobAction.Request updateRequest = UpdateJobAction.Request.internal(jobId, update);
updateRequest.setWaitForAck(false);

executeAsyncWithOrigin(client, ML_ORIGIN, UpdateJobAction.INSTANCE, updateRequest,
new ActionListener<PutJobAction.Response>() {
@Override
public void onResponse(PutJobAction.Response response) {
jobUpdateSemaphore.release();
latestEstablishedModelMemory = establishedModelMemory;
LOGGER.debug("[{}] Updated job with established model memory [{}]", jobId, establishedModelMemory);
}

@Override
public void onFailure(Exception e) {
jobUpdateSemaphore.release();
LOGGER.error("[" + jobId + "] Failed to update job with new established model memory [" +
establishedModelMemory + "]", e);
}
});
});
} catch (Exception e) {
jobUpdateSemaphore.release();
LOGGER.error("[" + jobId + "] error submitting established model memory update action", e);
}
} else {
jobUpdateSemaphore.release();
}
}, e -> LOGGER.error("[" + jobId + "] Failed to calculate established model memory", e));
}, e -> {
jobUpdateSemaphore.release();
LOGGER.error("[" + jobId + "] Failed to calculate established model memory", e);
});
}

public void awaitCompletion() throws TimeoutException {
Expand Down

0 comments on commit 5a9ea81

Please sign in to comment.