Skip to content

Commit

Permalink
Make UpdateSettingsClusterStateUpdateRequest a record (elastic#113484)
Browse files Browse the repository at this point in the history
No need to extend `IndicesClusterStateUpdateRequest`, this thing can be
completely immutable.

Backport of elastic#113450 to 8.x
  • Loading branch information
DaveCTurner authored Sep 25, 2024
1 parent 5e15f84 commit 5ef062c
Show file tree
Hide file tree
Showing 7 changed files with 165 additions and 132 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

import static org.hamcrest.Matchers.equalTo;

Expand All @@ -42,45 +43,58 @@ public void testThatNonDynamicSettingChangesTakeEffect() throws Exception {
MetadataUpdateSettingsService metadataUpdateSettingsService = internalCluster().getCurrentMasterNodeInstance(
MetadataUpdateSettingsService.class
);
UpdateSettingsClusterStateUpdateRequest request = new UpdateSettingsClusterStateUpdateRequest().ackTimeout(TimeValue.ZERO);
List<Index> indices = new ArrayList<>();
List<Index> indicesList = new ArrayList<>();
for (IndicesService indicesService : internalCluster().getInstances(IndicesService.class)) {
for (IndexService indexService : indicesService) {
indices.add(indexService.index());
indicesList.add(indexService.index());
}
}
request.indices(indices.toArray(Index.EMPTY_ARRAY));
request.settings(Settings.builder().put("index.codec", "FastDecompressionCompressingStoredFieldsData").build());
final var indices = indicesList.toArray(Index.EMPTY_ARRAY);

final Function<UpdateSettingsClusterStateUpdateRequest.OnStaticSetting, UpdateSettingsClusterStateUpdateRequest> requestFactory =
onStaticSetting -> new UpdateSettingsClusterStateUpdateRequest(
TEST_REQUEST_TIMEOUT,
TimeValue.ZERO,
Settings.builder().put("index.codec", "FastDecompressionCompressingStoredFieldsData").build(),
UpdateSettingsClusterStateUpdateRequest.OnExisting.OVERWRITE,
onStaticSetting,
indices
);

// First make sure it fails if reopenShards is not set on the request:
AtomicBoolean expectedFailureOccurred = new AtomicBoolean(false);
metadataUpdateSettingsService.updateSettings(request, new ActionListener<>() {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
fail("Should have failed updating a non-dynamic setting without reopenShards set to true");
}
metadataUpdateSettingsService.updateSettings(
requestFactory.apply(UpdateSettingsClusterStateUpdateRequest.OnStaticSetting.REJECT),
new ActionListener<>() {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
fail("Should have failed updating a non-dynamic setting without reopenShards set to true");
}

@Override
public void onFailure(Exception e) {
expectedFailureOccurred.set(true);
@Override
public void onFailure(Exception e) {
expectedFailureOccurred.set(true);
}
}
});
);
assertBusy(() -> assertThat(expectedFailureOccurred.get(), equalTo(true)));

// Now we set reopenShards and expect it to work:
request.reopenShards(true);
AtomicBoolean success = new AtomicBoolean(false);
metadataUpdateSettingsService.updateSettings(request, new ActionListener<>() {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
success.set(true);
}
metadataUpdateSettingsService.updateSettings(
requestFactory.apply(UpdateSettingsClusterStateUpdateRequest.OnStaticSetting.REOPEN_INDICES),
new ActionListener<>() {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
success.set(true);
}

@Override
public void onFailure(Exception e) {
fail(e);
@Override
public void onFailure(Exception e) {
fail(e);
}
}
});
);
assertBusy(() -> assertThat(success.get(), equalTo(true)));

// Now we look into the IndexShard objects to make sure that the code was actually updated (vs just the setting):
Expand Down Expand Up @@ -110,16 +124,23 @@ public void testThatNonDynamicSettingChangesDoNotUnncessesarilyCauseReopens() th
MetadataUpdateSettingsService metadataUpdateSettingsService = internalCluster().getCurrentMasterNodeInstance(
MetadataUpdateSettingsService.class
);
UpdateSettingsClusterStateUpdateRequest request = new UpdateSettingsClusterStateUpdateRequest().ackTimeout(TimeValue.ZERO);
List<Index> indices = new ArrayList<>();
List<Index> indicesList = new ArrayList<>();
for (IndicesService indicesService : internalCluster().getInstances(IndicesService.class)) {
for (IndexService indexService : indicesService) {
indices.add(indexService.index());
indicesList.add(indexService.index());
}
}
request.indices(indices.toArray(Index.EMPTY_ARRAY));
request.settings(Settings.builder().put("index.codec", "FastDecompressionCompressingStoredFieldsData").build());
request.reopenShards(true);
final var indices = indicesList.toArray(Index.EMPTY_ARRAY);

