Skip to content

Commit

Permalink
Ensure fully deleted segments are accounted for correctly
Browse files Browse the repository at this point in the history
We can't rely on the leaf reader ordinal in a wrapped reader since
it might not correspond to the ordinal in the SegmentInfos for it's
SegmentCommitInfo.

Relates to #32844
Closes #33689
  • Loading branch information
s1monw committed Sep 18, 2018
1 parent 393b7b0 commit c51acad
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,25 @@ public synchronized List<String> syncSnapshot(IndexCommit commit) throws IOExcep
try (Lock writeLock = targetDirectory.obtainLock(IndexWriter.WRITE_LOCK_NAME);
StandardDirectoryReader reader = (StandardDirectoryReader) DirectoryReader.open(commit)) {
SegmentInfos segmentInfos = reader.getSegmentInfos();
DirectoryReader wrapper = wrapReader(reader);
List<SegmentCommitInfo> newInfos = new ArrayList<>();
for (LeafReaderContext ctx : wrapper.leaves()) {
SegmentCommitInfo info = segmentInfos.info(ctx.ord);
for (LeafReaderContext ctx : reader.leaves()) {
LeafReader leafReader = ctx.reader();
LiveDocs liveDocs = getLiveDocs(leafReader);
if (leafReader.numDocs() != 0) { // fully deleted segments don't need to be processed
SegmentCommitInfo newInfo = syncSegment(info, liveDocs, leafReader.getFieldInfos(), existingSegments, createdFiles);
newInfos.add(newInfo);
SegmentCommitInfo info = reader.getSegmentInfos().info(ctx.ord);
assert info.info.equals(Lucene.segmentReader(ctx.reader()).getSegmentInfo().info);
/* We could do this totally different without wrapping this dummy directory reader if FilterCodecReader would have a
* getDelegate method. This is fixed in LUCENE-8502 but we need to wait for it to come in 7.5.1 or 7.6.
* The reason here is that the ctx.ord is not guaranteed to be equivalent to the SegmentCommitInfo ord in the SegmentInfo
* object since we might drop fully deleted segments. if that happens we are using the wrong reader for the SI and
* might almost certainly expose deleted documents.
*/
DirectoryReader wrappedReader = wrapReader(new DummyDirectoryReader(reader.directory(), leafReader));
if (wrappedReader.leaves().isEmpty() == false) {
leafReader = wrappedReader.leaves().get(0).reader();
LiveDocs liveDocs = getLiveDocs(leafReader);
if (leafReader.numDocs() != 0) { // fully deleted segments don't need to be processed
SegmentCommitInfo newInfo = syncSegment(info, liveDocs, leafReader.getFieldInfos(), existingSegments, createdFiles);
newInfos.add(newInfo);
}
}
}
segmentInfos.clear();
Expand Down Expand Up @@ -257,4 +267,51 @@ private static class LiveDocs {
this.bits = bits;
}
}

private static class DummyDirectoryReader extends DirectoryReader {

protected DummyDirectoryReader(Directory directory, LeafReader... segmentReaders) throws IOException {
super(directory, segmentReaders);
}

@Override
protected DirectoryReader doOpenIfChanged() throws IOException {
return null;
}

@Override
protected DirectoryReader doOpenIfChanged(IndexCommit commit) throws IOException {
return null;
}

@Override
protected DirectoryReader doOpenIfChanged(IndexWriter writer, boolean applyAllDeletes) throws IOException {
return null;
}

@Override
public long getVersion() {
return 0;
}

@Override
public boolean isCurrent() throws IOException {
return false;
}

@Override
public IndexCommit getIndexCommit() throws IOException {
return null;
}

@Override
protected void doClose() throws IOException {

}

@Override
public CacheHelper getReaderCacheHelper() {
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ protected void closeInternal() {
SourceOnlySnapshot snapshot = new SourceOnlySnapshot(tempStore.directory(), querySupplier);
snapshot.syncSnapshot(snapshotIndexCommit);
// we will use the lucene doc ID as the seq ID so we set the local checkpoint to maxDoc with a new index UUID
SegmentInfos segmentInfos = store.readLastCommittedSegmentsInfo();
SegmentInfos segmentInfos = tempStore.readLastCommittedSegmentsInfo();
tempStore.bootstrapNewHistory(segmentInfos.totalMaxDoc());
store.incRef();
try (DirectoryReader reader = DirectoryReader.open(tempStore.directory())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import java.util.function.BiConsumer;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
Expand Down Expand Up @@ -97,7 +97,10 @@ public void testSnapshotAndRestore() throws Exception {
boolean requireRouting = randomBoolean();
boolean useNested = randomBoolean();
IndexRequestBuilder[] builders = snashotAndRestore(sourceIdx, 1, true, requireRouting, useNested);
assertHits(sourceIdx, builders.length);
IndicesStatsResponse indicesStatsResponse = client().admin().indices().prepareStats(sourceIdx).clear().setDocs(true).get();
long deleted = indicesStatsResponse.getTotal().docs.getDeleted();
boolean sourceHadDeletions = deleted > 0; // we use indexRandom which might create holes ie. deleted docs
assertHits(sourceIdx, builders.length, sourceHadDeletions);
assertMappings(sourceIdx, requireRouting, useNested);
SearchPhaseExecutionException e = expectThrows(SearchPhaseExecutionException.class, () -> {
client().prepareSearch(sourceIdx).setQuery(QueryBuilders.idsQuery()
Expand All @@ -116,7 +119,7 @@ public void testSnapshotAndRestore() throws Exception {
client().admin().indices().prepareUpdateSettings(sourceIdx)
.setSettings(Settings.builder().put("index.number_of_replicas", 1)).get();
ensureGreen(sourceIdx);
assertHits(sourceIdx, builders.length);
assertHits(sourceIdx, builders.length, sourceHadDeletions);
}

public void testSnapshotAndRestoreWithNested() throws Exception {
Expand All @@ -125,7 +128,7 @@ public void testSnapshotAndRestoreWithNested() throws Exception {
IndexRequestBuilder[] builders = snashotAndRestore(sourceIdx, 1, true, requireRouting, true);
IndicesStatsResponse indicesStatsResponse = client().admin().indices().prepareStats().clear().setDocs(true).get();
assertThat(indicesStatsResponse.getTotal().docs.getDeleted(), Matchers.greaterThan(0L));
assertHits(sourceIdx, builders.length);
assertHits(sourceIdx, builders.length, true);
assertMappings(sourceIdx, requireRouting, true);
SearchPhaseExecutionException e = expectThrows(SearchPhaseExecutionException.class, () ->
client().prepareSearch(sourceIdx).setQuery(QueryBuilders.idsQuery().addIds("" + randomIntBetween(0, builders.length))).get());
Expand All @@ -141,7 +144,7 @@ public void testSnapshotAndRestoreWithNested() throws Exception {
client().admin().indices().prepareUpdateSettings(sourceIdx).setSettings(Settings.builder().put("index.number_of_replicas", 1))
.get();
ensureGreen(sourceIdx);
assertHits(sourceIdx, builders.length);
assertHits(sourceIdx, builders.length, true);
}

private void assertMappings(String sourceIdx, boolean requireRouting, boolean useNested) throws IOException {
Expand All @@ -165,15 +168,12 @@ private void assertMappings(String sourceIdx, boolean requireRouting, boolean us
}
}

private void assertHits(String index, int numDocsExpected) {
private void assertHits(String index, int numDocsExpected, boolean sourceHadDeletions) {
SearchResponse searchResponse = client().prepareSearch(index)
.addSort(SeqNoFieldMapper.NAME, SortOrder.ASC)
.setSize(numDocsExpected).get();
Consumer<SearchResponse> assertConsumer = res -> {
BiConsumer<SearchResponse, Boolean> assertConsumer = (res, allowHoles) -> {
SearchHits hits = res.getHits();
IndicesStatsResponse indicesStatsResponse = client().admin().indices().prepareStats().clear().setDocs(true).get();
long deleted = indicesStatsResponse.getTotal().docs.getDeleted();
boolean allowHoles = deleted > 0; // we use indexRandom which might create holes ie. deleted docs
long i = 0;
for (SearchHit hit : hits) {
String id = hit.getId();
Expand All @@ -190,18 +190,24 @@ private void assertHits(String index, int numDocsExpected) {
assertEquals("r" + id, hit.field("_routing").getValue());
}
};
assertConsumer.accept(searchResponse);
assertConsumer.accept(searchResponse, sourceHadDeletions);
assertEquals(numDocsExpected, searchResponse.getHits().totalHits);
searchResponse = client().prepareSearch(index)
.addSort(SeqNoFieldMapper.NAME, SortOrder.ASC)
.setScroll("1m")
.slice(new SliceBuilder(SeqNoFieldMapper.NAME, randomIntBetween(0,1), 2))
.setSize(randomIntBetween(1, 10)).get();
do {
// now do a scroll with a slice
assertConsumer.accept(searchResponse);
searchResponse = client().prepareSearchScroll(searchResponse.getScrollId()).setScroll(TimeValue.timeValueMinutes(1)).get();
} while (searchResponse.getHits().getHits().length > 0);
try {
do {
// now do a scroll with a slice
assertConsumer.accept(searchResponse, true);
searchResponse = client().prepareSearchScroll(searchResponse.getScrollId()).setScroll(TimeValue.timeValueMinutes(1)).get();
} while (searchResponse.getHits().getHits().length > 0);
} finally {
if (searchResponse.getScrollId() != null) {
client().prepareClearScroll().addScrollId(searchResponse.getScrollId()).get();
}
}

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,6 @@ private String randomDoc() {
return "{ \"value\" : \"" + randomAlphaOfLength(10) + "\"}";
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/33689")
public void testRestoreMinmal() throws IOException {
IndexShard shard = newStartedShard(true);
int numInitialDocs = randomIntBetween(10, 100);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@
import org.apache.lucene.document.StoredField;
import org.apache.lucene.document.StringField;
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.CodecReader;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.FilterMergePolicy;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
Expand All @@ -34,6 +36,7 @@
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.IOSupplier;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.test.ESTestCase;

Expand Down Expand Up @@ -242,4 +245,55 @@ public boolean useCompoundFile(SegmentInfos infos, SegmentCommitInfo mergedInfo,
reader.close();
}
}

public void testFullyDeletedSegments() throws IOException {
try (Directory dir = newDirectory()) {
SnapshotDeletionPolicy deletionPolicy = new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy());
IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()
.setSoftDeletesField(Lucene.SOFT_DELETES_FIELD)
.setIndexDeletionPolicy(deletionPolicy).setMergePolicy(new FilterMergePolicy(NoMergePolicy.INSTANCE) {
@Override
public boolean useCompoundFile(SegmentInfos infos, SegmentCommitInfo mergedInfo, MergeContext mergeContext) {
return randomBoolean();
}

@Override
public boolean keepFullyDeletedSegment(IOSupplier<CodecReader> readerIOSupplier) throws IOException {
return true;
}
}));
Document doc = new Document();
doc.add(new StringField("id", "1", Field.Store.YES));
doc.add(new TextField("text", "the quick brown fox", Field.Store.NO));
doc.add(new NumericDocValuesField("rank", 1));
doc.add(new StoredField("rank", 1));
doc.add(new StoredField("src", "the quick brown fox"));
writer.addDocument(doc);
writer.commit();
doc = new Document();
doc.add(new StringField("id", "1", Field.Store.YES));
doc.add(new TextField("text", "the quick brown fox", Field.Store.NO));
doc.add(new NumericDocValuesField("rank", 3));
doc.add(new StoredField("rank", 3));
doc.add(new StoredField("src", "the quick brown fox"));
writer.softUpdateDocument(new Term("id", "1"), doc, new NumericDocValuesField(Lucene.SOFT_DELETES_FIELD, 1));
writer.commit();
try (Directory targetDir = newDirectory()) {
IndexCommit snapshot = deletionPolicy.snapshot();
SourceOnlySnapshot snapshoter = new SourceOnlySnapshot(targetDir);
snapshoter.syncSnapshot(snapshot);

try (DirectoryReader snapReader = DirectoryReader.open(targetDir)) {
assertEquals(snapReader.maxDoc(), 1);
assertEquals(snapReader.numDocs(), 1);
assertEquals("3", snapReader.document(0).getField("rank").stringValue());
}
try (IndexReader writerReader = DirectoryReader.open(writer)) {
assertEquals(writerReader.maxDoc(), 2);
assertEquals(writerReader.numDocs(), 1);
}
}
writer.close();
}
}
}

0 comments on commit c51acad

Please sign in to comment.