Skip to content

Commit

Permalink
TEST: Add debug log to FlushIT
Browse files Browse the repository at this point in the history
We still don't have a strong reason for the failures of
testDoNotRenewSyncedFlushWhenAllSealed and
testSyncedFlushSkipOutOfSyncReplicas.

This commit adds debug logging for these two tests.
  • Loading branch information
dnhatn committed May 1, 2018
1 parent f4901b8 commit 038fe11
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 10 deletions.
20 changes: 12 additions & 8 deletions server/src/test/java/org/elasticsearch/indices/flush/FlushIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -117,7 +118,7 @@ public void testSyncedFlush() throws ExecutionException, InterruptedException, I
ShardsSyncedFlushResult result;
if (randomBoolean()) {
logger.info("--> sync flushing shard 0");
result = SyncedFlushUtil.attemptSyncedFlush(internalCluster(), new ShardId(index, 0));
result = SyncedFlushUtil.attemptSyncedFlush(logger, internalCluster(), new ShardId(index, 0));
} else {
logger.info("--> sync flushing index [test]");
SyncedFlushResponse indicesResult = client().admin().indices().prepareSyncedFlush("test").get();
Expand Down Expand Up @@ -246,11 +247,14 @@ private void indexDoc(Engine engine, String id) throws IOException {
}

private String syncedFlushDescription(ShardsSyncedFlushResult result) {
return result.shardResponses().entrySet().stream()
String detail = result.shardResponses().entrySet().stream()
.map(e -> "Shard [" + e.getKey() + "], result [" + e.getValue() + "]")
.collect(Collectors.joining(","));
return String.format(Locale.ROOT, "Total shards: [%d], failed: [%s], reason: [%s], detail: [%s]",
result.totalShards(), result.failed(), result.failureReason(), detail);
}

@TestLogging("_root:DEBUG")
public void testSyncedFlushSkipOutOfSyncReplicas() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(between(2, 3));
final int numberOfReplicas = internalCluster().numDataNodes() - 1;
Expand All @@ -275,7 +279,7 @@ public void testSyncedFlushSkipOutOfSyncReplicas() throws Exception {
for (int i = 0; i < extraDocs; i++) {
indexDoc(IndexShardTestCase.getEngine(outOfSyncReplica), "extra_" + i);
}
final ShardsSyncedFlushResult partialResult = SyncedFlushUtil.attemptSyncedFlush(internalCluster(), shardId);
final ShardsSyncedFlushResult partialResult = SyncedFlushUtil.attemptSyncedFlush(logger, internalCluster(), shardId);
logger.info("Partial seal: {}", syncedFlushDescription(partialResult));
assertThat(partialResult.totalShards(), equalTo(numberOfReplicas + 1));
assertThat(partialResult.successfulShards(), equalTo(numberOfReplicas));
Expand All @@ -287,7 +291,7 @@ public void testSyncedFlushSkipOutOfSyncReplicas() throws Exception {
indexDoc(IndexShardTestCase.getEngine(indexShard), "extra_" + i);
}
}
final ShardsSyncedFlushResult fullResult = SyncedFlushUtil.attemptSyncedFlush(internalCluster(), shardId);
final ShardsSyncedFlushResult fullResult = SyncedFlushUtil.attemptSyncedFlush(logger, internalCluster(), shardId);
assertThat(fullResult.totalShards(), equalTo(numberOfReplicas + 1));
assertThat(fullResult.successfulShards(), equalTo(numberOfReplicas + 1));
}
Expand All @@ -308,11 +312,11 @@ public void testDoNotRenewSyncedFlushWhenAllSealed() throws Exception {
for (int i = 0; i < numDocs; i++) {
index("test", "doc", Integer.toString(i));
}
final ShardsSyncedFlushResult firstSeal = SyncedFlushUtil.attemptSyncedFlush(internalCluster(), shardId);
final ShardsSyncedFlushResult firstSeal = SyncedFlushUtil.attemptSyncedFlush(logger, internalCluster(), shardId);
logger.info("First seal: {}", syncedFlushDescription(firstSeal));
assertThat(firstSeal.successfulShards(), equalTo(numberOfReplicas + 1));
// Do not renew synced-flush
final ShardsSyncedFlushResult secondSeal = SyncedFlushUtil.attemptSyncedFlush(internalCluster(), shardId);
final ShardsSyncedFlushResult secondSeal = SyncedFlushUtil.attemptSyncedFlush(logger, internalCluster(), shardId);
logger.info("Second seal: {}", syncedFlushDescription(secondSeal));
assertThat(secondSeal.successfulShards(), equalTo(numberOfReplicas + 1));
assertThat(secondSeal.syncId(), equalTo(firstSeal.syncId()));
Expand All @@ -321,7 +325,7 @@ public void testDoNotRenewSyncedFlushWhenAllSealed() throws Exception {
for (int i = 0; i < moreDocs; i++) {
index("test", "doc", Integer.toString(i));
}
final ShardsSyncedFlushResult thirdSeal = SyncedFlushUtil.attemptSyncedFlush(internalCluster(), shardId);
final ShardsSyncedFlushResult thirdSeal = SyncedFlushUtil.attemptSyncedFlush(logger, internalCluster(), shardId);
logger.info("Third seal: {}", syncedFlushDescription(thirdSeal));
assertThat(thirdSeal.successfulShards(), equalTo(numberOfReplicas + 1));
assertThat(thirdSeal.syncId(), not(equalTo(firstSeal.syncId())));
Expand All @@ -337,7 +341,7 @@ public void testDoNotRenewSyncedFlushWhenAllSealed() throws Exception {
shard.flush(new FlushRequest(shardId.getIndexName()).force(true).waitIfOngoing(true));
assertThat(shard.commitStats().syncId(), nullValue());
}
final ShardsSyncedFlushResult forthSeal = SyncedFlushUtil.attemptSyncedFlush(internalCluster(), shardId);
final ShardsSyncedFlushResult forthSeal = SyncedFlushUtil.attemptSyncedFlush(logger, internalCluster(), shardId);
logger.info("Forth seal: {}", syncedFlushDescription(forthSeal));
assertThat(forthSeal.successfulShards(), equalTo(numberOfReplicas + 1));
assertThat(forthSeal.syncId(), not(equalTo(thirdSeal.syncId())));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
*/
package org.elasticsearch.indices.flush;

import org.apache.logging.log4j.Logger;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.InternalTestCluster;

Expand All @@ -40,8 +40,10 @@ private SyncedFlushUtil() {
/**
* Blocking version of {@link SyncedFlushService#attemptSyncedFlush(ShardId, ActionListener)}
*/
public static ShardsSyncedFlushResult attemptSyncedFlush(InternalTestCluster cluster, ShardId shardId) {
public static ShardsSyncedFlushResult attemptSyncedFlush(Logger logger, InternalTestCluster cluster, ShardId shardId) {
SyncedFlushService service = cluster.getInstance(SyncedFlushService.class);
logger.debug("Issue synced-flush on node [{}], shard [{}], cluster state [{}]",
service.nodeName(), shardId, cluster.clusterService(service.nodeName()).state());
LatchedListener<ShardsSyncedFlushResult> listener = new LatchedListener<>();
service.attemptSyncedFlush(shardId, listener);
try {
Expand Down

0 comments on commit 038fe11

Please sign in to comment.