Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into primary_only
Browse files Browse the repository at this point in the history
  • Loading branch information
gaobinlong committed Mar 18, 2024
2 parents b3048fe + b4da802 commit 7c06d62
Show file tree
Hide file tree
Showing 35 changed files with 2,623 additions and 90 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Tiered caching] Make IndicesRequestCache implementation configurable [EXPERIMENTAL] ([#12533](https://github.com/opensearch-project/OpenSearch/pull/12533))
- Add kuromoji_completion analyzer and filter ([#4835](https://github.com/opensearch-project/OpenSearch/issues/4835))
- The org.opensearch.bootstrap.Security should support codebase for JAR files with classifiers ([#12586](https://github.com/opensearch-project/OpenSearch/issues/12586))
- [Metrics Framework] Adds support for asynchronous gauge metric type. ([#12642](https://github.com/opensearch-project/OpenSearch/issues/12642))
- Make search query counters dynamic to support all query types ([#12601](https://github.com/opensearch-project/OpenSearch/pull/12601))
- [Tiered caching] Add policies controlling which values can enter pluggable caches [EXPERIMENTAL] ([#12542](https://github.com/opensearch-project/OpenSearch/pull/12542))
- [Tiered caching] Add Stale keys Management and CacheCleaner to IndicesRequestCache ([#12625](https://github.com/opensearch-project/OpenSearch/pull/12625))

### Dependencies
- Bump `peter-evans/find-comment` from 2 to 3 ([#12288](https://github.com/opensearch-project/OpenSearch/pull/12288))
Expand Down Expand Up @@ -157,7 +160,6 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix `terms` query on `float` field when `doc_values` are turned off by reverting back to `FloatPoint` from `FloatField` ([#12499](https://github.com/opensearch-project/OpenSearch/pull/12499))
- Fix get task API does not refresh resource stats ([#11531](https://github.com/opensearch-project/OpenSearch/pull/11531))
- onShardResult and onShardFailure are executed on one shard causes opensearch jvm crashed ([#12158](https://github.com/opensearch-project/OpenSearch/pull/12158))
- Avoid overflow when sorting missing last on `epoch_millis` datetime field ([#12676](https://github.com/opensearch-project/OpenSearch/pull/12676))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@

package org.opensearch.telemetry.metrics;

import org.opensearch.telemetry.metrics.tags.Tags;

import java.io.Closeable;
import java.io.IOException;
import java.util.function.Supplier;

/**
* Default implementation for {@link MetricsRegistry}
Expand Down Expand Up @@ -39,6 +43,11 @@ public Histogram createHistogram(String name, String description, String unit) {
return metricsTelemetry.createHistogram(name, description, unit);
}

@Override
public Closeable createGauge(String name, String description, String unit, Supplier<Double> valueProvider, Tags tags) {
return metricsTelemetry.createGauge(name, description, unit, valueProvider, tags);
}

@Override
public void close() throws IOException {
metricsTelemetry.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@
package org.opensearch.telemetry.metrics;

import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.telemetry.metrics.tags.Tags;

import java.io.Closeable;
import java.util.function.Supplier;

/**
* MetricsRegistry helps in creating the metric instruments.
Expand Down Expand Up @@ -47,4 +49,18 @@ public interface MetricsRegistry extends Closeable {
* @return histogram.
*/
Histogram createHistogram(String name, String description, String unit);

/**
* Creates the Observable Gauge type of Metric. Where the value provider will be called at a certain frequency
* to capture the value.
*
* @param name name of the observable gauge.
* @param description any description about the metric.
* @param unit unit of the metric.
* @param valueProvider value provider.
* @param tags attributes/dimensions of the metric.
* @return closeable to dispose/close the Gauge metric.
*/
Closeable createGauge(String name, String description, String unit, Supplier<Double> valueProvider, Tags tags);

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@
import org.opensearch.telemetry.metrics.Counter;
import org.opensearch.telemetry.metrics.Histogram;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.telemetry.metrics.tags.Tags;

import java.io.Closeable;
import java.io.IOException;
import java.util.function.Supplier;

/**
*No-op {@link MetricsRegistry}
Expand Down Expand Up @@ -44,6 +47,11 @@ public Histogram createHistogram(String name, String description, String unit) {
return NoopHistogram.INSTANCE;
}

@Override
public Closeable createGauge(String name, String description, String unit, Supplier<Double> valueProvider, Tags tags) {
return () -> {};
}

@Override
public void close() throws IOException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,12 @@

package org.opensearch.telemetry.metrics;

import org.opensearch.telemetry.metrics.tags.Tags;
import org.opensearch.test.OpenSearchTestCase;

import java.io.Closeable;
import java.util.function.Supplier;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -59,4 +63,20 @@ public void testHistogram() {
assertSame(mockHistogram, histogram);
}

@SuppressWarnings("unchecked")
public void testGauge() {
Closeable mockCloseable = mock(Closeable.class);
when(
defaultMeterRegistry.createGauge(any(String.class), any(String.class), any(String.class), any(Supplier.class), any(Tags.class))
).thenReturn(mockCloseable);
Closeable closeable = defaultMeterRegistry.createGauge(
"org.opensearch.telemetry.metrics.DefaultMeterRegistryTests.testObservableGauge",
"test observable gauge",
"ms",
() -> 1.0,
Tags.EMPTY
);
assertSame(mockCloseable, closeable);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.cache.common.policy;

import org.opensearch.common.cache.policy.CachedQueryResult;
import org.opensearch.common.unit.TimeValue;

import java.util.function.Function;
import java.util.function.Predicate;

/**
* A cache tier policy which accepts queries whose took time is greater than some threshold.
* The threshold should be set to approximately the time it takes to get a result from the cache tier.
* The policy accepts values of type V and decodes them into CachedQueryResult.PolicyValues, which has the data needed
* to decide whether to admit the value.
* @param <V> The type of data consumed by test().
*/
public class TookTimePolicy<V> implements Predicate<V> {
/**
* The minimum took time to allow a query. Set to TimeValue.ZERO to let all data through.
*/
private final TimeValue threshold;

/**
* Function which extracts the relevant PolicyValues from a serialized CachedQueryResult
*/
private final Function<V, CachedQueryResult.PolicyValues> cachedResultParser;

/**
* Constructs a took time policy.
* @param threshold the threshold
* @param cachedResultParser the function providing policy values
*/
public TookTimePolicy(TimeValue threshold, Function<V, CachedQueryResult.PolicyValues> cachedResultParser) {
if (threshold.compareTo(TimeValue.ZERO) < 0) {
throw new IllegalArgumentException("Threshold for TookTimePolicy must be >= 0ms but was " + threshold.getStringRep());
}
this.threshold = threshold;
this.cachedResultParser = cachedResultParser;
}

/**
* Check whether to admit data.
* @param data the input argument
* @return whether to admit the data
*/
public boolean test(V data) {
long tookTimeNanos;
try {
tookTimeNanos = cachedResultParser.apply(data).getTookTimeNanos();
} catch (Exception e) {
// If we can't read a CachedQueryResult.PolicyValues from the BytesReference, reject the data
return false;
}

TimeValue tookTime = TimeValue.timeValueNanos(tookTimeNanos);
return tookTime.compareTo(threshold) >= 0;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/** A package for policies controlling what can enter caches. */
package org.opensearch.cache.common.policy;
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,31 @@

package org.opensearch.cache.common.tier;

import org.opensearch.cache.common.policy.TookTimePolicy;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.cache.CacheType;
import org.opensearch.common.cache.ICache;
import org.opensearch.common.cache.LoadAwareCacheLoader;
import org.opensearch.common.cache.RemovalListener;
import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.policy.CachedQueryResult;
import org.opensearch.common.cache.store.config.CacheConfig;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.ReleasableLock;
import org.opensearch.common.util.iterable.Iterables;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.function.Predicate;

/**
* This cache spillover the evicted items from heap tier to disk tier. All the new items are first cached on heap
Expand All @@ -52,6 +57,7 @@ public class TieredSpilloverCache<K, V> implements ICache<K, V> {
* Maintains caching tiers in ascending order of cache latency.
*/
private final List<ICache<K, V>> cacheList;
private final List<Predicate<V>> policies;

TieredSpilloverCache(Builder<K, V> builder) {
Objects.requireNonNull(builder.onHeapCacheFactory, "onHeap cache builder can't be null");
Expand All @@ -63,21 +69,27 @@ public class TieredSpilloverCache<K, V> implements ICache<K, V> {
@Override
public void onRemoval(RemovalNotification<K, V> notification) {
try (ReleasableLock ignore = writeLock.acquire()) {
diskCache.put(notification.getKey(), notification.getValue());
if (evaluatePolicies(notification.getValue())) {
diskCache.put(notification.getKey(), notification.getValue());
}
}
}
})
.setKeyType(builder.cacheConfig.getKeyType())
.setValueType(builder.cacheConfig.getValueType())
.setSettings(builder.cacheConfig.getSettings())
.setWeigher(builder.cacheConfig.getWeigher())
.setMaxSizeInBytes(builder.cacheConfig.getMaxSizeInBytes()) // TODO: Part of a workaround for an issue in TSC. Overall fix
// coming soon
.build(),
builder.cacheType,
builder.cacheFactories

);
this.diskCache = builder.diskCacheFactory.create(builder.cacheConfig, builder.cacheType, builder.cacheFactories);
this.cacheList = Arrays.asList(onHeapCache, diskCache);

this.policies = builder.policies; // Will never be null; builder initializes it to an empty list
}

// Package private for testing
Expand Down Expand Up @@ -192,6 +204,15 @@ private Function<K, V> getValueFromTieredCache() {
};
}

boolean evaluatePolicies(V value) {
for (Predicate<V> policy : policies) {
if (!policy.test(value)) {
return false;
}
}
return true;
}

/**
* Factory to create TieredSpilloverCache objects.
*/
Expand Down Expand Up @@ -231,11 +252,21 @@ public <K, V> ICache<K, V> create(CacheConfig<K, V> config, CacheType cacheType,
);
}
ICache.Factory diskCacheFactory = cacheFactories.get(diskCacheStoreName);

TimeValue diskPolicyThreshold = TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_TOOK_TIME_THRESHOLD
.getConcreteSettingForNamespace(cacheType.getSettingPrefix())
.get(settings);
Function<V, CachedQueryResult.PolicyValues> cachedResultParser = Objects.requireNonNull(
config.getCachedResultParser(),
"Cached result parser fn can't be null"
);

return new Builder<K, V>().setDiskCacheFactory(diskCacheFactory)
.setOnHeapCacheFactory(onHeapCacheFactory)
.setRemovalListener(config.getRemovalListener())
.setCacheConfig(config)
.setCacheType(cacheType)
.addPolicy(new TookTimePolicy<V>(diskPolicyThreshold, cachedResultParser))
.build();
}

Expand All @@ -257,6 +288,7 @@ public static class Builder<K, V> {
private CacheConfig<K, V> cacheConfig;
private CacheType cacheType;
private Map<String, ICache.Factory> cacheFactories;
private final ArrayList<Predicate<V>> policies = new ArrayList<>();

/**
* Default constructor
Expand Down Expand Up @@ -323,6 +355,26 @@ public Builder<K, V> setCacheFactories(Map<String, ICache.Factory> cacheFactorie
return this;
}

/**
* Set a cache policy to be used to limit access to this cache's disk tier.
* @param policy the policy
* @return builder
*/
public Builder<K, V> addPolicy(Predicate<V> policy) {
this.policies.add(policy);
return this;
}

/**
* Set multiple policies to be used to limit access to this cache's disk tier.
* @param policies the policies
* @return builder
*/
public Builder<K, V> addPolicies(List<Predicate<V>> policies) {
this.policies.addAll(policies);
return this;
}

/**
* Build tiered spillover cache.
* @return TieredSpilloverCache
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ public List<Setting<?>> getSettings() {
settingList.add(
TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_STORE_NAME.getConcreteSettingForNamespace(cacheType.getSettingPrefix())
);
settingList.add(
TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_TOOK_TIME_THRESHOLD.getConcreteSettingForNamespace(
cacheType.getSettingPrefix()
)
);
}
return settingList;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
package org.opensearch.cache.common.tier;

import org.opensearch.common.settings.Setting;
import org.opensearch.common.unit.TimeValue;

import java.util.concurrent.TimeUnit;

import static org.opensearch.common.settings.Setting.Property.NodeScope;

Expand Down Expand Up @@ -36,6 +39,21 @@ public class TieredSpilloverCacheSettings {
(key) -> Setting.simpleString(key, "", NodeScope)
);

/**
* Setting defining the minimum took time for a query to be allowed into the disk cache.
*/
public static final Setting.AffixSetting<TimeValue> TIERED_SPILLOVER_DISK_TOOK_TIME_THRESHOLD = Setting.suffixKeySetting(
TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME + ".disk.store.policies.took_time.threshold",
(key) -> Setting.timeSetting(
key,
new TimeValue(10, TimeUnit.MILLISECONDS), // Default value for this setting
TimeValue.ZERO, // Minimum value for this setting
NodeScope
)
);
// 10 ms was chosen as a safe value based on proof of concept, where we saw disk latencies in this range.
// Will be tuned further with future benchmarks.

/**
* Default constructor
*/
Expand Down
Loading

0 comments on commit 7c06d62

Please sign in to comment.