Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add base framework for snapshot retention #43605

Merged
merged 4 commits into from
Jun 27, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
*/
package org.elasticsearch.xpack.core.indexlifecycle;

import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.core.scheduler.CronSchedule;

/**
* Class encapsulating settings related to Index Lifecycle Management X-Pack Plugin
Expand All @@ -17,6 +19,8 @@ public class LifecycleSettings {
public static final String LIFECYCLE_INDEXING_COMPLETE = "index.lifecycle.indexing_complete";

public static final String SLM_HISTORY_INDEX_ENABLED = "slm.history_index_enabled";
public static final String SLM_RETENTION_SCHEDULE = "slm.retention_schedule";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't love this name for the setting - it might be confusing that this is "when to perform the cleanup" vs. "how long to retain snapshots". Maybe something like deletion_window_schedule? Although that's not the clearest either.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm definitely not in love with it either, I'm not sure yet about the deletion_window_schedule since it's not window-based (yet), let's revisit in the future?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++, like I said this can be changed in a follow-up PR once we come up with a better name



public static final Setting<TimeValue> LIFECYCLE_POLL_INTERVAL_SETTING = Setting.timeSetting(LIFECYCLE_POLL_INTERVAL,
TimeValue.timeValueMinutes(10), TimeValue.timeValueSeconds(1), Setting.Property.Dynamic, Setting.Property.NodeScope);
Expand All @@ -27,4 +31,15 @@ public class LifecycleSettings {

public static final Setting<Boolean> SLM_HISTORY_INDEX_ENABLED_SETTING = Setting.boolSetting(SLM_HISTORY_INDEX_ENABLED, true,
Setting.Property.NodeScope);
public static final Setting<String> SLM_RETENTION_SCHEDULE_SETTING = Setting.simpleString(SLM_RETENTION_SCHEDULE, str -> {
try {
if (Strings.hasText(str)) {
// Test that the setting is a valid cron syntax
new CronSchedule(str);
}
} catch (Exception e) {
throw new IllegalArgumentException("invalid cron expression [" + str + "] for SLM retention schedule [" +
SLM_RETENTION_SCHEDULE + "]", e);
}
}, Setting.Property.Dynamic, Setting.Property.NodeScope);
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@
import org.elasticsearch.xpack.indexlifecycle.action.TransportStopILMAction;
import org.elasticsearch.xpack.snapshotlifecycle.SnapshotLifecycleService;
import org.elasticsearch.xpack.snapshotlifecycle.SnapshotLifecycleTask;
import org.elasticsearch.xpack.snapshotlifecycle.SnapshotRetentionService;
import org.elasticsearch.xpack.snapshotlifecycle.SnapshotRetentionTask;
import org.elasticsearch.xpack.snapshotlifecycle.action.RestDeleteSnapshotLifecycleAction;
import org.elasticsearch.xpack.snapshotlifecycle.action.RestExecuteSnapshotLifecycleAction;
import org.elasticsearch.xpack.snapshotlifecycle.action.RestGetSnapshotLifecycleAction;
Expand All @@ -111,6 +113,7 @@
public class IndexLifecycle extends Plugin implements ActionPlugin {
private final SetOnce<IndexLifecycleService> indexLifecycleInitialisationService = new SetOnce<>();
private final SetOnce<SnapshotLifecycleService> snapshotLifecycleService = new SetOnce<>();
private final SetOnce<SnapshotRetentionService> snapshotRetentionService = new SetOnce<>();
private final SetOnce<SnapshotHistoryStore> snapshotHistoryStore = new SetOnce<>();
private Settings settings;
private boolean enabled;
Expand All @@ -132,7 +135,8 @@ public List<Setting<?>> getSettings() {
LifecycleSettings.LIFECYCLE_NAME_SETTING,
LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE_SETTING,
RolloverAction.LIFECYCLE_ROLLOVER_ALIAS_SETTING,
LifecycleSettings.SLM_HISTORY_INDEX_ENABLED_SETTING);
LifecycleSettings.SLM_HISTORY_INDEX_ENABLED_SETTING,
LifecycleSettings.SLM_RETENTION_SCHEDULE_SETTING);
}

@Override
Expand All @@ -150,7 +154,10 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
snapshotHistoryStore.set(new SnapshotHistoryStore(settings, client, getClock().getZone()));
snapshotLifecycleService.set(new SnapshotLifecycleService(settings,
() -> new SnapshotLifecycleTask(client, clusterService, snapshotHistoryStore.get()), clusterService, getClock()));
return Arrays.asList(indexLifecycleInitialisationService.get(), snapshotLifecycleService.get(), snapshotHistoryStore.get());
snapshotRetentionService.set(new SnapshotRetentionService(settings, () -> new SnapshotRetentionTask(client, clusterService),
clusterService, getClock()));
return Arrays.asList(indexLifecycleInitialisationService.get(), snapshotLifecycleService.get(), snapshotHistoryStore.get(),
snapshotRetentionService.get());
}

@Override
Expand Down Expand Up @@ -240,7 +247,7 @@ public List<RestHandler> getRestHandlers(Settings settings, RestController restC
@Override
public void close() {
try {
IOUtils.close(indexLifecycleInitialisationService.get(), snapshotLifecycleService.get());
IOUtils.close(indexLifecycleInitialisationService.get(), snapshotLifecycleService.get(), snapshotRetentionService.get());
} catch (IOException e) {
throw new ElasticsearchException("unable to close index lifecycle services", e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.snapshotlifecycle;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.LocalNodeMasterListener;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleSettings;
import org.elasticsearch.xpack.core.scheduler.CronSchedule;
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
import org.elasticsearch.xpack.core.snapshotlifecycle.SnapshotLifecyclePolicy;

import java.io.Closeable;
import java.time.Clock;
import java.util.function.Supplier;

/**
* The {@code SnapshotRetentionService} is responsible for scheduling the period kickoff of SLM's
* snapshot retention. This means that when the retention schedule setting is configured, the
* scheduler schedules a job that, when triggered, will delete snapshots according to the retention
* policy configured in the {@link SnapshotLifecyclePolicy}.
*/
public class SnapshotRetentionService implements LocalNodeMasterListener, Closeable {

static final String SLM_RETENTION_JOB_ID = "slm-retention-job";

private static final Logger logger = LogManager.getLogger(SnapshotRetentionService.class);

private final SchedulerEngine scheduler;

private volatile String slmRetentionSchedule;

public SnapshotRetentionService(Settings settings,
Supplier<SnapshotRetentionTask> taskSupplier,
ClusterService clusterService,
Clock clock) {
this.scheduler = new SchedulerEngine(settings, clock);
this.scheduler.register(taskSupplier.get());
this.slmRetentionSchedule = LifecycleSettings.SLM_RETENTION_SCHEDULE_SETTING.get(settings);
clusterService.addLocalNodeMasterListener(this);
clusterService.getClusterSettings().addSettingsUpdateConsumer(LifecycleSettings.SLM_RETENTION_SCHEDULE_SETTING,
this::setUpdateSchedule);
}

void setUpdateSchedule(String retentionSchedule) {
this.slmRetentionSchedule = retentionSchedule;
// The schedule has changed, so reschedule the retention job
rescheduleRetentionJob();
}

// Only used for testing
SchedulerEngine getScheduler() {
return this.scheduler;
}

@Override
public void onMaster() {
rescheduleRetentionJob();
}

@Override
public void offMaster() {
cancelRetentionJob();
}

private void rescheduleRetentionJob() {
final String schedule = this.slmRetentionSchedule;
if (Strings.hasText(schedule)) {
final SchedulerEngine.Job retentionJob = new SchedulerEngine.Job(SLM_RETENTION_JOB_ID,
new CronSchedule(schedule));
logger.debug("scheduling SLM retention job for [{}]", schedule);
this.scheduler.add(retentionJob);
} else {
// The schedule has been unset, so cancel the scheduled retention job
cancelRetentionJob();
}
}

private void cancelRetentionJob() {
this.scheduler.scheduledJobIds().forEach(this.scheduler::remove);
}

@Override
public String executorName() {
return ThreadPool.Names.SNAPSHOT;
}

@Override
public void close() {
this.scheduler.stop();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.snapshotlifecycle;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
import org.elasticsearch.xpack.core.snapshotlifecycle.SnapshotLifecyclePolicy;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

/**
* The {@code SnapshotRetentionTask} is invoked by the scheduled job from the
* {@link SnapshotRetentionService}. It is responsible for retrieving the snapshots for repositories
* that have an SLM policy configured, and then deleting the snapshots that fall outside the
* retention policy.
*/
public class SnapshotRetentionTask implements SchedulerEngine.Listener {

private static final Logger logger = LogManager.getLogger(SnapshotRetentionTask.class);
private static final AtomicBoolean running = new AtomicBoolean(false);

private final Client client;
private final ClusterService clusterService;

public SnapshotRetentionTask(Client client, ClusterService clusterService) {
this.client = client;
this.clusterService = clusterService;
}

@Override
public void triggered(SchedulerEngine.Event event) {
assert event.getJobName().equals(SnapshotRetentionService.SLM_RETENTION_JOB_ID) :
"expected id to be " + SnapshotRetentionService.SLM_RETENTION_JOB_ID + " but it was " + event.getJobName();
if (running.compareAndSet(false, true)) {
try {
logger.info("starting SLM retention snapshot cleanup task");
final ClusterState state = clusterService.state();

// Find all SLM policies that have retention enabled
final Map<String, SnapshotLifecyclePolicy> policiesWithRetention = getAllPoliciesWithRetentionEnabled(state);

// For those policies (there may be more than one for the same repo),
// return the repos that we need to get the snapshots for
final Set<String> repositioriesToFetch = policiesWithRetention.values().stream()
.map(SnapshotLifecyclePolicy::getRepository)
.collect(Collectors.toSet());

// Find all the snapshots that are past their retention date
// TODO: include min/max snapshot count as a criteria for deletion also
final List<SnapshotInfo> snapshotsToBeDeleted = getAllSnapshots(repositioriesToFetch).stream()
.filter(snapshot -> snapshotEligibleForDeletion(snapshot, policiesWithRetention))
.collect(Collectors.toList());

// Finally, delete the snapshots that need to be deleted
deleteSnapshots(snapshotsToBeDeleted);

} finally {
running.set(false);
}
} else {
logger.debug("snapshot lifecycle retention task started, but a task is already running, skipping");
}
}

static Map<String, SnapshotLifecyclePolicy> getAllPoliciesWithRetentionEnabled(final ClusterState state) {
// TODO: fill me in
return Collections.emptyMap();
}

static boolean snapshotEligibleForDeletion(SnapshotInfo snapshot, Map<String, SnapshotLifecyclePolicy> policies) {
dakrone marked this conversation as resolved.
Show resolved Hide resolved
// TODO: fill me in
return false;
}

List<SnapshotInfo> getAllSnapshots(Collection<String> repositories) {
// TODO: fill me in
return Collections.emptyList();
}

void deleteSnapshots(List<SnapshotInfo> snapshotsToDelete) {
// TODO: fill me in
logger.info("deleting {}", snapshotsToDelete);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ public static SnapshotLifecyclePolicy createPolicy(String id, String schedule) {
return new SnapshotLifecyclePolicy(id, randomAlphaOfLength(4), schedule, randomAlphaOfLength(4), config);
}

private static String randomSchedule() {
public static String randomSchedule() {
return randomIntBetween(0, 59) + " " +
randomIntBetween(0, 59) + " " +
randomIntBetween(0, 12) + " * * ?";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.snapshotlifecycle;

import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ClusterServiceUtils;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleSettings;
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
import org.elasticsearch.xpack.core.watcher.watch.ClockMock;

import java.util.Collections;
import java.util.HashSet;
import java.util.Set;

import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;

public class SnapshotRetentionServiceTests extends ESTestCase {

private static final ClusterSettings clusterSettings;
static {
Set<Setting<?>> internalSettings = new HashSet<>(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
internalSettings.add(LifecycleSettings.SLM_RETENTION_SCHEDULE_SETTING);
clusterSettings = new ClusterSettings(Settings.EMPTY, internalSettings);
}

public void testJobsAreScheduled() {
final DiscoveryNode discoveryNode = new DiscoveryNode("node", ESTestCase.buildNewFakeTransportAddress(),
Collections.emptyMap(), DiscoveryNodeRole.BUILT_IN_ROLES, Version.CURRENT);
ClockMock clock = new ClockMock();

try (ThreadPool threadPool = new TestThreadPool("test");
ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool, discoveryNode, clusterSettings);
SnapshotRetentionService service = new SnapshotRetentionService(Settings.EMPTY,
FakeRetentionTask::new, clusterService, clock)) {
assertThat(service.getScheduler().jobCount(), equalTo(0));

service.setUpdateSchedule(SnapshotLifecycleServiceTests.randomSchedule());
assertThat(service.getScheduler().scheduledJobIds(), containsInAnyOrder(SnapshotRetentionService.SLM_RETENTION_JOB_ID));

service.offMaster();
assertThat(service.getScheduler().jobCount(), equalTo(0));

service.onMaster();
assertThat(service.getScheduler().scheduledJobIds(), containsInAnyOrder(SnapshotRetentionService.SLM_RETENTION_JOB_ID));

service.setUpdateSchedule("");
assertThat(service.getScheduler().jobCount(), equalTo(0));
threadPool.shutdownNow();
}
}

private static class FakeRetentionTask extends SnapshotRetentionTask {
FakeRetentionTask() {
super(null, null);
}

@Override
public void triggered(SchedulerEngine.Event event) {
super.triggered(event);
}
}
}