Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add _source-only snapshot repository #32844

Merged
merged 52 commits into from
Sep 12, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
08b0cfd
Add `_source`-only snapshot repository
s1monw Jun 25, 2018
3c5a0ed
add license
s1monw Aug 14, 2018
0645159
fix docs test setup
s1monw Aug 15, 2018
b28407f
fix imports
s1monw Aug 15, 2018
5cca4c2
make sure on local reindex we don't parse the source if it's not nece…
s1monw Aug 16, 2018
6fc9664
remove dead code
s1monw Aug 16, 2018
d32bc05
Merge branch 'master' into source_only_snap
s1monw Aug 16, 2018
ea806bc
fix imports
s1monw Aug 16, 2018
02aecd7
status quo
s1monw Aug 20, 2018
8e612f6
Merge branch 'master' into source_only_snap
s1monw Aug 27, 2018
7dcc25d
iteration
s1monw Aug 27, 2018
81c8127
Restore from a soruce only snap by copying only the source
s1monw Aug 28, 2018
6dcf9e5
Merge branch 'master' into source_only_snap
s1monw Aug 29, 2018
ee8a9d5
fix imports and docs
s1monw Aug 29, 2018
40cd45d
fix constant
s1monw Aug 29, 2018
deff31d
add license headers
s1monw Aug 29, 2018
eea9f6f
fix javadocs
s1monw Aug 30, 2018
61c2ff2
Merge branch 'master' into source_only_snap
s1monw Sep 3, 2018
dc679c2
Fix stuff after soft deletes are first class citizen
s1monw Sep 3, 2018
de30a8f
move searcher creation to engine adn acquire write lock
s1monw Sep 3, 2018
6d63b41
fix tests
s1monw Sep 3, 2018
f5e7f70
fix tests
s1monw Sep 3, 2018
6975c82
fix test again
s1monw Sep 4, 2018
4825da2
Merge branch 'master' into source_only_snap
s1monw Sep 5, 2018
70395be
fix settings
s1monw Sep 5, 2018
82042ce
Merge branch 'master' into source_only_snap
s1monw Sep 7, 2018
134bcf5
fix compilation
s1monw Sep 7, 2018
efb9303
fix test after lucene upgrade
s1monw Sep 10, 2018
d682548
remove routing invariant
s1monw Sep 10, 2018
1ae023f
add comment about soft deletes
s1monw Sep 10, 2018
506c68b
remove TODO
s1monw Sep 10, 2018
eec9f18
make flush a no-op
s1monw Sep 10, 2018
02405d4
Merge branch 'master' into source_only_snap
s1monw Sep 11, 2018
df925d7
integrate read-only engine
s1monw Sep 11, 2018
afba90e
fix compilation
s1monw Sep 11, 2018
b615020
bootstrap source only shards with new UUID and localCheckpoint == maxDoc
s1monw Sep 11, 2018
2625996
fix imports
s1monw Sep 11, 2018
140eb50
move all actions to the snapshot creation side
s1monw Sep 11, 2018
14f7caa
rework docs
s1monw Sep 11, 2018
d69f675
fix line len
s1monw Sep 11, 2018
7c9b5eb
add source only snap docs
s1monw Sep 11, 2018
fe817ca
add xpack role to docs
s1monw Sep 11, 2018
59e5dbf
fix docs tests
s1monw Sep 11, 2018
12473d5
reword docs
s1monw Sep 12, 2018
71d2e58
fix nit
s1monw Sep 12, 2018
5be8844
apply feedback
s1monw Sep 12, 2018
18c4680
Make sure all queries and get requests other than match_all fail
s1monw Sep 12, 2018
f384028
fix comment
s1monw Sep 12, 2018
9930e08
Merge branch 'master' into source_only_snap
s1monw Sep 12, 2018
8877497
fix tests to expect exception on query
s1monw Sep 12, 2018
78de6b5
fix imports
s1monw Sep 12, 2018
5f6529f
add test that slices work too
s1monw Sep 12, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions docs/plugins/repository-source-only.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
[[repository-src-only]]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@clintongormley @debadair I'd love to get input where this should be linked from and where it should be located. at this point it's stand-alone.

=== Source Only Repository

