Skip to content

Commit

Permalink
Added support for feature flags in opensearch.yml
Browse files Browse the repository at this point in the history
This change introduces a static store "settings"
in FeatureFlags.java file to enable isEnabled method to
fetch flag settings defined in opensearch.yml.

Signed-off-by: Nagaraj Tantri <[email protected]>
  • Loading branch information
ntantri committed Jan 4, 2023
1 parent 28e9b11 commit 57c2125
Show file tree
Hide file tree
Showing 9 changed files with 164 additions and 43 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add CI bundle pattern to distribution download ([#5348](https://github.com/opensearch-project/OpenSearch/pull/5348))
- Add support for ppc64le architecture ([#5459](https://github.com/opensearch-project/OpenSearch/pull/5459))
- Added @gbbafna as an OpenSearch maintainer ([#5668](https://github.com/opensearch-project/OpenSearch/pull/5668))
- Added support for feature flags in opensearch.yml ([#4959](https://github.com/opensearch-project/OpenSearch/pull/4959))

### Dependencies
- Bumps `log4j-core` from 2.18.0 to 2.19.0
Expand Down
23 changes: 23 additions & 0 deletions distribution/src/config/opensearch.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,26 @@ ${path.logs}
# Require explicit names when deleting indices:
#
#action.destructive_requires_name: true
#
# ---------------------------------- Experimental Features -----------------------------------
#
# Gates the visibility of the index setting that allows changing of replication type.
# Once the feature is ready for production release, this feature flag can be removed.
#
#opensearch.experimental.feature.replication_type.enabled: false
#
#
# Gates the visibility of the index setting that allows persisting data to remote store along with local disk.
# Once the feature is ready for production release, this feature flag can be removed.
#
#opensearch.experimental.feature.remote_store.enabled: false
#
#
# Gates the functionality of a new parameter to the snapshot restore API
# that allows for creation of a new index type that searches a snapshot
# directly in a remote repository without restoring all index data to disk
# ahead of time.
#
#opensearch.experimental.feature.searchable_snapshot.enabled: false
#
#
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
package org.opensearch.indices.replication;

import com.carrotsearch.randomizedtesting.RandomizedTest;
import org.junit.BeforeClass;
import org.opensearch.OpenSearchCorruptionException;
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.indices.segments.IndexShardSegments;
Expand Down Expand Up @@ -70,11 +69,6 @@ public class SegmentReplicationIT extends OpenSearchIntegTestCase {
private static final int SHARD_COUNT = 1;
private static final int REPLICA_COUNT = 1;

@BeforeClass
public static void assumeFeatureFlag() {
assumeTrue("Segment replication Feature flag is enabled", Boolean.parseBoolean(System.getProperty(FeatureFlags.REPLICATION_TYPE)));
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(MockTransportService.TestPlugin.class);
Expand All @@ -96,11 +90,16 @@ protected boolean addMockInternalEngine() {
return false;
}

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REPLICATION_TYPE, "true").build();
}

public void testPrimaryStopped_ReplicaPromoted() throws Exception {
final String primary = internalCluster().startNode();
final String primary = internalCluster().startNode(featureFlagSettings());
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replica = internalCluster().startNode();
final String replica = internalCluster().startNode(featureFlagSettings());
ensureGreen(INDEX_NAME);

client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
Expand All @@ -127,7 +126,7 @@ public void testPrimaryStopped_ReplicaPromoted() throws Exception {
assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 3);

// start another node, index another doc and replicate.
String nodeC = internalCluster().startNode();
String nodeC = internalCluster().startNode(featureFlagSettings());
ensureGreen(INDEX_NAME);
client().prepareIndex(INDEX_NAME).setId("4").setSource("baz", "baz").get();
refresh(INDEX_NAME);
Expand All @@ -138,10 +137,10 @@ public void testPrimaryStopped_ReplicaPromoted() throws Exception {
}

public void testRestartPrimary() throws Exception {
final String primary = internalCluster().startNode();
final String primary = internalCluster().startNode(featureFlagSettings());
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replica = internalCluster().startNode();
final String replica = internalCluster().startNode(featureFlagSettings());
ensureGreen(INDEX_NAME);

assertEquals(getNodeContainingPrimaryShard().getName(), primary);
Expand All @@ -167,10 +166,10 @@ public void testRestartPrimary() throws Exception {

public void testCancelPrimaryAllocation() throws Exception {
// this test cancels allocation on the primary - promoting the new replica and recreating the former primary as a replica.
final String primary = internalCluster().startNode();
final String primary = internalCluster().startNode(featureFlagSettings());
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replica = internalCluster().startNode();
final String replica = internalCluster().startNode(featureFlagSettings());
ensureGreen(INDEX_NAME);

final int initialDocCount = 1;
Expand Down Expand Up @@ -201,10 +200,13 @@ public void testCancelPrimaryAllocation() throws Exception {

/**
* This test verfies that replica shard is not added to the cluster when doing a round of segment replication fails during peer recovery.
*
* TODO: Ignoring this test as its flaky and needs separate fix
*/
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669")
public void testAddNewReplicaFailure() throws Exception {
logger.info("--> starting [Primary Node] ...");
final String primaryNode = internalCluster().startNode();
final String primaryNode = internalCluster().startNode(featureFlagSettings());

logger.info("--> creating test index ...");
prepareCreate(
Expand All @@ -227,7 +229,7 @@ public void testAddNewReplicaFailure() throws Exception {
assertThat(client().prepareSearch(INDEX_NAME).setSize(0).execute().actionGet().getHits().getTotalHits().value, equalTo(20L));

logger.info("--> start empty node to add replica shard");
final String replicaNode = internalCluster().startNode();
final String replicaNode = internalCluster().startNode(featureFlagSettings());

// Mock transport service to add behaviour of throwing corruption exception during segment replication process.
MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance(
Expand Down Expand Up @@ -269,8 +271,8 @@ public void testAddNewReplicaFailure() throws Exception {
}

public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception {
final String nodeA = internalCluster().startNode();
final String nodeB = internalCluster().startNode();
final String nodeA = internalCluster().startNode(featureFlagSettings());
final String nodeB = internalCluster().startNode(featureFlagSettings());
createIndex(INDEX_NAME);
ensureGreen(INDEX_NAME);

Expand Down Expand Up @@ -310,8 +312,8 @@ public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception {
}

public void testIndexReopenClose() throws Exception {
final String primary = internalCluster().startNode();
final String replica = internalCluster().startNode();
final String primary = internalCluster().startNode(featureFlagSettings());
final String replica = internalCluster().startNode(featureFlagSettings());
createIndex(INDEX_NAME);
ensureGreen(INDEX_NAME);

Expand Down Expand Up @@ -355,8 +357,8 @@ public void testMultipleShards() throws Exception {
.put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build();
final String nodeA = internalCluster().startNode();
final String nodeB = internalCluster().startNode();
final String nodeA = internalCluster().startNode(featureFlagSettings());
final String nodeB = internalCluster().startNode(featureFlagSettings());
createIndex(INDEX_NAME, indexSettings);
ensureGreen(INDEX_NAME);

Expand Down Expand Up @@ -396,8 +398,8 @@ public void testMultipleShards() throws Exception {
}

public void testReplicationAfterForceMerge() throws Exception {
final String nodeA = internalCluster().startNode();
final String nodeB = internalCluster().startNode();
final String nodeA = internalCluster().startNode(featureFlagSettings());
final String nodeB = internalCluster().startNode(featureFlagSettings());
createIndex(INDEX_NAME);
ensureGreen(INDEX_NAME);

Expand Down Expand Up @@ -441,11 +443,11 @@ public void testReplicationAfterForceMerge() throws Exception {
}

public void testCancellation() throws Exception {
final String primaryNode = internalCluster().startNode();
final String primaryNode = internalCluster().startNode(featureFlagSettings());
createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build());
ensureYellow(INDEX_NAME);

final String replicaNode = internalCluster().startNode();
final String replicaNode = internalCluster().startNode(featureFlagSettings());

final SegmentReplicationSourceService segmentReplicationSourceService = internalCluster().getInstance(
SegmentReplicationSourceService.class,
Expand Down Expand Up @@ -500,7 +502,7 @@ public void testCancellation() throws Exception {
}

public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception {
final String primaryNode = internalCluster().startNode();
final String primaryNode = internalCluster().startNode(featureFlagSettings());
createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build());
ensureGreen(INDEX_NAME);

Expand All @@ -523,7 +525,7 @@ public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception {
.prepareUpdateSettings(INDEX_NAME)
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1))
);
final String replicaNode = internalCluster().startNode();
final String replicaNode = internalCluster().startNode(featureFlagSettings());
ensureGreen(INDEX_NAME);

assertHitCount(client(primaryNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2);
Expand All @@ -538,8 +540,8 @@ public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception {
}

public void testDeleteOperations() throws Exception {
final String nodeA = internalCluster().startNode();
final String nodeB = internalCluster().startNode();
final String nodeA = internalCluster().startNode(featureFlagSettings());
final String nodeB = internalCluster().startNode(featureFlagSettings());

createIndex(INDEX_NAME);
ensureGreen(INDEX_NAME);
Expand Down Expand Up @@ -601,10 +603,10 @@ public void testDeleteOperations() throws Exception {
}

public void testUpdateOperations() throws Exception {
final String primary = internalCluster().startNode();
final String primary = internalCluster().startNode(featureFlagSettings());
createIndex(INDEX_NAME);
ensureYellow(INDEX_NAME);
final String replica = internalCluster().startNode();
final String replica = internalCluster().startNode(featureFlagSettings());

final int initialDocCount = scaledRandomIntBetween(0, 200);
try (
Expand Down Expand Up @@ -705,10 +707,10 @@ public void testDropPrimaryDuringReplication() throws Exception {
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 6)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build();
final String clusterManagerNode = internalCluster().startClusterManagerOnlyNode();
final String primaryNode = internalCluster().startDataOnlyNode(Settings.EMPTY);
final String clusterManagerNode = internalCluster().startClusterManagerOnlyNode(featureFlagSettings());
final String primaryNode = internalCluster().startDataOnlyNode(featureFlagSettings());
createIndex(INDEX_NAME, settings);
internalCluster().startDataOnlyNodes(6);
internalCluster().startDataOnlyNodes(6, featureFlagSettings());
ensureGreen(INDEX_NAME);

int initialDocCount = scaledRandomIntBetween(100, 200);
Expand All @@ -731,7 +733,7 @@ public void testDropPrimaryDuringReplication() throws Exception {
ensureYellow(INDEX_NAME);

// start another replica.
internalCluster().startDataOnlyNode();
internalCluster().startDataOnlyNode(featureFlagSettings());
ensureGreen(INDEX_NAME);

// index another doc and refresh - without this the new replica won't catch up.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
package org.opensearch.snapshots;

import com.carrotsearch.randomizedtesting.RandomizedTest;
import org.junit.BeforeClass;
import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequestBuilder;
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
Expand Down Expand Up @@ -46,9 +45,9 @@ public class SegmentReplicationSnapshotIT extends AbstractSnapshotIntegTestCase
private static final String REPOSITORY_NAME = "test-segrep-repo";
private static final String SNAPSHOT_NAME = "test-segrep-snapshot";

@BeforeClass
public static void assumeFeatureFlag() {
assumeTrue("Segment replication Feature flag is enabled", Boolean.parseBoolean(System.getProperty(FeatureFlags.REPLICATION_TYPE)));
@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REPLICATION_TYPE, "true").build();
}

public Settings segRepEnableIndexSettings() {
Expand Down Expand Up @@ -100,11 +99,11 @@ public void ingestData(int docCount, String indexName) throws Exception {
// Start cluster with provided settings and return the node names as list
public List<String> startClusterWithSettings(Settings indexSettings, int replicaCount) throws Exception {
// Start primary
final String primaryNode = internalCluster().startNode();
final String primaryNode = internalCluster().startNode(featureFlagSettings());
List<String> nodeNames = new ArrayList<>();
nodeNames.add(primaryNode);
for (int i = 0; i < replicaCount; i++) {
nodeNames.add(internalCluster().startNode());
nodeNames.add(internalCluster().startNode(featureFlagSettings()));
}
createIndex(INDEX_NAME, indexSettings);
ensureGreen(INDEX_NAME);
Expand Down Expand Up @@ -266,7 +265,7 @@ public void testRestoreOnReplicaNode() throws Exception {

// Assertions
assertThat(restoreSnapshotResponse.status(), equalTo(RestStatus.ACCEPTED));
internalCluster().startNode();
internalCluster().startNode(featureFlagSettings());
ensureGreen(RESTORED_INDEX_NAME);
GetSettingsResponse settingsResponse = client().admin()
.indices()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.settings;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.util.FeatureFlags;

/**
* Encapsulates all valid feature flag level settings.
*
* @opensearch.internal
*/
public class FeatureFlagSettings extends AbstractScopedSettings {

protected FeatureFlagSettings(
Settings settings,
Set<Setting<?>> settingsSet,
Set<SettingUpgrader<?>> settingUpgraders,
Property scope
) {
super(settings, settingsSet, settingUpgraders, scope);
}

public static final Set<Setting<?>> BUILT_IN_FEATURE_FLAGS = Collections.unmodifiableSet(
new HashSet<>(
Arrays.asList(
FeatureFlags.REPLICATION_TYPE_SETTING,
FeatureFlags.REMOTE_STORE_SETTING,
FeatureFlags.SEARCHABLE_SNAPSHOT_SETTING,
FeatureFlags.EXTENSIONS_SETTING
)
)
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ public SettingsModule(
for (Setting<?> setting : IndexScopedSettings.BUILT_IN_INDEX_SETTINGS) {
registerSetting(setting);
}
for (Setting<?> setting : FeatureFlagSettings.BUILT_IN_FEATURE_FLAGS) {
registerSetting(setting);
}

for (Map.Entry<String, List<Setting>> featureFlaggedSetting : IndexScopedSettings.FEATURE_FLAGGED_INDEX_SETTINGS.entrySet()) {
if (FeatureFlags.isEnabled(featureFlaggedSetting.getKey())) {
Expand Down
Loading

0 comments on commit 57c2125

Please sign in to comment.