Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix EsAbortPolicy to conform to API #29075

Merged
merged 7 commits into from
Mar 16, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.hasToString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.lessThanOrEqualTo;

Expand Down Expand Up @@ -330,7 +331,8 @@ public ScheduledFuture<?> schedule(TimeValue delay, String name, Runnable comman
ScrollableHitSource.Response response = new ScrollableHitSource.Response(false, emptyList(), 0, emptyList(), null);
simulateScrollResponse(new DummyAsyncBulkByScrollAction(), timeValueNanos(System.nanoTime()), 10, response);
ExecutionException e = expectThrows(ExecutionException.class, () -> listener.get());
assertThat(e.getMessage(), equalTo("EsRejectedExecutionException[test]"));
assertThat(e.getCause(), instanceOf(EsRejectedExecutionException.class));
assertThat(e.getCause(), hasToString(containsString("test")));
assertThat(client.scrollsCleared, contains(scrollId));

// When the task is rejected we don't increment the throttled timer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -827,8 +827,7 @@ private enum ElasticsearchExceptionHandle {
org.elasticsearch.indices.IndexTemplateMissingException::new, 57, UNKNOWN_VERSION_ADDED),
SEND_REQUEST_TRANSPORT_EXCEPTION(org.elasticsearch.transport.SendRequestTransportException.class,
org.elasticsearch.transport.SendRequestTransportException::new, 58, UNKNOWN_VERSION_ADDED),
ES_REJECTED_EXECUTION_EXCEPTION(org.elasticsearch.common.util.concurrent.EsRejectedExecutionException.class,
org.elasticsearch.common.util.concurrent.EsRejectedExecutionException::new, 59, UNKNOWN_VERSION_ADDED),
// 59 used to be EsRejectedExecutionException
// 60 used to be for EarlyTerminationException
// 61 used to be for RoutingValidationException
NOT_SERIALIZABLE_EXCEPTION_WRAPPER(org.elasticsearch.common.io.stream.NotSerializableExceptionWrapper.class,
Expand Down
3 changes: 3 additions & 0 deletions server/src/main/java/org/elasticsearch/ExceptionsHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.index.Index;
import org.elasticsearch.rest.RestStatus;

Expand Down Expand Up @@ -67,6 +68,8 @@ public static RestStatus status(Throwable t) {
return ((ElasticsearchException) t).status();
} else if (t instanceof IllegalArgumentException) {
return RestStatus.BAD_REQUEST;
} else if (t instanceof EsRejectedExecutionException) {
return RestStatus.TOO_MANY_REQUESTS;
}
}
return RestStatus.INTERNAL_SERVER_ERROR;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.geo.GeoPoint;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;