The Source Only repository adds support for creating `_source` only snapshots using any other
available repository as it's storage backend. This allows using {ref}/modules-snapshots.html[Snapshot/Restore]
to create incremental, storage optimized, and minimal snapshots of an index.


[[repository-src-only-usage]]
==== Configuration

The `_source` only repository always requires a delegate repository to be used as it's storage backend.
In order to use the `fs` respository:

[source,js]
-----------------------------------
PUT _snapshot/my_src-only_repository
{
"type": "source",
"settings": {
"delegate_type": "fs",
"location": "my_backup_location"
}
}
-----------------------------------
// CONSOLE
// TESTSETUP

Since the `_source` only repository doesn't snapshot any index or doc-values structures but only stored
fields and index metadata, it's required to reindex the data during the restore process. This can either happen
as a full re-index based on the mapping of the original index or in a minimal form were only the internal data-structures
are recreated like the `_id` field in order to update the index. The latter can be configured in the repository settings
by setting `"minimal": true`. This allows updates and get operations but won't allow for aggregations or searches.

A minimal restore is useful if the data is only needed to be re-indexed into another index or if individual documents should be
modified or deleted.

During restore the re-indexing progress can be monitored via <<indices-recovery,recovery>> API that shows the per-document progress
under the `translog` recovery phase.
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,7 @@ private void restore(final IndexShard indexShard, final Repository repository, f
assert indexShard.shardRouting.primary() : "only primary shards can recover from store";
indexShard.openEngineAndRecoverFromTranslog();
indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm());
repository.applyPostRestoreOps(indexShard);
indexShard.finalizeRecovery();
indexShard.postRecovery("restore done");
} catch (Exception e) {
Expand Down
7 changes: 5 additions & 2 deletions server/src/main/java/org/elasticsearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -155,12 +155,15 @@ protected void closeInternal() {
public Store(ShardId shardId, IndexSettings indexSettings, DirectoryService directoryService, ShardLock shardLock) throws IOException {
this(shardId, indexSettings, directoryService, shardLock, OnClose.EMPTY);
}

public Store(ShardId shardId, IndexSettings indexSettings, DirectoryService directoryService, ShardLock shardLock,
OnClose onClose) throws IOException {
this(shardId, indexSettings, directoryService.newDirectory(), shardLock, onClose);
}

public Store(ShardId shardId, IndexSettings indexSettings, Directory dir, ShardLock shardLock,
OnClose onClose) throws IOException {
super(shardId, indexSettings);
final Settings settings = indexSettings.getSettings();
Directory dir = directoryService.newDirectory();
final TimeValue refreshInterval = indexSettings.getValue(INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING);
logger.debug("store stats are refreshed with refresh_interval [{}]", refreshInterval);
ByteSizeCachingDirectory sizeCachingDir = new ByteSizeCachingDirectory(dir, refreshInterval);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package org.elasticsearch.repositories;

import org.apache.lucene.index.IndexCommit;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.snapshots.SnapshotShardFailure;

import java.io.IOException;
import java.util.List;

public class FilterRepository implements Repository {

private final Repository in;

public FilterRepository(Repository in) {
this.in = in;
}

@Override
public RepositoryMetaData getMetadata() {
return in.getMetadata();
}

@Override
public SnapshotInfo getSnapshotInfo(SnapshotId snapshotId) {
return in.getSnapshotInfo(snapshotId);
}

@Override
public MetaData getSnapshotGlobalMetaData(SnapshotId snapshotId) {
return in.getSnapshotGlobalMetaData(snapshotId);
}

@Override
public IndexMetaData getSnapshotIndexMetaData(SnapshotId snapshotId, IndexId index) throws IOException {
return in.getSnapshotIndexMetaData(snapshotId, index);
}

@Override
public RepositoryData getRepositoryData() {
return in.getRepositoryData();
}

@Override
public void initializeSnapshot(SnapshotId snapshotId, List<IndexId> indices, MetaData metaData) {
in.initializeSnapshot(snapshotId, indices, metaData);
}

@Override
public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long startTime, String failure, int totalShards,
List<SnapshotShardFailure> shardFailures, long repositoryStateId, boolean includeGlobalState) {
return in.finalizeSnapshot(snapshotId, indices, startTime, failure, totalShards, shardFailures, repositoryStateId,
includeGlobalState);
}

@Override
public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId) {
in.deleteSnapshot(snapshotId, repositoryStateId);
}

@Override
public long getSnapshotThrottleTimeInNanos() {
return in.getSnapshotThrottleTimeInNanos();
}

@Override
public long getRestoreThrottleTimeInNanos() {
return in.getRestoreThrottleTimeInNanos();
}

@Override
public String startVerification() {
return in.startVerification();
}

@Override
public void endVerification(String verificationToken) {
in.endVerification(verificationToken);
}

@Override
public void verify(String verificationToken, DiscoveryNode localNode) {
in.verify(verificationToken, localNode);
}

@Override
public boolean isReadOnly() {
return in.isReadOnly();
}

@Override
public void snapshotShard(IndexShard shard, Store store, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit,
IndexShardSnapshotStatus snapshotStatus) {
in.snapshotShard(shard, store, snapshotId, indexId, snapshotIndexCommit, snapshotStatus);
}

@Override
public void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId,
RecoveryState recoveryState) {
in.restoreShard(shard, snapshotId, version, indexId, snapshotShardId, recoveryState);
}

@Override
public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, IndexId indexId, ShardId shardId) {
return in.getShardSnapshotStatus(snapshotId, version, indexId, shardId);
}

