From 979d4a5392543e861114afe1e0415eb2ebb8b4c6 Mon Sep 17 00:00:00 2001 From: Walker Carlson <18128741+wcarlson5@users.noreply.github.com> Date: Tue, 12 Apr 2022 10:09:34 -0500 Subject: [PATCH] fix: shared runtimes calculate cache size for validation properly (#8923) * fix: shared runtimes calculate cache size for validation properly --- .../query/KafkaStreamsQueryValidator.java | 48 +++++++++++++++++-- .../query/KafkaStreamsQueryValidatorTest.java | 47 +++++++++++++++++- 2 files changed, 89 insertions(+), 6 deletions(-) diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/query/KafkaStreamsQueryValidator.java b/ksqldb-engine/src/main/java/io/confluent/ksql/query/KafkaStreamsQueryValidator.java index b47d28a62405..f38525bede1d 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/query/KafkaStreamsQueryValidator.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/query/KafkaStreamsQueryValidator.java @@ -16,6 +16,7 @@ import io.confluent.ksql.config.SessionConfig; import io.confluent.ksql.execution.ExecutionPlan; +import io.confluent.ksql.util.BinPackedPersistentQueryMetadataImpl; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.PersistentQueryMetadata; @@ -23,11 +24,17 @@ import io.confluent.ksql.util.TransientQueryMetadata; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; import org.apache.kafka.streams.StreamsConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class KafkaStreamsQueryValidator implements QueryValidator { + private static final Logger LOG = LoggerFactory.getLogger(KafkaStreamsQueryValidator.class); + @Override public void validateQuery( final SessionConfig config, @@ -69,10 +76,43 @@ private void validateCacheBytesUsage( return; } final long configured = getCacheMaxBytesBuffering(config); - final long usedByRunning = running.stream() - .mapToLong(r -> new StreamsConfig(r.getStreamsProperties()) - .getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG)) - .sum(); + long usedByRunning; + if (!config.getConfig(true).getBoolean(KsqlConfig.KSQL_SHARED_RUNTIME_ENABLED)) { + usedByRunning = running.stream() + .mapToLong(r -> new StreamsConfig(r.getStreamsProperties()) + .getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG)) + .sum(); + } else { + usedByRunning = running.stream() + .filter(t -> !(t instanceof BinPackedPersistentQueryMetadataImpl)) + .mapToLong(r -> new StreamsConfig(r.getStreamsProperties()) + .getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG)) + .sum(); + final Set runtimes = new HashSet<>(); + long cacheSizeBytesPerRuntime = -1L; + for (final QueryMetadata queryMetadata : running) { + if (queryMetadata instanceof BinPackedPersistentQueryMetadataImpl) { + if (cacheSizeBytesPerRuntime == -1L) { + cacheSizeBytesPerRuntime = (long) queryMetadata.getStreamsProperties() + .get(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG); + } + if (!runtimes.contains(queryMetadata.getQueryApplicationId())) { + runtimes.add(queryMetadata.getQueryApplicationId()); + usedByRunning += (long) queryMetadata.getStreamsProperties() + .get(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG); + } else { + if (cacheSizeBytesPerRuntime == (long) queryMetadata.getStreamsProperties() + .get(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG)) { + LOG.warn("Inconsistent " + + StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG + + " in shared runtimes {} and {}", cacheSizeBytesPerRuntime, + queryMetadata.getStreamsProperties() + .get(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG)); + } + } + } + } + } if (configured + usedByRunning > limit) { throw new KsqlException(String.format( "Configured cache usage (cache.max.bytes.buffering=%d) would put usage over the " diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/query/KafkaStreamsQueryValidatorTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/query/KafkaStreamsQueryValidatorTest.java index 351304866503..21f94abefdd5 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/query/KafkaStreamsQueryValidatorTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/query/KafkaStreamsQueryValidatorTest.java @@ -20,6 +20,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import io.confluent.ksql.config.SessionConfig; +import io.confluent.ksql.util.BinPackedPersistentQueryMetadataImpl; import io.confluent.ksql.execution.ExecutionPlan; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; @@ -52,6 +53,10 @@ public class KafkaStreamsQueryValidatorTest { @Mock private PersistentQueryMetadata persistentQueryMetadata2; @Mock + private BinPackedPersistentQueryMetadataImpl binPackedPersistentQueryMetadata; + @Mock + private BinPackedPersistentQueryMetadataImpl binPackedPersistentQueryMetadata2; + @Mock private ExecutionPlan plan; private Collection queries; @@ -64,11 +69,17 @@ public void setup() { when(transientQueryMetadata2.getStreamsProperties()).thenReturn(streamPropsWithCacheLimit(20)); when(persistentQueryMetadata1.getStreamsProperties()).thenReturn(streamPropsWithCacheLimit(10)); when(persistentQueryMetadata2.getStreamsProperties()).thenReturn(streamPropsWithCacheLimit(20)); + when(binPackedPersistentQueryMetadata.getQueryApplicationId()).thenReturn("runtime 1"); + when(binPackedPersistentQueryMetadata2.getQueryApplicationId()).thenReturn("runtime 1"); + when(binPackedPersistentQueryMetadata.getStreamsProperties()).thenReturn(streamPropsWithCacheLimit(10)); + when(binPackedPersistentQueryMetadata2.getStreamsProperties()).thenReturn(streamPropsWithCacheLimit(10)); queries = ImmutableList.of( transientQueryMetadata1, transientQueryMetadata2, persistentQueryMetadata1, - persistentQueryMetadata2 + persistentQueryMetadata2, + binPackedPersistentQueryMetadata, + binPackedPersistentQueryMetadata2 ); } @@ -84,7 +95,7 @@ public void shouldLimitBufferCacheUsage() { @Test public void shouldNotThrowIfUnderLimit() { // Given: - final SessionConfig config = configWithLimits(5, OptionalLong.of(40)); + final SessionConfig config = configWithLimits(5, OptionalLong.of(60)); // When/Then (no throw) queryValidator.validateQuery(config, plan, queries); @@ -138,6 +149,38 @@ public void shouldIgnoreGlobalLimitSetInOverrides() { ); } + @Test + public void shouldValidateSharedRuntimes() { + // Given: + final SessionConfig config = SessionConfig.of( + new KsqlConfig(ImmutableMap.of(KsqlConfig.KSQL_TOTAL_CACHE_MAX_BYTES_BUFFERING, 50, + KsqlConfig.KSQL_SHARED_RUNTIME_ENABLED, true)), + ImmutableMap.of(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10) + ); + + // When/Then: + queryValidator.validateQuery(config, plan, queries); + } + + @Test + public void shouldNotValidateSharedRuntimesWhenCreatingAnewRuntimeWouldGoOverTheLimit() { + // Given: + final SessionConfig config = SessionConfig.of( + new KsqlConfig(ImmutableMap.of(KsqlConfig.KSQL_TOTAL_CACHE_MAX_BYTES_BUFFERING, 50, + KsqlConfig.KSQL_SHARED_RUNTIME_ENABLED, true)), + ImmutableMap.of( + StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 50, + KsqlConfig.KSQL_TOTAL_CACHE_MAX_BYTES_BUFFERING, 500 + ) + ); + + // When/Then: + assertThrows( + KsqlException.class, + () -> queryValidator.validateQuery(config, plan, queries) + ); + } + private SessionConfig configWithLimits( final long queryLimit, final OptionalLong globalLimit