Expand Down Expand Up @@ -747,6 +748,13 @@ public <T extends Exception> T readException() throws IOException {
switch (key) {
case 0:
final int ord = readVInt();
// TODO: remove the if branch when master is bumped to 8.0.0
assert Version.CURRENT.major < 8;
if (ord == 59) {
final ElasticsearchException ex = new ElasticsearchException(this);
final boolean isExecutorShutdown = readBoolean();
return (T) new EsRejectedExecutionException(ex.getMessage(), isExecutorShutdown);
}
return (T) ElasticsearchException.readException(this, ord);
case 1:
String msg1 = readOptionalString();
Expand Down Expand Up @@ -831,6 +839,9 @@ public <T extends Exception> T readException() throws IOException {
return (T) readStackTrace(new InterruptedException(readOptionalString()), this);
case 17:
return (T) readStackTrace(new IOException(readOptionalString(), readException()), this);
case 18:
final boolean isExecutorShutdown = readBoolean();
return (T) readStackTrace(new EsRejectedExecutionException(readOptionalString(), isExecutorShutdown), this);
default:
throw new IOException("no such exception for id: " + key);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.elasticsearch.common.geo.GeoPoint;
import org.elasticsearch.common.io.stream.Writeable.Writer;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.joda.time.DateTimeZone;
import org.joda.time.ReadableInstant;

Expand Down Expand Up @@ -852,8 +853,28 @@ public void writeException(Throwable throwable) throws IOException {
writeCause = false;
} else if (throwable instanceof IOException) {
writeVInt(17);
} else if (throwable instanceof EsRejectedExecutionException) {
// TODO: remove the if branch when master is bumped to 8.0.0
assert Version.CURRENT.major < 8;
if (version.before(Version.V_7_0_0_alpha1)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like you're not planning to back port this any more. Can you push a comment in the 6.x branch to note that if you change the wire level there, you need to look at 7.x code too? I don't think we will so a comment seems enough (a test is tricky)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am planning to backport. I do it this way so BWC tests can pass on this branch. Then I backport and change the version. Then I remove this code from master.

A test is very tricky indeed. I thought about a qa test in 6.x only (with a bulk thread pool with queue 0 to force a rejection). I am not sure it’s worth it. I did test it manually like this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed that a test is an overkill. At least any test I can think of.

/*
* This is a backwards compatibility layer when speaking to nodes that still treated EsRejectedExceutionException as an
* instance of ElasticsearchException. As such, we serialize this in a way that the receiving node would read this as an
* EsRejectedExecutionException.
*/
final ElasticsearchException ex = new ElasticsearchException(throwable.getMessage());
writeVInt(0);
writeVInt(59);
ex.writeTo(this);
writeBoolean(((EsRejectedExecutionException) throwable).isExecutorShutdown());
return;
} else {
writeVInt(18);
writeBoolean(((EsRejectedExecutionException) throwable).isExecutorShutdown());
writeCause = false;
}
} else {
ElasticsearchException ex;
final ElasticsearchException ex;
if (throwable instanceof ElasticsearchException && ElasticsearchException.isRegistered(throwable.getClass(), version)) {
ex = (ElasticsearchException) throwable;
} else {
Expand All @@ -863,7 +884,6 @@ public void writeException(Throwable throwable) throws IOException {
writeVInt(ElasticsearchException.getId(ex.getClass()));
ex.writeTo(this);
return;

}
if (writeMessage) {
writeOptionalString(throwable.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,9 @@

package org.elasticsearch.common.util.concurrent;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.rest.RestStatus;
import java.util.concurrent.RejectedExecutionException;

import java.io.IOException;

public class EsRejectedExecutionException extends ElasticsearchException {
public class EsRejectedExecutionException extends RejectedExecutionException {

private final boolean isExecutorShutdown;

Expand All @@ -43,22 +38,6 @@ public EsRejectedExecutionException() {
this(null, false);
}

@Override
public RestStatus status() {
return RestStatus.TOO_MANY_REQUESTS;
}

public EsRejectedExecutionException(StreamInput in) throws IOException{
super(in);
isExecutorShutdown = in.readBoolean();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(isExecutorShutdown);
}

/**
* Checks if the thread pool that rejected the execution was terminated
* shortly after the rejection. Its possible that this returns false and the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
Expand Down Expand Up @@ -76,6 +77,7 @@
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertToXContentEquivalent;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.CoreMatchers.hasItems;
import static org.hamcrest.Matchers.arrayWithSize;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.startsWith;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -728,7 +728,7 @@ public void testIds() {
ids.put(56, org.elasticsearch.common.settings.SettingsException.class);
ids.put(57, org.elasticsearch.indices.IndexTemplateMissingException.class);
ids.put(58, org.elasticsearch.transport.SendRequestTransportException.class);
ids.put(59, org.elasticsearch.common.util.concurrent.EsRejectedExecutionException.class);
ids.put(59, null); // weas EsRejectedExecutionException, which is no longer an instance of ElasticsearchException
ids.put(60, null); // EarlyTerminationException was removed in 6.0
ids.put(61, null); // RoutingValidationException was removed in 5.0
ids.put(62, org.elasticsearch.common.io.stream.NotSerializableExceptionWrapper.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
package org.elasticsearch;

import org.apache.commons.codec.DecoderException;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESTestCase;

import java.util.Optional;
Expand Down Expand Up @@ -84,4 +86,9 @@ private void assertError(final Throwable cause, final Error error) {
assertThat(maybeError.get(), equalTo(error));
}

public void testStatus() {
assertThat(ExceptionsHelper.status(new IllegalArgumentException("illegal")), equalTo(RestStatus.BAD_REQUEST));
assertThat(ExceptionsHelper.status(new EsRejectedExecutionException("rejected")), equalTo(RestStatus.TOO_MANY_REQUESTS));
}

}