Skip to content

Commit

Permalink
feat: Implement pull query routing to standbys if active is down (#4398)
Browse files Browse the repository at this point in the history
  • Loading branch information
vpapavas authored Feb 6, 2020
1 parent 3c1114c commit ace23b1
Show file tree
Hide file tree
Showing 49 changed files with 1,943 additions and 470 deletions.
30 changes: 20 additions & 10 deletions ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -167,11 +167,21 @@ public class KsqlConfig extends AbstractConfig {
"Config to enable or disable transient pull queries on a specific KSQL server.";
public static final boolean KSQL_QUERY_PULL_ENABLE_DEFAULT = true;

public static final String KSQL_QUERY_PULL_ROUTING_TIMEOUT_MS_CONFIG =
"ksql.query.pull.routing.timeout.ms";
public static final Long KSQL_QUERY_PULL_ROUTING_TIMEOUT_MS_DEFAULT = 30000L;
public static final String KSQL_QUERY_PULL_ROUTING_TIMEOUT_MS_DOC = "Timeout in milliseconds "
+ "when waiting for the lookup of the owner of a row key";
public static final String KSQL_QUERY_PULL_ENABLE_STANDBY_READS =
"ksql.query.pull.enable.stale.reads";
private static final String KSQL_QUERY_PULL_ENABLE_STANDBY_READS_DOC =
"Config to enable/disable forwarding pull queries to standby hosts when the active is dead. "
+ "This means that stale values may be returned for these queries since standby hosts"
+ "receive updates from the changelog topic (to which the active writes to) "
+ "asynchronously. Turning on this configuration, effectively sacrifices "
+ "consistency for higher availability. "
+ "Possible values are \"true\", \"false\". Setting to \"true\" guarantees high "
+ "availability for pull queries. If set to \"false\", pull queries will fail when"
+ "the active is dead and until a new active is elected. Default value is \"false\". "
+ "For using this functionality, the server must be configured with "
+ "to ksql.streams.num.standby.replicas >= 1";
public static final boolean KSQL_QUERY_PULL_ENABLE_STANDBY_READS_DEFAULT = false;


public static final String KSQL_QUERY_PULL_STREAMSTORE_REBALANCING_TIMEOUT_MS_CONFIG =
"ksql.query.pull.streamsstore.rebalancing.timeout.ms";
Expand Down Expand Up @@ -517,11 +527,11 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
Importance.LOW,
KSQL_QUERY_PULL_ENABLE_DOC
).define(
KSQL_QUERY_PULL_ROUTING_TIMEOUT_MS_CONFIG,
ConfigDef.Type.LONG,
KSQL_QUERY_PULL_ROUTING_TIMEOUT_MS_DEFAULT,
Importance.LOW,
KSQL_QUERY_PULL_ROUTING_TIMEOUT_MS_DOC
KSQL_QUERY_PULL_ENABLE_STANDBY_READS,
Type.BOOLEAN,
KSQL_QUERY_PULL_ENABLE_STANDBY_READS_DEFAULT,
Importance.MEDIUM,
KSQL_QUERY_PULL_ENABLE_STANDBY_READS_DOC
).define(
KSQL_QUERY_PULL_STREAMSTORE_REBALANCING_TIMEOUT_MS_CONFIG,
ConfigDef.Type.LONG,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,18 @@
import com.google.errorprone.annotations.Immutable;
import java.util.Objects;


/**
* Immutable representation of {@link org.apache.kafka.streams.state.HostInfo HostInfo}
* from KStreams.
*/
@Immutable
public class KsqlHost {
public class KsqlHostInfo {

private final String host;
private final int port;

public KsqlHost(final String host, final int port) {
public KsqlHostInfo(final String host, final int port) {
this.host = host;
this.port = port;
}
Expand All @@ -42,8 +43,8 @@ public boolean equals(final Object o) {
return false;
}

final KsqlHost hostInfo = (KsqlHost) o;
return port == hostInfo.port && host.equals(hostInfo.host);
final KsqlHostInfo other = (KsqlHostInfo) o;
return this.host.equals(other.host) && port == other.port;
}

@Override
Expand All @@ -61,6 +62,6 @@ public int port() {

@Override
public String toString() {
return "KsqlHost{host='" + this.host + '\'' + ", port=" + this.port + '}';
return "KsqlHostInfo{host='" + this.host + '\'' + ", port=" + this.port + '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import io.confluent.ksql.rest.entity.KsqlEntityList;
import io.confluent.ksql.rest.entity.LagReportingMessage;
import io.confluent.ksql.rest.entity.StreamedRow;
import io.confluent.ksql.util.KsqlHost;
import io.confluent.ksql.util.KsqlHostInfo;
import java.net.URI;
import java.util.List;

Expand Down Expand Up @@ -52,7 +52,7 @@ public RestResponse<List<StreamedRow>> makeQueryRequest(
@Override
public void makeAsyncHeartbeatRequest(
final URI serverEndPoint,
final KsqlHost host,
final KsqlHostInfo host,
final long timestamp
) {
throw new UnsupportedOperationException("KSQL client is disabled");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import io.confluent.ksql.rest.entity.KsqlEntityList;
import io.confluent.ksql.rest.entity.LagReportingMessage;
import io.confluent.ksql.rest.entity.StreamedRow;
import io.confluent.ksql.util.KsqlHost;
import io.confluent.ksql.util.KsqlHostInfo;
import java.net.URI;
import java.util.List;
import javax.annotation.concurrent.ThreadSafe;
Expand All @@ -46,7 +46,7 @@ RestResponse<List<StreamedRow>> makeQueryRequest(
*/
void makeAsyncHeartbeatRequest(
URI serverEndPoint,
KsqlHost host,
KsqlHostInfo host,
long timestamp
);

Expand All @@ -58,7 +58,8 @@ void makeAsyncHeartbeatRequest(
RestResponse<ClusterStatusResponse> makeClusterStatusRequest(URI serverEndPoint);

/**
* Send lag information to remote Ksql server.
* Send a request to remote Ksql server to inquire to inquire about which state stores the
* remote server maintains as an active and standby.
* @param serverEndPoint the remote destination.
* @param lagReportingMessage the host lag data
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.rest.server;

import io.confluent.ksql.execution.streams.RoutingFilter;
import io.confluent.ksql.util.KsqlHostInfo;
import org.apache.kafka.streams.state.HostInfo;

/**
* Filters for the active host.
*/
public class ActiveHostFilter implements RoutingFilter {

public ActiveHostFilter() {
}

/**
* Returns true if the host is the active host for a particular state store.
* @param activeHost the active host for a particular state store
* @param host The host for which the status is checked
* @param storeName Ignored
* @param partition Ignored
* @return true if the host is the active, false otherwise
*/
@Override
public boolean filter(
final HostInfo activeHost,
final KsqlHostInfo host,
final String storeName,
final int partition) {

return host.host().equals(activeHost.host()) && host.port() == activeHost.port();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.HostStatus;
import io.confluent.ksql.util.KsqlHost;
import io.confluent.ksql.util.KsqlHostInfo;
import io.confluent.ksql.util.PersistentQueryMetadata;
import java.net.URI;
import java.net.URL;
Expand Down Expand Up @@ -80,12 +80,12 @@ public final class HeartbeatAgent {
private final ServiceContext serviceContext;
private final HeartbeatConfig config;
private final List<HostStatusListener> hostStatusListeners;
private final ConcurrentHashMap<KsqlHost, TreeMap<Long, HeartbeatInfo>> receivedHeartbeats;
private final ConcurrentHashMap<KsqlHost, HostStatus> hostsStatus;
private final ConcurrentHashMap<KsqlHostInfo, TreeMap<Long, HeartbeatInfo>> receivedHeartbeats;
private final ConcurrentHashMap<KsqlHostInfo, HostStatus> hostsStatus;
private final ScheduledExecutorService scheduledExecutorService;
private final ServiceManager serviceManager;
private final Clock clock;
private KsqlHost localHost;
private KsqlHostInfo localHost;
private URL localUrl;

public static HeartbeatAgent.Builder builder() {
Expand Down Expand Up @@ -114,7 +114,7 @@ private HeartbeatAgent(final KsqlEngine engine,
* @param hostInfo The host information of the remote Ksql server.
* @param timestamp The timestamp the heartbeat was sent.
*/
public void receiveHeartbeat(final KsqlHost hostInfo, final long timestamp) {
public void receiveHeartbeat(final KsqlHostInfo hostInfo, final long timestamp) {
final TreeMap<Long, HeartbeatInfo> heartbeats = receivedHeartbeats.computeIfAbsent(
hostInfo, key -> new TreeMap<>());
synchronized (heartbeats) {
Expand All @@ -127,12 +127,12 @@ public void receiveHeartbeat(final KsqlHost hostInfo, final long timestamp) {
* Returns the current view of the cluster containing all hosts discovered (whether alive or dead)
* @return status of discovered hosts
*/
public Map<KsqlHost, HostStatus> getHostsStatus() {
public Map<KsqlHostInfo, HostStatus> getHostsStatus() {
return Collections.unmodifiableMap(hostsStatus);
}

@VisibleForTesting
void setHostsStatus(final Map<KsqlHost, HostStatus> status) {
void setHostsStatus(final Map<KsqlHostInfo, HostStatus> status) {
hostsStatus.putAll(status);
}

Expand All @@ -156,7 +156,7 @@ void stopAgent() {

void setLocalAddress(final String applicationServer) {
final HostInfo hostInfo = ServerUtil.parseHostInfo(applicationServer);
this.localHost = new KsqlHost(hostInfo.host(), hostInfo.port());
this.localHost = new KsqlHostInfo(hostInfo.host(), hostInfo.port());
try {
this.localUrl = new URL(applicationServer);
} catch (final Exception e) {
Expand Down Expand Up @@ -226,30 +226,30 @@ private void processHeartbeats(final long windowStart, final long windowEnd) {
return;
}

for (Entry<KsqlHost, HostStatus> hostEntry: hostsStatus.entrySet()) {
final KsqlHost ksqlHost = hostEntry.getKey();
for (Entry<KsqlHostInfo, HostStatus> hostEntry: hostsStatus.entrySet()) {
final KsqlHostInfo ksqlHostInfo = hostEntry.getKey();
final HostStatus hostStatus = hostEntry.getValue();
if (ksqlHost.equals(localHost)) {
if (ksqlHostInfo.equals(localHost)) {
continue;
}
final TreeMap<Long, HeartbeatInfo> heartbeats = receivedHeartbeats.get(ksqlHost);
final TreeMap<Long, HeartbeatInfo> heartbeats = receivedHeartbeats.get(ksqlHostInfo);
//For previously discovered hosts, if they have not received any heartbeats, mark them dead
if (heartbeats == null || heartbeats.isEmpty()) {
hostsStatus.computeIfPresent(ksqlHost, (host, status) -> status.withHostAlive(false));
hostsStatus.computeIfPresent(ksqlHostInfo, (host, status) -> status.withHostAlive(false));
} else {
final TreeMap<Long, HeartbeatInfo> copy;
synchronized (heartbeats) {
LOG.debug("Process heartbeats: {} of host: {}", heartbeats, ksqlHost);
LOG.debug("Process heartbeats: {} of host: {}", heartbeats, ksqlHostInfo);
// 1. remove heartbeats older than window
heartbeats.headMap(windowStart).clear();
copy = new TreeMap<>(heartbeats.subMap(windowStart, true, windowEnd, true));
}
// 2. count consecutive missed heartbeats and mark as alive or dead
final boolean isAlive = decideStatus(ksqlHost, windowStart, windowEnd, copy);
final boolean isAlive = decideStatus(ksqlHostInfo, windowStart, windowEnd, copy);
if (!isAlive) {
LOG.info("Host: {} marked as dead.", ksqlHost);
LOG.info("Host: {} marked as dead.", ksqlHostInfo);
}
hostsStatus.computeIfPresent(ksqlHost, (host, status) -> status
hostsStatus.computeIfPresent(ksqlHostInfo, (host, status) -> status
.withHostAlive(isAlive).withLastStatusUpdateMs(windowEnd));
}
}
Expand All @@ -259,7 +259,7 @@ private void processHeartbeats(final long windowStart, final long windowEnd) {
}

private boolean decideStatus(
final KsqlHost ksqlHost, final long windowStart, final long windowEnd,
final KsqlHostInfo ksqlHostInfo, final long windowStart, final long windowEnd,
final TreeMap<Long, HeartbeatInfo> heartbeats
) {
long missedCount = 0;
Expand Down Expand Up @@ -289,7 +289,7 @@ private boolean decideStatus(
if (windowEnd - prev - 1 > 0) {
missedCount = (windowEnd - prev - 1) / config.heartbeatSendIntervalMs;
}
LOG.debug("Host: {} has {} missing heartbeats", ksqlHost, missedCount);
LOG.debug("Host: {} has {} missing heartbeats", ksqlHostInfo, missedCount);
return (missedCount < config.heartbeatMissedThreshold);
}
}
Expand All @@ -306,8 +306,8 @@ class SendHeartbeatService extends AbstractScheduledService {

@Override
protected void runOneIteration() {
for (Entry<KsqlHost, HostStatus> hostStatusEntry: hostsStatus.entrySet()) {
final KsqlHost remoteHost = hostStatusEntry.getKey();
for (Entry<KsqlHostInfo, HostStatus> hostStatusEntry: hostsStatus.entrySet()) {
final KsqlHostInfo remoteHost = hostStatusEntry.getKey();
try {
if (!remoteHost.equals(localHost)) {
final URI remoteUri = ServerUtil.buildRemoteUri(
Expand Down Expand Up @@ -364,7 +364,7 @@ protected void runOneIteration() {
// Only add to map if it is the first time it is discovered. Design decision to
// optimistically consider every newly discovered server as alive to avoid situations of
// unavailability until the heartbeating kicks in.
final KsqlHost host = new KsqlHost(hostInfo.host(), hostInfo.port());
final KsqlHostInfo host = new KsqlHostInfo(hostInfo.host(), hostInfo.port());
hostsStatus.computeIfAbsent(host, key -> new HostStatus(true, clock.millis()));
}
} catch (Throwable t) {
Expand Down Expand Up @@ -492,6 +492,6 @@ public interface HostStatusListener {
* Call when the map of host statuses are updated
* @param hostsStatusMap The new host status map
*/
void onHostStatusUpdated(Map<KsqlHost, HostStatus> hostsStatusMap);
void onHostStatusUpdated(Map<KsqlHostInfo, HostStatus> hostsStatusMap);
}
}
Loading

0 comments on commit ace23b1

Please sign in to comment.