Skip to content

Commit

Permalink
fix: shared runtimes calculate cache size for validation properly (#8923
Browse files Browse the repository at this point in the history
)

* fix: shared runtimes calculate cache size for validation properly
  • Loading branch information
wcarlson5 authored Apr 12, 2022
1 parent fac288a commit 979d4a5
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,25 @@

import io.confluent.ksql.config.SessionConfig;
import io.confluent.ksql.execution.ExecutionPlan;
import io.confluent.ksql.util.BinPackedPersistentQueryMetadataImpl;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.QueryMetadata;
import io.confluent.ksql.util.TransientQueryMetadata;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.streams.StreamsConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaStreamsQueryValidator implements QueryValidator {
private static final Logger LOG = LoggerFactory.getLogger(KafkaStreamsQueryValidator.class);

@Override
public void validateQuery(
final SessionConfig config,
Expand Down Expand Up @@ -69,10 +76,43 @@ private void validateCacheBytesUsage(
return;
}
final long configured = getCacheMaxBytesBuffering(config);
final long usedByRunning = running.stream()
.mapToLong(r -> new StreamsConfig(r.getStreamsProperties())
.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG))
.sum();
long usedByRunning;
if (!config.getConfig(true).getBoolean(KsqlConfig.KSQL_SHARED_RUNTIME_ENABLED)) {
usedByRunning = running.stream()
.mapToLong(r -> new StreamsConfig(r.getStreamsProperties())
.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG))
.sum();
} else {
usedByRunning = running.stream()
.filter(t -> !(t instanceof BinPackedPersistentQueryMetadataImpl))
.mapToLong(r -> new StreamsConfig(r.getStreamsProperties())
.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG))
.sum();
final Set<String> runtimes = new HashSet<>();
long cacheSizeBytesPerRuntime = -1L;
for (final QueryMetadata queryMetadata : running) {
if (queryMetadata instanceof BinPackedPersistentQueryMetadataImpl) {
if (cacheSizeBytesPerRuntime == -1L) {
cacheSizeBytesPerRuntime = (long) queryMetadata.getStreamsProperties()
.get(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG);
}
if (!runtimes.contains(queryMetadata.getQueryApplicationId())) {
runtimes.add(queryMetadata.getQueryApplicationId());
usedByRunning += (long) queryMetadata.getStreamsProperties()
.get(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG);
} else {
if (cacheSizeBytesPerRuntime == (long) queryMetadata.getStreamsProperties()
.get(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG)) {
LOG.warn("Inconsistent "
+ StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG
+ " in shared runtimes {} and {}", cacheSizeBytesPerRuntime,
queryMetadata.getStreamsProperties()
.get(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG));
}
}
}
}
}
if (configured + usedByRunning > limit) {
throw new KsqlException(String.format(
"Configured cache usage (cache.max.bytes.buffering=%d) would put usage over the "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.config.SessionConfig;
import io.confluent.ksql.util.BinPackedPersistentQueryMetadataImpl;
import io.confluent.ksql.execution.ExecutionPlan;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
Expand Down Expand Up @@ -52,6 +53,10 @@ public class KafkaStreamsQueryValidatorTest {
@Mock
private PersistentQueryMetadata persistentQueryMetadata2;
@Mock
private BinPackedPersistentQueryMetadataImpl binPackedPersistentQueryMetadata;
@Mock
private BinPackedPersistentQueryMetadataImpl binPackedPersistentQueryMetadata2;
@Mock
private ExecutionPlan plan;

private Collection<QueryMetadata> queries;
Expand All @@ -64,11 +69,17 @@ public void setup() {
when(transientQueryMetadata2.getStreamsProperties()).thenReturn(streamPropsWithCacheLimit(20));
when(persistentQueryMetadata1.getStreamsProperties()).thenReturn(streamPropsWithCacheLimit(10));
when(persistentQueryMetadata2.getStreamsProperties()).thenReturn(streamPropsWithCacheLimit(20));
when(binPackedPersistentQueryMetadata.getQueryApplicationId()).thenReturn("runtime 1");
when(binPackedPersistentQueryMetadata2.getQueryApplicationId()).thenReturn("runtime 1");
when(binPackedPersistentQueryMetadata.getStreamsProperties()).thenReturn(streamPropsWithCacheLimit(10));
when(binPackedPersistentQueryMetadata2.getStreamsProperties()).thenReturn(streamPropsWithCacheLimit(10));
queries = ImmutableList.of(
transientQueryMetadata1,
transientQueryMetadata2,
persistentQueryMetadata1,
persistentQueryMetadata2
persistentQueryMetadata2,
binPackedPersistentQueryMetadata,
binPackedPersistentQueryMetadata2
);
}

Expand All @@ -84,7 +95,7 @@ public void shouldLimitBufferCacheUsage() {
@Test
public void shouldNotThrowIfUnderLimit() {
// Given:
final SessionConfig config = configWithLimits(5, OptionalLong.of(40));
final SessionConfig config = configWithLimits(5, OptionalLong.of(60));

// When/Then (no throw)
queryValidator.validateQuery(config, plan, queries);
Expand Down Expand Up @@ -138,6 +149,38 @@ public void shouldIgnoreGlobalLimitSetInOverrides() {
);
}

@Test
public void shouldValidateSharedRuntimes() {
// Given:
final SessionConfig config = SessionConfig.of(
new KsqlConfig(ImmutableMap.of(KsqlConfig.KSQL_TOTAL_CACHE_MAX_BYTES_BUFFERING, 50,
KsqlConfig.KSQL_SHARED_RUNTIME_ENABLED, true)),
ImmutableMap.of(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10)
);

// When/Then:
queryValidator.validateQuery(config, plan, queries);
}

@Test
public void shouldNotValidateSharedRuntimesWhenCreatingAnewRuntimeWouldGoOverTheLimit() {
// Given:
final SessionConfig config = SessionConfig.of(
new KsqlConfig(ImmutableMap.of(KsqlConfig.KSQL_TOTAL_CACHE_MAX_BYTES_BUFFERING, 50,
KsqlConfig.KSQL_SHARED_RUNTIME_ENABLED, true)),
ImmutableMap.of(
StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 50,
KsqlConfig.KSQL_TOTAL_CACHE_MAX_BYTES_BUFFERING, 500
)
);

// When/Then:
assertThrows(
KsqlException.class,
() -> queryValidator.validateQuery(config, plan, queries)
);
}

private SessionConfig configWithLimits(
final long queryLimit,
final OptionalLong globalLimit
Expand Down

0 comments on commit 979d4a5

Please sign in to comment.