Skip to content

Commit

Permalink
Merge branch 'opensearch-project:main' into dual-repl-segrep-lag
Browse files Browse the repository at this point in the history
  • Loading branch information
shourya035 authored Jun 10, 2024
2 parents 7b133c5 + 53ea952 commit 84001dd
Show file tree
Hide file tree
Showing 13 changed files with 67 additions and 110 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add remote routing table for remote state publication with experimental feature flag ([#13304](https://github.com/opensearch-project/OpenSearch/pull/13304))
- Add dynamic action retry timeout setting ([#14022](https://github.com/opensearch-project/OpenSearch/issues/14022))
- [Remote Store] Add support to disable flush based on translog reader count ([#14027](https://github.com/opensearch-project/OpenSearch/pull/14027))
- Add recovery chunk size setting ([#13997](https://github.com/opensearch-project/OpenSearch/pull/13997))
- [Query Insights] Add exporter support for top n queries ([#12982](https://github.com/opensearch-project/OpenSearch/pull/12982))
- [Query Insights] Add X-Opaque-Id to search request metadata for top n queries ([#13374](https://github.com/opensearch-project/OpenSearch/pull/13374))
- Add support for query level resource usage tracking ([#13172](https://github.com/opensearch-project/OpenSearch/pull/13172))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@
import org.opensearch.indices.recovery.RecoveryState.Stage;
import org.opensearch.indices.replication.common.ReplicationLuceneIndex;
import org.opensearch.node.NodeClosedException;
import org.opensearch.node.RecoverySettingsChunkSizePlugin;
import org.opensearch.plugins.AnalysisPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.PluginsService;
Expand Down Expand Up @@ -156,7 +155,7 @@
import static java.util.stream.Collectors.toList;
import static org.opensearch.action.DocWriteResponse.Result.CREATED;
import static org.opensearch.action.DocWriteResponse.Result.UPDATED;
import static org.opensearch.node.RecoverySettingsChunkSizePlugin.CHUNK_SIZE_SETTING;
import static org.opensearch.indices.recovery.RecoverySettings.INDICES_RECOVERY_CHUNK_SIZE_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.empty;
Expand Down Expand Up @@ -187,7 +186,6 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(
MockTransportService.TestPlugin.class,
MockFSIndexStore.TestPlugin.class,
RecoverySettingsChunkSizePlugin.class,
TestAnalysisPlugin.class,
InternalSettingsPlugin.class,
MockEngineFactoryPlugin.class
Expand Down Expand Up @@ -263,7 +261,7 @@ private void slowDownRecovery(ByteSizeValue shardSize) {
// one chunk per sec..
.put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), chunkSize, ByteSizeUnit.BYTES)
// small chunks
.put(CHUNK_SIZE_SETTING.getKey(), new ByteSizeValue(chunkSize, ByteSizeUnit.BYTES))
.put(INDICES_RECOVERY_CHUNK_SIZE_SETTING.getKey(), new ByteSizeValue(chunkSize, ByteSizeUnit.BYTES))
)
.get()
.isAcknowledged()
Expand All @@ -278,7 +276,10 @@ private void restoreRecoverySpeed() {
.setTransientSettings(
Settings.builder()
.put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), "20mb")
.put(CHUNK_SIZE_SETTING.getKey(), RecoverySettings.DEFAULT_CHUNK_SIZE)
.put(
INDICES_RECOVERY_CHUNK_SIZE_SETTING.getKey(),
RecoverySettings.INDICES_RECOVERY_CHUNK_SIZE_SETTING.getDefault(Settings.EMPTY)
)
)
.get()
.isAcknowledged()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.indices.recovery.FileChunkRequest;
import org.opensearch.indices.recovery.PeerRecoveryTargetService;
import org.opensearch.node.RecoverySettingsChunkSizePlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase;
Expand All @@ -61,7 +60,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.opensearch.node.RecoverySettingsChunkSizePlugin.CHUNK_SIZE_SETTING;
import static org.opensearch.indices.recovery.RecoverySettings.INDICES_RECOVERY_CHUNK_SIZE_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
Expand All @@ -81,7 +80,7 @@ public static Collection<Object[]> parameters() {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(MockTransportService.TestPlugin.class, RecoverySettingsChunkSizePlugin.class);
return Arrays.asList(MockTransportService.TestPlugin.class);
}

/**
Expand All @@ -96,7 +95,8 @@ public void testCancelRecoveryAndResume() throws Exception {
.cluster()
.prepareUpdateSettings()
.setTransientSettings(
Settings.builder().put(CHUNK_SIZE_SETTING.getKey(), new ByteSizeValue(randomIntBetween(50, 300), ByteSizeUnit.BYTES))
Settings.builder()
.put(INDICES_RECOVERY_CHUNK_SIZE_SETTING.getKey(), new ByteSizeValue(randomIntBetween(50, 300), ByteSizeUnit.BYTES))
)
.get()
.isAcknowledged()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ public void apply(Settings value, Settings current, Settings previous) {
RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_OPERATIONS_SETTING,
RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_REMOTE_STORE_STREAMS_SETTING,
RecoverySettings.INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT,
RecoverySettings.INDICES_RECOVERY_CHUNK_SIZE_SETTING,
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING,
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING,
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -480,28 +480,37 @@ public <T> T getTransient(String key) {
* @param value the header value
*/
public void addResponseHeader(final String key, final String value) {
addResponseHeader(key, value, v -> v);
updateResponseHeader(key, value, v -> v, false);
}

/**
* Remove the {@code value} for the specified {@code key}.
* Update the {@code value} for the specified {@code key}
*
* @param key the header name
* @param value the header value
*/
public void removeResponseHeader(final String key) {
threadLocal.get().responseHeaders.remove(key);
public void updateResponseHeader(final String key, final String value) {
updateResponseHeader(key, value, v -> v, true);
}

/**
* Add the {@code value} for the specified {@code key} with the specified {@code uniqueValue} used for de-duplication. Any duplicate
* Update the {@code value} for the specified {@code key} with the specified {@code uniqueValue} used for de-duplication. Any duplicate
* {@code value} after applying {@code uniqueValue} is ignored.
*
* @param key the header name
* @param value the header value
* @param uniqueValue the function that produces de-duplication values
*/
public void addResponseHeader(final String key, final String value, final Function<String, String> uniqueValue) {
threadLocal.set(threadLocal.get().putResponse(key, value, uniqueValue, maxWarningHeaderCount, maxWarningHeaderSize));
* @param replaceExistingKey whether to replace the existing header if it already exists
*/
public void updateResponseHeader(
final String key,
final String value,
final Function<String, String> uniqueValue,
final boolean replaceExistingKey
) {
threadLocal.set(
threadLocal.get().putResponse(key, value, uniqueValue, maxWarningHeaderCount, maxWarningHeaderSize, replaceExistingKey)
);
}

/**
Expand Down Expand Up @@ -726,7 +735,8 @@ private ThreadContextStruct putResponse(
final String value,
final Function<String, String> uniqueValue,
final int maxWarningHeaderCount,
final long maxWarningHeaderSize
final long maxWarningHeaderSize,
final boolean replaceExistingKey
) {
assert value != null;
long newWarningHeaderSize = warningHeadersSize;
Expand Down Expand Up @@ -768,8 +778,13 @@ private ThreadContextStruct putResponse(
if (existingValues.contains(uniqueValue.apply(value))) {
return this;
}
// preserve insertion order
final Set<String> newValues = Stream.concat(existingValues.stream(), Stream.of(value)).collect(LINKED_HASH_SET_COLLECTOR);
Set<String> newValues;
if (replaceExistingKey) {
newValues = Stream.of(value).collect(LINKED_HASH_SET_COLLECTOR);
} else {
// preserve insertion order
newValues = Stream.concat(existingValues.stream(), Stream.of(value)).collect(LINKED_HASH_SET_COLLECTOR);
}
newResponseHeaders = new HashMap<>(responseHeaders);
newResponseHeaders.put(key, Collections.unmodifiableSet(newValues));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,14 @@ public class RecoverySettings {
);

// choose 512KB-16B to ensure that the resulting byte[] is not a humongous allocation in G1.
public static final ByteSizeValue DEFAULT_CHUNK_SIZE = new ByteSizeValue(512 * 1024 - 16, ByteSizeUnit.BYTES);
public static final Setting<ByteSizeValue> INDICES_RECOVERY_CHUNK_SIZE_SETTING = Setting.byteSizeSetting(
"indices.recovery.chunk_size",
new ByteSizeValue(512 * 1024 - 16, ByteSizeUnit.BYTES),
new ByteSizeValue(1, ByteSizeUnit.BYTES),
new ByteSizeValue(100, ByteSizeUnit.MB),
Property.Dynamic,
Property.NodeScope
);

private volatile ByteSizeValue recoveryMaxBytesPerSec;
private volatile ByteSizeValue replicationMaxBytesPerSec;
Expand All @@ -193,7 +200,7 @@ public class RecoverySettings {
private volatile TimeValue internalActionRetryTimeout;
private volatile TimeValue internalActionLongTimeout;

private volatile ByteSizeValue chunkSize = DEFAULT_CHUNK_SIZE;
private volatile ByteSizeValue chunkSize;
private volatile TimeValue internalRemoteUploadTimeout;

public RecoverySettings(Settings settings, ClusterSettings clusterSettings) {
Expand Down Expand Up @@ -221,6 +228,7 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) {

logger.debug("using recovery max_bytes_per_sec[{}]", recoveryMaxBytesPerSec);
this.internalRemoteUploadTimeout = INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT.get(settings);
this.chunkSize = INDICES_RECOVERY_CHUNK_SIZE_SETTING.get(settings);

clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING, this::setRecoveryMaxBytesPerSec);
clusterSettings.addSettingsUpdateConsumer(INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING, this::setReplicationMaxBytesPerSec);
Expand All @@ -239,11 +247,11 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) {
);
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING, this::setActivityTimeout);
clusterSettings.addSettingsUpdateConsumer(INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT, this::setInternalRemoteUploadTimeout);
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_CHUNK_SIZE_SETTING, this::setChunkSize);
clusterSettings.addSettingsUpdateConsumer(
INDICES_RECOVERY_INTERNAL_ACTION_RETRY_TIMEOUT_SETTING,
this::setInternalActionRetryTimeout
);

}

public RateLimiter recoveryRateLimiter() {
Expand Down Expand Up @@ -286,10 +294,7 @@ public ByteSizeValue getChunkSize() {
return chunkSize;
}

public void setChunkSize(ByteSizeValue chunkSize) { // only settable for tests
if (chunkSize.bytesAsInt() <= 0) {
throw new IllegalArgumentException("chunkSize must be > 0");
}
public void setChunkSize(ByteSizeValue chunkSize) {
this.chunkSize = chunkSize;
}

Expand Down
5 changes: 0 additions & 5 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -1339,7 +1339,6 @@ protected Node(
b.bind(GatewayMetaState.class).toInstance(gatewayMetaState);
b.bind(Discovery.class).toInstance(discoveryModule.getDiscovery());
{
processRecoverySettings(settingsModule.getClusterSettings(), recoverySettings);
b.bind(PeerRecoverySourceService.class)
.toInstance(new PeerRecoverySourceService(transportService, indicesService, recoverySettings));
b.bind(PeerRecoveryTargetService.class)
Expand Down Expand Up @@ -1447,10 +1446,6 @@ protected TransportService newTransportService(
return new TransportService(settings, transport, threadPool, interceptor, localNodeFactory, clusterSettings, taskHeaders, tracer);
}

protected void processRecoverySettings(ClusterSettings clusterSettings, RecoverySettings recoverySettings) {
// Noop in production, overridden by tests
}

/**
* The settings that are used by this node. Contains original settings as well as additional settings provided by plugins.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,10 +322,7 @@ public void writeTaskResourceUsage(SearchShardTask task, String nodeId) {
)
.build();
// Remove the existing TASK_RESOURCE_USAGE header since it would have come from an earlier phase in the same request.
synchronized (this) {
threadPool.getThreadContext().removeResponseHeader(TASK_RESOURCE_USAGE);
threadPool.getThreadContext().addResponseHeader(TASK_RESOURCE_USAGE, taskResourceInfo.toString());
}
threadPool.getThreadContext().updateResponseHeader(TASK_RESOURCE_USAGE, taskResourceInfo.toString());
} catch (Exception e) {
logger.debug("Error during writing task resource usage: ", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,11 +344,16 @@ public void testResponseHeaders() {
}

final String value = HeaderWarning.formatWarning("qux");
threadContext.addResponseHeader("baz", value, s -> HeaderWarning.extractWarningValueFromWarningHeader(s, false));
threadContext.updateResponseHeader("baz", value, s -> HeaderWarning.extractWarningValueFromWarningHeader(s, false), false);
// pretend that another thread created the same response at a different time
if (randomBoolean()) {
final String duplicateValue = HeaderWarning.formatWarning("qux");
threadContext.addResponseHeader("baz", duplicateValue, s -> HeaderWarning.extractWarningValueFromWarningHeader(s, false));
threadContext.updateResponseHeader(
"baz",
duplicateValue,
s -> HeaderWarning.extractWarningValueFromWarningHeader(s, false),
false
);
}

threadContext.addResponseHeader("Warning", "One is the loneliest number");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,14 @@ public void testInternalLongActionTimeout() {
assertEquals(new TimeValue(duration, timeUnit), recoverySettings.internalActionLongTimeout());
}

public void testChunkSize() {
ByteSizeValue chunkSize = new ByteSizeValue(between(1, 1000), ByteSizeUnit.BYTES);
clusterSettings.applySettings(
Settings.builder().put(RecoverySettings.INDICES_RECOVERY_CHUNK_SIZE_SETTING.getKey(), chunkSize).build()
);
assertEquals(chunkSize, recoverySettings.getChunkSize());
}

public void testInternalActionRetryTimeout() {
long duration = between(1, 1000);
TimeUnit timeUnit = randomFrom(TimeUnit.MILLISECONDS, TimeUnit.SECONDS, TimeUnit.MINUTES, TimeUnit.HOURS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1148,7 +1148,7 @@ public final void recoverUnstartedReplica(
startingSeqNo
);
long fileChunkSizeInBytes = randomBoolean()
? RecoverySettings.DEFAULT_CHUNK_SIZE.getBytes()
? RecoverySettings.INDICES_RECOVERY_CHUNK_SIZE_SETTING.getDefault(Settings.EMPTY).getBytes()
: randomIntBetween(1, 10 * 1024 * 1024);
final Settings settings = Settings.builder()
.put("indices.recovery.max_concurrent_file_chunks", Integer.toString(between(1, 4)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import org.opensearch.env.Environment;
import org.opensearch.http.HttpServerTransport;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.plugins.Plugin;
import org.opensearch.script.MockScriptService;
import org.opensearch.script.ScriptContext;
Expand Down Expand Up @@ -236,13 +235,6 @@ protected TransportService newTransportService(
}
}

@Override
protected void processRecoverySettings(ClusterSettings clusterSettings, RecoverySettings recoverySettings) {
if (false == getPluginsService().filterPlugins(RecoverySettingsChunkSizePlugin.class).isEmpty()) {
clusterSettings.addSettingsUpdateConsumer(RecoverySettingsChunkSizePlugin.CHUNK_SIZE_SETTING, recoverySettings::setChunkSize);
}
}

@Override
protected ClusterInfoService newClusterInfoService(
Settings settings,
Expand Down

This file was deleted.

0 comments on commit 84001dd

Please sign in to comment.