Skip to content

Commit

Permalink
discard
Browse files Browse the repository at this point in the history
  • Loading branch information
probakowski committed Nov 4, 2020
1 parent 73ef56e commit 3ec4de5
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,18 @@ private boolean isOverTheLimit() {
return false;
}

/**
* Discards pending requests.
*/
public void discard() {
lock.lock();
try {
bulkRequest = bulkRequestSupplier.get();
} finally {
lock.unlock();
}
}

/**
* Flush pending delete or index requests.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -518,11 +518,6 @@ protected boolean preserveSLMPoliciesUponCompletion() {
protected boolean waitForAllSnapshotsWiped() { return false; }

private void wipeCluster() throws Exception {

if (hasXPack) {
adminClient().performRequest(new Request("POST", "/_ilm/stop"));
}

// Cleanup rollup before deleting indices. A rollup job might have bulks in-flight,
// so we need to fully shut them down first otherwise a job might stall waiting
// for a bulk to finish against a non-existing index (and then fail tests)
Expand Down Expand Up @@ -645,10 +640,6 @@ private void wipeCluster() throws Exception {
}

assertThat("Found in progress snapshots [" + inProgressSnapshots.get() + "].", inProgressSnapshots.get(), anEmptyMap());

if (hasXPack) {
adminClient().performRequest(new Request("POST", "/_ilm/start"));
}
}

protected static void wipeAllIndices() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ public class IndexLifecycleService
private final Clock clock;
private final PolicyStepsRegistry policyRegistry;
private final IndexLifecycleRunner lifecycleRunner;
private final ILMHistoryStore ilmHistoryStore;
private final Settings settings;
private ClusterService clusterService;
private LongSupplier nowSupplier;
Expand All @@ -75,7 +74,6 @@ public IndexLifecycleService(Settings settings, Client client, ClusterService cl
this.clock = clock;
this.nowSupplier = nowSupplier;
this.scheduledJob = null;
this.ilmHistoryStore = ilmHistoryStore;
this.policyRegistry = new PolicyStepsRegistry(xContentRegistry, client);
this.lifecycleRunner = new IndexLifecycleRunner(policyRegistry, ilmHistoryStore, clusterService, threadPool, nowSupplier);
this.pollInterval = LifecycleSettings.LIFECYCLE_POLL_INTERVAL_SETTING.get(settings);
Expand Down Expand Up @@ -175,9 +173,6 @@ void onMaster(ClusterState clusterState) {
}

if (safeToStop && OperationMode.STOPPING == currentMode) {
if (ilmHistoryStore != null) {
ilmHistoryStore.flush();
}
submitOperationModeUpdate(OperationMode.STOPPED);
}
}
Expand Down Expand Up @@ -348,9 +343,6 @@ void triggerPolicies(ClusterState clusterState, boolean fromClusterStateChange)
}

if (safeToStop && OperationMode.STOPPING == currentMode) {
if (ilmHistoryStore != null) {
ilmHistoryStore.flush();
}
submitOperationModeUpdate(OperationMode.STOPPED);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
Expand Down Expand Up @@ -48,16 +50,19 @@
* appropriate index. It sets up a {@link BulkProcessor} for indexing in bulk, and handles creation
* of the index/alias as needed for ILM policies.
*/
public class ILMHistoryStore implements Closeable {
public class ILMHistoryStore implements Closeable, ClusterStateListener {
private static final Logger logger = LogManager.getLogger(ILMHistoryStore.class);

public static final String ILM_HISTORY_DATA_STREAM = "ilm-history-" + INDEX_TEMPLATE_VERSION;

private final boolean ilmHistoryEnabled;
private final BulkProcessor processor;
private final ThreadPool threadPool;
private final ClusterService clusterService;

public ILMHistoryStore(Settings nodeSettings, Client client, ClusterService clusterService, ThreadPool threadPool) {
this.clusterService = clusterService;
clusterService.addListener(this);
this.ilmHistoryEnabled = LIFECYCLE_HISTORY_INDEX_ENABLED_SETTING.get(nodeSettings);
this.threadPool = threadPool;

Expand Down Expand Up @@ -160,14 +165,19 @@ public void putAsync(ILMHistoryItem item) {

@Override
public void close() {
clusterService.removeListener(this);
try {
processor.awaitClose(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
logger.warn("failed to shut down ILM history bulk processor after 10 seconds", e);
}
}

public void flush(){
processor.flush();
@Override
public void clusterChanged(ClusterChangedEvent event) {
if (event.previousState().metadata().dataStreams().containsKey(ILM_HISTORY_DATA_STREAM) &&
event.state().metadata().dataStreams().containsKey(ILM_HISTORY_DATA_STREAM) == false) {
processor.discard();
}
}
}

0 comments on commit 3ec4de5

Please sign in to comment.