Skip to content

Commit

Permalink
Fixing unblock condition for index create block (#9437)
Browse files Browse the repository at this point in the history
Signed-off-by: Rishav Sagar <[email protected]>
Co-authored-by: Rishav Sagar <[email protected]>
  • Loading branch information
RS146BIJAY and Rishav Sagar authored Aug 24, 2023
1 parent a6c0bb3 commit 4e68808
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 19 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix memory leak when using Zstd Dictionary ([#9403](https://github.com/opensearch-project/OpenSearch/pull/9403))
- Fix range reads in respository-s3 ([9512](https://github.com/opensearch-project/OpenSearch/issues/9512))
- Handle null partSize in OnDemandBlockSnapshotIndexInput ([#9291](https://github.com/opensearch-project/OpenSearch/issues/9291))
- Fix condition to remove index create block ([#9437](https://github.com/opensearch-project/OpenSearch/pull/9437))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,9 +380,21 @@ public void onNewInfo(ClusterInfo info) {
if ((state.getBlocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id()) == false)
&& nodes.size() > 0
&& nodesOverHighThreshold.size() == nodes.size()) {
logger.warn(
"Putting index create block on cluster as all nodes are breaching high disk watermark. "
+ "Number of nodes above high watermark: {}.",
nodesOverHighThreshold.size()
);
setIndexCreateBlock(listener, true);
} else if (state.getBlocks().hasGlobalBlockWithId(Metadata.CLUSTER_CREATE_INDEX_BLOCK.id())
&& diskThresholdSettings.isCreateIndexBlockAutoReleaseEnabled()) {
&& diskThresholdSettings.isCreateIndexBlockAutoReleaseEnabled()
&& nodesOverHighThreshold.size() < nodes.size()) {
logger.warn(
"Removing index create block on cluster as all nodes are no longer breaching high disk watermark. "
+ "Number of nodes above high watermark: {}. Total numbers of nodes: {}.",
nodesOverHighThreshold.size(),
nodes.size()
);
setIndexCreateBlock(listener, false);
} else {
listener.onResponse(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,12 @@
import org.opensearch.test.MockLogAppender;
import org.opensearch.test.junit.annotations.TestLogging;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -67,6 +69,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongSupplier;

import static org.opensearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_CREATE_INDEX_BLOCK_AUTO_RELEASE;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;

Expand Down Expand Up @@ -581,12 +584,16 @@ protected void setIndexCreateBlock(ActionListener<Void> listener, boolean indexC
);

advanceTime.set(false); // will do one reroute and emit warnings, but subsequent reroutes and associated messages are delayed
assertSingleWarningMessage(
monitor,
aboveHighWatermark,
final List<String> messages = new ArrayList<>();
messages.add(
"high disk watermark [90%] exceeded on * shards will be relocated away from this node* "
+ "the node is expected to continue to exceed the high disk watermark when these relocations are complete"
);
messages.add(
"Putting index create block on cluster as all nodes are breaching high disk watermark. "
+ "Number of nodes above high watermark: 1."
);
assertMultipleWarningMessages(monitor, aboveHighWatermark, messages);

advanceTime.set(true);
assertRepeatedWarningMessages(
Expand All @@ -605,22 +612,11 @@ protected void setIndexCreateBlock(ActionListener<Void> listener, boolean indexC

relocatingShardSizeRef.set(-5L);
advanceTime.set(true);
assertSingleInfoMessage(
monitor,
aboveHighWatermark,
"high disk watermark [90%] exceeded on * shards will be relocated away from this node* "
+ "the node is expected to be below the high disk watermark when these relocations are complete"
);

relocatingShardSizeRef.set(0L);
timeSupplier.getAsLong(); // advance time long enough to do another reroute
advanceTime.set(false); // will do one reroute and emit warnings, but subsequent reroutes and associated messages are delayed
assertSingleWarningMessage(
monitor,
aboveHighWatermark,
"high disk watermark [90%] exceeded on * shards will be relocated away from this node* "
+ "the node is expected to continue to exceed the high disk watermark when these relocations are complete"
);
assertMultipleWarningMessages(monitor, aboveHighWatermark, messages);

advanceTime.set(true);
assertRepeatedWarningMessages(
Expand Down Expand Up @@ -722,6 +718,113 @@ protected void setIndexCreateBlock(ActionListener<Void> listener, boolean indexC
assertTrue(countBlocksCalled.get() == 0);
}

public void testIndexCreateBlockRemovedOnlyWhenAnyNodeAboveHighWatermark() {
AllocationService allocation = createAllocationService(
Settings.builder()
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put("cluster.blocks.create_index.enabled", false)
.build()
);
Metadata metadata = Metadata.builder()
.put(
IndexMetadata.builder("test")
.settings(settings(Version.CURRENT).put("index.routing.allocation.require._id", "node2"))
.numberOfShards(1)
.numberOfReplicas(0)
)
.put(
IndexMetadata.builder("test_1")
.settings(settings(Version.CURRENT).put("index.routing.allocation.require._id", "node1"))
.numberOfShards(1)
.numberOfReplicas(0)
)
.put(
IndexMetadata.builder("test_2")
.settings(settings(Version.CURRENT).put("index.routing.allocation.require._id", "node1"))
.numberOfShards(1)
.numberOfReplicas(0)
)
.build();
RoutingTable routingTable = RoutingTable.builder()
.addAsNew(metadata.index("test"))
.addAsNew(metadata.index("test_1"))
.addAsNew(metadata.index("test_2"))
.build();

final ClusterState clusterState = applyStartedShardsUntilNoChange(
ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
.metadata(metadata)
.routingTable(routingTable)
.blocks(ClusterBlocks.builder().addGlobalBlock(Metadata.CLUSTER_CREATE_INDEX_BLOCK).build())
.nodes(DiscoveryNodes.builder().add(newNode("node1")).add(newNode("node2")))
.build(),
allocation
);
AtomicReference<Set<String>> indices = new AtomicReference<>();
AtomicInteger countBlocksCalled = new AtomicInteger();
AtomicInteger countUnblockBlocksCalled = new AtomicInteger();
AtomicLong currentTime = new AtomicLong();
Settings settings = Settings.builder().put(CLUSTER_CREATE_INDEX_BLOCK_AUTO_RELEASE.getKey(), true).build();
DiskThresholdMonitor monitor = new DiskThresholdMonitor(
settings,
() -> clusterState,
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
null,
currentTime::get,
(reason, priority, listener) -> {
listener.onResponse(null);
}
) {

@Override
protected void updateIndicesReadOnly(Set<String> indicesToMarkReadOnly, ActionListener<Void> listener, boolean readOnly) {
assertTrue(indices.compareAndSet(null, indicesToMarkReadOnly));
assertTrue(readOnly);
listener.onResponse(null);
}

@Override
protected void setIndexCreateBlock(ActionListener<Void> listener, boolean indexCreateBlock) {
if (indexCreateBlock == true) {
countBlocksCalled.set(countBlocksCalled.get() + 1);
} else {
countUnblockBlocksCalled.set(countUnblockBlocksCalled.get() + 1);
}

listener.onResponse(null);
}
};

Map<String, DiskUsage> builder = new HashMap<>();

// Initially all the nodes are breaching high watermark and IndexCreateBlock is already present on the cluster.
// Since block is already present, DiskThresholdMonitor should not again try to apply block.
builder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, 9));
builder.put("node2", new DiskUsage("node2", "node2", "/foo/bar", 100, 9));
monitor.onNewInfo(clusterInfo(builder));
// Since Block is already present and nodes are below high watermark so neither block nor unblock will be called.
assertEquals(countBlocksCalled.get(), 0);
assertEquals(countUnblockBlocksCalled.get(), 0);

// Ensure DiskThresholdMonitor does not try to remove block in the next iteration if all nodes are breaching high watermark.
monitor.onNewInfo(clusterInfo(builder));
assertEquals(countBlocksCalled.get(), 0);
assertEquals(countUnblockBlocksCalled.get(), 0);

builder = new HashMap<>();

// If any node is no longer breaching high watermark, DiskThresholdMonitor should remove IndexCreateBlock.
builder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, 19));
builder.put("node2", new DiskUsage("node2", "node2", "/foo/bar", 100, 1));
// Need to add delay in current time to allow nodes to be removed high watermark list.
currentTime.addAndGet(randomLongBetween(60001, 120000));

monitor.onNewInfo(clusterInfo(builder));
// Block will be removed if any nodes is no longer breaching high watermark.
assertEquals(countBlocksCalled.get(), 0);
assertEquals(countUnblockBlocksCalled.get(), 1);
}

private void assertNoLogging(DiskThresholdMonitor monitor, final Map<String, DiskUsage> diskUsages) throws IllegalAccessException {
try (MockLogAppender mockAppender = MockLogAppender.createForLoggers(LogManager.getLogger(DiskThresholdMonitor.class))) {
mockAppender.addExpectation(
Expand Down Expand Up @@ -756,10 +859,11 @@ private void assertRepeatedWarningMessages(DiskThresholdMonitor monitor, final M
}
}

private void assertSingleWarningMessage(DiskThresholdMonitor monitor, final Map<String, DiskUsage> diskUsages, String message)
private void assertMultipleWarningMessages(DiskThresholdMonitor monitor, final Map<String, DiskUsage> diskUsages, List<String> messages)
throws IllegalAccessException {
assertLogging(monitor, diskUsages, Level.WARN, message);
assertNoLogging(monitor, diskUsages);
for (int index = 0; index < messages.size(); index++) {
assertLogging(monitor, diskUsages, Level.WARN, messages.get(index));
}
}

private void assertSingleInfoMessage(DiskThresholdMonitor monitor, final Map<String, DiskUsage> diskUsages, String message)
Expand Down

0 comments on commit 4e68808

Please sign in to comment.