Skip to content

Commit

Permalink
fix: address Andy's comments
Browse files Browse the repository at this point in the history
- inject DenyListPropertyValidator to Resource classes
- add unit tests for WSQueryEndpoint
- validate only streamProperties (no requestProperties)
- minor fixes
  • Loading branch information
spena committed Aug 5, 2020
1 parent 71bbe6f commit 978c79a
Show file tree
Hide file tree
Showing 12 changed files with 239 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.confluent.ksql.properties;

import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import io.confluent.ksql.util.KsqlException;

import java.util.Collection;
Expand All @@ -37,14 +38,14 @@ public DenyListPropertyValidator(final Collection<String> immutableProps) {

/**
* Validates if a list of properties are part of the list of denied properties.
* @throws if a property is part of the denied list.
* @throws if at least one property is part of the denied list.
*/
public void validateAll(final Map<String, Object> properties) {
properties.forEach((name ,v) -> {
if (immutableProps.contains(name)) {
throw new KsqlException(String.format("A property override was set locally for a "
+ "property that the server prohibits overrides for: '%s'", name));
}
});
final Set<String> propsDenied = Sets.intersection(immutableProps, properties.keySet());
if (!propsDenied.isEmpty()) {
throw new KsqlException(String.format("One or more properties overrides set locally are "
+ "prohibited by the KSQL server (use UNSET to reset their default value): %s",
propsDenied));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -335,8 +335,7 @@ public class KsqlConfig extends AbstractConfig {

public static final String KSQL_PROPERTIES_OVERRIDES_DENYLIST =
"ksql.properties.overrides.denylist";
public static final String KSQL_PROPERTIES_OVERRIDES_DENYLIST_DEFAULT = "";
public static final String KSQL_PROPERTIES_OVERRIDES_DENYLIST_DOC = "Comma-separated list of "
private static final String KSQL_PROPERTIES_OVERRIDES_DENYLIST_DOC = "Comma-separated list of "
+ "properties that KSQL users cannot override.";

private enum ConfigGeneration {
Expand Down Expand Up @@ -771,7 +770,7 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
.define(
KSQL_PROPERTIES_OVERRIDES_DENYLIST,
Type.LIST,
KSQL_PROPERTIES_OVERRIDES_DENYLIST_DEFAULT,
"",
Importance.LOW,
KSQL_PROPERTIES_OVERRIDES_DENYLIST_DOC
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ public class DenyListPropertyValidatorTest {
@Before
public void setUp() {
validator = new DenyListPropertyValidator(Arrays.asList(
"immutable-property"
"immutable-property-1",
"immutable-property-2"
));
}

Expand All @@ -41,21 +42,23 @@ public void shouldThrowOnDenyListedProperty() {
final KsqlException e = assertThrows(
KsqlException.class,
() -> validator.validateAll(ImmutableMap.of(
"immutable-property", "v1",
"anything", "v2"
"immutable-property-1", "v1",
"anything", "v2",
"immutable-property-2", "v3"
))
);

// Then:
assertThat(e.getMessage(), containsString(
"A property override was set locally for a property that the server prohibits "
+ "overrides for: 'immutable-property'"
"One or more properties overrides set locally are prohibited by the KSQL server "
+ "(use UNSET to reset their default value): "
+ "[immutable-property-1, immutable-property-2]"
));

}

@Test
public void shouldNotThrowOnConfigurableProp() {
public void shouldNotThrowOnAllowedProp() {
validator.validateAll(ImmutableMap.of(
"mutable-1", "v1",
"anything", "v2"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.parser.KsqlParser.ParsedStatement;
import io.confluent.ksql.parser.KsqlParser.PreparedStatement;
import io.confluent.ksql.properties.DenyListPropertyValidator;
import io.confluent.ksql.query.id.SpecificQueryIdGenerator;
import io.confluent.ksql.rest.ErrorMessages;
import io.confluent.ksql.rest.Errors;
Expand Down Expand Up @@ -183,6 +184,7 @@ public final class KsqlRestApplication implements Executable {
private Server apiServer = null;
private final CompletableFuture<Void> terminatedFuture = new CompletableFuture<>();
private final QueryMonitor queryMonitor;
private final DenyListPropertyValidator denyListPropertyValidator;

// The startup thread that can be interrupted if necessary during shutdown. This should only
// happen if startup hangs.
Expand Down Expand Up @@ -218,7 +220,8 @@ public static SourceName getCommandsStreamName() {
final Optional<HeartbeatAgent> heartbeatAgent,
final Optional<LagReportingAgent> lagReportingAgent,
final Vertx vertx,
final QueryMonitor ksqlQueryMonitor
final QueryMonitor ksqlQueryMonitor,
final DenyListPropertyValidator denyListPropertyValidator
) {
log.debug("Creating instance of ksqlDB API server");
this.serviceContext = requireNonNull(serviceContext, "serviceContext");
Expand All @@ -245,6 +248,8 @@ public static SourceName getCommandsStreamName() {
this.heartbeatAgent = requireNonNull(heartbeatAgent, "heartbeatAgent");
this.lagReportingAgent = requireNonNull(lagReportingAgent, "lagReportingAgent");
this.vertx = requireNonNull(vertx, "vertx");
this.denyListPropertyValidator =
requireNonNull(denyListPropertyValidator, "denyListPropertyValidator");

this.serverInfoResource = new ServerInfoResource(serviceContext, ksqlConfigNoPort);
if (heartbeatAgent.isPresent()) {
Expand Down Expand Up @@ -305,7 +310,8 @@ public void startAsync() {
KsqlRestConfig.DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT_MS_CONFIG)),
authorizationValidator,
errorHandler,
pullQueryExecutor
pullQueryExecutor,
denyListPropertyValidator
);

startAsyncThreadRef.set(Thread.currentThread());
Expand Down Expand Up @@ -695,6 +701,9 @@ static KsqlRestApplication buildApplication(
final PullQueryExecutor pullQueryExecutor = new PullQueryExecutor(
ksqlEngine, routingFilterFactory, ksqlConfig);

final DenyListPropertyValidator denyListPropertyValidator = new DenyListPropertyValidator(
ksqlConfig.getList(KsqlConfig.KSQL_PROPERTIES_OVERRIDES_DENYLIST));

final StreamedQueryResource streamedQueryResource = new StreamedQueryResource(
ksqlEngine,
commandStore,
Expand All @@ -704,7 +713,8 @@ static KsqlRestApplication buildApplication(
versionChecker::updateLastRequestTime,
authorizationValidator,
errorHandler,
pullQueryExecutor
pullQueryExecutor,
denyListPropertyValidator
);

final KsqlResource ksqlResource = new KsqlResource(
Expand All @@ -713,7 +723,8 @@ static KsqlRestApplication buildApplication(
Duration.ofMillis(restConfig.getLong(DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT_MS_CONFIG)),
versionChecker::updateLastRequestTime,
authorizationValidator,
errorHandler
errorHandler,
denyListPropertyValidator
);

final List<String> managedTopics = new LinkedList<>();
Expand Down Expand Up @@ -773,7 +784,8 @@ static KsqlRestApplication buildApplication(
heartbeatAgent,
lagReportingAgent,
vertx,
queryMonitor
queryMonitor,
denyListPropertyValidator
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,20 +98,21 @@ public class KsqlResource implements KsqlConfigurable {
private final ActivenessRegistrar activenessRegistrar;
private final BiFunction<KsqlExecutionContext, ServiceContext, Injector> injectorFactory;
private final Optional<KsqlAuthorizationValidator> authorizationValidator;
private final DenyListPropertyValidator denyListPropertyValidator;
private RequestValidator validator;
private RequestHandler handler;
private final Errors errorHandler;
private KsqlHostInfo localHost;
private URL localUrl;
private DenyListPropertyValidator denyListPropertyValidator;

public KsqlResource(
final KsqlEngine ksqlEngine,
final CommandQueue commandQueue,
final Duration distributedCmdResponseTimeout,
final ActivenessRegistrar activenessRegistrar,
final Optional<KsqlAuthorizationValidator> authorizationValidator,
final Errors errorHandler
final Errors errorHandler,
final DenyListPropertyValidator denyListPropertyValidator
) {
this(
ksqlEngine,
Expand All @@ -120,7 +121,8 @@ public KsqlResource(
activenessRegistrar,
Injectors.DEFAULT,
authorizationValidator,
errorHandler
errorHandler,
denyListPropertyValidator
);
}

Expand All @@ -131,7 +133,8 @@ public KsqlResource(
final ActivenessRegistrar activenessRegistrar,
final BiFunction<KsqlExecutionContext, ServiceContext, Injector> injectorFactory,
final Optional<KsqlAuthorizationValidator> authorizationValidator,
final Errors errorHandler
final Errors errorHandler,
final DenyListPropertyValidator denyListPropertyValidator
) {
this.ksqlEngine = Objects.requireNonNull(ksqlEngine, "ksqlEngine");
this.commandQueue = Objects.requireNonNull(commandQueue, "commandQueue");
Expand All @@ -143,6 +146,8 @@ public KsqlResource(
this.authorizationValidator = Objects
.requireNonNull(authorizationValidator, "authorizationValidator");
this.errorHandler = Objects.requireNonNull(errorHandler, "errorHandler");
this.denyListPropertyValidator =
Objects.requireNonNull(denyListPropertyValidator, "denyListPropertyValidator");
}

@Override
Expand Down Expand Up @@ -190,9 +195,6 @@ public void configure(final KsqlConfig config) {
distributedCmdResponseTimeout
)
);

this.denyListPropertyValidator = new DenyListPropertyValidator(
config.getList(KsqlConfig.KSQL_PROPERTIES_OVERRIDES_DENYLIST));
}

public EndpointResponse terminateCluster(
Expand Down Expand Up @@ -241,13 +243,11 @@ public EndpointResponse handleKsqlStatements(
request,
distributedCmdResponseTimeout);

final Map<String, Object> requestProperties = request.getRequestProperties();
denyListPropertyValidator.validateAll(requestProperties);

final Map<String, Object> configProperties = request.getConfigOverrides();
denyListPropertyValidator.validateAll(configProperties);

final KsqlRequestConfig requestConfig = new KsqlRequestConfig(requestProperties);
final KsqlRequestConfig requestConfig =
new KsqlRequestConfig(request.getRequestProperties());
final List<ParsedStatement> statements = ksqlEngine.parse(request.getKsql());

validator.validate(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public class StreamedQueryResource implements KsqlConfigurable {
private final PullQueryExecutor pullQueryExecutor;
private Optional<PullQueryExecutorMetrics> pullQueryMetrics;
private final Time time;
private DenyListPropertyValidator denyListPropertyValidator;
private final DenyListPropertyValidator denyListPropertyValidator;

public StreamedQueryResource(
final KsqlEngine ksqlEngine,
Expand All @@ -94,7 +94,8 @@ public StreamedQueryResource(
final ActivenessRegistrar activenessRegistrar,
final Optional<KsqlAuthorizationValidator> authorizationValidator,
final Errors errorHandler,
final PullQueryExecutor pullQueryExecutor
final PullQueryExecutor pullQueryExecutor,
final DenyListPropertyValidator denyListPropertyValidator
) {
this(
ksqlEngine,
Expand All @@ -105,7 +106,8 @@ public StreamedQueryResource(
activenessRegistrar,
authorizationValidator,
errorHandler,
pullQueryExecutor
pullQueryExecutor,
denyListPropertyValidator
);
}

Expand All @@ -121,7 +123,8 @@ public StreamedQueryResource(
final ActivenessRegistrar activenessRegistrar,
final Optional<KsqlAuthorizationValidator> authorizationValidator,
final Errors errorHandler,
final PullQueryExecutor pullQueryExecutor
final PullQueryExecutor pullQueryExecutor,
final DenyListPropertyValidator denyListPropertyValidator
) {
this.ksqlEngine = Objects.requireNonNull(ksqlEngine, "ksqlEngine");
this.statementParser = Objects.requireNonNull(statementParser, "statementParser");
Expand All @@ -135,6 +138,8 @@ public StreamedQueryResource(
this.authorizationValidator = authorizationValidator;
this.errorHandler = Objects.requireNonNull(errorHandler, "errorHandler");
this.pullQueryExecutor = Objects.requireNonNull(pullQueryExecutor, "pullQueryExecutor");
this.denyListPropertyValidator =
Objects.requireNonNull(denyListPropertyValidator, "denyListPropertyValidator");
this.time = Time.SYSTEM;
}

Expand All @@ -152,9 +157,6 @@ public void configure(final KsqlConfig config) {
ksqlEngine.getServiceId(),
ksqlConfig.getStringAsMap(KsqlConfig.KSQL_CUSTOM_METRICS_TAGS)))
: empty();

this.denyListPropertyValidator = new DenyListPropertyValidator(
config.getList(KsqlConfig.KSQL_PROPERTIES_OVERRIDES_DENYLIST));
}

public EndpointResponse streamQuery(
Expand Down Expand Up @@ -219,9 +221,6 @@ private EndpointResponse handleStatement(
statement.getStatement())
);

final Map<String, Object> requestProperties = request.getRequestProperties();
denyListPropertyValidator.validateAll(requestProperties);

final Map<String, Object> configProperties = request.getConfigOverrides();
denyListPropertyValidator.validateAll(configProperties);

Expand All @@ -233,7 +232,7 @@ private EndpointResponse handleStatement(
securityContext.getServiceContext(),
queryStmt,
configProperties,
requestProperties,
request.getRequestProperties(),
isInternalRequest
);
if (pullQueryMetrics.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,9 @@ public class WSQueryEndpoint {
private final Optional<KsqlAuthorizationValidator> authorizationValidator;
private final Errors errorHandler;
private final PullQueryExecutor pullQueryExecutor;
private final DenyListPropertyValidator denyListPropertyValidator;

private WebSocketSubscriber<?> subscriber;
private DenyListPropertyValidator denyListPropertyValidator;

// CHECKSTYLE_RULES.OFF: ParameterNumberCheck
public WSQueryEndpoint(
Expand All @@ -85,7 +85,8 @@ public WSQueryEndpoint(
final Duration commandQueueCatchupTimeout,
final Optional<KsqlAuthorizationValidator> authorizationValidator,
final Errors errorHandler,
final PullQueryExecutor pullQueryExecutor
final PullQueryExecutor pullQueryExecutor,
final DenyListPropertyValidator denyListPropertyValidator
) {
this(ksqlConfig,
statementParser,
Expand All @@ -99,7 +100,8 @@ public WSQueryEndpoint(
commandQueueCatchupTimeout,
authorizationValidator,
errorHandler,
pullQueryExecutor
pullQueryExecutor,
denyListPropertyValidator
);
}

Expand All @@ -118,7 +120,8 @@ public WSQueryEndpoint(
final Duration commandQueueCatchupTimeout,
final Optional<KsqlAuthorizationValidator> authorizationValidator,
final Errors errorHandler,
final PullQueryExecutor pullQueryExecutor
final PullQueryExecutor pullQueryExecutor,
final DenyListPropertyValidator denyListPropertyValidator
) {
this.ksqlConfig = Objects.requireNonNull(ksqlConfig, "ksqlConfig");
this.statementParser = Objects.requireNonNull(statementParser, "statementParser");
Expand All @@ -137,9 +140,8 @@ public WSQueryEndpoint(
Objects.requireNonNull(authorizationValidator, "authorizationValidator");
this.errorHandler = Objects.requireNonNull(errorHandler, "errorHandler");
this.pullQueryExecutor = Objects.requireNonNull(pullQueryExecutor, "pullQueryExecutor");

this.denyListPropertyValidator = new DenyListPropertyValidator(
ksqlConfig.getList(KsqlConfig.KSQL_PROPERTIES_OVERRIDES_DENYLIST));
this.denyListPropertyValidator =
Objects.requireNonNull(denyListPropertyValidator, "denyListPropertyValidator");
}

public void executeStreamQuery(final ServerWebSocket webSocket, final MultiMap requestParams,
Expand Down Expand Up @@ -250,7 +252,6 @@ private PreparedStatement<?> parseStatement(final KsqlRequest request) {

private void handleQuery(final RequestContext info, final Query query) {
final Map<String, Object> clientLocalProperties = info.request.getConfigOverrides();
denyListPropertyValidator.validateAll(clientLocalProperties);

final WebSocketSubscriber<StreamedRow> streamSubscriber =
new WebSocketSubscriber<>(info.websocket);
Expand Down
Loading

0 comments on commit 978c79a

Please sign in to comment.