Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding provision to invoke stop replication from other plugins #1391

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

package org.opensearch.replication

import org.opensearch.commons.replication.action.ReplicationActions.STOP_REPLICATION_ACTION_TYPE
import org.opensearch.replication.action.changes.GetChangesAction
import org.opensearch.replication.action.changes.TransportGetChangesAction
import org.opensearch.replication.action.index.ReplicateIndexAction
Expand Down Expand Up @@ -41,8 +42,8 @@ import org.opensearch.replication.action.status.ReplicationStatusAction
import org.opensearch.replication.action.status.ShardsInfoAction
import org.opensearch.replication.action.status.TranportShardsInfoAction
import org.opensearch.replication.action.status.TransportReplicationStatusAction
import org.opensearch.replication.action.stop.StopIndexReplicationAction
import org.opensearch.replication.action.stop.TransportStopIndexReplicationAction
import org.opensearch.replication.action.stop.TransportUnfollowIndexReplicationAction
import org.opensearch.replication.action.update.TransportUpdateIndexReplicationAction
import org.opensearch.replication.action.update.UpdateIndexReplicationAction
import org.opensearch.replication.metadata.ReplicationMetadataManager
Expand Down Expand Up @@ -96,6 +97,7 @@ import org.opensearch.core.common.unit.ByteSizeUnit
import org.opensearch.core.common.unit.ByteSizeValue
import org.opensearch.common.unit.TimeValue
import org.opensearch.common.util.concurrent.OpenSearchExecutors
import org.opensearch.commons.replication.action.ReplicationActions.UNFOLLOW_REPLICATION_ACTION_TYPE
import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.core.xcontent.XContentParser
import org.opensearch.commons.utils.OpenForTesting
Expand Down Expand Up @@ -233,7 +235,8 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin,
ActionHandler(GetFileChunkAction.INSTANCE, TransportGetFileChunkAction::class.java),
ActionHandler(UpdateAutoFollowPatternAction.INSTANCE, TransportUpdateAutoFollowPatternAction::class.java),
ActionHandler(AutoFollowClusterManagerNodeAction.INSTANCE, TransportAutoFollowClusterManagerNodeAction::class.java),
ActionHandler(StopIndexReplicationAction.INSTANCE, TransportStopIndexReplicationAction::class.java),
ActionHandler(STOP_REPLICATION_ACTION_TYPE, TransportStopIndexReplicationAction::class.java),
ActionHandler(UNFOLLOW_REPLICATION_ACTION_TYPE, TransportUnfollowIndexReplicationAction::class.java),
ActionHandler(PauseIndexReplicationAction.INSTANCE, TransportPauseIndexReplicationAction::class.java),
ActionHandler(ResumeIndexReplicationAction.INSTANCE, TransportResumeIndexReplicationAction::class.java),
ActionHandler(UpdateIndexReplicationAction.INSTANCE, TransportUpdateIndexReplicationAction::class.java),
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

package org.opensearch.replication.action.stop

import org.opensearch.commons.replication.action.ReplicationActions.STOP_REPLICATION_ACTION_NAME
import org.opensearch.commons.replication.action.StopIndexReplicationRequest
import org.opensearch.replication.ReplicationPlugin.Companion.REPLICATED_INDEX_SETTING
import org.opensearch.replication.action.index.block.IndexBlockUpdateType
import org.opensearch.replication.action.index.block.UpdateIndexBlockAction
Expand Down Expand Up @@ -68,7 +70,7 @@ class TransportStopIndexReplicationAction @Inject constructor(transportService:
IndexNameExpressionResolver,
val client: Client,
val replicationMetadataManager: ReplicationMetadataManager) :
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a comment here explaining that the stop replication action class in moved to common utils and link this PR here?