final Function<Settings.Builder, UpdateSettingsClusterStateUpdateRequest> requestFactory =
settings -> new UpdateSettingsClusterStateUpdateRequest(
TEST_REQUEST_TIMEOUT,
TimeValue.ZERO,
settings.build(),
UpdateSettingsClusterStateUpdateRequest.OnExisting.OVERWRITE,
UpdateSettingsClusterStateUpdateRequest.OnStaticSetting.REOPEN_INDICES,
indices
);

ClusterService clusterService = internalCluster().getInstance(ClusterService.class);
AtomicBoolean shardsUnassigned = new AtomicBoolean(false);
Expand All @@ -142,47 +163,49 @@ public void testThatNonDynamicSettingChangesDoNotUnncessesarilyCauseReopens() th

AtomicBoolean success = new AtomicBoolean(false);
// Make the first request, just to set things up:
metadataUpdateSettingsService.updateSettings(request, new ActionListener<>() {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
success.set(true);
}
metadataUpdateSettingsService.updateSettings(
requestFactory.apply(Settings.builder().put("index.codec", "FastDecompressionCompressingStoredFieldsData")),
new ActionListener<>() {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
success.set(true);
}

@Override
public void onFailure(Exception e) {
fail(e);
@Override
public void onFailure(Exception e) {
fail(e);
}
}
});
);
assertBusy(() -> assertThat(success.get(), equalTo(true)));
assertBusy(() -> assertThat(expectedSettingsChangeInClusterState.get(), equalTo(true)));
assertThat(shardsUnassigned.get(), equalTo(true));

assertBusy(() -> assertThat(hasUnassignedShards(clusterService.state(), indexName), equalTo(false)));

// Same request, except now we'll also set the dynamic "index.max_result_window" setting:
request.settings(
Settings.builder()
.put("index.codec", "FastDecompressionCompressingStoredFieldsData")
.put("index.max_result_window", "1500")
.build()
);
success.set(false);
expectedSettingsChangeInClusterState.set(false);
shardsUnassigned.set(false);
expectedSetting.set("index.max_result_window");
expectedSettingValue.set("1500");
// Making this request ought to add this new setting but not unassign the shards:
metadataUpdateSettingsService.updateSettings(request, new ActionListener<>() {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
success.set(true);
}
metadataUpdateSettingsService.updateSettings(
// Same request, except now we'll also set the dynamic "index.max_result_window" setting:
requestFactory.apply(
Settings.builder().put("index.codec", "FastDecompressionCompressingStoredFieldsData").put("index.max_result_window", "1500")
),
new ActionListener<>() {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
success.set(true);
}

@Override
public void onFailure(Exception e) {
fail(e);
@Override
public void onFailure(Exception e) {
fail(e);
}
}
});
);

