Skip to content

Commit

Permalink
fix not statement and add change log
Browse files Browse the repository at this point in the history
Signed-off-by: panguixin <[email protected]>
  • Loading branch information
bugmakerrrrrr authored and andrross committed Jan 17, 2024
1 parent 9c40338 commit 9925816
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 14 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add copy ingest processor ([#11870](https://github.com/opensearch-project/OpenSearch/pull/11870))
- Introduce new feature flag "WRITEABLE_REMOTE_INDEX" to gate the writeable remote index functionality ([#11717](https://github.com/opensearch-project/OpenSearch/pull/11170))
- Bump OpenTelemetry from 1.32.0 to 1.34.1 ([#11891](https://github.com/opensearch-project/OpenSearch/pull/11891))
- Support index level allocation filtering for searchable snapshot index ([#11563](https://github.com/opensearch-project/OpenSearch/pull/11563))

### Dependencies
- Bumps jetty version to 9.4.52.v20230823 to fix GMS-2023-1857 ([#9822](https://github.com/opensearch-project/OpenSearch/pull/9822))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ void moveShards() {
}

// move shards that are currently assigned on excluded nodes
while (!eligibleNodes.isEmpty() && !excludedNodes.isEmpty()) {
while (eligibleNodes.isEmpty() == false && excludedNodes.isEmpty() == false) {
RoutingNode sourceNode = excludedNodes.poll();
for (final ShardRouting ineligibleShard : sourceNode) {
if (ineligibleForMove(ineligibleShard)) {
Expand All @@ -125,7 +125,7 @@ void moveShards() {
}

private boolean ineligibleForMove(ShardRouting shard) {
return !shard.started() || !RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shard, allocation));
return shard.started() == false || RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shard, allocation)) == false;
}

/**
Expand Down Expand Up @@ -166,7 +166,7 @@ private void classifyNodesForShardMovement(Queue<RoutingNode> eligibleNodes, Que
private void tryShardMovementToEligibleNode(Queue<RoutingNode> eligibleNodes, ShardRouting shard) {
final Set<String> nodesCheckedForShard = new HashSet<>();
int numNodesToCheck = eligibleNodes.size();
while (!eligibleNodes.isEmpty()) {
while (eligibleNodes.isEmpty() == false) {
assert numNodesToCheck > 0;
final RoutingNode targetNode = eligibleNodes.poll();
--numNodesToCheck;
Expand Down Expand Up @@ -258,7 +258,7 @@ void balance() {
}
}

while (!sourceNodes.isEmpty() && !targetNodes.isEmpty()) {
while (sourceNodes.isEmpty() == false && targetNodes.isEmpty() == false) {
RoutingNode sourceNode = sourceNodes.poll();
tryRebalanceNode(sourceNode, targetNodes, avgPrimaryPerNode, nodePrimaryShardCount);
}
Expand Down Expand Up @@ -308,11 +308,11 @@ public Map<String, UnassignedIndexShards> groupUnassignedShardsByIndex() {
HashMap<String, UnassignedIndexShards> unassignedShardMap = new HashMap<>();
for (ShardRouting shard : routingNodes.unassigned().drain()) {
String index = shard.getIndexName();
if (!RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shard, allocation))) {
if (RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shard, allocation)) == false) {
routingNodes.unassigned().add(shard);
continue;
}
if (!unassignedShardMap.containsKey(index)) {
if (unassignedShardMap.containsKey(index) == false) {
unassignedShardMap.put(index, new UnassignedIndexShards());
}
unassignedShardMap.get(index).addShard(shard);
Expand All @@ -329,13 +329,15 @@ private void unassignIgnoredRemoteShards(RoutingAllocation routingAllocation) {
RoutingNodes.UnassignedShards unassignedShards = routingAllocation.routingNodes().unassigned();
for (ShardRouting shard : unassignedShards.drainIgnored()) {
RoutingPool pool = RoutingPool.getShardPool(shard, routingAllocation);
if (pool == RoutingPool.REMOTE_CAPABLE && shard.unassigned() && (shard.primary() || !shard.unassignedInfo().isDelayed())) {
if (pool == RoutingPool.REMOTE_CAPABLE
&& shard.unassigned()
&& (shard.primary() || shard.unassignedInfo().isDelayed() == false)) {
ShardRouting unassignedShard = shard;
// Shard when moved to an unassigned state updates the recovery source to be ExistingStoreRecoverySource
// Remote shards do not have an existing store to recover from and can be recovered from an empty source
// to re-fetch any shard blocks from the repository.
if (shard.primary()) {
if (!RecoverySource.Type.SNAPSHOT.equals(shard.recoverySource().getType())) {
if (RecoverySource.Type.SNAPSHOT.equals(shard.recoverySource().getType()) == false) {
unassignedShard = shard.updateUnassigned(shard.unassignedInfo(), RecoverySource.EmptyStoreRecoverySource.INSTANCE);
}
}
Expand Down Expand Up @@ -386,7 +388,7 @@ private void allocateUnassignedShards(
}
logger.debug("Allocating shards for index: [{}]", index);

while (!shardsToAllocate.isEmpty() && !nodeQueue.isEmpty()) {
while (shardsToAllocate.isEmpty() == false && nodeQueue.isEmpty() == false) {
ShardRouting shard = shardsToAllocate.poll();
if (shard.assignedToNode()) {
if (logger.isDebugEnabled()) {
Expand Down Expand Up @@ -423,7 +425,7 @@ private void tryAllocateUnassignedShard(Queue<RoutingNode> nodeQueue, ShardRouti
boolean allocated = false;
boolean throttled = false;
Set<String> nodesCheckedForShard = new HashSet<>();
while (!nodeQueue.isEmpty()) {
while (nodeQueue.isEmpty() == false) {
RoutingNode node = nodeQueue.poll();
Decision allocateDecision = allocation.deciders().canAllocate(shard, node, allocation);
nodesCheckedForShard.add(node.nodeId());
Expand Down Expand Up @@ -482,7 +484,7 @@ private void tryAllocateUnassignedShard(Queue<RoutingNode> nodeQueue, ShardRouti
}
}

if (!allocated) {
if (allocated == false) {
UnassignedInfo.AllocationStatus status = throttled
? UnassignedInfo.AllocationStatus.DECIDERS_THROTTLED
: UnassignedInfo.AllocationStatus.DECIDERS_NO;
Expand All @@ -503,14 +505,16 @@ private void tryRebalanceNode(

// Try to relocate the valid shards on the sourceNode, one at a time;
// until either sourceNode is balanced OR no more active primary shard available OR all the target nodes are exhausted
while (shardsToBalance > 0 && shardIterator.hasNext() && !targetNodes.isEmpty()) {
while (shardsToBalance > 0 && shardIterator.hasNext() && targetNodes.isEmpty() == false) {
// Find an active primary shard to relocate
ShardRouting shard = shardIterator.next();
if (!shard.started() || !shard.primary() || !RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shard, allocation))) {
if (shard.started() == false
|| shard.primary() == false
|| RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shard, allocation)) == false) {
continue;
}

while (!targetNodes.isEmpty()) {
while (targetNodes.isEmpty() == false) {
// Find a valid target node that can accommodate the current shard relocation
RoutingNode targetNode = targetNodes.poll();
if (primaryCount.get(targetNode.nodeId()) >= avgPrimary) {
Expand Down

0 comments on commit 9925816

Please sign in to comment.