TransportMasterNodeAction<StopIndexReplicationRequest, AcknowledgedResponse> (StopIndexReplicationAction.NAME,
TransportMasterNodeAction<StopIndexReplicationRequest, AcknowledgedResponse> (STOP_REPLICATION_ACTION_NAME,
transportService, clusterService, threadPool, actionFilters, ::StopIndexReplicationRequest,
indexNameExpressionResolver), CoroutineScope by GlobalScope {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package org.opensearch.replication.action.stop

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
import org.apache.logging.log4j.LogManager
import org.opensearch.action.support.HandledTransportAction
import org.opensearch.action.ActionRequest
import org.opensearch.action.support.ActionFilters
import org.opensearch.action.support.master.AcknowledgedResponse
import org.opensearch.client.Client
import org.opensearch.common.inject.Inject
import org.opensearch.cluster.metadata.IndexNameExpressionResolver
import org.opensearch.cluster.service.ClusterService
import org.opensearch.commons.replication.action.ReplicationActions.STOP_REPLICATION_ACTION_TYPE
import org.opensearch.commons.replication.action.StopIndexReplicationRequest
import org.opensearch.commons.replication.action.ReplicationActions.UNFOLLOW_REPLICATION_ACTION_NAME
import org.opensearch.commons.utils.recreateObject
import org.opensearch.core.action.ActionListener
import org.opensearch.replication.metadata.ReplicationMetadataManager
import org.opensearch.replication.util.coroutineContext
import org.opensearch.replication.util.stackTraceToString
import org.opensearch.replication.util.suspendExecute
import org.opensearch.tasks.Task
import org.opensearch.threadpool.ThreadPool
import org.opensearch.transport.TransportService


/**
* This action transforms the request from ActionRequest type to StopIndexReplicationRequest
* and performs the TransportStopIndexReplicationAction on it.
* While TransportStopIndexReplicationAction is used directly by the _stop replication REST API,
* this action is used for inter-plugin communication by ism plugin to unfollow i.e. stop replication.
*/
class TransportUnfollowIndexReplicationAction @Inject constructor (
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's rename is to something like TransportIsmStopIndexReplicationAction. Basically remove unfollow and add ISM in action name to distinct it from other supported actions.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure thanks. Suggestions were given in the ISM PR to name this class as InternalTransportStopIndexReplicationAction and the action name as "indices:internal/plugins/replication/index/stop", so the name is differentiated to be something internally invoked from plugins. We hadn't added the name ism, so it remains generic to be used by something else too in future.
I hope these names shall be okay?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TransportInternalStopIndexReplicationAction should be fine

val name: String,
val transportService: TransportService,
val clusterService: ClusterService,
val threadPool: ThreadPool,
val client: Client,
val actionFilters: ActionFilters,
val indexNameExpressionResolver: IndexNameExpressionResolver,
val replicationMetadataManager: ReplicationMetadataManager,
): HandledTransportAction<ActionRequest, AcknowledgedResponse> (UNFOLLOW_REPLICATION_ACTION_NAME, transportService, actionFilters, ::StopIndexReplicationRequest),
CoroutineScope by GlobalScope {
companion object {
private val log = LogManager.getLogger(TransportUnfollowIndexReplicationAction::class.java)
}

@Throws(Exception::class)
override fun doExecute(task: Task?, request: ActionRequest, listener: ActionListener<AcknowledgedResponse>?) {
launch(Dispatchers.Unconfined + threadPool.coroutineContext()) {
val transformedRequest = request as? StopIndexReplicationRequest
?: request.let { recreateObject(it) { StopIndexReplicationRequest(it) } }
try {

var response = client.suspendExecute(STOP_REPLICATION_ACTION_TYPE, transformedRequest, true)
log.info("Stop replication successful for index[${transformedRequest.indexName}] with response: " + response.isAcknowledged)
listener?.onResponse(AcknowledgedResponse(true))
} catch (e: Exception) {
log.error("Stop replication failed for index[${transformedRequest.indexName}] with error ${e.stackTraceToString()}")
listener?.onFailure(e)
throw e
Copy link
Member

@bowenlan-amzn bowenlan-amzn Jul 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably no need to rethrow the exception here, as the exception is sent back to the caller to handle?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you are right. I had corrected that change in my local, but missed pushing it. My bad!

}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@

package org.opensearch.replication.rest

import org.opensearch.replication.action.stop.StopIndexReplicationAction
import org.opensearch.replication.action.stop.StopIndexReplicationRequest
import org.opensearch.commons.replication.action.ReplicationActions.STOP_REPLICATION_ACTION_TYPE
import org.opensearch.commons.replication.action.StopIndexReplicationRequest
import org.opensearch.client.node.NodeClient
import org.opensearch.rest.BaseRestHandler
import org.opensearch.rest.RestChannel
Expand All @@ -38,7 +38,7 @@ class StopIndexReplicationHandler : BaseRestHandler() {
val stopReplicationRequest = StopIndexReplicationRequest.fromXContent(parser, followIndex)
return RestChannelConsumer { channel: RestChannel? ->
client.admin().cluster()
.execute(StopIndexReplicationAction.INSTANCE, stopReplicationRequest, RestToXContentListener(channel))
.execute(STOP_REPLICATION_ACTION_TYPE, stopReplicationRequest, RestToXContentListener(channel))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

package org.opensearch.replication.util

import org.opensearch.commons.replication.action.ReplicationActions.STOP_REPLICATION_ACTION_NAME
import org.opensearch.commons.replication.action.ReplicationActions.UNFOLLOW_REPLICATION_ACTION_NAME
import org.opensearch.replication.action.autofollow.UpdateAutoFollowPatternAction
import org.opensearch.replication.action.changes.GetChangesAction
import org.opensearch.replication.action.index.ReplicateIndexAction
Expand All @@ -20,7 +22,6 @@ import org.opensearch.replication.action.repository.GetFileChunkAction
import org.opensearch.replication.action.repository.GetStoreMetadataAction
import org.opensearch.replication.action.resume.ResumeIndexReplicationAction
import org.opensearch.replication.action.status.ReplicationStatusAction
import org.opensearch.replication.action.stop.StopIndexReplicationAction
import org.opensearch.replication.action.update.UpdateIndexReplicationAction
import org.opensearch.replication.metadata.ReplicationMetadataManager
import org.opensearch.replication.metadata.store.ReplicationMetadata
Expand Down Expand Up @@ -49,7 +50,7 @@ class SecurityContext {
val LEADER_USER_ACTIONS = listOf(GetChangesAction.NAME, GetFileChunkAction.NAME)
val FOLLOWER_USER_ACTIONS = listOf(ReplayChangesAction.NAME,
ReplicateIndexAction.NAME, PauseIndexReplicationAction.NAME,
ResumeIndexReplicationAction.NAME, StopIndexReplicationAction.NAME,
ResumeIndexReplicationAction.NAME, STOP_REPLICATION_ACTION_NAME, UNFOLLOW_REPLICATION_ACTION_NAME,
UpdateIndexReplicationAction.NAME, ReplicationStatusAction.NAME,
UpdateAutoFollowPatternAction.NAME)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

package org.opensearch.replication

import org.opensearch.commons.replication.action.ReplicationActions.STOP_REPLICATION_ACTION_NAME
import org.opensearch.replication.MultiClusterAnnotations.ClusterConfiguration
import org.opensearch.replication.MultiClusterAnnotations.ClusterConfigurations
import org.opensearch.replication.MultiClusterAnnotations.getAnnotationsFromClass
Expand Down Expand Up @@ -329,7 +330,7 @@ abstract class MultiClusterRestTestCase : OpenSearchTestCase() {
"indices:admin/plugins/replication/index/start",
"indices:admin/plugins/replication/index/pause",
"indices:admin/plugins/replication/index/resume",
"indices:admin/plugins/replication/index/stop",
"$STOP_REPLICATION_ACTION_NAME",
"indices:admin/plugins/replication/index/update",
"indices:admin/plugins/replication/index/status_check"
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import org.apache.hc.core5.http.ContentType
import org.apache.hc.core5.http.io.entity.StringEntity
import org.opensearch.client.Request
import org.junit.BeforeClass
import org.opensearch.commons.replication.action.ReplicationActions.STOP_REPLICATION_ACTION_NAME

const val INTEG_TEST_PASSWORD = "ccr-integ-test@123"

Expand Down Expand Up @@ -74,7 +75,7 @@ abstract class SecurityBase : MultiClusterRestTestCase() {
"indices:admin/plugins/replication/index/start",
"indices:admin/plugins/replication/index/pause",
"indices:admin/plugins/replication/index/resume",
"indices:admin/plugins/replication/index/stop",
"$STOP_REPLICATION_ACTION_NAME",
"indices:admin/plugins/replication/index/update",
"indices:admin/plugins/replication/index/status_check"
]
Expand Down Expand Up @@ -106,7 +107,7 @@ abstract class SecurityBase : MultiClusterRestTestCase() {
"indices:admin/plugins/replication/index/start",
"indices:admin/plugins/replication/index/pause",
"indices:admin/plugins/replication/index/resume",
"indices:admin/plugins/replication/index/stop",
"$STOP_REPLICATION_ACTION_NAME",
"indices:admin/plugins/replication/index/update",
"indices:admin/plugins/replication/index/status_check"
]
Expand All @@ -119,7 +120,7 @@ abstract class SecurityBase : MultiClusterRestTestCase() {
"indices:admin/plugins/replication/index/start",
"indices:admin/plugins/replication/index/pause",
"indices:admin/plugins/replication/index/resume",
"indices:admin/plugins/replication/index/stop",
"$STOP_REPLICATION_ACTION_NAME",
"indices:admin/plugins/replication/index/update",
"indices:admin/plugins/replication/index/status_check"
]
Expand Down Expand Up @@ -151,7 +152,7 @@ abstract class SecurityBase : MultiClusterRestTestCase() {
"indices:admin/plugins/replication/index/start",
"indices:admin/plugins/replication/index/pause",
"indices:admin/plugins/replication/index/resume",
"indices:admin/plugins/replication/index/stop",
"$STOP_REPLICATION_ACTION_NAME",
"indices:admin/plugins/replication/index/update",
"indices:admin/plugins/replication/index/status_check"
]
Expand Down Expand Up @@ -183,7 +184,7 @@ abstract class SecurityBase : MultiClusterRestTestCase() {
"indices:admin/plugins/replication/index/start",
"indices:admin/plugins/replication/index/pause",
"indices:admin/plugins/replication/index/resume",
"indices:admin/plugins/replication/index/stop",
"$STOP_REPLICATION_ACTION_NAME",
"indices:admin/plugins/replication/index/update",
"indices:admin/plugins/replication/index/status_check"
]
Expand Down Expand Up @@ -257,7 +258,7 @@ abstract class SecurityBase : MultiClusterRestTestCase() {
"indices:admin/plugins/replication/index/start",
"indices:admin/plugins/replication/index/pause",
"indices:admin/plugins/replication/index/resume",
"indices:admin/plugins/replication/index/stop",
"$STOP_REPLICATION_ACTION_NAME",
"indices:admin/plugins/replication/index/update",
"indices:admin/plugins/replication/index/status_check"
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import java.util.concurrent.TimeUnit
import org.opensearch.replication.task.autofollow.AutoFollowExecutor
import org.opensearch.tasks.TaskInfo
import org.junit.Before
import org.opensearch.commons.replication.action.ReplicationActions.STOP_REPLICATION_ACTION_NAME

@MultiClusterAnnotations.ClusterConfigurations(
MultiClusterAnnotations.ClusterConfiguration(clusterName = LEADER),
Expand Down Expand Up @@ -413,7 +414,7 @@ class SecurityCustomRolesIT: SecurityBase() {
"indices:admin/plugins/replication/index/start",
"indices:admin/plugins/replication/index/pause",
"indices:admin/plugins/replication/index/resume",
"indices:admin/plugins/replication/index/stop",
"$STOP_REPLICATION_ACTION_NAME",
"indices:admin/plugins/replication/index/update",
"indices:admin/plugins/replication/index/status_check"
]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/*
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
Expand Down
Loading