Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Frozen tier autoscaling decider based on shards #71042

Merged
Merged
1 change: 1 addition & 0 deletions x-pack/plugin/autoscaling/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ dependencies {
compileOnly project(path: xpackModule('core'))
testImplementation(testArtifact(project(xpackModule('core'))))
testImplementation project(path: xpackModule('data-streams'))
testImplementation project(path: xpackModule('searchable-snapshots'))
}

addQaCheckDependencies()
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.autoscaling.shards;

import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.snapshots.AbstractSnapshotIntegTestCase;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.xpack.autoscaling.action.GetAutoscalingCapacityAction;
import org.elasticsearch.xpack.autoscaling.action.PutAutoscalingPolicyAction;
import org.elasticsearch.xpack.core.DataTier;
import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotAction;
import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotRequest;
import org.elasticsearch.xpack.searchablesnapshots.cache.shared.FrozenCacheService;

import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.elasticsearch.index.IndexSettings.INDEX_SOFT_DELETES_SETTING;
import static org.elasticsearch.license.LicenseService.SELF_GENERATED_LICENSE_TYPE;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;

public class FrozenShardsDeciderIT extends AbstractSnapshotIntegTestCase {

@Override
protected boolean addMockInternalEngine() {
return false;
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(LocalStateAutoscalingAndSearchableSnapshots.class);
}

@Override
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
Settings.Builder builder = Settings.builder()
.put(super.nodeSettings(nodeOrdinal, otherSettings))
.put(SELF_GENERATED_LICENSE_TYPE.getKey(), "trial");
if (DiscoveryNode.canContainData(otherSettings)) {
builder.put(FrozenCacheService.SNAPSHOT_CACHE_SIZE_SETTING.getKey(), new ByteSizeValue(10, ByteSizeUnit.MB));
}
return builder.build();
}

@Override
protected int numberOfShards() {
return 1;
}

public void testScale() throws Exception {
final String indexName = "index";
final String restoredIndexName = "restored";
final String fsRepoName = randomAlphaOfLength(10);
final String snapshotName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);

createRepository(fsRepoName, "fs");
putAutoscalingPolicy("frozen");
assertAcked(prepareCreate(indexName, Settings.builder().put(INDEX_SOFT_DELETES_SETTING.getKey(), true)));

indexRandom(
randomBoolean(),
IntStream.range(0, 10).mapToObj(i -> client().prepareIndex(indexName).setSource()).collect(Collectors.toList())
);

final SnapshotInfo snapshotInfo = createFullSnapshot(fsRepoName, snapshotName);

assertThat(capacity().results().get("frozen").requiredCapacity().total().memory(), equalTo(ByteSizeValue.ZERO));

final MountSearchableSnapshotRequest req = new MountSearchableSnapshotRequest(
restoredIndexName,
fsRepoName,
snapshotInfo.snapshotId().getName(),
indexName,
Settings.EMPTY,
Strings.EMPTY_ARRAY,
true,
MountSearchableSnapshotRequest.Storage.SHARED_CACHE
);
final RestoreSnapshotResponse restoreSnapshotResponse = client().execute(MountSearchableSnapshotAction.INSTANCE, req).get();
assertThat(restoreSnapshotResponse.getRestoreInfo().failedShards(), equalTo(0));

assertThat(
capacity().results().get("frozen").requiredCapacity().total().memory(),
equalTo(FrozenShardsDeciderService.DEFAULT_MEMORY_PER_SHARD)
);
}

private GetAutoscalingCapacityAction.Response capacity() {
GetAutoscalingCapacityAction.Request request = new GetAutoscalingCapacityAction.Request();
return client().execute(GetAutoscalingCapacityAction.INSTANCE, request).actionGet();
}

private void putAutoscalingPolicy(String policyName) {
// randomly set the setting to verify it can be set.
Settings settings = randomBoolean()
? Settings.EMPTY
: Settings.builder()
.put(FrozenShardsDeciderService.MEMORY_PER_SHARD.getKey(), FrozenShardsDeciderService.DEFAULT_MEMORY_PER_SHARD)
.build();
final PutAutoscalingPolicyAction.Request request = new PutAutoscalingPolicyAction.Request(
policyName,
new TreeSet<>(Set.of(DataTier.DATA_FROZEN)),
new TreeMap<>(Map.of(FrozenShardsDeciderService.NAME, settings))
);
assertAcked(client().execute(PutAutoscalingPolicyAction.INSTANCE, request).actionGet());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.autoscaling.shards;

import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.xpack.autoscaling.LocalStateAutoscaling;
import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots;

/**
* We need a local state plugin for searchable snapshots too (test sources are not exposed).
* The local state plugin is necessary to avoid touching the "static SetOnce" licenseState field in XPackPlugin.
*/
public class LocalStateAutoscalingAndSearchableSnapshots extends LocalStateAutoscaling {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my own edification and/or someone else reading this code, can you add a comment about why this plugin wrapper is required?

Copy link
Contributor Author

@henningandersen henningandersen Apr 12, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a comment in 1c4c4ae

No expert on this and any conventions. I think adding a LocalStateSearchablesnapshots here and using both LocalState plugins from the test could also work, but I found this nicer.


public LocalStateAutoscalingAndSearchableSnapshots(final Settings settings) {
super(settings);
plugins.add(new SearchableSnapshots(settings) {

@Override
protected XPackLicenseState getLicenseState() {
return LocalStateAutoscalingAndSearchableSnapshots.this.getLicenseState();
}

});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.elasticsearch.xpack.autoscaling.rest.RestGetAutoscalingCapacityHandler;
import org.elasticsearch.xpack.autoscaling.rest.RestGetAutoscalingPolicyHandler;
import org.elasticsearch.xpack.autoscaling.rest.RestPutAutoscalingPolicyHandler;
import org.elasticsearch.xpack.autoscaling.shards.FrozenShardsDeciderService;
import org.elasticsearch.xpack.autoscaling.storage.ProactiveStorageDeciderService;
import org.elasticsearch.xpack.autoscaling.storage.ReactiveStorageDeciderService;

Expand Down Expand Up @@ -164,6 +165,11 @@ public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
AutoscalingDeciderResult.Reason.class,
ProactiveStorageDeciderService.NAME,
ProactiveStorageDeciderService.ProactiveReason::new
),
new NamedWriteableRegistry.Entry(
AutoscalingDeciderResult.Reason.class,
FrozenShardsDeciderService.NAME,
FrozenShardsDeciderService.FrozenShardsReason::new
)
);
}
Expand Down Expand Up @@ -194,7 +200,8 @@ public Collection<AutoscalingDeciderService> deciders() {
clusterService.get().getSettings(),
clusterService.get().getClusterSettings(),
allocationDeciders.get()
)
),
new FrozenShardsDeciderService()
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.autoscaling.shards;

