Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't halt policy execution on policy trigger exception #49128

Merged
merged 1 commit into from
Nov 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
Expand Down Expand Up @@ -119,19 +120,35 @@ public void onMaster() {
final LifecycleExecutionState lifecycleState = LifecycleExecutionState.fromIndexMetadata(idxMeta);
StepKey stepKey = IndexLifecycleRunner.getCurrentStepKey(lifecycleState);

if (OperationMode.STOPPING == currentMode) {
if (stepKey != null && IGNORE_STEPS_MAINTENANCE_REQUESTED.contains(stepKey.getName())) {
logger.info("waiting to stop ILM because index [{}] with policy [{}] is currently in step [{}]",
idxMeta.getIndex().getName(), policyName, stepKey.getName());
try {
if (OperationMode.STOPPING == currentMode) {
if (stepKey != null && IGNORE_STEPS_MAINTENANCE_REQUESTED.contains(stepKey.getName())) {
logger.info("waiting to stop ILM because index [{}] with policy [{}] is currently in step [{}]",
idxMeta.getIndex().getName(), policyName, stepKey.getName());
lifecycleRunner.maybeRunAsyncAction(clusterState, idxMeta, policyName, stepKey);
// ILM is trying to stop, but this index is in a Shrink step (or other dangerous step) so we can't stop
safeToStop = false;
} else {
logger.info("skipping policy execution of step [{}] for index [{}] with policy [{}]" +
" because ILM is stopping",
stepKey == null ? "n/a" : stepKey.getName(), idxMeta.getIndex().getName(), policyName);
}
} else {
lifecycleRunner.maybeRunAsyncAction(clusterState, idxMeta, policyName, stepKey);
// ILM is trying to stop, but this index is in a Shrink step (or other dangerous step) so we can't stop
safeToStop = false;
}
} catch (Exception e) {
if (logger.isTraceEnabled()) {
logger.warn(new ParameterizedMessage("async action execution failed during master election trigger" +
" for index [{}] with policy [{}] in step [{}], lifecycle state: [{}]",
idxMeta.getIndex().getName(), policyName, stepKey, lifecycleState.asMap()), e);
} else {
logger.info("skipping policy execution of step [{}] for index [{}] with policy [{}] because ILM is stopping",
stepKey == null ? "n/a" : stepKey.getName(), idxMeta.getIndex().getName(), policyName);
logger.warn(new ParameterizedMessage("async action execution failed during master election trigger" +
" for index [{}] with policy [{}] in step [{}]",
idxMeta.getIndex().getName(), policyName, stepKey), e);

}
} else {
lifecycleRunner.maybeRunAsyncAction(clusterState, idxMeta, policyName, stepKey);
// Don't rethrow the exception, we don't want a failure for one index to be
// called to cause actions not to be triggered for further indices
}
}
}
Expand Down Expand Up @@ -264,27 +281,42 @@ void triggerPolicies(ClusterState clusterState, boolean fromClusterStateChange)
final LifecycleExecutionState lifecycleState = LifecycleExecutionState.fromIndexMetadata(idxMeta);
StepKey stepKey = IndexLifecycleRunner.getCurrentStepKey(lifecycleState);

if (OperationMode.STOPPING == currentMode) {
if (stepKey != null && IGNORE_STEPS_MAINTENANCE_REQUESTED.contains(stepKey.getName())) {
logger.info("waiting to stop ILM because index [{}] with policy [{}] is currently in step [{}]",
idxMeta.getIndex().getName(), policyName, stepKey.getName());
try {
if (OperationMode.STOPPING == currentMode) {
if (stepKey != null && IGNORE_STEPS_MAINTENANCE_REQUESTED.contains(stepKey.getName())) {
logger.info("waiting to stop ILM because index [{}] with policy [{}] is currently in step [{}]",
idxMeta.getIndex().getName(), policyName, stepKey.getName());
if (fromClusterStateChange) {
lifecycleRunner.runPolicyAfterStateChange(policyName, idxMeta);
} else {
lifecycleRunner.runPeriodicStep(policyName, idxMeta);
}
// ILM is trying to stop, but this index is in a Shrink step (or other dangerous step) so we can't stop
safeToStop = false;
} else {
logger.info("skipping policy execution of step [{}] for index [{}] with policy [{}] because ILM is stopping",
stepKey == null ? "n/a" : stepKey.getName(), idxMeta.getIndex().getName(), policyName);
}
} else {
if (fromClusterStateChange) {
lifecycleRunner.runPolicyAfterStateChange(policyName, idxMeta);
} else {
lifecycleRunner.runPeriodicStep(policyName, idxMeta);
}
// ILM is trying to stop, but this index is in a Shrink step (or other dangerous step) so we can't stop
safeToStop = false;
} else {
logger.info("skipping policy execution of step [{}] for index [{}] with policy [{}] because ILM is stopping",
stepKey == null ? "n/a" : stepKey.getName(), idxMeta.getIndex().getName(), policyName);
}
} else {
if (fromClusterStateChange) {
lifecycleRunner.runPolicyAfterStateChange(policyName, idxMeta);
} catch (Exception e) {
if (logger.isTraceEnabled()) {
logger.warn(new ParameterizedMessage("async action execution failed during policy trigger" +
" for index [{}] with policy [{}] in step [{}], lifecycle state: [{}]",
idxMeta.getIndex().getName(), policyName, stepKey, lifecycleState.asMap()), e);
} else {
lifecycleRunner.runPeriodicStep(policyName, idxMeta);
logger.warn(new ParameterizedMessage("async action execution failed during policy trigger" +
" for index [{}] with policy [{}] in step [{}]",
idxMeta.getIndex().getName(), policyName, stepKey), e);

}
// Don't rethrow the exception, we don't want a failure for one index to be
// called to cause actions not to be triggered for further indices
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1527,7 +1527,7 @@ private void assertClusterStateStepInfo(ClusterState oldClusterState, Index inde
assertEquals(newLifecycleState.getStepTime(), newLifecycleState.getStepTime());
}

private static class MockAsyncActionStep extends AsyncActionStep {
static class MockAsyncActionStep extends AsyncActionStep {

private Exception exception;
private boolean willComplete;
Expand Down Expand Up @@ -1576,7 +1576,7 @@ public void performAction(IndexMetaData indexMetaData, ClusterState currentState

}

private static class MockAsyncWaitStep extends AsyncWaitStep {
static class MockAsyncWaitStep extends AsyncWaitStep {

private Exception exception;
private boolean willComplete;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.node.Node.NODE_MASTER_SETTING;
import static org.elasticsearch.xpack.core.ilm.AbstractStepTestCase.randomStepKey;
Expand Down Expand Up @@ -300,6 +302,111 @@ public void testRequestedStopOnSafeAction() {
assertTrue(moveToMaintenance.get());
}

public void testExceptionStillProcessesOtherIndices() {
doTestExceptionStillProcessesOtherIndices(false);
}

public void testExceptionStillProcessesOtherIndicesOnMaster() {
doTestExceptionStillProcessesOtherIndices(true);
}

@SuppressWarnings("unchecked")
public void doTestExceptionStillProcessesOtherIndices(boolean useOnMaster) {
String policy1 = randomAlphaOfLengthBetween(1, 20);
Step.StepKey i1currentStepKey = randomStepKey();
final Step i1mockStep;
if (useOnMaster) {
i1mockStep = new IndexLifecycleRunnerTests.MockAsyncActionStep(i1currentStepKey, randomStepKey());
} else {
i1mockStep = new IndexLifecycleRunnerTests.MockClusterStateActionStep(i1currentStepKey, randomStepKey());
}
MockAction i1mockAction = new MockAction(Collections.singletonList(i1mockStep));
Phase i1phase = new Phase("phase", TimeValue.ZERO, Collections.singletonMap("action", i1mockAction));
LifecyclePolicy i1policy = newTestLifecyclePolicy(policy1, Collections.singletonMap(i1phase.getName(), i1phase));
Index index1 = new Index(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20));
LifecycleExecutionState.Builder i1lifecycleState = LifecycleExecutionState.builder();
i1lifecycleState.setPhase(i1currentStepKey.getPhase());
i1lifecycleState.setAction(i1currentStepKey.getAction());
i1lifecycleState.setStep(i1currentStepKey.getName());

String policy2 = randomValueOtherThan(policy1, () -> randomAlphaOfLengthBetween(1, 20));
Step.StepKey i2currentStepKey = randomStepKey();
final Step i2mockStep;
if (useOnMaster) {
i2mockStep = new IndexLifecycleRunnerTests.MockAsyncActionStep(i2currentStepKey, randomStepKey());
} else {
i2mockStep = new IndexLifecycleRunnerTests.MockClusterStateActionStep(i2currentStepKey, randomStepKey());
}
MockAction mockAction = new MockAction(Collections.singletonList(i2mockStep));
Phase i2phase = new Phase("phase", TimeValue.ZERO, Collections.singletonMap("action", mockAction));
LifecyclePolicy i2policy = newTestLifecyclePolicy(policy1, Collections.singletonMap(i2phase.getName(), i1phase));
Index index2 = new Index(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20));
LifecycleExecutionState.Builder i2lifecycleState = LifecycleExecutionState.builder();
i2lifecycleState.setPhase(i2currentStepKey.getPhase());
i2lifecycleState.setAction(i2currentStepKey.getAction());
i2lifecycleState.setStep(i2currentStepKey.getName());

CountDownLatch stepLatch = new CountDownLatch(2);
boolean failStep1 = randomBoolean();
if (useOnMaster) {
((IndexLifecycleRunnerTests.MockAsyncActionStep) i1mockStep).setLatch(stepLatch);
((IndexLifecycleRunnerTests.MockAsyncActionStep) i1mockStep)
.setException(failStep1 ? new IllegalArgumentException("forcing a failure for index 1") : null);
((IndexLifecycleRunnerTests.MockAsyncActionStep) i2mockStep).setLatch(stepLatch);
((IndexLifecycleRunnerTests.MockAsyncActionStep) i2mockStep)
.setException(failStep1 ? null : new IllegalArgumentException("forcing a failure for index 2"));
} else {
((IndexLifecycleRunnerTests.MockClusterStateActionStep) i1mockStep).setLatch(stepLatch);
((IndexLifecycleRunnerTests.MockClusterStateActionStep) i1mockStep)
.setException(failStep1 ? new IllegalArgumentException("forcing a failure for index 1") : null);
((IndexLifecycleRunnerTests.MockClusterStateActionStep) i1mockStep).setLatch(stepLatch);
((IndexLifecycleRunnerTests.MockClusterStateActionStep) i1mockStep)
.setException(failStep1 ? null : new IllegalArgumentException("forcing a failure for index 2"));
}

SortedMap<String, LifecyclePolicyMetadata> policyMap = new TreeMap<>();
policyMap.put(policy1, new LifecyclePolicyMetadata(i1policy, Collections.emptyMap(),
randomNonNegativeLong(), randomNonNegativeLong()));
policyMap.put(policy2, new LifecyclePolicyMetadata(i2policy, Collections.emptyMap(),
randomNonNegativeLong(), randomNonNegativeLong()));

IndexMetaData i1indexMetadata = IndexMetaData.builder(index1.getName())
.settings(settings(Version.CURRENT).put(LifecycleSettings.LIFECYCLE_NAME_SETTING.getKey(), policy1))
.putCustom(ILM_CUSTOM_METADATA_KEY, i1lifecycleState.build().asMap())
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
IndexMetaData i2indexMetadata = IndexMetaData.builder(index2.getName())
.settings(settings(Version.CURRENT).put(LifecycleSettings.LIFECYCLE_NAME_SETTING.getKey(), policy1))
.putCustom(ILM_CUSTOM_METADATA_KEY, i2lifecycleState.build().asMap())
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
ImmutableOpenMap.Builder<String, IndexMetaData> indices = ImmutableOpenMap.<String, IndexMetaData> builder()
.fPut(index1.getName(), i1indexMetadata)
.fPut(index2.getName(), i2indexMetadata);

MetaData metaData = MetaData.builder()
.putCustom(IndexLifecycleMetadata.TYPE, new IndexLifecycleMetadata(policyMap, OperationMode.RUNNING))
.indices(indices.build())
.persistentSettings(settings(Version.CURRENT).build())
.build();

ClusterState currentState = ClusterState.builder(ClusterName.DEFAULT)
.metaData(metaData)
.nodes(DiscoveryNodes.builder().localNodeId(nodeId).masterNodeId(nodeId).add(masterNode).build())
.build();

if (useOnMaster) {
when(clusterService.state()).thenReturn(currentState);
indexLifecycleService.onMaster();
} else {
indexLifecycleService.triggerPolicies(currentState, randomBoolean());
}
try {
stepLatch.await(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
logger.error("failure while waiting for step execution", e);
fail("both steps should have been executed, even with an exception");
}
}

public void testTriggeredDifferentJob() {
Mockito.reset(clusterService);
SchedulerEngine.Event schedulerEvent = new SchedulerEngine.Event("foo", randomLong(), randomLong());
Expand Down