Skip to content

Commit

Permalink
fix(homepage): make entity counts execute in parallel and make cache …
Browse files Browse the repository at this point in the history
…configurable (datahub-project#7249)
  • Loading branch information
RyanHolstien authored and looppi committed Feb 15, 2023
1 parent 74c18a9 commit 2b3c8fb
Show file tree
Hide file tree
Showing 16 changed files with 125 additions and 27 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.linkedin.metadata.config;

import lombok.Data;


@Data
public class CacheConfiguration {
PrimaryCacheConfiguration primary;
HomepageCacheConfiguration homepage;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.linkedin.metadata.config;

import lombok.Data;


@Data
public class EntityDocCountCacheConfiguration {
long ttlSeconds;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.linkedin.metadata.config;

import lombok.Data;


@Data
public class HomepageCacheConfiguration {
EntityDocCountCacheConfiguration entityCounts;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.linkedin.metadata.config;

import lombok.Data;


@Data
public class PrimaryCacheConfiguration {
long ttlSeconds;
long maxSize;
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.codahale.metrics.Timer;
import com.linkedin.data.template.GetMode;
import com.linkedin.data.template.LongMap;
import com.linkedin.metadata.config.EntityDocCountCacheConfiguration;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.query.SearchFlags;
import com.linkedin.metadata.query.filter.Filter;
Expand Down Expand Up @@ -54,11 +55,12 @@ public AllEntitiesSearchAggregator(
EntityRegistry entityRegistry,
EntitySearchService entitySearchService,
CachingEntitySearchService cachingEntitySearchService,
SearchRanker searchRanker) {
SearchRanker searchRanker,
EntityDocCountCacheConfiguration entityDocCountCacheConfiguration) {
_entitySearchService = Objects.requireNonNull(entitySearchService);
_searchRanker = Objects.requireNonNull(searchRanker);
_cachingEntitySearchService = Objects.requireNonNull(cachingEntitySearchService);
_entityDocCountCache = new EntityDocCountCache(entityRegistry, entitySearchService);
_entityDocCountCache = new EntityDocCountCache(entityRegistry, entitySearchService, entityDocCountCacheConfiguration);
_maxAggregationValueCount = DEFAULT_MAX_AGGREGATION_VALUES; // TODO: Make this externally configurable
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package com.linkedin.metadata.search.cache;

import com.google.common.base.Suppliers;
import com.linkedin.metadata.config.EntityDocCountCacheConfiguration;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.search.EntitySearchService;
import com.linkedin.metadata.utils.ConcurrencyUtils;
import io.opentelemetry.extension.annotations.WithSpan;
import java.util.List;
import java.util.Map;
Expand All @@ -17,17 +19,18 @@ public class EntityDocCountCache {
private final EntitySearchService _entitySearchService;
private final Supplier<Map<String, Long>> entityDocCount;

public EntityDocCountCache(EntityRegistry entityRegistry, EntitySearchService entitySearchService) {
public EntityDocCountCache(EntityRegistry entityRegistry, EntitySearchService entitySearchService,
EntityDocCountCacheConfiguration config) {
_entityRegistry = entityRegistry;
_entitySearchService = entitySearchService;
entityDocCount = Suppliers.memoizeWithExpiration(this::fetchEntityDocCount, 1, TimeUnit.MINUTES);
entityDocCount = Suppliers.memoizeWithExpiration(this::fetchEntityDocCount, config.getTtlSeconds(), TimeUnit.SECONDS);
}

private Map<String, Long> fetchEntityDocCount() {
return _entityRegistry.getEntitySpecs()
.keySet()
.stream()
.collect(Collectors.toMap(Function.identity(), _entitySearchService::docCount));
return ConcurrencyUtils
.transformAndCollectAsync(_entityRegistry.getEntitySpecs().keySet(),
Function.identity(),
Collectors.toMap(Function.identity(), _entitySearchService::docCount));
}

@WithSpan
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.linkedin.entity.client.EntityClient;
import com.linkedin.metadata.client.JavaEntityClient;
import com.linkedin.metadata.config.ElasticSearchConfiguration;
import com.linkedin.metadata.config.EntityDocCountCacheConfiguration;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.search.SearchService;
Expand Down Expand Up @@ -102,9 +103,11 @@ protected SearchService searchService(
int batchSize = 100;
SearchRanker<Double> ranker = new SimpleRanker();
CacheManager cacheManager = new ConcurrentMapCacheManager();
EntityDocCountCacheConfiguration entityDocCountCacheConfiguration = new EntityDocCountCacheConfiguration();
entityDocCountCacheConfiguration.setTtlSeconds(600L);

SearchService service = new SearchService(
new EntityDocCountCache(entityRegistry, entitySearchService),
new EntityDocCountCache(entityRegistry, entitySearchService, entityDocCountCacheConfiguration),
new CachingEntitySearchService(
cacheManager,
entitySearchService,
Expand All @@ -122,7 +125,8 @@ protected SearchService searchService(
batchSize,
false
),
ranker
ranker,
entityDocCountCacheConfiguration
),
batchSize,
false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.linkedin.entity.client.EntityClient;
import com.linkedin.metadata.client.JavaEntityClient;
import com.linkedin.metadata.config.ElasticSearchConfiguration;
import com.linkedin.metadata.config.EntityDocCountCacheConfiguration;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.graph.elastic.ESGraphQueryDAO;
import com.linkedin.metadata.graph.elastic.ESGraphWriteDAO;
Expand Down Expand Up @@ -149,9 +150,11 @@ protected SearchService searchService(
int batchSize = 100;
SearchRanker<Double> ranker = new SimpleRanker();
CacheManager cacheManager = new ConcurrentMapCacheManager();
EntityDocCountCacheConfiguration entityDocCountCacheConfiguration = new EntityDocCountCacheConfiguration();
entityDocCountCacheConfiguration.setTtlSeconds(600L);

SearchService service = new SearchService(
new EntityDocCountCache(entityRegistry, entitySearchService),
new EntityDocCountCache(entityRegistry, entitySearchService, entityDocCountCacheConfiguration),
new CachingEntitySearchService(
cacheManager,
entitySearchService,
Expand All @@ -169,7 +172,8 @@ protected SearchService searchService(
batchSize,
false
),
ranker
ranker,
entityDocCountCacheConfiguration
),
batchSize,
false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.linkedin.data.schema.annotation.PathSpecBasedSchemaAnnotationVisitor;
import com.linkedin.metadata.ESTestConfiguration;
import com.linkedin.metadata.TestEntityUtil;
import com.linkedin.metadata.config.EntityDocCountCacheConfiguration;
import com.linkedin.metadata.graph.EntityLineageResult;
import com.linkedin.metadata.graph.GraphService;
import com.linkedin.metadata.graph.LineageDirection;
Expand Down Expand Up @@ -91,13 +92,16 @@ public void setup() {

private void resetService(boolean withCache) {
CachingEntitySearchService cachingEntitySearchService = new CachingEntitySearchService(_cacheManager, _elasticSearchService, 100, true);
EntityDocCountCacheConfiguration entityDocCountCacheConfiguration = new EntityDocCountCacheConfiguration();
entityDocCountCacheConfiguration.setTtlSeconds(600L);
_lineageSearchService = new LineageSearchService(
new SearchService(
new EntityDocCountCache(_entityRegistry, _elasticSearchService),
new EntityDocCountCache(_entityRegistry, _elasticSearchService, entityDocCountCacheConfiguration),
cachingEntitySearchService,
new CachingAllEntitiesSearchAggregator(
_cacheManager,
new AllEntitiesSearchAggregator(_entityRegistry, _elasticSearchService, cachingEntitySearchService, new SimpleRanker()),
new AllEntitiesSearchAggregator(_entityRegistry, _elasticSearchService, cachingEntitySearchService,
new SimpleRanker(), entityDocCountCacheConfiguration),
100,
true),
new SimpleRanker()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.StringArray;
import com.linkedin.metadata.ESTestConfiguration;
import com.linkedin.metadata.config.EntityDocCountCacheConfiguration;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.models.registry.SnapshotEntityRegistry;
import com.linkedin.metadata.query.SearchFlags;
Expand Down Expand Up @@ -82,16 +83,19 @@ private void resetSearchService() {
_elasticSearchService,
100,
true);

EntityDocCountCacheConfiguration entityDocCountCacheConfiguration = new EntityDocCountCacheConfiguration();
entityDocCountCacheConfiguration.setTtlSeconds(600L);
_searchService = new SearchService(
new EntityDocCountCache(_entityRegistry, _elasticSearchService),
new EntityDocCountCache(_entityRegistry, _elasticSearchService, entityDocCountCacheConfiguration),
cachingEntitySearchService,
new CachingAllEntitiesSearchAggregator(
_cacheManager,
new AllEntitiesSearchAggregator(
_entityRegistry,
_elasticSearchService,
cachingEntitySearchService,
new SimpleRanker()),
new SimpleRanker(), entityDocCountCacheConfiguration),
100,
true),
new SimpleRanker());
Expand Down Expand Up @@ -161,6 +165,9 @@ public void testSearchService() throws Exception {
assertEquals(searchResult.getEntities().get(0).getEntity(), urn2);
clearCache();

long docCount = _elasticSearchService.docCount(ENTITY_NAME);
assertEquals(docCount, 2L);

_elasticSearchService.deleteDocument(ENTITY_NAME, urn.toString());
_elasticSearchService.deleteDocument(ENTITY_NAME, urn2.toString());
syncAfterWrite(_bulkProcessor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
@Configuration
public class CacheConfig {

@Value("${CACHE_TTL_SECONDS:600}")
@Value("${cache.primary.ttlSeconds:600}")
private int cacheTtlSeconds;

@Value("${CACHE_MAX_SIZE:10000}")
@Value("${cache.primary.maxSize:10000}")
private int cacheMaxSize;

@Value("${searchService.cache.hazelcast.serviceName:hazelcast-service}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.datahub.authentication.AuthenticationConfiguration;
import com.datahub.authorization.AuthorizationConfiguration;
import com.linkedin.datahub.graphql.featureflags.FeatureFlags;
import com.linkedin.metadata.config.CacheConfiguration;
import com.linkedin.metadata.config.DataHubConfiguration;
import com.linkedin.metadata.config.ElasticSearchConfiguration;
import com.linkedin.metadata.config.IngestionConfiguration;
Expand Down Expand Up @@ -69,4 +70,9 @@ public class ConfigurationProvider {
* System Update configurations
*/
private SystemUpdateConfiguration systemUpdate;

/**
* Configuration for caching
*/
private CacheConfiguration cache;
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.gms.factory.search;

import com.linkedin.gms.factory.config.ConfigurationProvider;
import com.linkedin.gms.factory.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.search.EntitySearchService;
Expand Down Expand Up @@ -38,11 +39,12 @@ public class AllEntitiesSearchAggregatorFactory {
@Bean(name = "allEntitiesSearchAggregator")
@Primary
@Nonnull
protected AllEntitiesSearchAggregator getInstance() {
protected AllEntitiesSearchAggregator getInstance(ConfigurationProvider configurationProvider) {
return new AllEntitiesSearchAggregator(
entityRegistry,
entitySearchService,
cachingEntitySearchService,
searchRanker);
searchRanker,
configurationProvider.getCache().getHomepage().getEntityCounts());
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.gms.factory.search;

import com.linkedin.gms.factory.config.ConfigurationProvider;
import com.linkedin.gms.factory.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.search.EntitySearchService;
Expand Down Expand Up @@ -44,9 +45,10 @@ public class SearchServiceFactory {
@Bean(name = "searchService")
@Primary
@Nonnull
protected SearchService getInstance() {
protected SearchService getInstance(ConfigurationProvider configurationProvider) {
return new SearchService(
new EntityDocCountCache(entityRegistry, entitySearchService),
new EntityDocCountCache(entityRegistry, entitySearchService, configurationProvider.getCache()
.getHomepage().getEntityCounts()),
cachingEntitySearchService,
cachingAllEntitiesSearchAggregator,
searchRanker);
Expand Down
8 changes: 8 additions & 0 deletions metadata-service/factories/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -255,3 +255,11 @@ entityClient:
usageClient:
retryInterval: ${USAGE_CLIENT_RETRY_INTERVAL:2}
numRetries: ${USAGE_CLIENT_NUM_RETRIES:3}

cache:
primary:
ttlSeconds: ${CACHE_TTL_SECONDS:600}
maxSize: ${CACHE_MAX_SIZE:10000}
homepage:
entityCounts:
ttlSeconds: ${CACHE_ENTITY_COUNTS_TTL_SECONDS:600}
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package com.linkedin.metadata.utils;

import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -15,32 +17,48 @@ public class ConcurrencyUtils {
private ConcurrencyUtils() {
}

public static <O, T> List<T> transformAndCollectAsync(List<O> originalList, Function<O, T> transformer) {
return transformAndCollectAsync(originalList, transformer, Collectors.toList());
}

/**
* Transforms original list into the final list using the function transformer in an asynchronous fashion
* i.e. each element transform is run as a separate CompleteableFuture and then joined at the end
*/
public static <O, T> List<T> transformAndCollectAsync(List<O> originalList, Function<O, T> transformer) {
return originalList.stream()
public static <O, T, OUTPUT> OUTPUT transformAndCollectAsync(Collection<O> originalCollection,
Function<O, T> transformer, Collector<T, ?, OUTPUT> collector) {
return originalCollection.stream()
.map(element -> CompletableFuture.supplyAsync(() -> transformer.apply(element)))
.collect(Collectors.collectingAndThen(Collectors.toList(),
completableFutureList -> completableFutureList.stream().map(CompletableFuture::join)))
.collect(Collectors.toList());
.collect(collector);
}


/**
* Transforms original list into the final list using the function transformer in an asynchronous fashion
* with exceptions handled by the input exceptionHandler
* i.e. each element transform is run as a separate CompleteableFuture and then joined at the end
*/
public static <O, T> List<T> transformAndCollectAsync(List<O> originalList, Function<O, T> transformer,
BiFunction<O, Throwable, ? extends T> exceptionHandler) {
return originalList.stream()
return transformAndCollectAsync(originalList, transformer, exceptionHandler, Collectors.toList());
}

/**
* Transforms original list into the final list using the function transformer in an asynchronous fashion
* with exceptions handled by the input exceptionHandler
* i.e. each element transform is run as a separate CompleteableFuture and then joined at the end
*/
public static <O, T, OUTPUT> OUTPUT transformAndCollectAsync(Collection<O> originalCollection,
Function<O, T> transformer, BiFunction<O, Throwable, ? extends T> exceptionHandler, Collector<T, ?, OUTPUT> collector) {
return originalCollection.stream()
.map(element -> CompletableFuture.supplyAsync(() -> transformer.apply(element))
.exceptionally(e -> exceptionHandler.apply(element, e)))
.filter(Objects::nonNull)
.collect(Collectors.collectingAndThen(Collectors.toList(),
completableFutureList -> completableFutureList.stream().map(CompletableFuture::join)))
.collect(Collectors.toList());
.collect(collector);
}

/**
Expand Down

0 comments on commit 2b3c8fb

Please sign in to comment.