Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML][Transforms] allow executor to call start on started task #46347

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,8 @@ private void startTask(DataFrameTransformTask buildTask,
ActionListener<StartDataFrameTransformTaskAction.Response> listener) {
buildTask.initializeIndexer(indexerBuilder);
// DataFrameTransformTask#start will fail if the task state is FAILED
buildTask.setNumFailureRetries(numFailureRetries).start(previousCheckpoint, false, listener);
// Will continue to attempt to start the indexer, even if the state is STARTED
buildTask.setNumFailureRetries(numFailureRetries).start(previousCheckpoint, false, false, listener);
}

private void setNumFailureRetries(int numFailureRetries) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,13 +219,8 @@ public void getCheckpointingInfo(DataFrameTransformsCheckpointService transforms
));
}

/**
* Start the background indexer and set the task's state to started
* @param startingCheckpoint Set the current checkpoint to this value. If null the
* current checkpoint is not set
* @param listener Started listener
*/
public synchronized void start(Long startingCheckpoint, boolean force, ActionListener<Response> listener) {
// Here `failOnConflict` is usually true, except when the initial start is called when the task is assigned to the node
synchronized void start(Long startingCheckpoint, boolean force, boolean failOnConflict, ActionListener<Response> listener) {
logger.debug("[{}] start called with force [{}] and state [{}].", getTransformId(), force, getState());
if (taskState.get() == DataFrameTransformTaskState.FAILED && force == false) {
listener.onFailure(new ElasticsearchStatusException(
Expand All @@ -249,7 +244,7 @@ public synchronized void start(Long startingCheckpoint, boolean force, ActionLis
return;
}
// If we are already in a `STARTED` state, we should not attempt to call `.start` on the indexer again.
if (taskState.get() == DataFrameTransformTaskState.STARTED) {
if (taskState.get() == DataFrameTransformTaskState.STARTED && failOnConflict) {
listener.onFailure(new ElasticsearchStatusException(
"Cannot start transform [{}] as it is already STARTED.",
RestStatus.CONFLICT,
Expand All @@ -260,7 +255,7 @@ public synchronized void start(Long startingCheckpoint, boolean force, ActionLis
final IndexerState newState = getIndexer().start();
if (Arrays.stream(RUNNING_STATES).noneMatch(newState::equals)) {
listener.onFailure(new ElasticsearchException("Cannot start task for data frame transform [{}], because state was [{}]",
transform.getId(), newState));
transform.getId(), newState));
return;
}
stateReason.set(null);
Expand Down Expand Up @@ -298,10 +293,20 @@ public synchronized void start(Long startingCheckpoint, boolean force, ActionLis
logger.error(new ParameterizedMessage("[{}] failed updating state to [{}].", getTransformId(), state), exc);
getIndexer().stop();
listener.onFailure(new ElasticsearchException("Error while updating state for data frame transform ["
+ transform.getId() + "] to [" + state.getIndexerState() + "].", exc));
+ transform.getId() + "] to [" + state.getIndexerState() + "].", exc));
}
));
}
/**
* Start the background indexer and set the task's state to started
* @param startingCheckpoint Set the current checkpoint to this value. If null the
* current checkpoint is not set
* @param force Whether to force start a failed task or not
* @param listener Started listener
*/
public synchronized void start(Long startingCheckpoint, boolean force, ActionListener<Response> listener) {
start(startingCheckpoint, force, true, listener);
}

public synchronized void stop(boolean force) {
logger.debug("[{}] stop called with force [{}] and state [{}]", getTransformId(), force, getState());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.Version;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
Expand Down Expand Up @@ -52,7 +51,6 @@
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.oneOf;

@LuceneTestCase.AwaitsFix(bugUrl="https://github.com/elastic/elasticsearch/issues/46341")
public class DataFrameSurvivesUpgradeIT extends AbstractUpgradeTestCase {

private static final Version UPGRADE_FROM_VERSION = Version.fromString(System.getProperty("tests.upgrade_from_version"));
Expand Down