Skip to content

Commit

Permalink
feat: Makes timeout for owner lookup in StaticQueryExecutor and rebal…
Browse files Browse the repository at this point in the history
…ancing in KsStateStore configurable
  • Loading branch information
AlanConfluent committed Nov 15, 2019
1 parent 537d23d commit 4f448b7
Show file tree
Hide file tree
Showing 10 changed files with 95 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public final class ImmutableProperties {
.add(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG)
.add(KsqlConfig.KSQL_EXT_DIR)
.add(KsqlConfig.KSQL_ACTIVE_PERSISTENT_QUERY_LIMIT_CONFIG)
.add(KsqlConfig.KSQL_PULL_QUERIES_ENABLE_CONFIG)
.add(KsqlConfig.KSQL_QUERY_PULL_ENABLE_CONFIG)
.addAll(KsqlConfig.SSL_CONFIG_NAMES)
.build();

Expand Down
36 changes: 30 additions & 6 deletions ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -196,10 +196,22 @@ public class KsqlConfig extends AbstractConfig {
+ "whether the Kafka cluster supports the required API, and enables the validator if "
+ "it does.";

public static final String KSQL_PULL_QUERIES_ENABLE_CONFIG = "ksql.pull.queries.enable";
public static final String KSQL_PULL_QUERIES_ENABLE_DOC =
public static final String KSQL_QUERY_PULL_ENABLE_CONFIG = "ksql.query.pull.enable";
public static final String KSQL_QUERY_PULL_ENABLE_DOC =
"Config to enable or disable transient pull queries on a specific KSQL server.";
public static final boolean KSQL_PULL_QUERIES_ENABLE_DEFAULT = true;
public static final boolean KSQL_QUERY_PULL_ENABLE_DEFAULT = true;

public static final String KSQL_QUERY_PULL_OWNER_TIMEOUT_MS_CONFIG =
"ksql.query.pull.owner.timeout.ms";
public static final Long KSQL_QUERY_PULL_OWNER_TIMEOUT_MS_DEFAULT = 30000L;
public static final String KSQL_QUERY_PULL_OWNER_TIMEOUT_MS_DOC = "Timeout in milliseconds "
+ "when waiting for the lookup of the owner of a row key";

public static final String KSQL_STREAMSTORE_REBALANCING_TIMEOUT_MS_CONFIG =
"ksql.streamsstore.rebalancing.timeout.ms";
public static final Long KSQL_STREAMSTORE_REBALANCING_TIMEOUT_MS_DEFAULT = 10000L;
public static final String KSQL_STREAMSTORE_REBALANCING_TIMEOUT_MS_DOC = "Timeout in "
+ "milliseconds when waiting for rebalancing of the stream store during a write";

public static final Collection<CompatibilityBreakingConfigDef> COMPATIBLY_BREAKING_CONFIG_DEFS
= ImmutableList.of(
Expand Down Expand Up @@ -566,11 +578,23 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
Importance.LOW,
METRIC_REPORTER_CLASSES_DOC
).define(
KSQL_PULL_QUERIES_ENABLE_CONFIG,
KSQL_QUERY_PULL_ENABLE_CONFIG,
Type.BOOLEAN,
KSQL_PULL_QUERIES_ENABLE_DEFAULT,
KSQL_QUERY_PULL_ENABLE_DEFAULT,
Importance.LOW,
KSQL_QUERY_PULL_ENABLE_DOC
).define(
KSQL_QUERY_PULL_OWNER_TIMEOUT_MS_CONFIG,
ConfigDef.Type.LONG,
KSQL_QUERY_PULL_OWNER_TIMEOUT_MS_DEFAULT,
Importance.LOW,
KSQL_QUERY_PULL_OWNER_TIMEOUT_MS_DOC
).define(
KSQL_STREAMSTORE_REBALANCING_TIMEOUT_MS_CONFIG,
ConfigDef.Type.LONG,
KSQL_STREAMSTORE_REBALANCING_TIMEOUT_MS_DEFAULT,
Importance.LOW,
KSQL_PULL_QUERIES_ENABLE_DOC
KSQL_STREAMSTORE_REBALANCING_TIMEOUT_MS_DOC
)
.withClientSslSupport();
for (final CompatibilityBreakingConfigDef compatibilityBreakingConfigDef
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,8 @@ private Optional<MaterializationProvider> buildMaterializationProvider(
info.getStateStoreSchema(),
keySerializer,
keyFormat.getWindowType(),
streamsProperties
streamsProperties,
ksqlConfig
);

return ksMaterialization.map(ksMat -> (queryId, contextStacker) -> ksqlMaterializationFactory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ public void setup() {
when(materializationBuilder.build()).thenReturn(materializationInfo);
when(materializationInfo.getStateStoreSchema()).thenReturn(aggregationSchema);
when(materializationInfo.stateStoreName()).thenReturn(STORE_NAME);
when(ksMaterializationFactory.create(any(), any(), any(), any(), any(), any()))
when(ksMaterializationFactory.create(any(), any(), any(), any(), any(), any(), any()))
.thenReturn(Optional.of(ksMaterialization));
when(ksqlMaterializationFactory.create(any(), any(), any(), any())).thenReturn(materialization);
when(processingLogContext.getLoggerFactory()).thenReturn(processingLoggerFactory);
Expand Down Expand Up @@ -301,7 +301,8 @@ public void shouldCreateKSMaterializationCorrectly() {
same(aggregationSchema),
any(),
eq(Optional.empty()),
eq(properties)
eq(properties),
eq(ksqlConfig)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.SchemaUtil;
import io.confluent.ksql.util.timestamp.PartialStringToTimestampParser;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Map;
Expand All @@ -94,7 +93,6 @@
public final class StaticQueryExecutor {
// CHECKSTYLE_RULES.ON: ClassDataAbstractionCoupling

private static final Duration OWNERSHIP_TIMEOUT = Duration.ofSeconds(30);
private static final Set<Type> VALID_WINDOW_BOUNDS_TYPES = ImmutableSet.of(
Type.EQUAL,
Type.GREATER_THAN,
Expand All @@ -117,11 +115,11 @@ public static void validate(
) {
final Query queryStmt = statement.getStatement();

if (!statement.getConfig().getBoolean(KsqlConfig.KSQL_PULL_QUERIES_ENABLE_CONFIG)) {
if (!statement.getConfig().getBoolean(KsqlConfig.KSQL_QUERY_PULL_ENABLE_CONFIG)) {
throw new KsqlRestException(
Errors.badStatement(
"Pull queries are disabled on this KSQL server - please set "
+ KsqlConfig.KSQL_PULL_QUERIES_ENABLE_CONFIG + "=true to enable this feature. "
+ KsqlConfig.KSQL_QUERY_PULL_ENABLE_CONFIG + "=true to enable this feature. "
+ "If you intended to issue a push query, resubmit the query with the "
+ "EMIT CHANGES clause.",
statement.getStatementText()));
Expand Down Expand Up @@ -162,7 +160,8 @@ public static TableRowsEntity execute(

final Struct rowKey = asKeyStruct(whereInfo.rowkey, query.getPhysicalSchema());

final KsqlNode owner = getOwner(rowKey, mat);
final KsqlConfig ksqlConfig = statement.getConfig();
final KsqlNode owner = getOwner(ksqlConfig, rowKey, mat);
if (!owner.isLocal()) {
return proxyTo(owner, statement, serviceContext);
}
Expand Down Expand Up @@ -638,10 +637,16 @@ private static SourceName getSourceName(final Analysis analysis) {
return source.getName();
}

private static KsqlNode getOwner(final Struct rowKey, final Materialization mat) {
private static KsqlNode getOwner(
final KsqlConfig ksqlConfig,
final Struct rowKey,
final Materialization mat
) {
final Locator locator = mat.locator();

final long threshold = System.currentTimeMillis() + OWNERSHIP_TIMEOUT.toMillis();
final long timeoutMs =
ksqlConfig.getLong(KsqlConfig.KSQL_QUERY_PULL_OWNER_TIMEOUT_MS_CONFIG);
final long threshold = System.currentTimeMillis() + timeoutMs;
while (System.currentTimeMillis() < threshold) {
final Optional<KsqlNode> owner = locator.locate(rowKey);
if (owner.isPresent()) {
Expand All @@ -650,7 +655,8 @@ private static KsqlNode getOwner(final Struct rowKey, final Materialization mat)
}

throw new MaterializationTimeOutException(
"The owner of the key could not be determined within the configured timeout"
"The owner of the key could not be determined within the configured timeout: "
+ timeoutMs + "ms, config: " + KsqlConfig.KSQL_QUERY_PULL_OWNER_TIMEOUT_MS_CONFIG
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class StaticQueryExecutorTest {
public static class Disabled {
@Rule
public final TemporaryEngine engine = new TemporaryEngine()
.withConfigs(ImmutableMap.of(KsqlConfig.KSQL_PULL_QUERIES_ENABLE_CONFIG, false));
.withConfigs(ImmutableMap.of(KsqlConfig.KSQL_QUERY_PULL_ENABLE_CONFIG, false));

@Rule
public final ExpectedException expectedException = ExpectedException.none();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.confluent.ksql.execution.streams.materialization.Locator;
import io.confluent.ksql.model.WindowType;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.util.KsqlConfig;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Map;
Expand Down Expand Up @@ -74,7 +75,8 @@ public Optional<KsMaterialization> create(
final LogicalSchema schema,
final Serializer<Struct> keySerializer,
final Optional<WindowType> windowType,
final Map<String, ?> streamsProperties
final Map<String, ?> streamsProperties,
final KsqlConfig ksqlConfig
) {
final Object appServer = streamsProperties.get(StreamsConfig.APPLICATION_SERVER_CONFIG);
if (appServer == null) {
Expand All @@ -93,7 +95,8 @@ public Optional<KsMaterialization> create(
final KsStateStore stateStore = storeFactory.create(
stateStoreName,
kafkaStreams,
schema
schema,
ksqlConfig
);

final KsMaterialization materialization = materializationFactory.create(
Expand Down Expand Up @@ -133,7 +136,8 @@ interface StateStoreFactory {
KsStateStore create(
String stateStoreName,
KafkaStreams kafkaStreams,
LogicalSchema schema
LogicalSchema schema,
KsqlConfig ksqlConfig
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import io.confluent.ksql.execution.streams.materialization.MaterializationTimeOutException;
import io.confluent.ksql.execution.streams.materialization.NotRunningException;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import java.time.Duration;
import io.confluent.ksql.util.KsqlConfig;
import java.util.function.Supplier;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KafkaStreams.State;
Expand All @@ -33,34 +33,33 @@
*/
class KsStateStore {

private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(10);

private final String stateStoreName;
private final KafkaStreams kafkaStreams;
private final LogicalSchema schema;
private final Duration timeout;
private final KsqlConfig ksqlConfig;
private final Supplier<Long> clock;

KsStateStore(
final String stateStoreName,
final KafkaStreams kafkaStreams,
final LogicalSchema schema
final LogicalSchema schema,
final KsqlConfig ksqlConfig
) {
this(stateStoreName, kafkaStreams, schema, DEFAULT_TIMEOUT, System::currentTimeMillis);
this(stateStoreName, kafkaStreams, schema, ksqlConfig, System::currentTimeMillis);
}

@VisibleForTesting
KsStateStore(
final String stateStoreName,
final KafkaStreams kafkaStreams,
final LogicalSchema schema,
final Duration timeout,
final KsqlConfig ksqlConfig,
final Supplier<Long> clock
) {
this.kafkaStreams = requireNonNull(kafkaStreams, "kafkaStreams");
this.stateStoreName = requireNonNull(stateStoreName, "stateStoreName");
this.schema = requireNonNull(schema, "schema");
this.timeout = requireNonNull(timeout, "timeout");
this.ksqlConfig = requireNonNull(ksqlConfig, "ksqlConfig");
this.clock = requireNonNull(clock, "clock");
}

Expand All @@ -84,11 +83,14 @@ <T> T store(final QueryableStoreType<T> queryableStoreType) {
}

private void awaitRunning() {
final long threshold = clock.get() + timeout.toMillis();
final long timeoutMs =
ksqlConfig.getLong(KsqlConfig.KSQL_STREAMSTORE_REBALANCING_TIMEOUT_MS_CONFIG);
final long threshold = clock.get() + timeoutMs;
while (kafkaStreams.state() == State.REBALANCING) {
if (clock.get() > threshold) {
throw new MaterializationTimeOutException("Store failed to rebalance within the configured "
+ "timeout. timeout: " + timeout.toMillis() + "ms");
+ "timeout. timeout: " + timeoutMs + "ms, config: "
+ KsqlConfig.KSQL_STREAMSTORE_REBALANCING_TIMEOUT_MS_CONFIG);
}

Thread.yield();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.confluent.ksql.name.ColumnName;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.types.SqlTypes;
import io.confluent.ksql.util.KsqlConfig;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.HashMap;
Expand Down Expand Up @@ -73,6 +74,8 @@ public class KsMaterializationFactoryTest {
private MaterializationFactory materializationFactory;
@Mock
private KsMaterialization materialization;
@Mock
private KsqlConfig ksqlConfig;
private KsMaterializationFactory factory;
private final Map<String, Object> streamsProperties = new HashMap<>();

Expand All @@ -85,7 +88,7 @@ public void setUp() {
);

when(locatorFactory.create(any(), any(), any(), any())).thenReturn(locator);
when(storeFactory.create(any(), any(), any())).thenReturn(stateStore);
when(storeFactory.create(any(), any(), any(), any())).thenReturn(stateStore);
when(materializationFactory.create(any(), any(), any())).thenReturn(materialization);

streamsProperties.clear();
Expand All @@ -108,7 +111,8 @@ public void shouldReturnEmptyIfAppServerNotConfigured() {

// When:
final Optional<KsMaterialization> result = factory
.create(STORE_NAME, kafkaStreams, SCHEMA, keySerializer, Optional.empty(), streamsProperties);
.create(STORE_NAME, kafkaStreams, SCHEMA, keySerializer, Optional.empty(), streamsProperties,
ksqlConfig);

// Then:
assertThat(result, is(Optional.empty()));
Expand All @@ -117,7 +121,8 @@ public void shouldReturnEmptyIfAppServerNotConfigured() {
@Test
public void shouldBuildLocatorWithCorrectParams() {
// When:
factory.create(STORE_NAME, kafkaStreams, SCHEMA, keySerializer, Optional.empty(), streamsProperties);
factory.create(STORE_NAME, kafkaStreams, SCHEMA, keySerializer, Optional.empty(), streamsProperties,
ksqlConfig);

// Then:
verify(locatorFactory).create(
Expand All @@ -131,13 +136,15 @@ public void shouldBuildLocatorWithCorrectParams() {
@Test
public void shouldBuildStateStoreWithCorrectParams() {
// When:
factory.create(STORE_NAME, kafkaStreams, SCHEMA, keySerializer, Optional.empty(), streamsProperties);
factory.create(STORE_NAME, kafkaStreams, SCHEMA, keySerializer, Optional.empty(), streamsProperties,
ksqlConfig);

// Then:
verify(storeFactory).create(
STORE_NAME,
kafkaStreams,
SCHEMA
SCHEMA,
ksqlConfig
);
}

Expand All @@ -147,7 +154,8 @@ public void shouldBuildMaterializationWithCorrectParams() {
final Optional<WindowType> windowType = Optional.of(WindowType.SESSION);

// When:
factory.create(STORE_NAME, kafkaStreams, SCHEMA, keySerializer, windowType, streamsProperties);
factory.create(STORE_NAME, kafkaStreams, SCHEMA, keySerializer, windowType, streamsProperties,
ksqlConfig);

// Then:
verify(materializationFactory).create(
Expand All @@ -161,7 +169,8 @@ public void shouldBuildMaterializationWithCorrectParams() {
public void shouldReturnMaterialization() {
// When:
final Optional<KsMaterialization> result = factory
.create(STORE_NAME, kafkaStreams, SCHEMA, keySerializer, Optional.empty(), streamsProperties);
.create(STORE_NAME, kafkaStreams, SCHEMA, keySerializer, Optional.empty(), streamsProperties,
ksqlConfig);

// Then:
assertThat(result, is(Optional.of(materialization)));
Expand Down
Loading

0 comments on commit 4f448b7

Please sign in to comment.