Skip to content

Commit

Permalink
Deduplicate Index Metadata in BlobStore (elastic#50278)
Browse files Browse the repository at this point in the history
This PR introduces two new fields in to `RepositoryData` (index-N) to track the blob name of `IndexMetaData` blobs and their content via setting generations and uuids. This is used to deduplicate the `IndexMetaData` blobs (`meta-{uuid}.dat` in the indices folders under `/indices` so that new metadata for an index is only written to the repository during a snapshot if that same metadata can't be found in another snapshot.
This saves one write per index in the common case of unchanged metadata thus saving cost and making snapshot finalization drastically faster if many indices are being snapshotted at the same time.

The implementation is mostly analogous to that for shard generations in elastic#46250 and piggy backs on the BwC mechanism introduced in that PR (which means this PR needs adjustments if it doesn't go into `7.6`).

Relates to elastic#45736 as it improves the efficiency of snapshotting unchanged indices
Relates to elastic#49800 as it has the potential of loading the index metadata for multiple snapshots of the same index concurrently much more efficient speeding up future concurrent snapshot delete
  • Loading branch information
original-brownbear committed Jul 14, 2020
1 parent 81e9695 commit 56eaf66
Show file tree
Hide file tree
Showing 23 changed files with 644 additions and 139 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,8 @@ public void testEnforcedCooldownPeriod() throws IOException {
final RepositoryData repositoryData = getRepositoryData(repository);
final RepositoryData modifiedRepositoryData = repositoryData.withVersions(Collections.singletonMap(fakeOldSnapshot,
SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION.minimumCompatibilityVersion()));
final BytesReference serialized =
BytesReference.bytes(modifiedRepositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), false));
final BytesReference serialized = BytesReference.bytes(modifiedRepositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(),
SnapshotsService.OLD_SNAPSHOT_FORMAT));
PlainActionFuture.get(f -> repository.threadPool().generic().execute(ActionRunnable.run(f, () -> {
try (InputStream stream = serialized.streamInput()) {
repository.blobStore().blobContainer(repository.basePath()).writeBlobAtomic(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ public void testUpgradeMovesRepoToNewMetaVersion() throws IOException {
ensureSnapshotRestoreWorks(repoName, "snapshot-2", shards);
}
} else {
if (SnapshotsService.useShardGenerations(minimumNodeVersion()) == false) {
if (SnapshotsService.useIndexGenerations(minimumNodeVersion()) == false) {
assertThat(TEST_STEP, is(TestStep.STEP3_OLD_CLUSTER));
final List<Class<? extends Exception>> expectedExceptions =
Arrays.asList(ResponseException.class, ElasticsearchStatusException.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.IndexMetaDataGenerations;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryData;
Expand Down Expand Up @@ -273,11 +274,12 @@ public void testHandlingMissingRootLevelSnapshotMetadata() throws Exception {
SnapshotId::getUUID, Function.identity())),
repositoryData.getSnapshotIds().stream().collect(Collectors.toMap(
SnapshotId::getUUID, repositoryData::getSnapshotState)),
Collections.emptyMap(), Collections.emptyMap(), ShardGenerations.EMPTY);
Collections.emptyMap(), Collections.emptyMap(), ShardGenerations.EMPTY, IndexMetaDataGenerations.EMPTY);

Files.write(repo.resolve(BlobStoreRepository.INDEX_FILE_PREFIX + withoutVersions.getGenId()),
BytesReference.toBytes(BytesReference.bytes(withoutVersions.snapshotsToXContent(XContentFactory.jsonBuilder(),
true))), StandardOpenOption.TRUNCATE_EXISTING);
BytesReference.toBytes(BytesReference.bytes(
withoutVersions.snapshotsToXContent(XContentFactory.jsonBuilder(), Version.CURRENT))),
StandardOpenOption.TRUNCATE_EXISTING);

logger.info("--> verify that repo is assumed in old metadata format");
final SnapshotsService snapshotsService = internalCluster().getCurrentMasterNodeInstance(SnapshotsService.class);
Expand Down Expand Up @@ -403,11 +405,12 @@ public void testRepairBrokenShardGenerations() throws IOException {
Collectors.toMap(SnapshotId::getUUID, repositoryData1::getVersion)),
repositoryData1.getIndices().values().stream().collect(
Collectors.toMap(Function.identity(), repositoryData1::getSnapshots)
), ShardGenerations.builder().putAll(repositoryData1.shardGenerations()).put(indexId, 0, "0").build()
), ShardGenerations.builder().putAll(repositoryData1.shardGenerations()).put(indexId, 0, "0").build(),
repositoryData1.indexMetaDataGenerations()
);
Files.write(repoPath.resolve(BlobStoreRepository.INDEX_FILE_PREFIX + repositoryData1.getGenId()),
BytesReference.toBytes(BytesReference.bytes(
brokenRepoData.snapshotsToXContent(XContentFactory.jsonBuilder(), true))),
brokenRepoData.snapshotsToXContent(XContentFactory.jsonBuilder(), Version.CURRENT))),
StandardOpenOption.TRUNCATE_EXISTING);

logger.info("--> recreating repository to clear caches");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,11 @@
import org.elasticsearch.node.Node;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.RepositoryMissingException;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.rest.AbstractRestChannel;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.admin.cluster.RestClusterStateAction;
import org.elasticsearch.rest.action.admin.cluster.RestGetRepositoriesAction;
import org.elasticsearch.snapshots.mockstore.MockRepository;
Expand Down Expand Up @@ -996,6 +998,8 @@ public void testSnapshotTotalAndIncrementalSizes() throws IOException {

SnapshotStats stats = snapshots.get(0).getStats();

final List<Path> snapshot0IndexMetaFiles = findRepoMetaBlobs(repoPath);
assertThat(snapshot0IndexMetaFiles, hasSize(1)); // snapshotting a single index
assertThat(stats.getTotalFileCount(), greaterThanOrEqualTo(snapshot0FileCount));
assertThat(stats.getTotalSize(), greaterThanOrEqualTo(snapshot0FileSize));

Expand Down Expand Up @@ -1023,6 +1027,10 @@ public void testSnapshotTotalAndIncrementalSizes() throws IOException {
.get();

final List<Path> snapshot1Files = scanSnapshotFolder(repoPath);
final List<Path> snapshot1IndexMetaFiles = findRepoMetaBlobs(repoPath);

// The IndexMetadata did not change between snapshots, verify that no new redundant IndexMetaData was written to the repository
assertThat(snapshot1IndexMetaFiles, is(snapshot0IndexMetaFiles));

final int snapshot1FileCount = snapshot1Files.size();
final long snapshot1FileSize = calculateTotalFilesSize(snapshot1Files);
Expand All @@ -1047,6 +1055,65 @@ public void testSnapshotTotalAndIncrementalSizes() throws IOException {
assertThat(anotherStats.getTotalSize(), greaterThanOrEqualTo(snapshot1FileSize));
}

public void testDeduplicateIndexMetadata() throws Exception {
final String indexName = "test-blocks-1";
final String repositoryName = "repo-" + indexName;
final String snapshot0 = "snapshot-0";
final String snapshot1 = "snapshot-1";
final String snapshot2 = "snapshot-2";

createIndex(indexName);

int docs = between(10, 100);
for (int i = 0; i < docs; i++) {
client().prepareIndex(indexName, "_doc").setSource("test", "init").execute().actionGet();
}

final Path repoPath = randomRepoPath();
createRepository(repositoryName, "fs", repoPath);

logger.info("--> create a snapshot");
client().admin().cluster().prepareCreateSnapshot(repositoryName, snapshot0)
.setIncludeGlobalState(true)
.setWaitForCompletion(true)
.get();

final List<Path> snapshot0IndexMetaFiles = findRepoMetaBlobs(repoPath);
assertThat(snapshot0IndexMetaFiles, hasSize(1)); // snapshotting a single index

docs = between(1, 5);
for (int i = 0; i < docs; i++) {
client().prepareIndex(indexName, "_doc").setSource("test", "test" + i).execute().actionGet();
}

logger.info("--> restart random data node and add new data node to change index allocation");
internalCluster().restartRandomDataNode();
internalCluster().startDataOnlyNode();
ensureGreen(indexName);

assertThat(client().admin().cluster().prepareCreateSnapshot(repositoryName, snapshot1).setWaitForCompletion(true).get().status(),
equalTo(RestStatus.OK));

final List<Path> snapshot1IndexMetaFiles = findRepoMetaBlobs(repoPath);

// The IndexMetadata did not change between snapshots, verify that no new redundant IndexMetaData was written to the repository
assertThat(snapshot1IndexMetaFiles, is(snapshot0IndexMetaFiles));

// index to some other field to trigger a change in index metadata
for (int i = 0; i < docs; i++) {
client().prepareIndex(indexName, "_doc").setSource("new_field", "test" + i).execute().actionGet();
}
assertThat(client().admin().cluster().prepareCreateSnapshot(repositoryName, snapshot2).setWaitForCompletion(true).get().status(),
equalTo(RestStatus.OK));

final List<Path> snapshot2IndexMetaFiles = findRepoMetaBlobs(repoPath);
assertThat(snapshot2IndexMetaFiles, hasSize(2)); // should have created one new metadata blob

assertAcked(client().admin().cluster().prepareDeleteSnapshot(repositoryName, snapshot0, snapshot1).get());
final List<Path> snapshot3IndexMetaFiles = findRepoMetaBlobs(repoPath);
assertThat(snapshot3IndexMetaFiles, hasSize(1)); // should have deleted the metadata blob referenced by the first two snapshots
}

public void testDataNodeRestartWithBusyMasterDuringSnapshot() throws Exception {
logger.info("--> starting a master node and two data nodes");
internalCluster().startMasterOnlyNode();
Expand Down Expand Up @@ -1256,6 +1323,22 @@ private long calculateTotalFilesSize(List<Path> files) {
}).sum();
}

private static List<Path> findRepoMetaBlobs(Path repoPath) throws IOException {
List<Path> files = new ArrayList<>();
Files.walkFileTree(repoPath.resolve("indices"), new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
final String fileName = file.getFileName().toString();
if (fileName.startsWith(BlobStoreRepository.METADATA_PREFIX) && fileName.endsWith(".dat")) {
files.add(file);
}
return super.visitFile(file, attrs);
}
}
);
return files;
}

