Skip to content

Commit

Permalink
Preserve response headers when creating an index
Browse files Browse the repository at this point in the history
This commit preserves the response headers when creating an index and updating settings for an
index.

Closes elastic#23947
  • Loading branch information
jaymode committed Apr 6, 2017
1 parent 2620200 commit b59d5d0
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,12 @@ public void onFailure(Exception e) {
delegate.onFailure(e);
}
}

/**
* Wraps the provided action listener in a {@link ContextPreservingActionListener} that will
* also copy the response headers when the {@link ThreadContext.StoredContext} is closed
*/
public static <R> ContextPreservingActionListener<R> wrapPreservingContext(ActionListener<R> listener, ThreadContext threadContext) {
return new ContextPreservingActionListener<>(threadContext.newRestorableContext(true), listener);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.ActiveShardsObserver;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
Expand Down Expand Up @@ -93,6 +92,7 @@
import java.util.function.BiFunction;
import java.util.function.Predicate;

import static org.elasticsearch.action.support.ContextPreservingActionListener.wrapPreservingContext;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_CREATION_DATE;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_INDEX_UUID;
Expand Down Expand Up @@ -222,7 +222,9 @@ private void onlyCreateIndex(final CreateIndexClusterStateUpdateRequest request,
request.settings(updatedSettingsBuilder.build());

clusterService.submitStateUpdateTask("create-index [" + request.index() + "], cause [" + request.cause() + "]",
new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(Priority.URGENT, request, wrapPreservingContext(listener)) {
new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(Priority.URGENT, request,
wrapPreservingContext(listener, threadPool.getThreadContext())) {

@Override
protected ClusterStateUpdateResponse newResponse(boolean acknowledged) {
return new ClusterStateUpdateResponse(acknowledged);
Expand Down Expand Up @@ -476,10 +478,6 @@ public void onFailure(String source, Exception e) {
});
}

private ContextPreservingActionListener<ClusterStateUpdateResponse> wrapPreservingContext(ActionListener<ClusterStateUpdateResponse> listener) {
return new ContextPreservingActionListener<>(threadPool.getThreadContext().newRestorableContext(false), listener);
}

private List<IndexTemplateMetaData> findTemplates(CreateIndexClusterStateUpdateRequest request, ClusterState state) throws IOException {
List<IndexTemplateMetaData> templateMetadata = new ArrayList<>();
for (ObjectCursor<IndexTemplateMetaData> cursor : state.metaData().templates().values()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsClusterStateUpdateRequest;
import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeSettingsClusterStateUpdateRequest;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
Expand Down Expand Up @@ -56,6 +55,8 @@
import java.util.Map;
import java.util.Set;

import static org.elasticsearch.action.support.ContextPreservingActionListener.wrapPreservingContext;

/**
* Service responsible for submitting update index settings requests
*/
Expand Down Expand Up @@ -180,7 +181,8 @@ public void updateSettings(final UpdateSettingsClusterStateUpdateRequest request
final boolean preserveExisting = request.isPreserveExisting();

clusterService.submitStateUpdateTask("update-settings",
new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(Priority.URGENT, request, wrapPreservingContext(listener)) {
new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(Priority.URGENT, request,
wrapPreservingContext(listener, threadPool.getThreadContext())) {

@Override
protected ClusterStateUpdateResponse newResponse(boolean acknowledged) {
Expand Down Expand Up @@ -284,10 +286,6 @@ public ClusterState execute(ClusterState currentState) {
});
}

private ContextPreservingActionListener<ClusterStateUpdateResponse> wrapPreservingContext(ActionListener<ClusterStateUpdateResponse> listener) {
return new ContextPreservingActionListener<>(threadPool.getThreadContext().newRestorableContext(false), listener);
}

/**
* Updates the cluster block only iff the setting exists in the given settings
*/
Expand All @@ -307,7 +305,8 @@ private static void maybeUpdateClusterBlock(String[] actualIndices, ClusterBlock

public void upgradeIndexSettings(final UpgradeSettingsClusterStateUpdateRequest request, final ActionListener<ClusterStateUpdateResponse> listener) {
clusterService.submitStateUpdateTask("update-index-compatibility-versions",
new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(Priority.URGENT, request, wrapPreservingContext(listener)) {
new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(Priority.URGENT, request,
wrapPreservingContext(listener, threadPool.getThreadContext())) {

@Override
protected ClusterStateUpdateResponse newResponse(boolean acknowledged) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,10 @@ public void testOriginalContextIsPreservedAfterOnResponse() throws IOException {
if (nonEmptyContext) {
threadContext.putHeader("not empty", "value");
}
ContextPreservingActionListener<Void> actionListener;
final ContextPreservingActionListener<Void> actionListener;
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
threadContext.putHeader("foo", "bar");
actionListener = new ContextPreservingActionListener<>(threadContext.newRestorableContext(true),
new ActionListener<Void>() {
final ActionListener<Void> delegate = new ActionListener<Void>() {
@Override
public void onResponse(Void aVoid) {
assertEquals("bar", threadContext.getHeader("foo"));
Expand All @@ -48,7 +47,12 @@ public void onResponse(Void aVoid) {
public void onFailure(Exception e) {
throw new RuntimeException("onFailure shouldn't be called", e);
}
});
};
if (randomBoolean()) {
actionListener = new ContextPreservingActionListener<>(threadContext.newRestorableContext(true), delegate);
} else {
actionListener = ContextPreservingActionListener.wrapPreservingContext(delegate, threadContext);
}
}

assertNull(threadContext.getHeader("foo"));
Expand All @@ -67,22 +71,28 @@ public void testOriginalContextIsPreservedAfterOnFailure() throws Exception {
if (nonEmptyContext) {
threadContext.putHeader("not empty", "value");
}
ContextPreservingActionListener<Void> actionListener;
final ContextPreservingActionListener<Void> actionListener;
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
threadContext.putHeader("foo", "bar");
actionListener = new ContextPreservingActionListener<>(threadContext.newRestorableContext(true),
new ActionListener<Void>() {
@Override
public void onResponse(Void aVoid) {
throw new RuntimeException("onResponse shouldn't be called");
}

@Override
public void onFailure(Exception e) {
assertEquals("bar", threadContext.getHeader("foo"));
assertNull(threadContext.getHeader("not empty"));
}
});
final ActionListener<Void> delegate = new ActionListener<Void>() {
@Override
public void onResponse(Void aVoid) {
throw new RuntimeException("onResponse shouldn't be called");
}

@Override
public void onFailure(Exception e) {
assertEquals("bar", threadContext.getHeader("foo"));
assertNull(threadContext.getHeader("not empty"));
}
};

if (randomBoolean()) {
actionListener = new ContextPreservingActionListener<>(threadContext.newRestorableContext(true), delegate);
} else {
actionListener = ContextPreservingActionListener.wrapPreservingContext(delegate, threadContext);
}

}

assertNull(threadContext.getHeader("foo"));
Expand All @@ -101,25 +111,30 @@ public void testOriginalContextIsWhenListenerThrows() throws Exception {
if (nonEmptyContext) {
threadContext.putHeader("not empty", "value");
}
ContextPreservingActionListener<Void> actionListener;
final ContextPreservingActionListener<Void> actionListener;
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
threadContext.putHeader("foo", "bar");
actionListener = new ContextPreservingActionListener<>(threadContext.newRestorableContext(true),
new ActionListener<Void>() {
@Override
public void onResponse(Void aVoid) {
assertEquals("bar", threadContext.getHeader("foo"));
assertNull(threadContext.getHeader("not empty"));
throw new RuntimeException("onResponse called");
}

@Override
public void onFailure(Exception e) {
assertEquals("bar", threadContext.getHeader("foo"));
assertNull(threadContext.getHeader("not empty"));
throw new RuntimeException("onFailure called");
}
});
final ActionListener<Void> delegate = new ActionListener<Void>() {
@Override
public void onResponse(Void aVoid) {
assertEquals("bar", threadContext.getHeader("foo"));
assertNull(threadContext.getHeader("not empty"));
throw new RuntimeException("onResponse called");
}

@Override
public void onFailure(Exception e) {
assertEquals("bar", threadContext.getHeader("foo"));
assertNull(threadContext.getHeader("not empty"));
throw new RuntimeException("onFailure called");
}
};

if (randomBoolean()) {
actionListener = new ContextPreservingActionListener<>(threadContext.newRestorableContext(true), delegate);
} else {
actionListener = ContextPreservingActionListener.wrapPreservingContext(delegate, threadContext);
}
}

assertNull(threadContext.getHeader("foo"));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
---
"Create index with deprecated settings":

- skip:
version: "all"
reason: removed in 6.0
features: "warnings"
- do:
indices.create:
index: test_index
body:
settings:
number_of_shards: 1
shadow_replicas: true
shared_filesystem: false
mappings:
type:
properties:
field:
type: "string"
field2:
type: "long"
store : "no"
warnings:
- "[index.shadow_replicas] setting was deprecated in Elasticsearch and will be removed in a future release! See the breaking changes documentation for the next major version."
- "[index.shared_filesystem] setting was deprecated in Elasticsearch and will be removed in a future release! See the breaking changes documentation for the next major version."
- "The [string] field is deprecated, please use [text] or [keyword] instead on [field]"
- "Expected a boolean [true/false] for property [field2.store] but got [no]"

0 comments on commit b59d5d0

Please sign in to comment.