@Override
public void applyPostRestoreOps(IndexShard shard) throws IOException {
in.applyPostRestoreOps(shard);
}

@Override
public Lifecycle.State lifecycleState() {
return in.lifecycleState();
}

@Override
public void addLifecycleListener(LifecycleListener listener) {
in.addLifecycleListener(listener);
}

@Override
public void removeLifecycleListener(LifecycleListener listener) {
in.removeLifecycleListener(listener);
}

@Override
public void start() {
in.start();
}

@Override
public void stop() {
in.stop();
}

@Override
public void close() {
in.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ private Repository createRepository(RepositoryMetaData repositoryMetaData) {
"repository type [" + repositoryMetaData.type() + "] does not exist");
}
try {
Repository repository = factory.create(repositoryMetaData);
Repository repository = factory.create(repositoryMetaData, typesRegistry::get);
repository.start();
return repository;
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.snapshots.SnapshotShardFailure;

import java.io.IOException;
import java.util.List;
import java.util.function.Function;

/**
* An interface for interacting with a repository in snapshot and restore.
Expand All @@ -46,7 +48,7 @@
* <ul>
* <li>Master calls {@link #initializeSnapshot(SnapshotId, List, org.elasticsearch.cluster.metadata.MetaData)}
* with list of indices that will be included into the snapshot</li>
* <li>Data nodes call {@link Repository#snapshotShard(IndexShard, SnapshotId, IndexId, IndexCommit, IndexShardSnapshotStatus)}
* <li>Data nodes call {@link Repository#snapshotShard(IndexShard, Store, SnapshotId, IndexId, IndexCommit, IndexShardSnapshotStatus)}
* for each shard</li>
* <li>When all shard calls return master calls {@link #finalizeSnapshot} with possible list of failures</li>
* </ul>
Expand All @@ -63,6 +65,10 @@ interface Factory {
* @param metadata metadata for the repository including name and settings
*/
Repository create(RepositoryMetaData metadata) throws Exception;

default Repository create(RepositoryMetaData metaData, Function<String, Repository.Factory> typeLookup) throws Exception {
return create(metaData);
}
}

/**
Expand Down Expand Up @@ -188,14 +194,15 @@ SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long
* <p>
* As snapshot process progresses, implementation of this method should update {@link IndexShardSnapshotStatus} object and check
* {@link IndexShardSnapshotStatus#isAborted()} to see if the snapshot process should be aborted.
*
* @param shard shard to be snapshotted
* @param store store to be snapshotted
* @param snapshotId snapshot id
* @param indexId id for the index being snapshotted
* @param snapshotIndexCommit commit point
* @param snapshotStatus snapshot status
*/
void snapshotShard(IndexShard shard, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus);
void snapshotShard(IndexShard shard, Store store, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit,
IndexShardSnapshotStatus snapshotStatus);

/**
* Restores snapshot of the shard.
Expand All @@ -211,6 +218,11 @@ SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long
*/
void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState);

