Skip to content

Commit

Permalink
Core: Require all actions have a Task (#31627)
Browse files Browse the repository at this point in the history
The TaskManager and TaskAwareRequest could return null when registering
a task according to their javadocs, but no implementations ever actually
did that. This commit removes that wording from the javadocs and ensures
null is no longer allowed.
  • Loading branch information
rjernst authored Jun 28, 2018
1 parent 0522c66 commit f924835
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,23 +59,19 @@ public final Task execute(Request request, ActionListener<Response> listener) {
* this method.
*/
Task task = taskManager.register("transport", actionName, request);
if (task == null) {
execute(null, request, listener);
} else {
execute(task, request, new ActionListener<Response>() {
@Override
public void onResponse(Response response) {
taskManager.unregister(task);
listener.onResponse(response);
}
execute(task, request, new ActionListener<Response>() {
@Override
public void onResponse(Response response) {
taskManager.unregister(task);
listener.onResponse(response);
}

@Override
public void onFailure(Exception e) {
taskManager.unregister(task);
listener.onFailure(e);
}
});
}
@Override
public void onFailure(Exception e) {
taskManager.unregister(task);
listener.onFailure(e);
}
});
return task;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,6 @@ default void setParentTask(String parentTaskNode, long parentTaskId) {

/**
* Returns the task object that should be used to keep track of the processing of the request.
*
* A request can override this method and return null to avoid being tracked by the task
* manager.
*/
default Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new Task(id, type, action, getDescription(), parentTaskId, headers);
Expand Down
7 changes: 2 additions & 5 deletions server/src/main/java/org/elasticsearch/tasks/TaskManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -91,8 +92,6 @@ public void setTaskResultsService(TaskResultsService taskResultsService) {

/**
* Registers a task without parent task
* <p>
* Returns the task manager tracked task or null if the task doesn't support the task manager
*/
public Task register(String type, String action, TaskAwareRequest request) {
Map<String, String> headers = new HashMap<>();
Expand All @@ -110,9 +109,7 @@ public Task register(String type, String action, TaskAwareRequest request) {
}
}
Task task = request.createTask(taskIdGenerator.incrementAndGet(), type, action, request.getParentTask(), headers);
if (task == null) {
return null;
}
Objects.requireNonNull(task);
assert task.getParentTaskId().equals(request.getParentTask()) : "Request [ " + request + "] didn't preserve it parentTaskId";
if (logger.isTraceEnabled()) {
logger.trace("register {} [{}] [{}] [{}]", task.getId(), type, action, task.getDescription());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,13 @@ public Request newRequest(StreamInput in) throws IOException {

public void processMessageReceived(Request request, TransportChannel channel) throws Exception {
final Task task = taskManager.register(channel.getChannelType(), action, request);
if (task == null) {
handler.messageReceived(request, channel, null);
} else {
boolean success = false;
try {
handler.messageReceived(request, new TaskTransportChannel(taskManager, task, channel), task);
success = true;
} finally {
if (success == false) {
taskManager.unregister(task);
}
boolean success = false;
try {
handler.messageReceived(request, new TaskTransportChannel(taskManager, task, channel), task);
success = true;
} finally {
if (success == false) {
taskManager.unregister(task);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ public class TransportTasksActionTests extends TaskManagerTestCase {

public static class NodeRequest extends BaseNodeRequest {
protected String requestName;
private boolean enableTaskManager;

public NodeRequest() {
super();
Expand All @@ -88,82 +87,63 @@ public NodeRequest() {
public NodeRequest(NodesRequest request, String nodeId) {
super(nodeId);
requestName = request.requestName;
enableTaskManager = request.enableTaskManager;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
requestName = in.readString();
enableTaskManager = in.readBoolean();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(requestName);
out.writeBoolean(enableTaskManager);
}

@Override
public String getDescription() {
return "CancellableNodeRequest[" + requestName + ", " + enableTaskManager + "]";
return "CancellableNodeRequest[" + requestName + "]";
}

@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
if (enableTaskManager) {
return super.createTask(id, type, action, parentTaskId, headers);
} else {
return null;
}
return super.createTask(id, type, action, parentTaskId, headers);
}
}

public static class NodesRequest extends BaseNodesRequest<NodesRequest> {
private String requestName;
private boolean enableTaskManager;

NodesRequest() {
super();
}

public NodesRequest(String requestName, String... nodesIds) {
this(requestName, true, nodesIds);
}

public NodesRequest(String requestName, boolean enableTaskManager, String... nodesIds) {
super(nodesIds);
this.requestName = requestName;
this.enableTaskManager = enableTaskManager;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
requestName = in.readString();
enableTaskManager = in.readBoolean();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(requestName);
out.writeBoolean(enableTaskManager);
}

@Override
public String getDescription() {
return "CancellableNodesRequest[" + requestName + ", " + enableTaskManager + "]";
return "CancellableNodesRequest[" + requestName + "]";
}

@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
if (enableTaskManager) {
return super.createTask(id, type, action, parentTaskId, headers);
} else {
return null;
}
return super.createTask(id, type, action, parentTaskId, headers);
}
}

Expand Down Expand Up @@ -400,7 +380,7 @@ public void onFailure(Exception e) {
assertEquals(testNodes.length, response.getPerNodeTasks().size());
for (Map.Entry<String, List<TaskInfo>> entry : response.getPerNodeTasks().entrySet()) {
assertEquals(1, entry.getValue().size());
assertEquals("CancellableNodeRequest[Test Request, true]", entry.getValue().get(0).getDescription());
assertEquals("CancellableNodeRequest[Test Request]", entry.getValue().get(0).getDescription());
}

// Make sure that the main task on coordinating node is the task that was returned to us by execute()
Expand Down Expand Up @@ -455,27 +435,6 @@ public void testFindChildTasks() throws Exception {
assertEquals(0, responses.failureCount());
}

public void testTaskManagementOptOut() throws Exception {
setupTestNodes(Settings.EMPTY);
connectNodes(testNodes);
CountDownLatch checkLatch = new CountDownLatch(1);
// Starting actions that disable task manager
ActionFuture<NodesResponse> future = startBlockingTestNodesAction(checkLatch, new NodesRequest("Test Request", false));

TestNode testNode = testNodes[randomIntBetween(0, testNodes.length - 1)];

// Get the parent task
ListTasksRequest listTasksRequest = new ListTasksRequest();
listTasksRequest.setActions("testAction*");
ListTasksResponse response = ActionTestUtils.executeBlocking(testNode.transportListTasksAction, listTasksRequest);
assertEquals(0, response.getTasks().size());

// Release all tasks and wait for response
checkLatch.countDown();
NodesResponse responses = future.get();
assertEquals(0, responses.failureCount());
}

public void testTasksDescriptions() throws Exception {
long minimalStartTime = System.currentTimeMillis();
setupTestNodes(Settings.EMPTY);
Expand All @@ -502,7 +461,7 @@ public void testTasksDescriptions() throws Exception {
assertEquals(testNodes.length, response.getPerNodeTasks().size());
for (Map.Entry<String, List<TaskInfo>> entry : response.getPerNodeTasks().entrySet()) {
assertEquals(1, entry.getValue().size());
assertEquals("CancellableNodeRequest[Test Request, true]", entry.getValue().get(0).getDescription());
assertEquals("CancellableNodeRequest[Test Request]", entry.getValue().get(0).getDescription());
assertThat(entry.getValue().get(0).getStartTime(), greaterThanOrEqualTo(minimalStartTime));
assertThat(entry.getValue().get(0).getRunningTimeNanos(), greaterThanOrEqualTo(minimalDurationNanos));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,15 @@ public MockTaskManager(Settings settings, ThreadPool threadPool, Set<String> tas
@Override
public Task register(String type, String action, TaskAwareRequest request) {
Task task = super.register(type, action, request);
if (task != null) {
for (MockTaskManagerListener listener : listeners) {
try {
listener.onTaskRegistered(task);
} catch (Exception e) {
logger.warn(
(Supplier<?>) () -> new ParameterizedMessage(
"failed to notify task manager listener about registering the task with id {}",
task.getId()),
e);
}
for (MockTaskManagerListener listener : listeners) {
try {
listener.onTaskRegistered(task);
} catch (Exception e) {
logger.warn(
(Supplier<?>) () -> new ParameterizedMessage(
"failed to notify task manager listener about registering the task with id {}",
task.getId()),
e);
}
}
return task;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
*/
package org.elasticsearch.xpack.monitoring.action;

import java.util.concurrent.ExecutorService;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.ActionFilter;
Expand All @@ -28,6 +27,7 @@
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskAwareRequest;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.test.ESTestCase;
Expand All @@ -51,6 +51,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutorService;

import static org.elasticsearch.Version.CURRENT;
import static org.hamcrest.Matchers.containsString;
Expand Down Expand Up @@ -98,7 +99,7 @@ public void setUpMocks() {
filters = mock(ActionFilters.class);

when(transportService.getTaskManager()).thenReturn(taskManager);
when(taskManager.register(anyString(), eq(MonitoringBulkAction.NAME), any(TaskAwareRequest.class))).thenReturn(null);
when(taskManager.register(anyString(), eq(MonitoringBulkAction.NAME), any(TaskAwareRequest.class))).thenReturn(mock(Task.class));
when(filters.filters()).thenReturn(new ActionFilter[0]);
when(threadPool.executor(ThreadPool.Names.GENERIC)).thenReturn(executor);

Expand Down

0 comments on commit f924835

Please sign in to comment.