-
Notifications
You must be signed in to change notification settings - Fork 24.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ML] Change Datafeed actions to read config from the config index #33273
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,42 +11,48 @@ | |
import org.elasticsearch.action.support.master.AcknowledgedResponse; | ||
import org.elasticsearch.action.support.master.TransportMasterNodeAction; | ||
import org.elasticsearch.client.Client; | ||
import org.elasticsearch.cluster.AckedClusterStateUpdateTask; | ||
import org.elasticsearch.cluster.ClusterState; | ||
import org.elasticsearch.cluster.block.ClusterBlockException; | ||
import org.elasticsearch.cluster.block.ClusterBlockLevel; | ||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; | ||
import org.elasticsearch.cluster.metadata.MetaData; | ||
import org.elasticsearch.cluster.service.ClusterService; | ||
import org.elasticsearch.common.inject.Inject; | ||
import org.elasticsearch.common.settings.Settings; | ||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData; | ||
import org.elasticsearch.persistent.PersistentTasksService; | ||
import org.elasticsearch.common.xcontent.NamedXContentRegistry; | ||
import org.elasticsearch.threadpool.ThreadPool; | ||
import org.elasticsearch.transport.TransportService; | ||
import org.elasticsearch.xpack.core.XPackPlugin; | ||
import org.elasticsearch.xpack.core.ml.MlMetadata; | ||
import org.elasticsearch.xpack.core.ml.MlTasks; | ||
import org.elasticsearch.xpack.core.ml.action.DeleteDatafeedAction; | ||
import org.elasticsearch.xpack.core.ml.action.IsolateDatafeedAction; | ||
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; | ||
import org.elasticsearch.xpack.core.ml.job.messages.Messages; | ||
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; | ||
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; | ||
|
||
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; | ||
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; | ||
|
||
public class TransportDeleteDatafeedAction extends TransportMasterNodeAction<DeleteDatafeedAction.Request, AcknowledgedResponse> { | ||
|
||
private Client client; | ||
private PersistentTasksService persistentTasksService; | ||
private final Client client; | ||
private final DatafeedConfigProvider datafeedConfigProvider; | ||
private final ClusterService clusterService; | ||
private final PersistentTasksService persistentTasksService; | ||
|
||
@Inject | ||
public TransportDeleteDatafeedAction(Settings settings, TransportService transportService, ClusterService clusterService, | ||
ThreadPool threadPool, ActionFilters actionFilters, | ||
IndexNameExpressionResolver indexNameExpressionResolver, | ||
Client client, PersistentTasksService persistentTasksService) { | ||
Client client, PersistentTasksService persistentTasksService, | ||
NamedXContentRegistry xContentRegistry) { | ||
super(settings, DeleteDatafeedAction.NAME, transportService, clusterService, threadPool, actionFilters, | ||
indexNameExpressionResolver, DeleteDatafeedAction.Request::new); | ||
this.client = client; | ||
this.datafeedConfigProvider = new DatafeedConfigProvider(client, settings, xContentRegistry); | ||
this.persistentTasksService = persistentTasksService; | ||
this.clusterService = clusterService; | ||
} | ||
|
||
@Override | ||
|
@@ -65,14 +71,14 @@ protected void masterOperation(DeleteDatafeedAction.Request request, ClusterStat | |
if (request.isForce()) { | ||
forceDeleteDatafeed(request, state, listener); | ||
} else { | ||
deleteDatafeedFromMetadata(request, listener); | ||
deleteDatafeedConfig(request, listener); | ||
} | ||
} | ||
|
||
private void forceDeleteDatafeed(DeleteDatafeedAction.Request request, ClusterState state, | ||
ActionListener<AcknowledgedResponse> listener) { | ||
ActionListener<Boolean> finalListener = ActionListener.wrap( | ||
response -> deleteDatafeedFromMetadata(request, listener), | ||
response -> deleteDatafeedConfig(request, listener), | ||
listener::onFailure | ||
); | ||
|
||
|
@@ -111,28 +117,18 @@ public void onFailure(Exception e) { | |
} | ||
} | ||
|
||
private void deleteDatafeedFromMetadata(DeleteDatafeedAction.Request request, ActionListener<AcknowledgedResponse> listener) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We'll need to bring this back later for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. The equivalent method is missing for Jobs also I thought I would build this on the migrate code |
||
clusterService.submitStateUpdateTask("delete-datafeed-" + request.getDatafeedId(), | ||
new AckedClusterStateUpdateTask<AcknowledgedResponse>(request, listener) { | ||
|
||
@Override | ||
protected AcknowledgedResponse newResponse(boolean acknowledged) { | ||
return new AcknowledgedResponse(acknowledged); | ||
} | ||
|
||
@Override | ||
public ClusterState execute(ClusterState currentState) { | ||
XPackPlugin.checkReadyForXPackCustomMetadata(currentState); | ||
MlMetadata currentMetadata = MlMetadata.getMlMetadata(currentState); | ||
PersistentTasksCustomMetaData persistentTasks = | ||
currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); | ||
MlMetadata newMetadata = new MlMetadata.Builder(currentMetadata) | ||
.removeDatafeed(request.getDatafeedId(), persistentTasks).build(); | ||
return ClusterState.builder(currentState).metaData( | ||
MetaData.builder(currentState.getMetaData()).putCustom(MlMetadata.TYPE, newMetadata).build()) | ||
.build(); | ||
} | ||
}); | ||
private void deleteDatafeedConfig(DeleteDatafeedAction.Request request, ActionListener<AcknowledgedResponse> listener) { | ||
// Check datafeed is stopped | ||
PersistentTasksCustomMetaData tasks = clusterService.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE); | ||
if (MlTasks.getDatafeedTask(request.getDatafeedId(), tasks) != null) { | ||
listener.onFailure(ExceptionsHelper.conflictStatusException( | ||
Messages.getMessage(Messages.DATAFEED_CANNOT_DELETE_IN_CURRENT_STATE, request.getDatafeedId(), DatafeedState.STARTED))); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Missing return statement. |
||
} | ||
|
||
datafeedConfigProvider.deleteDatafeedConfig(request.getDatafeedId(), ActionListener.wrap( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is where it gets a bit tricky. We've checked the datafeed is stopped. However, we could receive a call to start the datafeed before the deletion is complete. Then, the check we've done here for deleting will no longer be valid. We need to think how to prevent this. Perhaps, we need to keep a map with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We discussed this offline and there is a general case for consistent checks for all updates including jobs. That is beyond the scope of this PR though |
||
deleteResponse -> listener.onResponse(new AcknowledgedResponse(true)), | ||
listener::onFailure | ||
)); | ||
} | ||
|
||
@Override | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,34 +8,46 @@ | |
import org.elasticsearch.action.ActionListener; | ||
import org.elasticsearch.action.support.ActionFilters; | ||
import org.elasticsearch.action.support.master.TransportMasterNodeReadAction; | ||
import org.elasticsearch.client.Client; | ||
import org.elasticsearch.cluster.ClusterState; | ||
import org.elasticsearch.cluster.block.ClusterBlockException; | ||
import org.elasticsearch.cluster.block.ClusterBlockLevel; | ||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; | ||
import org.elasticsearch.cluster.service.ClusterService; | ||
import org.elasticsearch.common.inject.Inject; | ||
import org.elasticsearch.common.settings.Settings; | ||
import org.elasticsearch.common.xcontent.NamedXContentRegistry; | ||
import org.elasticsearch.threadpool.ThreadPool; | ||
import org.elasticsearch.transport.TransportService; | ||
import org.elasticsearch.xpack.core.ml.action.GetDatafeedsAction; | ||
import org.elasticsearch.xpack.core.ml.MlMetadata; | ||
import org.elasticsearch.xpack.core.ml.action.util.QueryPage; | ||
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; | ||
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; | ||
|
||
import java.util.ArrayList; | ||
import java.util.Collections; | ||
import java.util.Comparator; | ||
import java.util.HashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Set; | ||
|
||
public class TransportGetDatafeedsAction extends TransportMasterNodeReadAction<GetDatafeedsAction.Request, | ||
GetDatafeedsAction.Response> { | ||
|
||
private final DatafeedConfigProvider datafeedConfigProvider; | ||
|
||
@Inject | ||
public TransportGetDatafeedsAction(Settings settings, TransportService transportService, | ||
ClusterService clusterService, ThreadPool threadPool, | ||
ActionFilters actionFilters, | ||
IndexNameExpressionResolver indexNameExpressionResolver) { | ||
IndexNameExpressionResolver indexNameExpressionResolver, | ||
Client client, NamedXContentRegistry xContentRegistry) { | ||
super(settings, GetDatafeedsAction.NAME, transportService, clusterService, threadPool, actionFilters, | ||
indexNameExpressionResolver, GetDatafeedsAction.Request::new); | ||
|
||
datafeedConfigProvider = new DatafeedConfigProvider(client, settings, xContentRegistry); | ||
} | ||
|
||
@Override | ||
|
@@ -50,18 +62,51 @@ protected GetDatafeedsAction.Response newResponse() { | |
|
||
@Override | ||
protected void masterOperation(GetDatafeedsAction.Request request, ClusterState state, | ||
ActionListener<GetDatafeedsAction.Response> listener) throws Exception { | ||
ActionListener<GetDatafeedsAction.Response> listener) { | ||
logger.debug("Get datafeed '{}'", request.getDatafeedId()); | ||
|
||
MlMetadata mlMetadata = MlMetadata.getMlMetadata(state); | ||
Set<String> expandedDatafeedIds = mlMetadata.expandDatafeedIds(request.getDatafeedId(), request.allowNoDatafeeds()); | ||
List<DatafeedConfig> datafeedConfigs = new ArrayList<>(); | ||
Map<String, DatafeedConfig> clusterStateConfigs = | ||
expandClusterStateDatafeeds(request.getDatafeedId(), request.allowNoDatafeeds(), state); | ||
|
||
datafeedConfigProvider.expandDatafeedConfigs(request.getDatafeedId(), request.allowNoDatafeeds(), ActionListener.wrap( | ||
datafeedBuilders -> { | ||
// Check for duplicate datafeeds | ||
for (DatafeedConfig.Builder datafeed : datafeedBuilders) { | ||
if (clusterStateConfigs.containsKey(datafeed.getId())) { | ||
listener.onFailure(new IllegalStateException("Datafeed [" + datafeed.getId() + "] configuration " + | ||
"exists in both clusterstate and index")); | ||
return; | ||
} | ||
} | ||
|
||
// Merge cluster state and index configs | ||
List<DatafeedConfig> datafeeds = new ArrayList<>(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. #preOptimization, but you should probably initialize the array size to be the size of |
||
for (DatafeedConfig.Builder builder: datafeedBuilders) { | ||
datafeeds.add(builder.build()); | ||
} | ||
|
||
datafeeds.addAll(clusterStateConfigs.values()); | ||
Collections.sort(datafeeds, Comparator.comparing(DatafeedConfig::getId)); | ||
listener.onResponse(new GetDatafeedsAction.Response(new QueryPage<>(datafeeds, datafeeds.size(), | ||
DatafeedConfig.RESULTS_FIELD))); | ||
}, | ||
listener::onFailure | ||
)); | ||
} | ||
|
||
Map<String, DatafeedConfig> expandClusterStateDatafeeds(String datafeedExpression, boolean allowNoDatafeeds, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is no additional value for it to be a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I want to check whether there are 2 datafeeds with the same ID I can't rely on object equality in that case as the datafeeds may be different but use the same ID. |
||
ClusterState clusterState) { | ||
|
||
Map<String, DatafeedConfig> configById = new HashMap<>(); | ||
|
||
MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState); | ||
Set<String> expandedDatafeedIds = mlMetadata.expandDatafeedIds(datafeedExpression, allowNoDatafeeds); | ||
|
||
for (String expandedDatafeedId : expandedDatafeedIds) { | ||
datafeedConfigs.add(mlMetadata.getDatafeed(expandedDatafeedId)); | ||
configById.put(expandedDatafeedId, mlMetadata.getDatafeed(expandedDatafeedId)); | ||
} | ||
|
||
listener.onResponse(new GetDatafeedsAction.Response(new QueryPage<>(datafeedConfigs, datafeedConfigs.size(), | ||
DatafeedConfig.RESULTS_FIELD))); | ||
return configById; | ||
} | ||
|
||
@Override | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We mostly use
id
(all lower case). There's a fewId
s around, but we should probably favourid
.