Skip to content

Commit

Permalink
feat: Adds the ability have internal endpoints listen on ksql.interna…
Browse files Browse the repository at this point in the history
…l.listener (#5212)

* feat: Adds the ability have internal endpoints listen on ksql.internal.listener
  • Loading branch information
AlanConfluent authored May 14, 2020
1 parent 1ad497c commit 46acb73
Show file tree
Hide file tree
Showing 17 changed files with 653 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,20 @@ The corresponding environment variable in the
[ksqlDB Server image](https://hub.docker.com/r/confluentinc/ksqldb-server/)
is `KSQL_LISTENERS`.

### ksql.advertised.listener

This is the url used for inter-node communication. Unlike `listeners` or `ksql.internal.listener`, this configuration doesn't create a listener.

### ksql.internal.listener

The `ksql.internal.listener ` setting controls the address bound for use by internal,
intra-cluster endpoints. These include forwarded pull queries, heartbeating, and lag reporting.

If not set, the system will use `listeners` to expose internal endpoints.

This setting is most often useful in a IaaS environment to separate external-facing
trafic from internal traffic.

### ksql.metrics.tags.custom

A list of tags to be included with emitted
Expand Down
80 changes: 80 additions & 0 deletions docs/operate-and-deploy/installation/server-config/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -310,3 +310,83 @@ Start the ksqlDB server with the configuration file specified.
<path-to-confluent>/bin/ksql-server-start <path-to-confluent>/etc/ksqldb/ksql-server.properties
```

Configuring Listeners for a ksqlDB Cluster
--------------------------------

Multiple hosts are required to scale ksqlDB processing power and to do that, they must
form a cluster. ksqlDB requires all hosts of a cluster to use the same `ksql.service.id`.

```properties
bootstrap.servers=localhost:9092
ksql.service.id=my_application_
```

Once formed, many operations can be run using the client APIs exposed on `listeners`.

In order to utilize pull queries and their high availability functionality,
the hosts within the cluster must be configured to speak to each other via their
internal endpoints. The following describes how to configure listeners depending on
the nature of your environment.

### Listener Uses a Routable IP Address

If you want to configure your listener with an expicit IP address that is reachable
within the cluster, you might do the following:

```properties
listeners=http://192.168.1.101:8088
```

This listener will bind the client and internal endpoints to the given address, which
will also be utilized by inter-node communication.

### Listener Uses a Special or Unroutable Address

It's common to setup a service using special hostnames such as `localhost` or special
addresses such as `0.0.0.0`. Both have special meanings and are not appropriate for
inter-node commmunication. Similarly, if your network is setup such that the IP you
bind isn't routeable, it's also not appropriate. If you choose to use such a listener,
you must set `ksql.advertised.listener` specifying which address to use:

```properties
listeners=http://0.0.0.0:8088
ksql.advertised.listener=http://host1.internal.example.com:8088
```

This listener will bind the client and internal endpoints to the given address. Now
inter-node communication will explicity use the other address
`http://host1.internal.example.com:8088`.

### Listener Binds Client and Internal Listeners Separately

If ksqlDB is being run in an environment where you require more security, you first
want to enable [authentication and other security measures](security.md). Secondly,
you may choose to configure internal endpoints to be bound using a separate listener
from the client endpoints. This allows for port filtering, to make internal endpoints
unreachable beyond the internal network of the cluster, based on the port bound.
Similarly, it allows for the use of dual network interfaces where one is public facing,
and the other is cluster facing. Both measures can used to add additional security to
your configuration. To do this, you can set a separate listener for inter-node
communication `ksql.internal.listener`:

```properties
listeners=https://192.168.1.101:8088
# Port 8099 is available only to the trusted internal network
ksql.internal.listener=https://192.168.1.101:8099
```

Now, `listener` will bind the client endpoints, while `ksql.internal.listener` binds
the internal ones. Also, inter-node communication will now utilize the addess from
`ksql.internal.listener`.


### Listener Binds Client and Internal Listeners Separately With Special Addresses

This combines two of the previous configurations:

```properties
listeners=https://0.0.0.0:8088
# Port 8099 is available only to the trusted internal network
ksql.internal.listener=https://0.0.0.0:8099
ksql.advertised.listener=http://host1.internal.example.com:8099
```
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ private ErrorCodes() {
public static final int ERROR_FAILED_AUTHENTICATION = 9;
public static final int ERROR_FAILED_AUTHORIZATION = 10;
public static final int ERROR_HTTP2_ONLY = 11;
public static final int ERROR_CODE_INTERNAL_ONLY = 12;

public static final int ERROR_CODE_INTERNAL_ERROR = 100;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.api.server;

import com.google.common.collect.ImmutableSet;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.vertx.core.Handler;
import io.vertx.ext.web.RoutingContext;
import java.util.Set;

public class InternalEndpointHandler implements Handler<RoutingContext> {
private static final Set<String> INTERNAL_PATHS = ImmutableSet.of(
"/heartbeat", "/lag");

private final boolean isFromInternalListener;

public InternalEndpointHandler(final boolean isFromInternalListener) {
this.isFromInternalListener = isFromInternalListener;
}


@Override
public void handle(final RoutingContext routingContext) {
if (INTERNAL_PATHS.contains(routingContext.normalisedPath())
&& !isFromInternalListener) {
routingContext.fail(HttpResponseStatus.BAD_REQUEST.code(),
new KsqlApiException("Can't call internal endpoint on public listener",
ErrorCodes.ERROR_CODE_INTERNAL_ONLY));
} else {
routingContext.next();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public class Server {
private final Optional<AuthenticationPlugin> authenticationPlugin;
private final ServerState serverState;
private final List<URI> listeners = new ArrayList<>();
private URI internalListener;
private WorkerExecutor workerExecutor;

public Server(final Vertx vertx, final KsqlRestConfig config, final Endpoints endpoints,
Expand All @@ -95,21 +96,24 @@ public synchronized void start() {
log.debug("Deploying " + options.getInstances() + " instances of server verticle");

final List<URI> listenUris = parseListeners(config);
final Optional<URI> internalListenUri = parseInternalListener(config, listenUris);
final List<URI> allListenUris = new ArrayList<>(listenUris);
internalListenUri.ifPresent(allListenUris::add);

final int instances = config.getInt(KsqlRestConfig.VERTICLE_INSTANCES);

final List<CompletableFuture<String>> deployFutures = new ArrayList<>();

final Map<URI, URI> uris = new ConcurrentHashMap<>();

for (URI listener : listenUris) {
for (URI listener : allListenUris) {
final Optional<Boolean> isInternalListener =
internalListenUri.map(uri -> uri.equals(listener));

for (int i = 0; i < instances; i++) {
final VertxCompletableFuture<String> vcf = new VertxCompletableFuture<>();
final ServerVerticle serverVerticle = new ServerVerticle(endpoints,
createHttpServerOptions(config, listener.getHost(), listener.getPort(),
listener.getScheme().equalsIgnoreCase("https")),
this);
this, isInternalListener);
vertx.deployVerticle(serverVerticle, vcf);
final int index = i;
final CompletableFuture<String> deployFuture = vcf.thenApply(s -> {
Expand Down Expand Up @@ -142,6 +146,9 @@ public synchronized void start() {
for (URI uri : listenUris) {
listeners.add(uris.get(uri));
}
if (internalListenUri.isPresent()) {
internalListener = uris.get(internalListenUri.get());
}
log.info("API server started");
}

Expand Down Expand Up @@ -224,6 +231,10 @@ public synchronized List<URI> getListeners() {
return ImmutableList.copyOf(listeners);
}

public synchronized Optional<URI> getInternalListener() {
return Optional.ofNullable(internalListener);
}

private static HttpServerOptions createHttpServerOptions(final KsqlRestConfig ksqlRestConfig,
final String host, final int port, final boolean tls) {

Expand Down Expand Up @@ -265,8 +276,30 @@ private static HttpServerOptions createHttpServerOptions(final KsqlRestConfig ks

private static List<URI> parseListeners(final KsqlRestConfig config) {
final List<String> sListeners = config.getList(KsqlRestConfig.LISTENERS_CONFIG);
return parseListenerStrings(config, sListeners);
}

private static Optional<URI> parseInternalListener(
final KsqlRestConfig config,
final List<URI> listenUris
) {
if (config.getString(KsqlRestConfig.INTERNAL_LISTENER_CONFIG) == null) {
return Optional.empty();
}
final URI uri = parseListenerStrings(config,
ImmutableList.of(config.getString(KsqlRestConfig.INTERNAL_LISTENER_CONFIG))).get(0);
if (listenUris.contains(uri)) {
return Optional.empty();
} else {
return Optional.of(uri);
}
}

private static List<URI> parseListenerStrings(
final KsqlRestConfig config,
final List<String> stringListeners) {
final List<URI> listeners = new ArrayList<>();
for (String listenerName : sListeners) {
for (String listenerName : stringListeners) {
try {
final URI uri = new URI(listenerName);
final String scheme = uri.getScheme();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
*/
// CHECKSTYLE_RULES.OFF: ClassDataAbstractionCoupling
public class ServerVerticle extends AbstractVerticle {

// CHECKSTYLE_RULES.ON: ClassDataAbstractionCoupling
private static final Logger log = LoggerFactory.getLogger(ServerVerticle.class);

Expand All @@ -62,12 +61,17 @@ public class ServerVerticle extends AbstractVerticle {
private final Server server;
private ConnectionQueryManager connectionQueryManager;
private HttpServer httpServer;
private final Optional<Boolean> isInternalListener;

public ServerVerticle(final Endpoints endpoints, final HttpServerOptions httpServerOptions,
final Server server) {
public ServerVerticle(
final Endpoints endpoints,
final HttpServerOptions httpServerOptions,
final Server server,
final Optional<Boolean> isInternalListener) {
this.endpoints = Objects.requireNonNull(endpoints);
this.httpServerOptions = Objects.requireNonNull(httpServerOptions);
this.server = Objects.requireNonNull(server);
this.isInternalListener = Objects.requireNonNull(isInternalListener);
}

@Override
Expand Down Expand Up @@ -111,6 +115,9 @@ private Router setupRouter() {

router.route().failureHandler(ServerVerticle::failureHandler);

isInternalListener.ifPresent(isInternal ->
router.route().handler(new InternalEndpointHandler(isInternal)));

setupAuthHandlers(router);

router.route().handler(new ServerStateHandler(server.getServerState()));
Expand Down Expand Up @@ -273,5 +280,4 @@ private static void chcHandler(final RoutingContext routingContext) {
routingContext.response().putHeader(HttpHeaders.CONTENT_TYPE.toString(), "application/json")
.end(new JsonObject().toBuffer());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,16 @@ List<URL> getListeners() {
}).collect(Collectors.toList());
}

Optional<URL> getInternalListener() {
return apiServer.getInternalListener().map(uri -> {
try {
return uri.toURL();
} catch (MalformedURLException e) {
throw new KsqlException(e);
}
});
}

public static KsqlRestApplication buildApplication(final KsqlRestConfig restConfig) {
final KsqlConfig ksqlConfig = new KsqlConfig(restConfig.getKsqlConfigProperties());
final Supplier<SchemaRegistryClient> schemaRegistryClientFactory =
Expand Down
Loading

0 comments on commit 46acb73

Please sign in to comment.