Skip to content

Commit

Permalink
[ML] Stop timing stats failure propagation (#49495)
Browse files Browse the repository at this point in the history
  • Loading branch information
benwtrent authored Nov 22, 2019
1 parent e9941c2 commit 197d5e7
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,13 +117,7 @@ public Integer getMaxEmptySearches() {
}

public void finishReportingTimingStats() {
try {
timingStatsReporter.finishReporting();
} catch (Exception e) {
// We don't want the exception to propagate out of this method as it can leave the datafeed in the "stopping" state forever.
// Since persisting datafeed timing stats is not critical, we just log a warning here.
LOGGER.warn("[{}] Datafeed timing stats could not be reported due to: {}", jobId, e);
}
timingStatsReporter.finishReporting();
}

Long runLookBack(long startTime, Long endTime) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
*/
package org.elasticsearch.xpack.ml.datafeed;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats;
Expand All @@ -20,6 +23,7 @@
*/
public class DatafeedTimingStatsReporter {

private static final Logger LOGGER = LogManager.getLogger(DatafeedTimingStatsReporter.class);
/** Interface used for persisting current timing stats to the results index. */
@FunctionalInterface
public interface DatafeedTimingStatsPersister {
Expand Down Expand Up @@ -96,7 +100,14 @@ private void flushIfDifferSignificantly() {
private void flush(WriteRequest.RefreshPolicy refreshPolicy) {
persistedTimingStats = new DatafeedTimingStats(currentTimingStats);
if (allowedPersisting) {
persister.persistDatafeedTimingStats(persistedTimingStats, refreshPolicy);
try {
persister.persistDatafeedTimingStats(persistedTimingStats, refreshPolicy);
} catch (Exception ex) {
// Since persisting datafeed timing stats is not critical, we just log a warning here.
LOGGER.warn(
() -> new ParameterizedMessage("[{}] failed to report datafeed timing stats", currentTimingStats.getJobId()),
ex);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import org.elasticsearch.client.Client;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
Expand Down Expand Up @@ -61,7 +60,6 @@
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.atMost;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
Expand Down Expand Up @@ -456,15 +454,6 @@ public void testFlushAnalysisProblemIsConflict() {
assertThat(analysisProblemException.shouldStop, is(true));
}

public void testFinishReportingTimingStats() {
doThrow(new EsRejectedExecutionException()).when(timingStatsReporter).finishReporting();

long frequencyMs = 100;
long queryDelayMs = 1000;
DatafeedJob datafeedJob = createDatafeedJob(frequencyMs, queryDelayMs, 1000, -1, randomBoolean());
datafeedJob.finishReportingTimingStats();
}

private DatafeedJob createDatafeedJob(long frequencyMs, long queryDelayMs, long latestFinalBucketEndTimeMs,
long latestRecordTimeMs, boolean haveSeenDataPreviously) {
Supplier<Long> currentTimeSupplier = () -> currentTime;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.ml.datafeed;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESTestCase;
Expand All @@ -18,8 +19,10 @@
import java.sql.Date;
import java.time.Instant;

import static org.elasticsearch.mock.orig.Mockito.doThrow;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -176,6 +179,18 @@ public void testTimingStatsDifferSignificantly() {
is(true));
}

public void testFinishReportingTimingStatsException() {
doThrow(new ElasticsearchException("BOOM")).when(timingStatsPersister).persistDatafeedTimingStats(any(), any());
DatafeedTimingStatsReporter reporter = createReporter(new DatafeedTimingStats(JOB_ID));

try {
reporter.reportDataCounts(createDataCounts(0, TIMESTAMP));
reporter.finishReporting();
} catch (ElasticsearchException ex) {
fail("Should not have failed with: " + ex.getDetailedMessage());
}
}

private DatafeedTimingStatsReporter createReporter(DatafeedTimingStats timingStats) {
return new DatafeedTimingStatsReporter(timingStats, timingStatsPersister);
}
Expand Down

0 comments on commit 197d5e7

Please sign in to comment.