diff --git a/ksql-common/src/main/java/io/confluent/ksql/config/ImmutableProperties.java b/ksql-common/src/main/java/io/confluent/ksql/config/ImmutableProperties.java index 77c3f5409523..ece1b7a08dc4 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/config/ImmutableProperties.java +++ b/ksql-common/src/main/java/io/confluent/ksql/config/ImmutableProperties.java @@ -29,7 +29,7 @@ public final class ImmutableProperties { .add(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG) .add(KsqlConfig.KSQL_EXT_DIR) .add(KsqlConfig.KSQL_ACTIVE_PERSISTENT_QUERY_LIMIT_CONFIG) - .add(KsqlConfig.KSQL_PULL_QUERIES_ENABLE_CONFIG) + .add(KsqlConfig.KSQL_QUERY_PULL_ENABLE_CONFIG) .addAll(KsqlConfig.SSL_CONFIG_NAMES) .build(); diff --git a/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java b/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java index 623ac5559758..c157f2af4309 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java +++ b/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java @@ -196,10 +196,22 @@ public class KsqlConfig extends AbstractConfig { + "whether the Kafka cluster supports the required API, and enables the validator if " + "it does."; - public static final String KSQL_PULL_QUERIES_ENABLE_CONFIG = "ksql.pull.queries.enable"; - public static final String KSQL_PULL_QUERIES_ENABLE_DOC = + public static final String KSQL_QUERY_PULL_ENABLE_CONFIG = "ksql.query.pull.enable"; + public static final String KSQL_QUERY_PULL_ENABLE_DOC = "Config to enable or disable transient pull queries on a specific KSQL server."; - public static final boolean KSQL_PULL_QUERIES_ENABLE_DEFAULT = true; + public static final boolean KSQL_QUERY_PULL_ENABLE_DEFAULT = true; + + public static final String KSQL_QUERY_PULL_OWNER_TIMEOUT_MS_CONFIG = + "ksql.query.pull.owner.timeout.ms"; + public static final Long KSQL_QUERY_PULL_OWNER_TIMEOUT_MS_DEFAULT = 30000L; + public static final String KSQL_QUERY_PULL_OWNER_TIMEOUT_MS_DOC = "Timeout in milliseconds " + + "when waiting for the lookup of the owner of a row key"; + + public static final String KSQL_STREAMSTORE_REBALANCING_TIMEOUT_MS_CONFIG = + "ksql.streamsstore.rebalancing.timeout.ms"; + public static final Long KSQL_STREAMSTORE_REBALANCING_TIMEOUT_MS_DEFAULT = 10000L; + public static final String KSQL_STREAMSTORE_REBALANCING_TIMEOUT_MS_DOC = "Timeout in " + + "milliseconds when waiting for rebalancing of the stream store during a write"; public static final Collection COMPATIBLY_BREAKING_CONFIG_DEFS = ImmutableList.of( @@ -566,11 +578,23 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) { Importance.LOW, METRIC_REPORTER_CLASSES_DOC ).define( - KSQL_PULL_QUERIES_ENABLE_CONFIG, + KSQL_QUERY_PULL_ENABLE_CONFIG, Type.BOOLEAN, - KSQL_PULL_QUERIES_ENABLE_DEFAULT, + KSQL_QUERY_PULL_ENABLE_DEFAULT, + Importance.LOW, + KSQL_QUERY_PULL_ENABLE_DOC + ).define( + KSQL_QUERY_PULL_OWNER_TIMEOUT_MS_CONFIG, + ConfigDef.Type.LONG, + KSQL_QUERY_PULL_OWNER_TIMEOUT_MS_DEFAULT, + Importance.LOW, + KSQL_QUERY_PULL_OWNER_TIMEOUT_MS_DOC + ).define( + KSQL_STREAMSTORE_REBALANCING_TIMEOUT_MS_CONFIG, + ConfigDef.Type.LONG, + KSQL_STREAMSTORE_REBALANCING_TIMEOUT_MS_DEFAULT, Importance.LOW, - KSQL_PULL_QUERIES_ENABLE_DOC + KSQL_STREAMSTORE_REBALANCING_TIMEOUT_MS_DOC ) .withClientSslSupport(); for (final CompatibilityBreakingConfigDef compatibilityBreakingConfigDef diff --git a/ksql-engine/src/main/java/io/confluent/ksql/query/QueryExecutor.java b/ksql-engine/src/main/java/io/confluent/ksql/query/QueryExecutor.java index 638ca2d6748b..b694315293cc 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/query/QueryExecutor.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/query/QueryExecutor.java @@ -363,7 +363,8 @@ private Optional buildMaterializationProvider( info.getStateStoreSchema(), keySerializer, keyFormat.getWindowType(), - streamsProperties + streamsProperties, + ksqlConfig ); return ksMaterialization.map(ksMat -> (queryId, contextStacker) -> ksqlMaterializationFactory diff --git a/ksql-engine/src/test/java/io/confluent/ksql/query/QueryExecutorTest.java b/ksql-engine/src/test/java/io/confluent/ksql/query/QueryExecutorTest.java index 224b248b8ed1..1c77ccfd55ad 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/query/QueryExecutorTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/query/QueryExecutorTest.java @@ -171,7 +171,7 @@ public void setup() { when(materializationBuilder.build()).thenReturn(materializationInfo); when(materializationInfo.getStateStoreSchema()).thenReturn(aggregationSchema); when(materializationInfo.stateStoreName()).thenReturn(STORE_NAME); - when(ksMaterializationFactory.create(any(), any(), any(), any(), any(), any())) + when(ksMaterializationFactory.create(any(), any(), any(), any(), any(), any(), any())) .thenReturn(Optional.of(ksMaterialization)); when(ksqlMaterializationFactory.create(any(), any(), any(), any())).thenReturn(materialization); when(processingLogContext.getLoggerFactory()).thenReturn(processingLoggerFactory); @@ -301,7 +301,8 @@ public void shouldCreateKSMaterializationCorrectly() { same(aggregationSchema), any(), eq(Optional.empty()), - eq(properties) + eq(properties), + eq(ksqlConfig) ); } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/StaticQueryExecutor.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/StaticQueryExecutor.java index e558a05e3122..51d37ae6009a 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/StaticQueryExecutor.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/StaticQueryExecutor.java @@ -77,7 +77,6 @@ import io.confluent.ksql.util.PersistentQueryMetadata; import io.confluent.ksql.util.SchemaUtil; import io.confluent.ksql.util.timestamp.PartialStringToTimestampParser; -import java.time.Duration; import java.time.Instant; import java.util.List; import java.util.Map; @@ -94,7 +93,6 @@ public final class StaticQueryExecutor { // CHECKSTYLE_RULES.ON: ClassDataAbstractionCoupling - private static final Duration OWNERSHIP_TIMEOUT = Duration.ofSeconds(30); private static final Set VALID_WINDOW_BOUNDS_TYPES = ImmutableSet.of( Type.EQUAL, Type.GREATER_THAN, @@ -117,11 +115,11 @@ public static void validate( ) { final Query queryStmt = statement.getStatement(); - if (!statement.getConfig().getBoolean(KsqlConfig.KSQL_PULL_QUERIES_ENABLE_CONFIG)) { + if (!statement.getConfig().getBoolean(KsqlConfig.KSQL_QUERY_PULL_ENABLE_CONFIG)) { throw new KsqlRestException( Errors.badStatement( "Pull queries are disabled on this KSQL server - please set " - + KsqlConfig.KSQL_PULL_QUERIES_ENABLE_CONFIG + "=true to enable this feature. " + + KsqlConfig.KSQL_QUERY_PULL_ENABLE_CONFIG + "=true to enable this feature. " + "If you intended to issue a push query, resubmit the query with the " + "EMIT CHANGES clause.", statement.getStatementText())); @@ -162,7 +160,8 @@ public static TableRowsEntity execute( final Struct rowKey = asKeyStruct(whereInfo.rowkey, query.getPhysicalSchema()); - final KsqlNode owner = getOwner(rowKey, mat); + final KsqlConfig ksqlConfig = statement.getConfig(); + final KsqlNode owner = getOwner(ksqlConfig, rowKey, mat); if (!owner.isLocal()) { return proxyTo(owner, statement, serviceContext); } @@ -638,10 +637,16 @@ private static SourceName getSourceName(final Analysis analysis) { return source.getName(); } - private static KsqlNode getOwner(final Struct rowKey, final Materialization mat) { + private static KsqlNode getOwner( + final KsqlConfig ksqlConfig, + final Struct rowKey, + final Materialization mat + ) { final Locator locator = mat.locator(); - final long threshold = System.currentTimeMillis() + OWNERSHIP_TIMEOUT.toMillis(); + final long timeoutMs = + ksqlConfig.getLong(KsqlConfig.KSQL_QUERY_PULL_OWNER_TIMEOUT_MS_CONFIG); + final long threshold = System.currentTimeMillis() + timeoutMs; while (System.currentTimeMillis() < threshold) { final Optional owner = locator.locate(rowKey); if (owner.isPresent()) { @@ -650,7 +655,8 @@ private static KsqlNode getOwner(final Struct rowKey, final Materialization mat) } throw new MaterializationTimeOutException( - "The owner of the key could not be determined within the configured timeout" + "The owner of the key could not be determined within the configured timeout: " + + timeoutMs + "ms, config: " + KsqlConfig.KSQL_QUERY_PULL_OWNER_TIMEOUT_MS_CONFIG ); } diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/StaticQueryExecutorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/StaticQueryExecutorTest.java index bc13d2591dd5..a9a6743c27cf 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/StaticQueryExecutorTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/StaticQueryExecutorTest.java @@ -44,7 +44,7 @@ public class StaticQueryExecutorTest { public static class Disabled { @Rule public final TemporaryEngine engine = new TemporaryEngine() - .withConfigs(ImmutableMap.of(KsqlConfig.KSQL_PULL_QUERIES_ENABLE_CONFIG, false)); + .withConfigs(ImmutableMap.of(KsqlConfig.KSQL_QUERY_PULL_ENABLE_CONFIG, false)); @Rule public final ExpectedException expectedException = ExpectedException.none(); diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializationFactory.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializationFactory.java index 97abc8344bd3..135d550978a4 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializationFactory.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializationFactory.java @@ -21,6 +21,7 @@ import io.confluent.ksql.execution.streams.materialization.Locator; import io.confluent.ksql.model.WindowType; import io.confluent.ksql.schema.ksql.LogicalSchema; +import io.confluent.ksql.util.KsqlConfig; import java.net.MalformedURLException; import java.net.URL; import java.util.Map; @@ -74,7 +75,8 @@ public Optional create( final LogicalSchema schema, final Serializer keySerializer, final Optional windowType, - final Map streamsProperties + final Map streamsProperties, + final KsqlConfig ksqlConfig ) { final Object appServer = streamsProperties.get(StreamsConfig.APPLICATION_SERVER_CONFIG); if (appServer == null) { @@ -93,7 +95,8 @@ public Optional create( final KsStateStore stateStore = storeFactory.create( stateStoreName, kafkaStreams, - schema + schema, + ksqlConfig ); final KsMaterialization materialization = materializationFactory.create( @@ -133,7 +136,8 @@ interface StateStoreFactory { KsStateStore create( String stateStoreName, KafkaStreams kafkaStreams, - LogicalSchema schema + LogicalSchema schema, + KsqlConfig ksqlConfig ); } diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsStateStore.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsStateStore.java index 8d942ef9d2ad..55532e80f69b 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsStateStore.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsStateStore.java @@ -22,7 +22,7 @@ import io.confluent.ksql.execution.streams.materialization.MaterializationTimeOutException; import io.confluent.ksql.execution.streams.materialization.NotRunningException; import io.confluent.ksql.schema.ksql.LogicalSchema; -import java.time.Duration; +import io.confluent.ksql.util.KsqlConfig; import java.util.function.Supplier; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreams.State; @@ -33,20 +33,19 @@ */ class KsStateStore { - private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(10); - private final String stateStoreName; private final KafkaStreams kafkaStreams; private final LogicalSchema schema; - private final Duration timeout; + private final KsqlConfig ksqlConfig; private final Supplier clock; KsStateStore( final String stateStoreName, final KafkaStreams kafkaStreams, - final LogicalSchema schema + final LogicalSchema schema, + final KsqlConfig ksqlConfig ) { - this(stateStoreName, kafkaStreams, schema, DEFAULT_TIMEOUT, System::currentTimeMillis); + this(stateStoreName, kafkaStreams, schema, ksqlConfig, System::currentTimeMillis); } @VisibleForTesting @@ -54,13 +53,13 @@ class KsStateStore { final String stateStoreName, final KafkaStreams kafkaStreams, final LogicalSchema schema, - final Duration timeout, + final KsqlConfig ksqlConfig, final Supplier clock ) { this.kafkaStreams = requireNonNull(kafkaStreams, "kafkaStreams"); this.stateStoreName = requireNonNull(stateStoreName, "stateStoreName"); this.schema = requireNonNull(schema, "schema"); - this.timeout = requireNonNull(timeout, "timeout"); + this.ksqlConfig = requireNonNull(ksqlConfig, "ksqlConfig"); this.clock = requireNonNull(clock, "clock"); } @@ -84,11 +83,14 @@ T store(final QueryableStoreType queryableStoreType) { } private void awaitRunning() { - final long threshold = clock.get() + timeout.toMillis(); + final long timeoutMs = + ksqlConfig.getLong(KsqlConfig.KSQL_STREAMSTORE_REBALANCING_TIMEOUT_MS_CONFIG); + final long threshold = clock.get() + timeoutMs; while (kafkaStreams.state() == State.REBALANCING) { if (clock.get() > threshold) { throw new MaterializationTimeOutException("Store failed to rebalance within the configured " - + "timeout. timeout: " + timeout.toMillis() + "ms"); + + "timeout. timeout: " + timeoutMs + "ms, config: " + + KsqlConfig.KSQL_STREAMSTORE_REBALANCING_TIMEOUT_MS_CONFIG); } Thread.yield(); diff --git a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializationFactoryTest.java b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializationFactoryTest.java index f4eace7a7590..32e3e64e1c38 100644 --- a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializationFactoryTest.java +++ b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializationFactoryTest.java @@ -30,6 +30,7 @@ import io.confluent.ksql.name.ColumnName; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.schema.ksql.types.SqlTypes; +import io.confluent.ksql.util.KsqlConfig; import java.net.MalformedURLException; import java.net.URL; import java.util.HashMap; @@ -73,6 +74,8 @@ public class KsMaterializationFactoryTest { private MaterializationFactory materializationFactory; @Mock private KsMaterialization materialization; + @Mock + private KsqlConfig ksqlConfig; private KsMaterializationFactory factory; private final Map streamsProperties = new HashMap<>(); @@ -85,7 +88,7 @@ public void setUp() { ); when(locatorFactory.create(any(), any(), any(), any())).thenReturn(locator); - when(storeFactory.create(any(), any(), any())).thenReturn(stateStore); + when(storeFactory.create(any(), any(), any(), any())).thenReturn(stateStore); when(materializationFactory.create(any(), any(), any())).thenReturn(materialization); streamsProperties.clear(); @@ -108,7 +111,8 @@ public void shouldReturnEmptyIfAppServerNotConfigured() { // When: final Optional result = factory - .create(STORE_NAME, kafkaStreams, SCHEMA, keySerializer, Optional.empty(), streamsProperties); + .create(STORE_NAME, kafkaStreams, SCHEMA, keySerializer, Optional.empty(), streamsProperties, + ksqlConfig); // Then: assertThat(result, is(Optional.empty())); @@ -117,7 +121,8 @@ public void shouldReturnEmptyIfAppServerNotConfigured() { @Test public void shouldBuildLocatorWithCorrectParams() { // When: - factory.create(STORE_NAME, kafkaStreams, SCHEMA, keySerializer, Optional.empty(), streamsProperties); + factory.create(STORE_NAME, kafkaStreams, SCHEMA, keySerializer, Optional.empty(), streamsProperties, + ksqlConfig); // Then: verify(locatorFactory).create( @@ -131,13 +136,15 @@ public void shouldBuildLocatorWithCorrectParams() { @Test public void shouldBuildStateStoreWithCorrectParams() { // When: - factory.create(STORE_NAME, kafkaStreams, SCHEMA, keySerializer, Optional.empty(), streamsProperties); + factory.create(STORE_NAME, kafkaStreams, SCHEMA, keySerializer, Optional.empty(), streamsProperties, + ksqlConfig); // Then: verify(storeFactory).create( STORE_NAME, kafkaStreams, - SCHEMA + SCHEMA, + ksqlConfig ); } @@ -147,7 +154,8 @@ public void shouldBuildMaterializationWithCorrectParams() { final Optional windowType = Optional.of(WindowType.SESSION); // When: - factory.create(STORE_NAME, kafkaStreams, SCHEMA, keySerializer, windowType, streamsProperties); + factory.create(STORE_NAME, kafkaStreams, SCHEMA, keySerializer, windowType, streamsProperties, + ksqlConfig); // Then: verify(materializationFactory).create( @@ -161,7 +169,8 @@ public void shouldBuildMaterializationWithCorrectParams() { public void shouldReturnMaterialization() { // When: final Optional result = factory - .create(STORE_NAME, kafkaStreams, SCHEMA, keySerializer, Optional.empty(), streamsProperties); + .create(STORE_NAME, kafkaStreams, SCHEMA, keySerializer, Optional.empty(), streamsProperties, + ksqlConfig); // Then: assertThat(result, is(Optional.of(materialization))); diff --git a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsStateStoreTest.java b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsStateStoreTest.java index dd7a4f1e0d39..ee4b78120526 100644 --- a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsStateStoreTest.java +++ b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsStateStoreTest.java @@ -33,6 +33,7 @@ import io.confluent.ksql.name.ColumnName; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.schema.ksql.types.SqlTypes; +import io.confluent.ksql.util.KsqlConfig; import java.time.Duration; import java.util.function.Supplier; import org.apache.kafka.streams.KafkaStreams; @@ -58,7 +59,7 @@ public class KsStateStoreTest { private static final String STORE_NAME = "someStore"; - private static final Duration TIMEOUT = Duration.ofMillis(10); + private static final Long TIMEOUT_MS = 10L; private static final LogicalSchema SCHEMA = LogicalSchema.builder() .keyColumn(ColumnName.of("k0"), SqlTypes.STRING) .keyColumn(ColumnName.of("v0"), SqlTypes.BIGINT) @@ -74,15 +75,19 @@ public class KsStateStoreTest { private KafkaStreams kafkaStreams; @Mock private Supplier clock; + @Mock + private KsqlConfig ksqlConfig; private KsStateStore store; @Before public void setUp() { - store = new KsStateStore(STORE_NAME, kafkaStreams, SCHEMA, TIMEOUT, clock); + store = new KsStateStore(STORE_NAME, kafkaStreams, SCHEMA, ksqlConfig, clock); when(clock.get()).thenReturn(0L); when(kafkaStreams.state()).thenReturn(State.RUNNING); + when(ksqlConfig.getLong(KsqlConfig.KSQL_STREAMSTORE_REBALANCING_TIMEOUT_MS_CONFIG)) + .thenReturn(TIMEOUT_MS); } @Test @@ -91,6 +96,7 @@ public void shouldThrowNPEs() { .setDefault(KafkaStreams.class, kafkaStreams) .setDefault(LogicalSchema.class, SCHEMA) .setDefault(Supplier.class, clock) + .setDefault(KsqlConfig.class, ksqlConfig) .testConstructors(KsStateStore.class, Visibility.PACKAGE); } @@ -117,7 +123,7 @@ public void shouldAwaitRunning() { public void shouldThrowIfDoesNotFinishRebalanceBeforeTimeout() { // Given: when(kafkaStreams.state()).thenReturn(State.REBALANCING); - when(clock.get()).thenReturn(0L, 5L, TIMEOUT.toMillis() + 1); + when(clock.get()).thenReturn(0L, 5L, TIMEOUT_MS + 1); // When: expectedException.expect(MaterializationTimeOutException.class);