Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Cluster State to Track Repository Generation #49729

Merged
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
d5c26f5
Use Cluster State to Track Repository Generation
original-brownbear Nov 29, 2019
ab530c5
simpler change
original-brownbear Nov 29, 2019
a67bb90
cleaner
original-brownbear Nov 29, 2019
7072b7e
add docs
original-brownbear Nov 29, 2019
3cd2e72
better wording
original-brownbear Nov 29, 2019
3effaad
Merge remote-tracking branch 'elastic/master' into repo-uses-cs-incre…
original-brownbear Nov 30, 2019
afa7b67
nicer docs
original-brownbear Nov 30, 2019
8518bfc
Merge remote-tracking branch 'elastic/master' into repo-uses-cs-incre…
original-brownbear Dec 2, 2019
59352a1
small changes
original-brownbear Dec 2, 2019
4bb379b
Merge remote-tracking branch 'elastic/master' into repo-uses-cs-incre…
original-brownbear Dec 2, 2019
2b38e7b
bck
original-brownbear Dec 2, 2019
abad89a
move meta handling to repo meta
original-brownbear Dec 3, 2019
8f2081d
Merge remote-tracking branch 'elastic/master' into repo-uses-cs-incre…
original-brownbear Dec 3, 2019
4468734
fix test
original-brownbear Dec 3, 2019
7b07f62
nicer javadoc
original-brownbear Dec 3, 2019
600356a
Merge remote-tracking branch 'elastic/master' into repo-uses-cs-incre…
original-brownbear Dec 3, 2019
aa1bc78
CR: comments
original-brownbear Dec 3, 2019
2e8cc1e
CR: add breakout
original-brownbear Dec 3, 2019
87bdb8d
fix test
original-brownbear Dec 3, 2019
cd30717
revert
original-brownbear Dec 3, 2019
81e236f
Merge remote-tracking branch 'elastic/master' into repo-uses-cs-incre…
original-brownbear Dec 3, 2019
45a42c1
fix typo
original-brownbear Dec 3, 2019
e7969fd
nicer log message
original-brownbear Dec 3, 2019
f7997af
fix typo
original-brownbear Dec 3, 2019
6874658
smarter check
original-brownbear Dec 3, 2019
5a202ba
CR: Comments
original-brownbear Dec 3, 2019
8a61063
CR: hide generations
original-brownbear Dec 4, 2019
357fa56
Merge remote-tracking branch 'elastic/master' into repo-uses-cs-incre…
original-brownbear Dec 4, 2019
c3c758c
CR: adjust ex. message
original-brownbear Dec 4, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.persistent.PersistentTasksNodeService;
import org.elasticsearch.plugins.ClusterPlugin;
import org.elasticsearch.repositories.RepositoriesState;
import org.elasticsearch.script.ScriptMetaData;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskResultsService;
Expand Down Expand Up @@ -114,6 +115,7 @@ public ClusterModule(Settings settings, ClusterService clusterService, List<Clus
public static List<Entry> getNamedWriteables() {
List<Entry> entries = new ArrayList<>();
// Cluster State
registerClusterCustom(entries, RepositoriesState.TYPE, RepositoriesState::new, RepositoriesState::readDiffFrom);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not use the RepositoryMetaData for this? It is already persisted in the cluster state and even survives full-cluster restarts. This one is a bit odd, as it now has to keep a map for each repo again, and needs clean-up when repos are added / removed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I was thinking we may not actually want to persist this ... but now that I think about S3 and the pending generation, we actually probably want to ... will see what I can do about the RepositoryMetaData here :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ywelsch hmm on second thought:

This one is a bit odd, as it now has to keep a map for each repo again, and needs clean-up when repos are added / removed.

This seems to be the only real downside though? And it's not really a downside since if I move this stuff into RepositoryMetaData I get the some new complication from having to adjust RepositoriesService to now use some custom comparisons on RepositoryMetaData that ignores the generation related fields to see if a repo has changed.

The other downside is, that we now have the generation logic leak into all the Repository implementations in a bunch of new spots. CCR and all the wrapping repositories don't really care about the generation and it's entirely BlobStoreRepository specific. Would we then make adjustments to the serialization of e.g. GetRepositoriesResponse so that it doesn't include the generation? We also lose some incrementality/efficiency in the serialization ClusterState as a result of mixing somewhat dynamic and pretty static things in it don't we?

I think persisting this state across full-cluster restarts is a good thing still, but I'm not so sure about using RepositoryMetaData here having tried implementating this practically now.
Still think it's worth it and make adjustments to things like GetRepositoriesResponse accordingly?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RepositoriesService to now use some custom comparisons on RepositoryMetaData that ignores the generation related fields to see if a repo has changed

I think this is ok (it's similar to how we have it with most items in the cluster state). Further down the line, we should not reinit the repository implementation when there is an update from the CS, but leave it to the implementation to decide whether it needs complete reinitialization or not. This will allow us to do dynamic throttling (e.g. allow user to dynamically change max_restore_bytes_per_sec which directly takes place on an ongoing restore). It's a bit bigger investment right now, but will hopefully pay off further down the line.

Regarding serialization, we can make RepositoryMetaData implement Diffable to have it more smartly serialize changes (this would benefit any other field in it as well). I don't expect this to matter much though.

I also think we won't need to adapt GetRepositoriesResponse but can just conditionally expose the additional fields by parameterizing toXContent if need be (I'm even fine exposing these fields on the repositories API and not having two variants).

registerClusterCustom(entries, SnapshotsInProgress.TYPE, SnapshotsInProgress::new, SnapshotsInProgress::readDiffFrom);
registerClusterCustom(entries, RestoreInProgress.TYPE, RestoreInProgress::new, RestoreInProgress::readDiffFrom);
registerClusterCustom(entries, SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress::new,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,8 @@ public ClusterState execute(ClusterState currentState) {
MetaData metaData = currentState.metaData();
MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData());
RepositoriesMetaData repositories = metaData.custom(RepositoriesMetaData.TYPE);
final RepositoriesState existingStates = RepositoriesState.getOrEmpty(currentState);
final RepositoriesState.Builder updatedStates = RepositoriesState.builder();
if (repositories != null && repositories.repositories().size() > 0) {
List<RepositoryMetaData> repositoriesMetaData = new ArrayList<>(repositories.repositories().size());
boolean changed = false;
Expand All @@ -215,13 +217,19 @@ public ClusterState execute(ClusterState currentState) {
logger.info("delete repository [{}]", repositoryMetaData.name());
changed = true;
} else {
final RepositoriesState.State repoState = existingStates.state(repositoryMetaData.name());
if (repoState != null) {
updatedStates.putState(
repositoryMetaData.name(), repoState.generation(), repoState.pendingGeneration());
}
repositoriesMetaData.add(repositoryMetaData);
}
}
if (changed) {
repositories = new RepositoriesMetaData(repositoriesMetaData);
mdBuilder.putCustom(RepositoriesMetaData.TYPE, repositories);
return ClusterState.builder(currentState).metaData(mdBuilder).build();
return ClusterState.builder(currentState).putCustom(RepositoriesState.TYPE, updatedStates.build())
.metaData(mdBuilder).build();
}
}
if (Regex.isMatchAllPattern(request.name())) { // we use a wildcard so we don't barf if it's not present.
Expand Down Expand Up @@ -293,6 +301,9 @@ public void applyClusterState(ClusterChangedEvent event) {

// Check if repositories got changed
if ((oldMetaData == null && newMetaData == null) || (oldMetaData != null && oldMetaData.equals(newMetaData))) {
for (Repository repo : repositories.values()) {
repo.updateState(state);
}
return;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.repositories;

import org.elasticsearch.Version;
import org.elasticsearch.cluster.AbstractNamedDiffable;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.NamedDiff;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

/**
* Tracks the current generation at which the {@link RepositoryData} for each writable
* {@link org.elasticsearch.repositories.blobstore.BlobStoreRepository} in the cluster can be found.
* See documentation for the package {@link org.elasticsearch.repositories.blobstore} for details.
*/
public final class RepositoriesState extends AbstractNamedDiffable<ClusterState.Custom> implements ClusterState.Custom {

public static final Version REPO_GEN_IN_CS_VERSION = Version.V_8_0_0;

public static final String TYPE = "repositories_state";

private static final RepositoriesState EMPTY = RepositoriesState.builder().build();

private final Map<String, State> states;

private RepositoriesState(Map<String, State> states) {
this.states = states;
}

public RepositoriesState(StreamInput in) throws IOException {
this(in.readMap(StreamInput::readString, State::new));
}

/**
* Gets repositories state from the given cluster state or an empty repositories state if none is found in the cluster state.
*
* @param state cluster state
* @return RepositoriesState
*/
public static RepositoriesState getOrEmpty(ClusterState state) {
return Objects.requireNonNullElse(state.custom(RepositoriesState.TYPE), RepositoriesState.EMPTY);
}

/**
* Get {@link State} for a repository.
*
* @param repoName repository name
* @return State for the repository or {@code null} if none is found
*/
@Nullable
public State state(String repoName) {
return states.get(repoName);
}

public static NamedDiff<ClusterState.Custom> readDiffFrom(StreamInput in) throws IOException {
return readDiffFrom(ClusterState.Custom.class, TYPE, in);
}

@Override
public Version getMinimalSupportedVersion() {
return REPO_GEN_IN_CS_VERSION;
}

@Override
public String getWriteableName() {
return TYPE;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeMap(states, StreamOutput::writeString, (o, v) -> v.writeTo(o));
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
for (Map.Entry<String, State> stringStateEntry : states.entrySet()) {
builder.field(stringStateEntry.getKey(), stringStateEntry.getValue());
}
return builder;
}

public static Builder builder() {
return new Builder();
}

@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (other instanceof RepositoriesState == false) {
return false;
}
final RepositoriesState that = (RepositoriesState) other;
return states.equals(that.states);
}

@Override
public int hashCode() {
return states.hashCode();
}

@Override
public String toString() {
return Strings.toString(this);
}

public static final class State implements Writeable, ToXContent {

private final long generation;

private final long pendingGeneration;

private State(long generation, long pendingGeneration) {
assert generation <= pendingGeneration :
"Pending generation [" + pendingGeneration + "] smaller than generation [" + generation + "]";
this.generation = generation;
this.pendingGeneration = pendingGeneration;
}

private State(StreamInput in) throws IOException {
this(in.readLong(), in.readLong());
}

/**
* Returns the current repository generation for the repository.
*
* @return current repository generation that should be used for reads of the repository's {@link RepositoryData}
*/
public long generation() {
return generation;
}

/**
* Latest repository generation that a write was attempted for.
*
* @return latest repository generation that a write was attempted for
*/
public long pendingGeneration() {
return pendingGeneration;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeLong(generation);
out.writeLong(pendingGeneration);
}

@Override
public String toString() {
return Strings.toString(this);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.startObject().field("generation", generation).field("pending", pendingGeneration).endObject();
}

@Override
public boolean isFragment() {
return false;
}

@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (other instanceof RepositoriesState.State == false) {
return false;
}
final RepositoriesState.State that = (RepositoriesState.State) other;
return this.generation == that.generation && this.pendingGeneration == that.pendingGeneration;
}

@Override
public int hashCode() {
return Objects.hash(generation, pendingGeneration);
}
}

public static final class Builder {

private final Map<String, State> stateMap = new HashMap<>();

private Builder() {
}

public Builder putAll(RepositoriesState state) {
stateMap.putAll(state.states);
return this;
}

public Builder putState(String name, long generation, long pending) {
stateMap.put(name, new State(generation, pending));
return this;
}

public RepositoriesState build() {
return new RepositoriesState(Map.copyOf(stateMap));
}
}
}
Loading