private List<Path> scanSnapshotFolder(Path repoPath) throws IOException {
List<Path> files = new ArrayList<>();
Files.walkFileTree(repoPath, new SimpleFileVisitor<Path>(){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
Expand All @@ -34,6 +35,7 @@
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.snapshots.mockstore.MockRepository;

Expand Down Expand Up @@ -198,9 +200,10 @@ public Metadata getSnapshotGlobalMetadata(SnapshotId snapshotId) {
}

@Override
public IndexMetadata getSnapshotIndexMetadata(SnapshotId snapshotId, IndexId indexId) throws IOException {
public IndexMetadata getSnapshotIndexMetaData(RepositoryData repositoryData, SnapshotId snapshotId,
IndexId indexId) throws IOException {
indicesMetadata.computeIfAbsent(key(snapshotId.getName(), indexId.getName()), (s) -> new AtomicInteger(0)).incrementAndGet();
return super.getSnapshotIndexMetadata(snapshotId, indexId);
return super.getSnapshotIndexMetaData(PlainActionFuture.get(this::getRepositoryData), snapshotId, indexId);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2546,7 +2546,8 @@ public void testRestoreSnapshotWithCorruptedIndexMetadata() throws Exception {
final IndexId corruptedIndex = randomFrom(indexIds.values());
final Path indexMetadataPath = repo.resolve("indices")
.resolve(corruptedIndex.getId())
.resolve("meta-" + snapshotInfo.snapshotId().getUUID() + ".dat");
.resolve(
"meta-" + repositoryData.indexMetaDataGenerations().indexMetaBlobId(snapshotInfo.snapshotId(), corruptedIndex) + ".dat");

// Truncate the index metadata file
try(SeekableByteChannel outChan = Files.newByteChannel(indexMetadataPath, StandardOpenOption.WRITE)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ private Map<ShardId, IndexShardSnapshotStatus> snapshotShards(final String repos
final Map<ShardId, IndexShardSnapshotStatus> shardStatus = new HashMap<>();
for (String index : snapshotInfo.indices()) {
IndexId indexId = repositoryData.resolveIndexId(index);
IndexMetadata indexMetadata = repository.getSnapshotIndexMetadata(snapshotInfo.snapshotId(), indexId);
IndexMetadata indexMetadata = repository.getSnapshotIndexMetaData(repositoryData, snapshotInfo.snapshotId(), indexId);
if (indexMetadata != null) {
int numberOfShards = indexMetadata.getNumberOfShards();
for (int i = 0; i < numberOfShards; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ public Metadata getSnapshotGlobalMetadata(SnapshotId snapshotId) {
}

@Override
public IndexMetadata getSnapshotIndexMetadata(SnapshotId snapshotId, IndexId index) throws IOException {
return in.getSnapshotIndexMetadata(snapshotId, index);
public IndexMetadata getSnapshotIndexMetaData(RepositoryData repositoryData, SnapshotId snapshotId, IndexId index) throws IOException {
return in.getSnapshotIndexMetaData(repositoryData, snapshotId, index);
}

@Override
Expand Down
Loading

0 comments on commit 56eaf66

Please sign in to comment.