/**
* This allows a repository to apply operations after the snapshot has been restored as part of the translog recovery phase.
*/
default void applyPostRestoreOps(IndexShard shard) throws IOException {}

/**
* Retrieve shard snapshot status for the stored snapshot
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -846,16 +846,17 @@ private void writeAtomic(final String blobName, final BytesReference bytesRef, b
}

@Override
public void snapshotShard(IndexShard shard, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) {
SnapshotContext snapshotContext = new SnapshotContext(shard, snapshotId, indexId, snapshotStatus, System.currentTimeMillis());
public void snapshotShard(IndexShard shard, Store store, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we still need to pass shard in tho this method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah unfortunately. I need access to shardPath and the mapper. I can look into this as a follow up

IndexShardSnapshotStatus snapshotStatus) {
SnapshotContext snapshotContext = new SnapshotContext(store, snapshotId, indexId, snapshotStatus, System.currentTimeMillis());
try {
snapshotContext.snapshot(snapshotIndexCommit);
} catch (Exception e) {
snapshotStatus.moveToFailed(System.currentTimeMillis(), ExceptionsHelper.detailedMessage(e));
if (e instanceof IndexShardSnapshotFailedException) {
throw (IndexShardSnapshotFailedException) e;
} else {
throw new IndexShardSnapshotFailedException(shard.shardId(), e);
throw new IndexShardSnapshotFailedException(store.shardId(), e);
}
}
}
Expand Down Expand Up @@ -1158,15 +1159,15 @@ private class SnapshotContext extends Context {
/**
* Constructs new context
*
* @param shard shard to be snapshotted
* @param store store to be snapshotted
* @param snapshotId snapshot id
* @param indexId the id of the index being snapshotted
* @param snapshotStatus snapshot status to report progress
*/
SnapshotContext(IndexShard shard, SnapshotId snapshotId, IndexId indexId, IndexShardSnapshotStatus snapshotStatus, long startTime) {
super(snapshotId, Version.CURRENT, indexId, shard.shardId());
SnapshotContext(Store store, SnapshotId snapshotId, IndexId indexId, IndexShardSnapshotStatus snapshotStatus, long startTime) {
super(snapshotId, Version.CURRENT, indexId, store.shardId());
this.snapshotStatus = snapshotStatus;
this.store = shard.store();
this.store = store;
this.startTime = startTime;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,8 @@ private void snapshot(final IndexShard indexShard, final Snapshot snapshot, fina
try {
// we flush first to make sure we get the latest writes snapshotted
try (Engine.IndexCommitRef snapshotRef = indexShard.acquireLastIndexCommit(true)) {
repository.snapshotShard(indexShard, snapshot.getSnapshotId(), indexId, snapshotRef.getIndexCommit(), snapshotStatus);
repository.snapshotShard(indexShard, indexShard.store(), snapshot.getSnapshotId(), indexId, snapshotRef.getIndexCommit(),
snapshotStatus);
if (logger.isDebugEnabled()) {
final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.asCopy();
logger.debug("snapshot ({}) completed to {} with {}", snapshot, repository, lastSnapshotStatus);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2764,7 +2764,8 @@ public boolean isReadOnly() {
}

@Override
public void snapshotShard(IndexShard shard, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) {
public void snapshotShard(IndexShard shard, Store store, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit,
IndexShardSnapshotStatus snapshotStatus) {
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public void onRecoveryFailure(RecoveryState state, RecoveryFailedException e, bo
};

protected ThreadPool threadPool;
private long primaryTerm;
protected long primaryTerm;

@Override
public void setUp() throws Exception {
Expand Down Expand Up @@ -693,7 +693,8 @@ protected void snapshotShard(final IndexShard shard,
Index index = shard.shardId().getIndex();
IndexId indexId = new IndexId(index.getName(), index.getUUID());

repository.snapshotShard(shard, snapshot.getSnapshotId(), indexId, indexCommitRef.getIndexCommit(), snapshotStatus);
repository.snapshotShard(shard, shard.store(), snapshot.getSnapshotId(), indexId, indexCommitRef.getIndexCommit(),
snapshotStatus);
}

final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.asCopy();
Expand Down
Loading