Skip to content

Commit

Permalink
addressed some comments
Browse files Browse the repository at this point in the history
  • Loading branch information
vpapavas committed Dec 30, 2019
1 parent 02cc960 commit 6fb716f
Show file tree
Hide file tree
Showing 9 changed files with 256 additions and 140 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.state.StreamsMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -124,8 +126,14 @@ public Topology getTopology() {
return topology;
}

public KafkaStreams getKafkaStreams() {
return kafkaStreams;
public Collection<StreamsMetadata> getAllMetadata() {
Collection<StreamsMetadata> metadata = null;
try {
metadata = kafkaStreams.allMetadata();
} catch (IllegalStateException e) {
LOG.error(e.getMessage());
}
return metadata;
}

public Map<String, Object> getStreamsProperties() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,23 @@ RestResponse<List<StreamedRow>> makeQueryRequest(
String sql
);

/**
* Send heartbeat to remote Ksql server.
* @param serverEndPoint the remote destination.
* @param host the host information of the sender.
* @param timestamp the timestamp the heartbeat is sent.
* @return response indicating whether the request was successful or not.
*/
RestResponse<HeartbeatResponse> makeHeartbeatRequest(
URI serverEndPoint,
HostInfo host,
long timestamp
);

/**
* Send a request to remote Ksql server to inquire about its view of the status of the cluster.
* @param serverEndPoint the remote destination.
* @return response containing the cluster status.
*/
RestResponse<ClusterStatusResponse> makeClusterStatusRequest(URI serverEndPoint);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,19 @@

import com.google.common.annotations.VisibleForTesting;
import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.rest.client.KsqlRestClientException;
import io.confluent.ksql.rest.entity.HostInfoEntity;
import io.confluent.ksql.rest.entity.HostStatusEntity;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.QueryMetadata;
import java.net.URI;
import java.net.URL;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -41,17 +44,23 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* The heartbeat mechanism consists of three periodic tasks running at configurable time intervals:
* 1. Send heartbeats: Broadcast heartbeats to remote Ksql servers.
* 2. Process received heartbeats: Determine which remote server is up or down.
* 3. Cluster membership: Discover the Ksql servers that are part of the cluster.
*/
@SuppressWarnings("checkstyle:ClassDataAbstractionCoupling")
public final class HeartbeatHandler {

private final KsqlEngine engine;
private final ServiceContext serviceContext;
private final ConfigurationParameters configs;
private final ScheduledExecutorService scheduledExecutorService;
private final ConcurrentHashMap<HostInfo, TreeMap<Long, HeartbeatInfo>> receivedHeartbeats;
private final ConcurrentHashMap<String, Boolean> aliveHosts;
private final ConcurrentHashMap<URI, Boolean> remoteHostsURI;
private final ConcurrentHashMap<HostInfo, Boolean> remoteHostsInfo;
private final ConcurrentHashMap<String, TreeMap<Long, HeartbeatInfo>> receivedHeartbeats;
private final ConcurrentHashMap<String, HostStatusEntity> aliveHosts;
private final HostInfo localHostInfo;
private final String localHostString;
private final URL localURL;

private static final Logger log = LoggerFactory.getLogger(HeartbeatHandler.class);
Expand All @@ -68,19 +77,22 @@ private HeartbeatHandler(KsqlEngine engine,
this.engine = requireNonNull(engine, "engine");
this.serviceContext = requireNonNull(serviceContext, "serviceContext");
this.configs = requireNonNull(configs, "configuration parameters");
requireNonNull(localServerURI, "localServerURI");
this.scheduledExecutorService = Executors.newScheduledThreadPool(configs.threadPoolSize);
this.receivedHeartbeats = new ConcurrentHashMap<>();
this.aliveHosts = new ConcurrentHashMap<>();
this.remoteHostsURI = new ConcurrentHashMap<>();
this.remoteHostsInfo = new ConcurrentHashMap<>();
this.localHostInfo = new HostInfo(localServerURI.getHost(), localServerURI.getPort());
this.localHostString = localHostInfo.toString();
try {
this.localURL = localServerURI.toURL();
} catch (final Exception e) {
throw new IllegalStateException("Failed to convert local host URI to URL."
+ " local host URL: " + localServerURI);
}
this.aliveHosts.putIfAbsent(localHostInfo.toString(), true);
this.aliveHosts.putIfAbsent(localHostString, new HostStatusEntity(
new HostInfoEntity(localHostInfo.host(), localHostInfo.port()),
true,
System.currentTimeMillis()));
}

void scheduleHeartbeatRelatedTasks() {
Expand All @@ -100,25 +112,23 @@ void shutDown() {
scheduledExecutorService.shutdown();
}

public ConcurrentHashMap<HostInfo, TreeMap<Long, HeartbeatInfo>> getReceivedHeartbeats() {
return receivedHeartbeats;
}

/**
* Stores the heartbeat received from a remote Ksql server.
* @param hostInfo The host information of the remote Ksql server.
* @param timestamp The timestamp the heartbeat was sent.
*/
public void registerHeartbeat(HostInfo hostInfo, long timestamp) {
receivedHeartbeats.putIfAbsent(hostInfo, new TreeMap<>());
TreeMap<Long, HeartbeatInfo> heartbeats = receivedHeartbeats.get(hostInfo);
String hostKey = hostInfo.toString();
receivedHeartbeats.putIfAbsent(hostKey, new TreeMap<>());
TreeMap<Long, HeartbeatInfo> heartbeats = receivedHeartbeats.get(hostKey);

synchronized (heartbeats) {
heartbeats.put(timestamp, new HeartbeatInfo(timestamp));
}
}

ConcurrentHashMap<URI, Boolean> getRemoteHostsURI() {
return remoteHostsURI;
}

public Map<String, Boolean> getAliveHosts() {
return aliveHosts;
public Map<String, HostStatusEntity> getAliveHosts() {
return Collections.unmodifiableMap(aliveHosts);
}

class CheckHeartbeatTask implements Runnable {
Expand All @@ -137,40 +147,47 @@ void runWithWindow(long windowStart, long windowEnd) {

private void processHeartbeats(final long windowStart, final long windowEnd) {
if (receivedHeartbeats.isEmpty()) {
// If server has been previously discovered, then mark is as down
aliveHosts.forEach((host, status) -> {
if (!host.equals(localHostString)) {
status.setHostAlive(false);
}
});
}

remoteHostsInfo.forEach((host, bool) ->
aliveHosts.compute(host.toString(), (key, val) -> false));
//For previously discovered servers, if they have not received any heartbeats, mark them down
for (String host: aliveHosts.keySet()) {
if (!host.equals(localHostString) && (receivedHeartbeats.get(host) == null
|| receivedHeartbeats.get(host).isEmpty())) {
aliveHosts.get(host).setHostAlive(false);
}
}
for (Entry<HostInfo, TreeMap<Long, HeartbeatInfo>> entry : receivedHeartbeats.entrySet()) {
HostInfo host = entry.getKey();

for (Entry<String, TreeMap<Long, HeartbeatInfo>> entry : receivedHeartbeats.entrySet()) {
String host = entry.getKey();
TreeMap<Long, HeartbeatInfo> heartbeats = entry.getValue();
synchronized (heartbeats) {
// 1. remove heartbeats older than window
while (!heartbeats.isEmpty()) {
Entry<Long, HeartbeatInfo> nestedEntry = heartbeats.firstEntry();
long timestamp = nestedEntry.getKey();
if (timestamp < windowStart) {
heartbeats.pollFirstEntry();
} else {
break;
}
}
heartbeats.headMap(windowStart).clear();
// 2. count consecutive missed heartbeats and mark as up
downPolicy(host, windowStart, windowEnd, heartbeats);
}
}
}


private void downPolicy(final HostInfo host, final long windowStart, final long windowEnd,
private void downPolicy(final String host, final long windowStart, final long windowEnd,
final TreeMap<Long, HeartbeatInfo> heartbeats) {
long missedCount = 0;
long prev = windowStart;
// No heartbeat received in this window
if (heartbeats.isEmpty()) {
aliveHosts.put(host.toString(), false);
aliveHosts.get(host).setHostAlive(false);
return;
}
// We want to count consecutive missed heartbeats and reset the count when we have received
// heartbeats. It's not enough to just count how many heartbeats we missed in the window as a
// server may have missed > THRESHOLD but not consecutive ones which doesn't constitute it
// as down.
for (long ts : heartbeats.keySet()) {

//Don't count heartbeats after window end
Expand All @@ -189,29 +206,48 @@ private void downPolicy(final HostInfo host, final long windowStart, final long
missedCount = (windowEnd - prev - 1) / configs.heartbeatSendIntervalMs;
}

HostStatusEntity status = aliveHosts.get(host);
if (missedCount < configs.heartbeatMissedThreshold) {
aliveHosts.put(host.toString() , true);

status.setHostAlive(true);
status.setLastAliveTsMs(windowEnd);
} else {
aliveHosts.put(host.toString(), false);
status.setHostAlive(false);
}
}
}

/**
* Broadcast heartbeats to remote servers that are alive. This is a blocking operation.
*/
class SendHeartbeatTask implements Runnable {

@Override
public void run() {
//broadcast heartbeat
remoteHostsURI.forEach((uri, bool) -> {
aliveHosts.values().forEach((status) -> {
try {
serviceContext.getKsqlClient().makeHeartbeatRequest(
uri, localHostInfo, System.currentTimeMillis());
} catch (KsqlRestClientException e) {
log.error("Request to server: " + uri + " failed with exception: " + e.getMessage());
if (status.getHostAlive()) {
URI remoteUri = buildLocation(localURL, status.getHostInfoEntity().getHost(),
status.getHostInfoEntity().getPort());
serviceContext.getKsqlClient().makeHeartbeatRequest(remoteUri, localHostInfo,
System.currentTimeMillis());
}
} catch (Exception e) {
log.error("Request to server: " + status.getHostInfoEntity().getHost() + ":"
+ status.getHostInfoEntity().getPort()
+ " failed with exception: " + e.getMessage());
}
});
}

private URI buildLocation(final URL localHost, final String host, final int port) {
try {
return new URL(localHost.getProtocol(), host, port, "/").toURI();
} catch (final Exception e) {
throw new IllegalStateException("Failed to convert remote host info to URL."
+ " remoteInfo: " + host + ":" + port);
}
}
}

class DiscoverClusterTask implements Runnable {
Expand All @@ -225,34 +261,22 @@ public void run() {
}

Set<HostInfo> uniqueHosts = currentQueries.stream()
.map(queryMetadata -> ((QueryMetadata)queryMetadata).getKafkaStreams().allMetadata())
.map(queryMetadata -> ((QueryMetadata)queryMetadata).getAllMetadata())
.filter(Objects::nonNull)
.flatMap(Collection::stream)
.map(StreamsMetadata::hostInfo)
.filter(hostInfo -> !(hostInfo.host().equals(localHostInfo.host())
&& hostInfo.port() == (localHostInfo.port())))
.collect(Collectors.toSet());

for (HostInfo hostInfo : uniqueHosts) {
// First time discovered
if (remoteHostsInfo.putIfAbsent(hostInfo, true) == null) {
URI remoteUri = buildLocation(localURL, hostInfo);
remoteHostsURI.put(remoteUri, true);
aliveHosts.put(hostInfo.toString(), true);
}
}
}

private URI buildLocation(final URL localHost, final HostInfo remoteInfo) {
try {
return new URL(
localHost.getProtocol(),
remoteInfo.host(),
remoteInfo.port(),
"/"
).toURI();
} catch (final Exception e) {
throw new IllegalStateException("Failed to convert remote host info to URL."
+ " remoteInfo: " + remoteInfo);
// First time discovered. Design decision to optimistically consider every newly discovered
// server as alive to avoid situations of unavailability until the cluster gets discovered.
aliveHosts.putIfAbsent(hostInfo.toString(),
new HostStatusEntity(
new HostInfoEntity(hostInfo.host(), hostInfo.port()),
true,
System.currentTimeMillis()));
}
}
}
Expand Down Expand Up @@ -312,7 +336,7 @@ public HeartbeatHandler build(KsqlEngine engine,
}
}

private static class ConfigurationParameters {
static class ConfigurationParameters {
private final int threadPoolSize;
private final long heartbeatSendIntervalMs;
private final long heartbeatCheckIntervalMs;
Expand Down
Loading

0 comments on commit 6fb716f

Please sign in to comment.