-
Notifications
You must be signed in to change notification settings - Fork 24.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ML] Change Datafeed actions to read config from the config index #33273
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,42 +11,48 @@ | |
import org.elasticsearch.action.support.master.AcknowledgedResponse; | ||
import org.elasticsearch.action.support.master.TransportMasterNodeAction; | ||
import org.elasticsearch.client.Client; | ||
import org.elasticsearch.cluster.AckedClusterStateUpdateTask; | ||
import org.elasticsearch.cluster.ClusterState; | ||
import org.elasticsearch.cluster.block.ClusterBlockException; | ||
import org.elasticsearch.cluster.block.ClusterBlockLevel; | ||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; | ||
import org.elasticsearch.cluster.metadata.MetaData; | ||
import org.elasticsearch.cluster.service.ClusterService; | ||
import org.elasticsearch.common.inject.Inject; | ||
import org.elasticsearch.common.settings.Settings; | ||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData; | ||
import org.elasticsearch.persistent.PersistentTasksService; | ||
import org.elasticsearch.common.xcontent.NamedXContentRegistry; | ||
import org.elasticsearch.threadpool.ThreadPool; | ||
import org.elasticsearch.transport.TransportService; | ||
import org.elasticsearch.xpack.core.XPackPlugin; | ||
import org.elasticsearch.xpack.core.ml.MlMetadata; | ||
import org.elasticsearch.xpack.core.ml.MlTasks; | ||
import org.elasticsearch.xpack.core.ml.action.DeleteDatafeedAction; | ||
import org.elasticsearch.xpack.core.ml.action.IsolateDatafeedAction; | ||
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; | ||
import org.elasticsearch.xpack.core.ml.job.messages.Messages; | ||
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; | ||
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; | ||
|
||
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; | ||
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; | ||
|
||
public class TransportDeleteDatafeedAction extends TransportMasterNodeAction<DeleteDatafeedAction.Request, AcknowledgedResponse> { | ||
|
||
private Client client; | ||
private PersistentTasksService persistentTasksService; | ||
private final Client client; | ||
private final DatafeedConfigProvider datafeedConfigProvider; | ||
private final ClusterService clusterService; | ||
private final PersistentTasksService persistentTasksService; | ||
|
||
@Inject | ||
public TransportDeleteDatafeedAction(Settings settings, TransportService transportService, ClusterService clusterService, | ||
ThreadPool threadPool, ActionFilters actionFilters, | ||
IndexNameExpressionResolver indexNameExpressionResolver, | ||
Client client, PersistentTasksService persistentTasksService) { | ||
Client client, PersistentTasksService persistentTasksService, | ||
NamedXContentRegistry xContentRegistry) { | ||
super(settings, DeleteDatafeedAction.NAME, transportService, clusterService, threadPool, actionFilters, | ||
indexNameExpressionResolver, DeleteDatafeedAction.Request::new); | ||
this.client = client; | ||
this.datafeedConfigProvider = new DatafeedConfigProvider(client, settings, xContentRegistry); | ||
this.persistentTasksService = persistentTasksService; | ||
this.clusterService = clusterService; | ||
} | ||
|
||
@Override | ||
|
@@ -65,14 +71,14 @@ protected void masterOperation(DeleteDatafeedAction.Request request, ClusterStat | |
if (request.isForce()) { | ||
forceDeleteDatafeed(request, state, listener); | ||
} else { | ||
deleteDatafeedFromMetadata(request, listener); | ||
deleteDatafeedConfig(request, listener); | ||
} | ||
} | ||
|
||
private void forceDeleteDatafeed(DeleteDatafeedAction.Request request, ClusterState state, | ||
ActionListener<AcknowledgedResponse> listener) { | ||
ActionListener<Boolean> finalListener = ActionListener.wrap( | ||
response -> deleteDatafeedFromMetadata(request, listener), | ||
response -> deleteDatafeedConfig(request, listener), | ||
listener::onFailure | ||
); | ||
|
||
|
@@ -111,28 +117,19 @@ public void onFailure(Exception e) { | |
} | ||
} | ||
|
||
private void deleteDatafeedFromMetadata(DeleteDatafeedAction.Request request, ActionListener<AcknowledgedResponse> listener) { | ||
clusterService.submitStateUpdateTask("delete-datafeed-" + request.getDatafeedId(), | ||
new AckedClusterStateUpdateTask<AcknowledgedResponse>(request, listener) { | ||
|
||
@Override | ||
protected AcknowledgedResponse newResponse(boolean acknowledged) { | ||
return new AcknowledgedResponse(acknowledged); | ||
} | ||
|
||
@Override | ||
public ClusterState execute(ClusterState currentState) { | ||
XPackPlugin.checkReadyForXPackCustomMetadata(currentState); | ||
MlMetadata currentMetadata = MlMetadata.getMlMetadata(currentState); | ||
PersistentTasksCustomMetaData persistentTasks = | ||
currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); | ||
MlMetadata newMetadata = new MlMetadata.Builder(currentMetadata) | ||
.removeDatafeed(request.getDatafeedId(), persistentTasks).build(); | ||
return ClusterState.builder(currentState).metaData( | ||
MetaData.builder(currentState.getMetaData()).putCustom(MlMetadata.TYPE, newMetadata).build()) | ||
.build(); | ||
} | ||
}); | ||
private void deleteDatafeedConfig(DeleteDatafeedAction.Request request, ActionListener<AcknowledgedResponse> listener) { | ||
// Check datafeed is stopped | ||
PersistentTasksCustomMetaData tasks = clusterService.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE); | ||
if (MlTasks.getDatafeedTask(request.getDatafeedId(), tasks) != null) { | ||
listener.onFailure(ExceptionsHelper.conflictStatusException( | ||
Messages.getMessage(Messages.DATAFEED_CANNOT_DELETE_IN_CURRENT_STATE, request.getDatafeedId(), DatafeedState.STARTED))); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Missing return statement. |
||
return; | ||
} | ||
|
||
datafeedConfigProvider.deleteDatafeedConfig(request.getDatafeedId(), ActionListener.wrap( | ||
deleteResponse -> listener.onResponse(new AcknowledgedResponse(true)), | ||
listener::onFailure | ||
)); | ||
} | ||
|
||
@Override | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,34 +8,46 @@ | |
import org.elasticsearch.action.ActionListener; | ||
import org.elasticsearch.action.support.ActionFilters; | ||
import org.elasticsearch.action.support.master.TransportMasterNodeReadAction; | ||
import org.elasticsearch.client.Client; | ||
import org.elasticsearch.cluster.ClusterState; | ||
import org.elasticsearch.cluster.block.ClusterBlockException; | ||
import org.elasticsearch.cluster.block.ClusterBlockLevel; | ||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; | ||
import org.elasticsearch.cluster.service.ClusterService; | ||
import org.elasticsearch.common.inject.Inject; | ||
import org.elasticsearch.common.settings.Settings; | ||
import org.elasticsearch.common.xcontent.NamedXContentRegistry; | ||
import org.elasticsearch.threadpool.ThreadPool; | ||
import org.elasticsearch.transport.TransportService; | ||
import org.elasticsearch.xpack.core.ml.action.GetDatafeedsAction; | ||
import org.elasticsearch.xpack.core.ml.MlMetadata; | ||
import org.elasticsearch.xpack.core.ml.action.util.QueryPage; | ||
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; | ||
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; | ||
|
||
import java.util.ArrayList; | ||
import java.util.Collections; | ||
import java.util.Comparator; | ||
import java.util.HashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Set; | ||
|
||
public class TransportGetDatafeedsAction extends TransportMasterNodeReadAction<GetDatafeedsAction.Request, | ||
GetDatafeedsAction.Response> { | ||
|
||
private final DatafeedConfigProvider datafeedConfigProvider; | ||
|
||
@Inject | ||
public TransportGetDatafeedsAction(Settings settings, TransportService transportService, | ||
ClusterService clusterService, ThreadPool threadPool, | ||
ActionFilters actionFilters, | ||
IndexNameExpressionResolver indexNameExpressionResolver) { | ||
IndexNameExpressionResolver indexNameExpressionResolver, | ||
Client client, NamedXContentRegistry xContentRegistry) { | ||
super(settings, GetDatafeedsAction.NAME, transportService, clusterService, threadPool, actionFilters, | ||
indexNameExpressionResolver, GetDatafeedsAction.Request::new); | ||
|
||
datafeedConfigProvider = new DatafeedConfigProvider(client, settings, xContentRegistry); | ||
} | ||
|
||
@Override | ||
|
@@ -50,18 +62,51 @@ protected GetDatafeedsAction.Response newResponse() { | |
|
||
@Override | ||
protected void masterOperation(GetDatafeedsAction.Request request, ClusterState state, | ||
ActionListener<GetDatafeedsAction.Response> listener) throws Exception { | ||
ActionListener<GetDatafeedsAction.Response> listener) { | ||
logger.debug("Get datafeed '{}'", request.getDatafeedId()); | ||
|
||
MlMetadata mlMetadata = MlMetadata.getMlMetadata(state); | ||
Set<String> expandedDatafeedIds = mlMetadata.expandDatafeedIds(request.getDatafeedId(), request.allowNoDatafeeds()); | ||
List<DatafeedConfig> datafeedConfigs = new ArrayList<>(); | ||
Map<String, DatafeedConfig> clusterStateConfigs = | ||
expandClusterStateDatafeeds(request.getDatafeedId(), request.allowNoDatafeeds(), state); | ||
|
||
datafeedConfigProvider.expandDatafeedConfigs(request.getDatafeedId(), request.allowNoDatafeeds(), ActionListener.wrap( | ||
datafeedBuilders -> { | ||
// Check for duplicate datafeeds | ||
for (DatafeedConfig.Builder datafeed : datafeedBuilders) { | ||
if (clusterStateConfigs.containsKey(datafeed.getId())) { | ||
listener.onFailure(new IllegalStateException("Datafeed [" + datafeed.getId() + "] configuration " + | ||
"exists in both clusterstate and index")); | ||
return; | ||
} | ||
} | ||
|
||
// Merge cluster state and index configs | ||
List<DatafeedConfig> datafeeds = new ArrayList<>(datafeedBuilders.size() + clusterStateConfigs.values().size()); | ||
for (DatafeedConfig.Builder builder: datafeedBuilders) { | ||
datafeeds.add(builder.build()); | ||
} | ||
|
||
datafeeds.addAll(clusterStateConfigs.values()); | ||
Collections.sort(datafeeds, Comparator.comparing(DatafeedConfig::getId)); | ||
listener.onResponse(new GetDatafeedsAction.Response(new QueryPage<>(datafeeds, datafeeds.size(), | ||
DatafeedConfig.RESULTS_FIELD))); | ||
}, | ||
listener::onFailure | ||
)); | ||
} | ||
|
||
Map<String, DatafeedConfig> expandClusterStateDatafeeds(String datafeedExpression, boolean allowNoDatafeeds, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is no additional value for it to be a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I want to check whether there are 2 datafeeds with the same ID I can't rely on object equality in that case as the datafeeds may be different but use the same ID. |
||
ClusterState clusterState) { | ||
|
||
Map<String, DatafeedConfig> configById = new HashMap<>(); | ||
|
||
MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState); | ||
Set<String> expandedDatafeedIds = mlMetadata.expandDatafeedIds(datafeedExpression, allowNoDatafeeds); | ||
|
||
for (String expandedDatafeedId : expandedDatafeedIds) { | ||
datafeedConfigs.add(mlMetadata.getDatafeed(expandedDatafeedId)); | ||
configById.put(expandedDatafeedId, mlMetadata.getDatafeed(expandedDatafeedId)); | ||
} | ||
|
||
listener.onResponse(new GetDatafeedsAction.Response(new QueryPage<>(datafeedConfigs, datafeedConfigs.size(), | ||
DatafeedConfig.RESULTS_FIELD))); | ||
return configById; | ||
} | ||
|
||
@Override | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We'll need to bring this back later for
6.x
, right?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. The equivalent method is missing for Jobs also I thought I would build this on the migrate code