Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Addition of Optional Parameter - allow_Red_cluster to bypass ISM to run on a Red Opensearch Cluster #1189

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -250,11 +250,6 @@ object ManagedIndexRunner :
@Suppress("ReturnCount", "ComplexMethod", "LongMethod", "ComplexCondition", "NestedBlockDepth")
private suspend fun runManagedIndexConfig(managedIndexConfig: ManagedIndexConfig, jobContext: JobExecutionContext) {
logger.debug("Run job for index ${managedIndexConfig.index}")
// doing a check of local cluster health as we do not want to overload cluster manager node with potentially a lot of calls
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't move this red cluster check down. Because it helps to not making 1 or more calls below which potentially worsen a red cluster.

Instead, we would need to rely on the ManagedIndexConfig which we directly have here. It has a snapshot of the policy. Only when the policy has allow_red_cluster, then we will bypass this check.

The problem is ManagedIndexConfig doesn't know which state or action is running, only metadata knows, so we won't be able to just bypass when specific state or action is running. To do that, more complex logic would be needed to update ManagedIndexConfig with required info...

Copy link
Author

@skumarp7 skumarp7 Jul 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @bowenlan-amzn,

The calls we're making to the cluster are mostly GET operations, to fetch state etc, so I'm not sure how they would worsen a red cluster. I can still try to avoid them, but not sure of the right approach --

This is my understanding of your suggestions. Please correct where required :

  • Move the check to original location
  • Do some parsing of the policy to figure out if "allow_red_cluster" is present anywhere in the policy. If it does not, return like we do in existing code.
  • Else allow to run further
    (At this stage, we don't know for which state this is going to be executed, as we cannot run any get calls).

Challenges:

  • allow_red_cluster flag has been added at state level i.e. some states in a policy can have it enabled true and some might not.
  • After we have allowed it to bypass the initial policy check, we again require a check at the state/action level if parameter is set to 'true or false' to run/skip that state/action.

That means, for a state that didn't have allow_red_cluster true, we would still execute those 1-2 GET calls before skipping it - which again could potentially worsen the cluster.

Ques:

  1. How would be able to get the value of "allow_red_cluster" parameter at each state before even we get that metadata? can you throw some light to help me implement this logic in ManagedIndexConfig?
  2. If we still need to place the check at top level only, we can move the parameter at the policy level itself instead of state level (as that can be fetched from the ManagedIndexConfig).
    That would mean run all state/actions (if flag is true), or run nothing (if flag is false).
    But with this, the concern rises that - any ism action that doesnt support red cluster or harm a red cluster will also be bypassed to run on a red cluster.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The calls we're making to the cluster are mostly GET operations, to fetch state etc, so I'm not sure how they would worsen a red cluster.

Right, these are get calls, but consider this small change will affect all the ISM jobs which can be 10k+ in a cluster, we should still keep the original logic — don't do anything if cluster is red.

That would mean run all state/actions (if flag is true), or run nothing (if flag is false).
But with this, the concern rises that - any ism action that doesnt support red cluster or harm a red cluster will also be bypassed to run on a red cluster.

We don't need new logic in ManagedIndexConfig. ManagedIndexConfig has policy in it

I'm thinking a little hacky solution maybe having sth like allow_red_cluster_after: 3d, then compare the current time with jobEnabledTime which seems to be the time when the job is created. So we can calculate when to start to allow red cluster, before then, we still block execution by red cluster.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @bowenlan-amzn,

I am guessing the idea here being suggested is we give a time-window to the user to decide - such that if the cluster continues to remain in red state for that long, then ism would be allowed to run.

Few follow up questions:

  • Let's say that the policy has triggered the job on day 1 and the cluster state goes to red after 10 days. The job would immediately get triggered on the red-cluster as the difference between the jobEnabledTime and current time would already have exceeded 3 days. In this case, the red cluster will be bypassed regardless of the time duration of 3days set in the parameter and it would not wait in red state for 3d. Is this understanding correct?

  • Is there any way to retrieve information for how many days the cluster state has been in Red?
    Just thinking out loud on whether we could instead check the duration of how much time the cluster state has been red and compare that with allow_red_cluster_after: 3d.

if (clusterIsRed()) {
logger.debug("Skipping current execution of ${managedIndexConfig.index} because of red cluster health")
return
}

val (managedIndexMetaData, getMetadataSuccess) = client.getManagedIndexMetadata(managedIndexConfig.indexUuid)
if (!getMetadataSuccess) {
Expand Down Expand Up @@ -314,6 +309,15 @@ object ManagedIndexRunner :

val state = policy.getStateToExecute(managedIndexMetaData)
val action: Action? = state?.getActionToExecute(managedIndexMetaData, indexMetadataProvider)
val allowRedCluster = state?.allowRedCluster
// doing a check of local cluster health as we do not want to overload cluster manager node with potentially a lot of calls
if (clusterIsRed()) {
if (allowRedCluster == false) {
logger.debug("Skipping current execution of ${managedIndexConfig.index} because of red cluster health")
return
}
logger.warn("Warning: ISM is running on a red cluster")
}
val stepContext =
StepContext(
managedIndexMetaData, clusterService, client, threadPool.threadContext, policy.user, scriptService, settings, jobContext.lockService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,17 @@ import java.io.IOException

data class State(
val name: String,
val allowRedCluster: Boolean,
val actions: List<Action>,
val transitions: List<Transition>,
) : ToXContentObject, Writeable {

constructor(
name: String,
actions: List<Action>,
transitions: List<Transition>,
) : this(name, false, actions, transitions)

init {
require(name.isNotBlank()) { "State must contain a valid name" }
var hasDelete = false
Expand All @@ -45,6 +53,7 @@ data class State(
builder
.startObject()
.field(NAME_FIELD, name)
.field(ALLOW_RED_CLUSTER, allowRedCluster)
.startArray(ACTIONS_FIELD)
.also { actions.forEach { action -> action.toXContent(it, params) } }
.endArray()
Expand All @@ -56,13 +65,15 @@ data class State(
@Throws(IOException::class)
constructor(sin: StreamInput) : this(
sin.readString(),
sin.readBoolean(),
sin.readList { ISMActionsParser.instance.fromStreamInput(it) },
sin.readList(::Transition),
)

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
out.writeString(name)
out.writeBoolean(allowRedCluster)
out.writeList(actions)
out.writeList(transitions)
}
Expand Down Expand Up @@ -104,13 +115,15 @@ data class State(

companion object {
const val NAME_FIELD = "name"
const val ALLOW_RED_CLUSTER = "allow_red_cluster"
const val ACTIONS_FIELD = "actions"
const val TRANSITIONS_FIELD = "transitions"

@JvmStatic
@Throws(IOException::class)
fun parse(xcp: XContentParser): State {
var name: String? = null
var allowRedCluster: Boolean = false
val actions: MutableList<Action> = mutableListOf()
val transitions: MutableList<Transition> = mutableListOf()

Expand All @@ -121,6 +134,7 @@ data class State(

when (fieldName) {
NAME_FIELD -> name = xcp.text()
ALLOW_RED_CLUSTER -> allowRedCluster = xcp.booleanValue()
ACTIONS_FIELD -> {
ensureExpectedToken(Token.START_ARRAY, xcp.currentToken(), xcp)
while (xcp.nextToken() != Token.END_ARRAY) {
Expand All @@ -139,6 +153,7 @@ data class State(

return State(
name = requireNotNull(name) { "State name is null" },
allowRedCluster = allowRedCluster,
actions = actions.toList(),
transitions = transitions.toList(),
)
Expand Down
3 changes: 3 additions & 0 deletions src/main/resources/mappings/opendistro-ism-config.json
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,9 @@
"name": {
"type": "keyword"
},
"allow_red_cluster": {
"type": "boolean"
},
"actions": {
"type": "nested",
"properties": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -941,6 +941,12 @@ abstract class IndexStateManagementRestTestCase : IndexManagementRestTestCase()
assertEquals("Unable to delete snapshot", RestStatus.OK, response.restStatus())
}

protected fun isClusterGreen(timeout: String) {
val endpoint = "_cluster/health?wait_for_status=yellow&timeout=$timeout"
val response = client().makeRequest("GET", endpoint)
assertEquals("Cluster status check timed out", RestStatus.OK, response.restStatus())
}

@Suppress("UNCHECKED_CAST")
protected fun assertSnapshotExists(
repository: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@

package org.opensearch.indexmanagement.indexstatemanagement.runner

import org.apache.hc.core5.http.ContentType
import org.apache.hc.core5.http.io.entity.StringEntity
import org.opensearch.client.Request
import org.opensearch.client.Response
import org.opensearch.core.rest.RestStatus
import org.opensearch.indexmanagement.indexstatemanagement.ISMActionsParser
import org.opensearch.indexmanagement.indexstatemanagement.IndexStateManagementRestTestCase
import org.opensearch.indexmanagement.indexstatemanagement.action.OpenAction
Expand Down Expand Up @@ -224,4 +229,101 @@ class ManagedIndexRunnerIT : IndexStateManagementRestTestCase() {
assertEquals("Failed to update ManagedIndexConfig jitter", newJitter, currJitter)
}
}

fun `test runner on a red cluster with allow_red_cluster as false`() {
val indexName = "test-index-1"
val policyID = "test_red_cluster_policy"
val policy =
"""
{"policy":{"description":"Close indices older than 5m","default_state":"active","states":[{"name":"active","allow_red_cluster":"false",
"actions":[],"transitions":[{"state_name":"inactivestate","conditions":{"min_index_age":"5s"}}]},{"name":"inactivestate","allow_red_cluster":"false"
,"actions":[{"delete":{}}],"transitions":[]}],"ism_template":{"index_patterns":["test-index"]}}}
""".trimIndent()
createPolicyJson(policy, policyID)
createIndex(indexName, policyID)
waitFor { assertIndexExists(indexName) }

val managedIndexConfig = getExistingManagedIndexConfig(indexName)

val endpoint = "sim-red"
val jsonEntity = """{"settings":{"index.routing.allocation.require.size": "test"}}"""
val request = Request("PUT", endpoint)
request.entity = StringEntity(jsonEntity, ContentType.APPLICATION_JSON)
val response: Response = client().performRequest(request)
assertEquals("Failed to simulate red cluster", RestStatus.OK, response.restStatus())

// Change the start time so the job will trigger in 2 seconds.
// After the job, the index will be in "Active" State
updateManagedIndexConfigStartTime(managedIndexConfig)

waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) }

// Change the start time so the job will trigger in 2 seconds.
// Index Transitions to inactivestate state
updateManagedIndexConfigStartTime(managedIndexConfig)

// Wait for the index to settle in "inactivestate".
Thread.sleep(8000L)

// Change the start time so the job will trigger in 2 seconds.
updateManagedIndexConfigStartTime(managedIndexConfig)

Thread.sleep(5000L)

waitFor { assertIndexExists(indexName) }

val deleteReq = Request("DELETE", endpoint)
val deleteRes: Response = client().performRequest(deleteReq)
assertEquals("Failed to delete Index $endpoint", RestStatus.OK, deleteRes.restStatus())
isClusterGreen("30s")
}

fun `test runner on a red cluster with allow_red_cluster as true`() {
val indexName = "test-index-2"
val policyID = "test_red_cluster_policy"
val policy =
"""
{"policy":{"description":"Close indices older than 5m","default_state":"active","states":[{"name":"active","allow_red_cluster":"true",
"actions":[],"transitions":[{"state_name":"inactivestate","conditions":{"min_index_age":"5s"}}]},{"name":"inactivestate","allow_red_cluster":"true"
,"actions":[{"delete":{}}],"transitions":[]}],"ism_template":{"index_patterns":["test-index"]}}}
""".trimIndent()
createPolicyJson(policy, policyID)
createIndex(indexName, policyID)
waitFor { assertIndexExists(indexName) }

val managedIndexConfig = getExistingManagedIndexConfig(indexName)

val endpoint = "sim-red"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to not use a different index, but just use val indexName = "test-index-2" to simulate the red cluster situation?

val jsonEntity = """{"settings":{"index.routing.allocation.require.size": "test"}}"""
val request = Request("PUT", endpoint)
request.entity = StringEntity(jsonEntity, ContentType.APPLICATION_JSON)
val response: Response = client().performRequest(request)
assertEquals("Failed to simulate red cluster", RestStatus.OK, response.restStatus())

// Change the start time so the job will trigger in 2 seconds.
// After the job, the index will be in "Active" State
updateManagedIndexConfigStartTime(managedIndexConfig)

waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) }

// Change the start time so the job will trigger in 2 seconds.
// Index Transitions to inactivestate state
updateManagedIndexConfigStartTime(managedIndexConfig)

// Wait for the index to settle in "inactivestate".
Thread.sleep(8000L)

// Change the start time so the job will trigger in 2 seconds.
updateManagedIndexConfigStartTime(managedIndexConfig)

// Wait for the index deletion by the ISM job.
Thread.sleep(5000L)

waitFor { assertIndexDoesNotExist(indexName) }

val deleteReq = Request("DELETE", endpoint)
val deleteRes: Response = client().performRequest(deleteReq)
assertEquals("Failed to delete Index $endpoint", RestStatus.OK, deleteRes.restStatus())
isClusterGreen("30s")
}
}
Loading