Skip to content

Commit

Permalink
Merge branch 'main' into issue-14519
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Widdis <[email protected]>
  • Loading branch information
dbwiddis committed Jun 27, 2024
2 parents c6dbb49 + bb9819c commit ab69cb4
Show file tree
Hide file tree
Showing 17 changed files with 272 additions and 54 deletions.
1 change: 1 addition & 0 deletions .ci/bwcVersions
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,5 @@ BWC_VERSION:
- "2.14.0"
- "2.14.1"
- "2.15.0"
- "2.15.1"
- "2.16.0"
2 changes: 1 addition & 1 deletion .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,4 @@

/.github/ @peternied

/MAINTAINERS.md @anasalkouz @andrross @Bukhtawar @CEHENKLE @dblock @dbwiddis @gbbafna @jed326 @kotwanikunal @mch2 @msfroh @nknize @owaiskazi19 @peternied @reta @Rishikesh1159 @sachinpkale @saratvemulapalli @shwetathareja @sohami @VachaShah
/MAINTAINERS.md @anasalkouz @andrross @ashking94 @Bukhtawar @CEHENKLE @dblock @dbwiddis @gbbafna @jed326 @kotwanikunal @mch2 @msfroh @nknize @owaiskazi19 @peternied @reta @Rishikesh1159 @sachinpkale @saratvemulapalli @shwetathareja @sohami @VachaShah
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Apply the date histogram rewrite optimization to range aggregation ([#13865](https://github.com/opensearch-project/OpenSearch/pull/13865))
- [Writable Warm] Add composite directory implementation and integrate it with FileCache ([12782](https://github.com/opensearch-project/OpenSearch/pull/12782))
- Fix race condition while parsing derived fields from search definition ([14445](https://github.com/opensearch-project/OpenSearch/pull/14445))
- Add allowlist setting for ingest-common processors ([#14439](https://github.com/opensearch-project/OpenSearch/issues/14439))

### Dependencies
- Bump `org.gradle.test-retry` from 1.5.8 to 1.5.9 ([#13442](https://github.com/opensearch-project/OpenSearch/pull/13442))
Expand Down Expand Up @@ -46,6 +47,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix fs info reporting negative available size ([#11573](https://github.com/opensearch-project/OpenSearch/pull/11573))
- Add ListPitInfo::getKeepAlive() getter ([#14495](https://github.com/opensearch-project/OpenSearch/pull/14495))
- Fix FuzzyQuery in keyword field will use IndexOrDocValuesQuery when both of index and doc_value are true ([#14378](https://github.com/opensearch-project/OpenSearch/pull/14378))
- Fix file cache initialization ([#14004](https://github.com/opensearch-project/OpenSearch/pull/14004))
- Handle NPE in GetResult if "found" field is missing ([#14552](https://github.com/opensearch-project/OpenSearch/pull/14552))

### Security
Expand Down
1 change: 1 addition & 0 deletions MAINTAINERS.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ This document contains a list of maintainers in this repo. See [opensearch-proje
| Anas Alkouz | [anasalkouz](https://github.com/anasalkouz) | Amazon |
| Andrew Ross | [andrross](https://github.com/andrross) | Amazon |
| Andriy Redko | [reta](https://github.com/reta) | Aiven |
| Ashish Singh | [ashking94](https://github.com/ashking94) | Amazon |
| Bukhtawar Khan | [Bukhtawar](https://github.com/Bukhtawar) | Amazon |
| Charlotte Henkle | [CEHENKLE](https://github.com/CEHENKLE) | Amazon |
| Dan Widdis | [dbwiddis](https://github.com/dbwiddis) | Amazon |
Expand Down
1 change: 1 addition & 0 deletions libs/core/src/main/java/org/opensearch/Version.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ public class Version implements Comparable<Version>, ToXContentFragment {
public static final Version V_2_14_0 = new Version(2140099, org.apache.lucene.util.Version.LUCENE_9_10_0);
public static final Version V_2_14_1 = new Version(2140199, org.apache.lucene.util.Version.LUCENE_9_10_0);
public static final Version V_2_15_0 = new Version(2150099, org.apache.lucene.util.Version.LUCENE_9_10_0);
public static final Version V_2_15_1 = new Version(2150199, org.apache.lucene.util.Version.LUCENE_9_10_0);
public static final Version V_2_16_0 = new Version(2160099, org.apache.lucene.util.Version.LUCENE_9_11_0);
public static final Version V_3_0_0 = new Version(3000099, org.apache.lucene.util.Version.LUCENE_9_12_0);
public static final Version CURRENT = V_3_0_0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,15 +195,24 @@ public V computeIfAbsent(ICacheKey<K> key, LoadAwareCacheLoader<ICacheKey<K>, V>
// and it only has to be loaded one time, we should report one miss and the rest hits. But, if we do stats in
// getValueFromTieredCache(),
// we will see all misses. Instead, handle stats in computeIfAbsent().
Tuple<V, String> cacheValueTuple = getValueFromTieredCache(false).apply(key);
Tuple<V, String> cacheValueTuple;
CompletableFuture<Tuple<ICacheKey<K>, V>> future = null;
try (ReleasableLock ignore = readLock.acquire()) {
cacheValueTuple = getValueFromTieredCache(false).apply(key);
if (cacheValueTuple == null) {
// Only one of the threads will succeed putting a future into map for the same key.
// Rest will fetch existing future and wait on that to complete.
future = completableFutureMap.putIfAbsent(key, new CompletableFuture<>());
}
}
List<String> heapDimensionValues = statsHolder.getDimensionsWithTierValue(key.dimensions, TIER_DIMENSION_VALUE_ON_HEAP);
List<String> diskDimensionValues = statsHolder.getDimensionsWithTierValue(key.dimensions, TIER_DIMENSION_VALUE_DISK);

if (cacheValueTuple == null) {
// Add the value to the onHeap cache. We are calling computeIfAbsent which does another get inside.
// This is needed as there can be many requests for the same key at the same time and we only want to load
// the value once.
V value = compute(key, loader);
V value = compute(key, loader, future);
// Handle stats
if (loader.isLoaded()) {
// The value was just computed and added to the cache by this thread. Register a miss for the heap cache, and the disk cache
Expand Down Expand Up @@ -232,10 +241,8 @@ public V computeIfAbsent(ICacheKey<K> key, LoadAwareCacheLoader<ICacheKey<K>, V>
return cacheValueTuple.v1();
}

private V compute(ICacheKey<K> key, LoadAwareCacheLoader<ICacheKey<K>, V> loader) throws Exception {
// Only one of the threads will succeed putting a future into map for the same key.
// Rest will fetch existing future and wait on that to complete.
CompletableFuture<Tuple<ICacheKey<K>, V>> future = completableFutureMap.putIfAbsent(key, new CompletableFuture<>());
private V compute(ICacheKey<K> key, LoadAwareCacheLoader<ICacheKey<K>, V> loader, CompletableFuture<Tuple<ICacheKey<K>, V>> future)
throws Exception {
// Handler to handle results post processing. Takes a tuple<key, value> or exception as an input and returns
// the value. Also before returning value, puts the value in cache.
BiFunction<Tuple<ICacheKey<K>, V>, Throwable, Void> handler = (pair, ex) -> {
Expand All @@ -253,7 +260,7 @@ private V compute(ICacheKey<K> key, LoadAwareCacheLoader<ICacheKey<K>, V> loader
logger.warn("Exception occurred while trying to compute the value", ex);
}
}
completableFutureMap.remove(key); // Remove key from map as not needed anymore.
completableFutureMap.remove(key);// Remove key from map as not needed anymore.
return null;
};
V value = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -760,7 +760,7 @@ public void testInvalidateAll() throws Exception {
}

public void testComputeIfAbsentConcurrently() throws Exception {
int onHeapCacheSize = randomIntBetween(100, 300);
int onHeapCacheSize = randomIntBetween(500, 700);
int diskCacheSize = randomIntBetween(200, 400);
int keyValueSize = 50;

Expand All @@ -782,7 +782,7 @@ public void testComputeIfAbsentConcurrently() throws Exception {
0
);

int numberOfSameKeys = randomIntBetween(10, onHeapCacheSize - 1);
int numberOfSameKeys = randomIntBetween(400, onHeapCacheSize - 1);
ICacheKey<String> key = getICacheKey(UUID.randomUUID().toString());
String value = UUID.randomUUID().toString();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,20 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

public class IngestCommonModulePlugin extends Plugin implements ActionPlugin, IngestPlugin {

static final Setting<List<String>> PROCESSORS_ALLOWLIST_SETTING = Setting.listSetting(
"ingest.common.processors.allowed",
List.of(),
Function.identity(),
Setting.Property.NodeScope
);

static final Setting<TimeValue> WATCHDOG_INTERVAL = Setting.timeSetting(
"ingest.grok.watchdog.interval",
TimeValue.timeValueSeconds(1),
Expand All @@ -77,7 +87,7 @@ public IngestCommonModulePlugin() {}

@Override
public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {
Map<String, Processor.Factory> processors = new HashMap<>();
final Map<String, Processor.Factory> processors = new HashMap<>();
processors.put(DateProcessor.TYPE, new DateProcessor.Factory(parameters.scriptService));
processors.put(SetProcessor.TYPE, new SetProcessor.Factory(parameters.scriptService));
processors.put(AppendProcessor.TYPE, new AppendProcessor.Factory(parameters.scriptService));
Expand Down Expand Up @@ -110,7 +120,7 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
processors.put(RemoveByPatternProcessor.TYPE, new RemoveByPatternProcessor.Factory());
processors.put(CommunityIdProcessor.TYPE, new CommunityIdProcessor.Factory());
processors.put(FingerprintProcessor.TYPE, new FingerprintProcessor.Factory());
return Collections.unmodifiableMap(processors);
return filterForAllowlistSetting(parameters.env.settings(), processors);
}

@Override
Expand All @@ -133,7 +143,7 @@ public List<RestHandler> getRestHandlers(

@Override
public List<Setting<?>> getSettings() {
return Arrays.asList(WATCHDOG_INTERVAL, WATCHDOG_MAX_EXECUTION_TIME);
return Arrays.asList(WATCHDOG_INTERVAL, WATCHDOG_MAX_EXECUTION_TIME, PROCESSORS_ALLOWLIST_SETTING);
}

private static MatcherWatchdog createGrokThreadWatchdog(Processor.Parameters parameters) {
Expand All @@ -147,4 +157,27 @@ private static MatcherWatchdog createGrokThreadWatchdog(Processor.Parameters par
);
}

private Map<String, Processor.Factory> filterForAllowlistSetting(Settings settings, Map<String, Processor.Factory> map) {
if (PROCESSORS_ALLOWLIST_SETTING.exists(settings) == false) {
return Map.copyOf(map);
}
final Set<String> allowlist = Set.copyOf(PROCESSORS_ALLOWLIST_SETTING.get(settings));
// Assert that no unknown processors are defined in the allowlist
final Set<String> unknownAllowlistProcessors = allowlist.stream()
.filter(p -> map.containsKey(p) == false)
.collect(Collectors.toSet());
if (unknownAllowlistProcessors.isEmpty() == false) {
throw new IllegalArgumentException(
"Processor(s) "
+ unknownAllowlistProcessors
+ " were defined in ["
+ PROCESSORS_ALLOWLIST_SETTING.getKey()
+ "] but do not exist"
);
}
return map.entrySet()
.stream()
.filter(e -> allowlist.contains(e.getKey()))
.collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.ingest.common;

import org.opensearch.common.settings.Settings;
import org.opensearch.env.TestEnvironment;
import org.opensearch.ingest.Processor;
import org.opensearch.test.OpenSearchTestCase;

import java.io.IOException;
import java.util.List;
import java.util.Set;

public class IngestCommonModulePluginTests extends OpenSearchTestCase {

public void testAllowlist() throws IOException {
runAllowlistTest(List.of());
runAllowlistTest(List.of("date"));
runAllowlistTest(List.of("set"));
runAllowlistTest(List.of("copy", "date"));
runAllowlistTest(List.of("date", "set", "copy"));
}

private void runAllowlistTest(List<String> allowlist) throws IOException {
final Settings settings = Settings.builder()
.putList(IngestCommonModulePlugin.PROCESSORS_ALLOWLIST_SETTING.getKey(), allowlist)
.build();
try (IngestCommonModulePlugin plugin = new IngestCommonModulePlugin()) {
assertEquals(Set.copyOf(allowlist), plugin.getProcessors(createParameters(settings)).keySet());
}
}

public void testAllowlistNotSpecified() throws IOException {
final Settings.Builder builder = Settings.builder();
builder.remove(IngestCommonModulePlugin.PROCESSORS_ALLOWLIST_SETTING.getKey());
final Settings settings = builder.build();
try (IngestCommonModulePlugin plugin = new IngestCommonModulePlugin()) {
final Set<String> expected = Set.of(
"append",
"urldecode",
"sort",
"fail",
"trim",
"set",
"fingerprint",
"pipeline",
"json",
"join",
"kv",
"bytes",
"date",
"drop",
"community_id",
"lowercase",
"convert",
"copy",
"gsub",
"dot_expander",
"rename",
"remove_by_pattern",
"html_strip",
"remove",
"csv",
"grok",
"date_index_name",
"foreach",
"script",
"dissect",
"uppercase",
"split"
);
assertEquals(expected, plugin.getProcessors(createParameters(settings)).keySet());
}
}

public void testAllowlistHasNonexistentProcessors() throws IOException {
final Settings settings = Settings.builder()
.putList(IngestCommonModulePlugin.PROCESSORS_ALLOWLIST_SETTING.getKey(), List.of("threeve"))
.build();
try (IngestCommonModulePlugin plugin = new IngestCommonModulePlugin()) {
IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
() -> plugin.getProcessors(createParameters(settings))
);
assertTrue(e.getMessage(), e.getMessage().contains("threeve"));
}
}

private static Processor.Parameters createParameters(Settings settings) {
return new Processor.Parameters(
TestEnvironment.newEnvironment(Settings.builder().put(settings).put("path.home", "").build()),
null,
null,
null,
() -> 0L,
(a, b) -> null,
null,
null,
$ -> {},
null
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@
import org.opensearch.cluster.block.ClusterBlockException;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodeRole;
import org.opensearch.cluster.routing.GroupShardsIterator;
import org.opensearch.cluster.routing.ShardIterator;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.opensearch.common.Priority;
import org.opensearch.common.io.PathUtils;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.SettingsException;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.index.Index;
Expand Down Expand Up @@ -65,10 +67,13 @@
import java.util.stream.StreamSupport;

import static org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest.Metric.FS;
import static org.opensearch.common.util.FeatureFlags.TIERED_REMOTE_INDEX;
import static org.opensearch.core.common.util.CollectionUtils.iterableAsArrayList;
import static org.opensearch.index.store.remote.filecache.FileCacheSettings.DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING;
import static org.opensearch.test.NodeRoles.clusterManagerOnlyNode;
import static org.opensearch.test.NodeRoles.dataNode;
import static org.opensearch.test.NodeRoles.onlyRole;
import static org.opensearch.test.NodeRoles.onlyRoles;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsString;
Expand Down Expand Up @@ -1009,6 +1014,26 @@ public void cleanup() throws Exception {
);
}

public void testStartSearchNode() throws Exception {
// test start dedicated search node
internalCluster().startNode(Settings.builder().put(onlyRole(DiscoveryNodeRole.SEARCH_ROLE)));
// test start node without search role
internalCluster().startNode(Settings.builder().put(onlyRole(DiscoveryNodeRole.DATA_ROLE)));
// test start non-dedicated search node with TIERED_REMOTE_INDEX feature enabled
internalCluster().startNode(
Settings.builder()
.put(onlyRoles(Set.of(DiscoveryNodeRole.SEARCH_ROLE, DiscoveryNodeRole.DATA_ROLE)))
.put(TIERED_REMOTE_INDEX, true)
);
// test start non-dedicated search node
assertThrows(
SettingsException.class,
() -> internalCluster().startNode(
Settings.builder().put(onlyRoles(Set.of(DiscoveryNodeRole.SEARCH_ROLE, DiscoveryNodeRole.DATA_ROLE)))
)
);
}

private void assertSearchableSnapshotIndexDirectoryExistence(String nodeName, Index index, boolean exists) throws Exception {
final Node node = internalCluster().getInstance(Node.class, nodeName);
final ShardId shardId = new ShardId(index, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,10 @@ public static boolean isSearchNode(Settings settings) {
return hasRole(settings, DiscoveryNodeRole.SEARCH_ROLE);
}

public static boolean isDedicatedSearchNode(Settings settings) {
return getRolesFromSettings(settings).stream().allMatch(DiscoveryNodeRole.SEARCH_ROLE::equals);
}

private final String nodeName;
private final String nodeId;
private final String ephemeralId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,15 @@ private static final int ceilingNextPowerOfTwo(int x) {
private final Weigher<V> weigher;

public SegmentedCache(Builder<K, V> builder) {
this.capacity = builder.capacity;
final int segments = ceilingNextPowerOfTwo(builder.concurrencyLevel);
this.segmentMask = segments - 1;
this.table = newSegmentArray(segments);
this.perSegmentCapacity = (capacity + (segments - 1)) / segments;
this.perSegmentCapacity = (builder.capacity + (segments - 1)) / segments;
this.weigher = builder.weigher;
for (int i = 0; i < table.length; i++) {
table[i] = new LRUCache<>(perSegmentCapacity, builder.listener, builder.weigher);
}
this.capacity = perSegmentCapacity * segments;
}

@SuppressWarnings("unchecked")
Expand Down
Loading

0 comments on commit ab69cb4

Please sign in to comment.