Skip to content

Commit

Permalink
Fixing UTs
Browse files Browse the repository at this point in the history
Signed-off-by: Shourya Dutta Biswas <[email protected]>
  • Loading branch information
shourya035 committed May 31, 2024
1 parent e09d1f8 commit 7d7dd50
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -42,19 +42,30 @@
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodeRole;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.RerouteService;
import org.opensearch.cluster.routing.RoutingNodes;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.allocation.command.AllocationCommands;
import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.opensearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.UUIDs;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.BlobStore;
import org.opensearch.common.compress.DeflateCompressor;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.node.remotestore.RemoteStoreNodeService;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.snapshots.InternalSnapshotsInfoService;
import org.opensearch.test.ClusterServiceUtils;
import org.opensearch.test.VersionUtils;
import org.opensearch.test.gateway.TestGatewayAllocator;
import org.opensearch.threadpool.TestThreadPool;

import java.util.ArrayList;
import java.util.HashSet;
Expand All @@ -72,11 +83,15 @@
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING;
import static org.opensearch.snapshots.InternalSnapshotsInfoService.INTERNAL_SNAPSHOT_INFO_MAX_CONCURRENT_FETCHES_SETTING;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class FailedShardsRoutingTests extends OpenSearchAllocationTestCase {
private final Logger logger = LogManager.getLogger(FailedShardsRoutingTests.class);
Expand Down Expand Up @@ -826,7 +841,6 @@ private void testReplicaIsPromoted(boolean isSegmentReplicationEnabled) {

public void testPreferReplicaOnRemoteNodeForPrimaryPromotion() {
FeatureFlags.initializeFeatureFlags(Settings.builder().put(REMOTE_STORE_MIGRATION_EXPERIMENTAL, "true").build());
AllocationService allocation = createAllocationService(Settings.builder().build());

// segment replication enabled
Settings.Builder settingsBuilder = settings(Version.CURRENT).put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT);
Expand All @@ -849,6 +863,29 @@ public void testPreferReplicaOnRemoteNodeForPrimaryPromotion() {
.routingTable(initialRoutingTable)
.build();

BlobPath basePath = BlobPath.cleanPath().add("test");
RepositoriesService repositoriesService = mock(RepositoriesService.class);
BlobStoreRepository repository = mock(BlobStoreRepository.class);
BlobStore blobStore = mock(BlobStore.class);
when(repository.blobStore()).thenReturn(blobStore);
when(repositoriesService.repository(anyString())).thenReturn(repository);
when(repository.basePath()).thenReturn(basePath);
when(repository.getCompressor()).thenReturn(new DeflateCompressor());
when(blobStore.isBlobMetadataEnabled()).thenReturn(true);

TestThreadPool testThreadPool = new TestThreadPool(getTestName());
final ClusterService clusterService = ClusterServiceUtils.createClusterService(testThreadPool);
final RerouteService rerouteService = (reason, priority, listener) -> listener.onResponse(clusterService.state());
final InternalSnapshotsInfoService internalSnapshotsInfoService = new InternalSnapshotsInfoService(
Settings.builder().put(INTERNAL_SNAPSHOT_INFO_MAX_CONCURRENT_FETCHES_SETTING.getKey(), randomIntBetween(1, 10)).build(),
clusterService,
() -> repositoriesService,
() -> rerouteService
);

TestGatewayAllocator gatewayAllocator = new TestGatewayAllocator();
AllocationService allocation = createAllocationService(Settings.EMPTY, gatewayAllocator, internalSnapshotsInfoService);

ShardId shardId = new ShardId(metadata.index("test").getIndex(), 0);

// add a remote node and start primary shard
Expand Down Expand Up @@ -954,5 +991,6 @@ public void testPreferReplicaOnRemoteNodeForPrimaryPromotion() {
|| primaryShardRouting3.currentNodeId().equals(nonRemoteNode2.getId())
);
assertEquals(expectedCandidateForSegRep.allocationId(), primaryShardRouting3.allocationId());
testThreadPool.shutdownNow();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.RerouteService;
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
Expand All @@ -51,18 +52,29 @@
import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.opensearch.cluster.routing.allocation.decider.Decision;
import org.opensearch.cluster.routing.allocation.decider.RemoteStoreMigrationAllocationDecider;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.UUIDs;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.BlobStore;
import org.opensearch.common.compress.DeflateCompressor;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.node.remotestore.RemoteStoreNodeService;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.snapshots.InternalSnapshotsInfoService;
import org.opensearch.test.ClusterServiceUtils;
import org.opensearch.threadpool.TestThreadPool;
import org.junit.After;

import java.util.Collections;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_STORE_ENABLED;
Expand All @@ -74,7 +86,11 @@
import static org.opensearch.node.remotestore.RemoteStoreNodeService.Direction.REMOTE_STORE;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING;
import static org.opensearch.snapshots.InternalSnapshotsInfoService.INTERNAL_SNAPSHOT_INFO_MAX_CONCURRENT_FETCHES_SETTING;
import static org.hamcrest.core.Is.is;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class RemoteStoreMigrationAllocationDeciderTests extends OpenSearchAllocationTestCase {

Expand Down Expand Up @@ -109,6 +125,7 @@ public class RemoteStoreMigrationAllocationDeciderTests extends OpenSearchAlloca
private RoutingTable routingTable = null;

private ShardId shardId = new ShardId(TEST_INDEX, "_na_", 0);
private TestThreadPool testThreadPool;

private void beforeAllocation(String direction) {
FeatureFlags.initializeFeatureFlags(directionEnabledNodeSettings);
Expand Down Expand Up @@ -138,17 +155,46 @@ private void beforeAllocation(String direction) {
getClusterSettings(customSettings)
);

BlobPath basePath = BlobPath.cleanPath().add("test");
RepositoriesService repositoriesService = mock(RepositoriesService.class);
BlobStoreRepository repository = mock(BlobStoreRepository.class);
BlobStore blobStore = mock(BlobStore.class);
when(repository.blobStore()).thenReturn(blobStore);
when(repositoriesService.repository(anyString())).thenReturn(repository);
when(repository.basePath()).thenReturn(basePath);
when(repository.getCompressor()).thenReturn(new DeflateCompressor());
when(blobStore.isBlobMetadataEnabled()).thenReturn(true);

if (Objects.isNull(testThreadPool)) {
testThreadPool = new TestThreadPool(getTestName());
}
final ClusterService clusterService = ClusterServiceUtils.createClusterService(testThreadPool);
final RerouteService rerouteService = (reason, priority, listener) -> listener.onResponse(clusterService.state());
final InternalSnapshotsInfoService internalSnapshotsInfoService = new InternalSnapshotsInfoService(
Settings.builder().put(INTERNAL_SNAPSHOT_INFO_MAX_CONCURRENT_FETCHES_SETTING.getKey(), randomIntBetween(1, 10)).build(),
clusterService,
() -> repositoriesService,
() -> rerouteService
);

routingAllocation = new RoutingAllocation(
new AllocationDeciders(Collections.singleton(remoteStoreMigrationAllocationDecider)),
clusterState.getRoutingNodes(),
clusterState,
null,
internalSnapshotsInfoService,
null,
0L
);
routingAllocation.debugDecision(true);
}

@After
public void tearDown() throws Exception {
testThreadPool.shutdownNow();
super.tearDown();
}

private void prepareRoutingTable(boolean isReplicaAllocation, String primaryShardNodeId) {
routingTable = RoutingTable.builder()
.add(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.gateway.GatewayAllocator;
import org.opensearch.snapshots.InternalSnapshotsInfoService;
import org.opensearch.snapshots.SnapshotShardSizeInfo;
import org.opensearch.snapshots.SnapshotsInfoService;
import org.opensearch.test.OpenSearchTestCase;
Expand All @@ -70,6 +71,7 @@

import static java.util.Collections.emptyMap;
import static org.opensearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.opensearch.snapshots.InternalSnapshotsInfoService.INTERNAL_SNAPSHOT_INFO_MAX_CONCURRENT_FETCHES_SETTING;

public abstract class OpenSearchAllocationTestCase extends OpenSearchTestCase {
private static final ClusterSettings EMPTY_CLUSTER_SETTINGS = new ClusterSettings(
Expand Down Expand Up @@ -103,6 +105,7 @@ public static MockAllocationService createAllocationService(Settings settings, C
randomAllocationDeciders(settings, clusterSettings, random),
new TestGatewayAllocator(),
new BalancedShardsAllocator(settings),

EmptyClusterInfoService.INSTANCE,
SNAPSHOT_INFO_SERVICE_WITH_NO_SHARD_SIZES
);
Expand Down

0 comments on commit 7d7dd50

Please sign in to comment.