Skip to content

Commit

Permalink
feat: Implementation of heartbeat mechanism as part of KLIP-12 (#4173)
Browse files Browse the repository at this point in the history
* Implementation of heartbeat mechanism
  • Loading branch information
vpapavas authored Jan 16, 2020
1 parent 3946f73 commit 37c1eaa
Show file tree
Hide file tree
Showing 23 changed files with 1,896 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,22 @@

package io.confluent.ksql.util;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.internal.QueryStateListener;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import java.lang.Thread.UncaughtExceptionHandler;
import java.time.Duration;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.state.StreamsMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -129,6 +132,15 @@ public Topology getTopology() {
return topology;
}

public Collection<StreamsMetadata> getAllMetadata() {
try {
return kafkaStreams.allMetadata();
} catch (IllegalStateException e) {
LOG.error(e.getMessage());
}
return ImmutableList.of();
}

public Map<String, Object> getStreamsProperties() {
return streamsProperties;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
package io.confluent.ksql.services;

import io.confluent.ksql.rest.client.RestResponse;
import io.confluent.ksql.rest.entity.ClusterStatusResponse;
import io.confluent.ksql.rest.entity.KsqlEntityList;
import io.confluent.ksql.rest.entity.StreamedRow;
import java.net.URI;
import java.util.List;
import org.apache.kafka.streams.state.HostInfo;

/**
* A KSQL client implementation for use when communication with other nodes is not supported.
Expand All @@ -45,5 +47,19 @@ public RestResponse<List<StreamedRow>> makeQueryRequest(
) {
throw new UnsupportedOperationException("KSQL client is disabled");
}

@Override
public void makeAsyncHeartbeatRequest(
final URI serverEndPoint,
final HostInfo host,
final long timestamp
) {
throw new UnsupportedOperationException("KSQL client is disabled");
}

@Override
public RestResponse<ClusterStatusResponse> makeClusterStatusRequest(final URI serverEndPoint) {
throw new UnsupportedOperationException("KSQL client is disabled");
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@
package io.confluent.ksql.services;

import io.confluent.ksql.rest.client.RestResponse;
import io.confluent.ksql.rest.entity.ClusterStatusResponse;
import io.confluent.ksql.rest.entity.KsqlEntityList;
import io.confluent.ksql.rest.entity.StreamedRow;
import java.net.URI;
import java.util.List;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.kafka.streams.state.HostInfo;

@ThreadSafe
public interface SimpleKsqlClient {
Expand All @@ -34,4 +36,23 @@ RestResponse<List<StreamedRow>> makeQueryRequest(
URI serverEndPoint,
String sql
);

/**
* Send heartbeat to remote Ksql server.
* @param serverEndPoint the remote destination.
* @param host the host information of the sender.
* @param timestamp the timestamp the heartbeat is sent.
*/
void makeAsyncHeartbeatRequest(
URI serverEndPoint,
HostInfo host,
long timestamp
);

/**
* Send a request to remote Ksql server to inquire about its view of the status of the cluster.
* @param serverEndPoint the remote destination.
* @return response containing the cluster status.
*/
RestResponse<ClusterStatusResponse> makeClusterStatusRequest(URI serverEndPoint);
}
Loading

0 comments on commit 37c1eaa

Please sign in to comment.