assertBusy(() -> assertThat(success.get(), equalTo(true)));
assertBusy(() -> assertThat(expectedSettingsChangeInClusterState.get(), equalTo(true)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,19 +124,24 @@ protected void masterOperation(
return;
}

UpdateSettingsClusterStateUpdateRequest clusterStateUpdateRequest = new UpdateSettingsClusterStateUpdateRequest().indices(
concreteIndices
)
.settings(requestSettings)
.setPreserveExisting(request.isPreserveExisting())
.reopenShards(request.reopen())
.ackTimeout(request.ackTimeout())
.masterNodeTimeout(request.masterNodeTimeout());

updateSettingsService.updateSettings(clusterStateUpdateRequest, listener.delegateResponse((l, e) -> {
logger.debug(() -> "failed to update settings on indices [" + Arrays.toString(concreteIndices) + "]", e);
l.onFailure(e);
}));
updateSettingsService.updateSettings(
new UpdateSettingsClusterStateUpdateRequest(
request.masterNodeTimeout(),
request.ackTimeout(),
requestSettings,
request.isPreserveExisting()
? UpdateSettingsClusterStateUpdateRequest.OnExisting.PRESERVE
: UpdateSettingsClusterStateUpdateRequest.OnExisting.OVERWRITE,
request.reopen()
? UpdateSettingsClusterStateUpdateRequest.OnStaticSetting.REOPEN_INDICES
: UpdateSettingsClusterStateUpdateRequest.OnStaticSetting.REJECT,
concreteIndices
),
listener.delegateResponse((l, e) -> {
logger.debug(() -> "failed to update settings on indices [" + Arrays.toString(concreteIndices) + "]", e);
l.onFailure(e);
})
);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,70 +9,60 @@

package org.elasticsearch.action.admin.indices.settings.put;

import org.elasticsearch.cluster.ack.IndicesClusterStateUpdateRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.Index;

import java.util.Arrays;
import java.util.Objects;

/**
* Cluster state update request that allows to update settings for some indices
*/
public class UpdateSettingsClusterStateUpdateRequest extends IndicesClusterStateUpdateRequest<UpdateSettingsClusterStateUpdateRequest> {

private Settings settings;

private boolean preserveExisting = false;

private boolean reopenShards = false;

/**
* Returns <code>true</code> iff the settings update should only add but not update settings. If the setting already exists
* it should not be overwritten by this update. The default is <code>false</code>
*/
public boolean isPreserveExisting() {
return preserveExisting;
}
public record UpdateSettingsClusterStateUpdateRequest(
TimeValue masterNodeTimeout,
TimeValue ackTimeout,
Settings settings,
OnExisting onExisting,
OnStaticSetting onStaticSetting,
Index... indices
) {

/**
* Returns <code>true</code> if non-dynamic setting updates should go through, by automatically unassigning shards in the same cluster
* state change as the setting update. The shards will be automatically reassigned after the cluster state update is made. The
* default is <code>false</code>.
* Specifies the behaviour of an update-settings action on existing settings.
*/
public boolean reopenShards() {
return reopenShards;
}
public enum OnExisting {
/**
* Update all the specified settings, overwriting any settings which already exist. This is the API default.
*/
OVERWRITE,

public UpdateSettingsClusterStateUpdateRequest reopenShards(boolean reopenShards) {
this.reopenShards = reopenShards;
return this;
/**
* Only add new settings, preserving the values of any settings which are already set and ignoring the new values specified in the
* request.
*/
PRESERVE
}

/**
* Iff set to <code>true</code> this settings update will only add settings not already set on an index. Existing settings remain
* unchanged.
* Specifies the behaviour of an update-settings action which is trying to adjust a non-dynamic setting.
*/
public UpdateSettingsClusterStateUpdateRequest setPreserveExisting(boolean preserveExisting) {
this.preserveExisting = preserveExisting;
return this;
}
public enum OnStaticSetting {
/**
* Reject attempts to update non-dynamic settings on open indices. This is the API default.
*/
REJECT,

/**
* Returns the {@link Settings} to update
*/
public Settings settings() {
return settings;
}

/**
* Sets the {@link Settings} to update
*/
public UpdateSettingsClusterStateUpdateRequest settings(Settings settings) {
this.settings = settings;
return this;
/**
* Automatically close and reopen the shards of any open indices when updating a non-dynamic setting, forcing the shard to
* reinitialize from scratch.
*/
REOPEN_INDICES
}

@Override
public String toString() {
return Arrays.toString(indices()) + settings;
public UpdateSettingsClusterStateUpdateRequest {
Objects.requireNonNull(masterNodeTimeout);
Objects.requireNonNull(ackTimeout);
Objects.requireNonNull(settings);
Objects.requireNonNull(indices);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ ClusterState execute(ClusterState currentState) {
}
final Settings closedSettings = settingsForClosedIndices.build();
final Settings openSettings = settingsForOpenIndices.build();
final boolean preserveExisting = request.isPreserveExisting();
final boolean preserveExisting = request.onExisting() == UpdateSettingsClusterStateUpdateRequest.OnExisting.PRESERVE;

RoutingTable.Builder routingTableBuilder = null;
Metadata.Builder metadataBuilder = Metadata.builder(currentState.metadata());
Expand All @@ -199,7 +199,7 @@ ClusterState execute(ClusterState currentState) {
}

if (skippedSettings.isEmpty() == false && openIndices.isEmpty() == false) {
if (request.reopenShards()) {
if (request.onStaticSetting() == UpdateSettingsClusterStateUpdateRequest.OnStaticSetting.REOPEN_INDICES) {
// We have non-dynamic settings and open indices. We will unassign all of the shards in these indices so that the new
// changed settings are applied when the shards are re-assigned.
routingTableBuilder = RoutingTable.builder(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -537,11 +537,18 @@ private CheckedBiConsumer<ActionListener<BulkByScrollResponse>, AcknowledgedResp
*/
private void setWriteBlock(Index index, boolean readOnlyValue, ActionListener<AcknowledgedResponse> listener) {
final Settings readOnlySettings = Settings.builder().put(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.getKey(), readOnlyValue).build();
UpdateSettingsClusterStateUpdateRequest updateSettingsRequest = new UpdateSettingsClusterStateUpdateRequest().indices(
new Index[] { index }
).settings(readOnlySettings).setPreserveExisting(false).ackTimeout(TimeValue.ZERO);

metadataUpdateSettingsService.updateSettings(updateSettingsRequest, listener);
metadataUpdateSettingsService.updateSettings(
new UpdateSettingsClusterStateUpdateRequest(
TimeValue.MINUS_ONE,
TimeValue.ZERO,
readOnlySettings,
UpdateSettingsClusterStateUpdateRequest.OnExisting.OVERWRITE,
UpdateSettingsClusterStateUpdateRequest.OnStaticSetting.REJECT,
index
),
listener
);
}

private void reindex(SystemIndexMigrationInfo migrationInfo, ActionListener<BulkByScrollResponse> listener) {
Expand Down
Loading

0 comments on commit 5ef062c

Please sign in to comment.