diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index fd06655f566b..cfd925923502 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -1420,6 +1420,7 @@ final void refresh(String source, SearcherScope scope) throws EngineException { // since it flushes the index as well (though, in terms of concurrency, we are allowed to do it) // both refresh types will result in an internal refresh but only the external will also // pass the new reader reference to the external reader manager. + final long localCheckpointBeforeRefresh = getLocalCheckpoint(); // this will also cause version map ram to be freed hence we always account for it. final long bytes = indexWriter.ramBytesUsed() + versionMap.ramBytesUsedForRefresh(); @@ -1445,6 +1446,7 @@ final void refresh(String source, SearcherScope scope) throws EngineException { } finally { store.decRef(); } + lastRefreshedCheckpointListener.updateRefreshedCheckpoint(localCheckpointBeforeRefresh); } } catch (AlreadyClosedException e) { failOnTragicEvent(e); @@ -1459,7 +1461,8 @@ final void refresh(String source, SearcherScope scope) throws EngineException { } finally { writingBytes.addAndGet(-bytes); } - + assert lastRefreshedCheckpoint() >= localCheckpointBeforeRefresh : "refresh checkpoint was not advanced; " + + "local_checkpoint=" + localCheckpointBeforeRefresh + " refresh_checkpoint=" + lastRefreshedCheckpoint(); // TODO: maybe we should just put a scheduled job in threadPool? // We check for pruning in each delete request, but we also prune here e.g. in case a delete burst comes in and then no more deletes // for a long time: @@ -2526,21 +2529,31 @@ public long softUpdateDocuments(Term term, Iterable Math.max(curr, checkpoint)); + assert refreshedCheckpoint.get() >= checkpoint : refreshedCheckpoint.get() + " < " + checkpoint; + } } } diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 8da58ffc3a37..d3aead9e44e1 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -4994,6 +4994,32 @@ public void testKeepMinRetainedSeqNoByMergePolicy() throws IOException { } } + public void testLastRefreshCheckpoint() throws Exception { + AtomicBoolean done = new AtomicBoolean(); + Thread[] refreshThreads = new Thread[between(1, 8)]; + CountDownLatch latch = new CountDownLatch(refreshThreads.length); + for (int i = 0; i < refreshThreads.length; i++) { + latch.countDown(); + refreshThreads[i] = new Thread(() -> { + while (done.get() == false) { + long checkPointBeforeRefresh = engine.getLocalCheckpoint(); + engine.refresh("test", randomFrom(Engine.SearcherScope.values())); + assertThat(engine.lastRefreshedCheckpoint(), greaterThanOrEqualTo(checkPointBeforeRefresh)); + } + }); + refreshThreads[i].start(); + } + latch.await(); + List ops = generateSingleDocHistory(true, VersionType.EXTERNAL, 1, 10, 1000, "1"); + concurrentlyApplyOps(ops, engine); + done.set(true); + for (Thread thread : refreshThreads) { + thread.join(); + } + engine.refresh("test"); + assertThat(engine.lastRefreshedCheckpoint(), equalTo(engine.getLocalCheckpoint())); + } + private static void trimUnsafeCommits(EngineConfig config) throws IOException { final Store store = config.getStore(); final TranslogConfig translogConfig = config.getTranslogConfig();