import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingCapacity;
import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderContext;
import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderResult;
import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderService;
import org.elasticsearch.xpack.cluster.routing.allocation.DataTierAllocationDecider;
import org.elasticsearch.xpack.core.DataTier;

import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.stream.StreamSupport;

/**
* This decider enforces that on a 64GB memory node (31GB heap) we can max have 2000 shards. We arrive at 2000 because our current limit is
* 1000 but frozen tier uses the "frozen engine", which is much more efficient. We scale the total tier memory accordingly.
*
* The decider relies on frozen tier being used exclusively for frozen shards.
*/
public class FrozenShardsDeciderService implements AutoscalingDeciderService {
public static final String NAME = "frozen_shards";
private static final ByteSizeValue MAX_MEMORY = ByteSizeValue.ofGb(64);
static final ByteSizeValue DEFAULT_MEMORY_PER_SHARD = ByteSizeValue.ofBytes(MAX_MEMORY.getBytes() / 2000);
public static final Setting<ByteSizeValue> MEMORY_PER_SHARD = Setting.byteSizeSetting(
"memory_per_shard",
(ignored) -> DEFAULT_MEMORY_PER_SHARD.getStringRep(),
ByteSizeValue.ZERO,
ByteSizeValue.ofBytes(Long.MAX_VALUE)
);

@Override
public String name() {
return NAME;
}

@Override
public AutoscalingDeciderResult scale(Settings configuration, AutoscalingDeciderContext context) {
// we assume that nodes do not grow beyond 64GB here.
int shards = countFrozenShards(context.state().metadata());
long memory = shards * MEMORY_PER_SHARD.get(configuration).getBytes();
return new AutoscalingDeciderResult(AutoscalingCapacity.builder().total(null, memory).build(), new FrozenShardsReason(shards));
}

static int countFrozenShards(Metadata metadata) {
return StreamSupport.stream(metadata.spliterator(), false)
.filter(imd -> isFrozenIndex(imd.getSettings()))
.mapToInt(IndexMetadata::getTotalNumberOfShards)
.sum();
}

static boolean isFrozenIndex(Settings indexSettings) {
String tierPreference = DataTierAllocationDecider.INDEX_ROUTING_PREFER_SETTING.get(indexSettings);
String[] preferredTiers = DataTierAllocationDecider.parseTierList(tierPreference);
if (preferredTiers.length >= 1 && preferredTiers[0].equals(DataTier.DATA_FROZEN)) {
assert preferredTiers.length == 1 : "frozen tier preference must be frozen only";
return true;
} else {
return false;
}
}

@Override
public List<Setting<?>> deciderSettings() {
return List.of(MEMORY_PER_SHARD);
}

@Override
public List<DiscoveryNodeRole> roles() {
return List.of(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE);
}

public static class FrozenShardsReason implements AutoscalingDeciderResult.Reason {
private final long shards;

public FrozenShardsReason(long shards) {
assert shards >= 0;
this.shards = shards;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we validate this is non-negative, and then use (read|write)Vlong() in serialization? It's not a huge deal, just curious whether we expect this to ever be negative.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++, done in 8f975a0

}

public FrozenShardsReason(StreamInput in) throws IOException {
this.shards = in.readVLong();
}

@Override
public String summary() {
return "shard count [" + shards + "]";
}

public long shards() {
return shards;
}

@Override
public String getWriteableName() {
return NAME;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(shards);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("shards", shards);
builder.endObject();
return builder;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
FrozenShardsReason that = (FrozenShardsReason) o;
return shards == that.shards;
}

@Override
public int hashCode() {
return Objects.hash(shards);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.autoscaling.shards;

import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.elasticsearch.test.ESTestCase;

import java.io.IOException;

public class FrozenShardsDeciderReasonWireSerializationTests extends AbstractWireSerializingTestCase<
FrozenShardsDeciderService.FrozenShardsReason> {

@Override
protected Writeable.Reader<FrozenShardsDeciderService.FrozenShardsReason> instanceReader() {
return FrozenShardsDeciderService.FrozenShardsReason::new;
}

@Override
protected FrozenShardsDeciderService.FrozenShardsReason createTestInstance() {
return new FrozenShardsDeciderService.FrozenShardsReason(randomNonNegativeLong());
}

@Override
protected FrozenShardsDeciderService.FrozenShardsReason mutateInstance(FrozenShardsDeciderService.FrozenShardsReason instance)
throws IOException {
return new FrozenShardsDeciderService.FrozenShardsReason(
randomValueOtherThan(instance.shards(), ESTestCase::randomNonNegativeLong)
);
}
}
Loading