-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Implementation of heartbeat mechanism as part of KLIP-12 #4173
Conversation
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/HeartbeatHandler.java
Outdated
Show resolved
Hide resolved
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/HeartbeatHandler.java
Outdated
Show resolved
Hide resolved
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/HeartbeatHandler.java
Outdated
Show resolved
Hide resolved
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/HeartbeatResource.java
Outdated
Show resolved
Hide resolved
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/HeartbeatHandler.java
Outdated
Show resolved
Hide resolved
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/HeartbeatHandler.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @vpapavas! I think the bones are there, but the complexity and synchronization models can be improved IMO. Let me know if you have any questions :)
ksql-engine/src/main/java/io/confluent/ksql/util/QueryMetadata.java
Outdated
Show resolved
Hide resolved
ksql-execution/src/main/java/io/confluent/ksql/services/SimpleKsqlClient.java
Outdated
Show resolved
Hide resolved
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/HeartbeatHandler.java
Outdated
Show resolved
Hide resolved
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/HeartbeatHandler.java
Outdated
Show resolved
Hide resolved
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/HeartbeatHandler.java
Outdated
Show resolved
Hide resolved
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/HeartbeatHandler.java
Outdated
Show resolved
Hide resolved
ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/HeartbeatRequest.java
Outdated
Show resolved
Hide resolved
ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/HeartbeatRequest.java
Outdated
Show resolved
Hide resolved
ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/HeartbeatRequest.java
Outdated
Show resolved
Hide resolved
HostInfo hostInfo = new HostInfo(hostInfoEntity.getHost(), hostInfoEntity.getPort()); | ||
long timestamp = request.getTimestamp(); | ||
|
||
heartbeatHandler.getReceivedHeartbeats().putIfAbsent(hostInfo, new TreeMap<>()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it might make sense to split the producer/consumer data structures of heartbeats. I would have the heartbeat handler maintain a circular buffer of heartbeats that can have many writers and then whenever you runOneIteration
(see comment about AbstractScheduledService
) you would flush that buffer into the tree map locally. This would probably result in fewer tree rebalances and a much simpler concurrency model. It also makes this method non-blocking (in the case that heartbeats is locked)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I had privately suggested all three design choices and we decided to go with the simplest and measure runtime/throughput and then go from there. The requirements to keep in mind are:
- The heartbeats per hosts needs to be ordered for processing.
- We have multiple producers with out-of-order writes.
- We have one consumer.
The choices in order of (reasoning) complexity are:
- Synchronize with ConcurrentHashMap + TreeMap
- ConcurrentHashMap + ConcurrentNavigableMap
- Use read/write locks and unordered concurrent list implementation. It will get ordered and drained by
processHeartbeat
thread.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed offline - I agree that the criteria for choosing a solution should be simplicity, I just think that option 3 is the simplest. You don't have to worry about concurrency on any complicated (map) data structure. The critical section is really stupid, just append at end and poll from start. It's the same idea as Kafka 😂
EDIT: also my suggestion is not to bother with R/W locks - appending to a list is fast enough we can just hard lock
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will leave this to @vpapavas .
The critical section is really stupid, just append at end and poll from start.
No comments. :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tracked in issue #4277.
I feel the first solution might the best because:
With TreeMap
registerHeartbeat: lock, put into TreeMap, unlock (critical section takes O(logN))
processHeartbeat: lock, copy subMap, clear initial map, unlock (critical section takes O(N))
With List
registerHeartbeat: lock, add to List, unlock (critical section takes O(1))
processHeartbeat: lock, order, copy to new list, clear initial list, unlock (critical section takes O(NlogN))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vpapavas three points:
- With the second option, if you're okay with a double-copy, then the critical section becomes
O(N)
. You just drain the list in the critical section and sort it outside of it. - The
N
s aren't necessarily the same - theN
in the list approach is bounded to the number of heartbeats that come in in one interval, theN
in the big TreeMap approach is bounded to all of the heartbeats that you've seen - There's also the frequency argument, hypothetically you're receiving heartbeats way more often than you're registering heartbeats (probably a write-heavy system).
But I digress, I'm not sure perf should be the driving factor here but rather ease of implementation. If you feel that the current implementation is simpler, then we should stick to that. It would be a fun exercise to write them both and compare.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some comments. May be one more spin and we are home!
There are good comments here from @agavra . Lets file issues for these, so we can track them for later, even if don't do this right away?
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/HeartbeatHandler.java
Outdated
Show resolved
Hide resolved
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/HeartbeatHandler.java
Outdated
Show resolved
Hide resolved
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/HeartbeatHandler.java
Outdated
Show resolved
Hide resolved
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/HeartbeatHandler.java
Outdated
Show resolved
Hide resolved
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/HeartbeatHandler.java
Outdated
Show resolved
Hide resolved
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/HeartbeatHandler.java
Outdated
Show resolved
Hide resolved
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/HeartbeatHandler.java
Outdated
Show resolved
Hide resolved
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/HeartbeatHandler.java
Outdated
Show resolved
Hide resolved
ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/HeartbeatRequest.java
Outdated
Show resolved
Hide resolved
ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/HeartbeatRequest.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One real question and bunch of cleanup suggestions.. Great job @vpapavas ! Otherwise LGTM
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java
Outdated
Show resolved
Hide resolved
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java
Outdated
Show resolved
Hide resolved
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java
Outdated
Show resolved
Hide resolved
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/services/DefaultKsqlClient.java
Show resolved
Hide resolved
ksql-rest-client/src/main/java/io/confluent/ksql/rest/client/KsqlTarget.java
Outdated
Show resolved
Hide resolved
final URI remoteUri = buildLocation(localURL, status.getHostInfoEntity().getHost(), | ||
status.getHostInfoEntity().getPort()); | ||
serviceContext.getKsqlClient().makeHeartbeatRequest(remoteUri, localHostInfo, | ||
System.currentTimeMillis()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so the heartbeat timestamp is sent in the system's default time zone.. so the UTC issue above would be a real problem..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aaa good catch!
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/HeartbeatHandler.java
Outdated
Show resolved
Hide resolved
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/HeartbeatHandler.java
Outdated
Show resolved
Hide resolved
prev = ts; | ||
} | ||
// Check frame from last received heartbeat to window end | ||
if (windowEnd - prev - 1 > 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay fair.. I still have some gaps in understanding... The if- check here, seems to check if the difference between the last heartbeat (which can be windowStart or the real last heartbeat) and the windowEnd is non-zero.. I think this will mostly pass, in normal mode of operation right? There will be some non-zero gap between heartbeat and windowEnd, as I can imagine..
The assignment below, resets the missedCount
(presumably since we are interested in consecutive misses only).. I am wondering if thats the right thing to do.. could it be possible that we already accumulated a missedCount in the for loop above and we could reset this to 0? if say heartbeatSendIntervalMs was 10ms and heartbeats arrive perfectly (let say), then windowEnd-prev is say 10ms and we set missedCount = 10-1/10
as 0? even though we detected enough consecutive missed above?
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/HeartbeatHandler.java
Outdated
Show resolved
Hide resolved
c5ce119
to
86fe4c8
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly LGTM - thanks @vpapavas! This is a big step forward in HA :) my comments are all about the code (nothing structural) and a few bugs (I think) that I identified.
ksql-engine/src/main/java/io/confluent/ksql/util/QueryMetadata.java
Outdated
Show resolved
Hide resolved
@@ -0,0 +1,496 @@ | |||
/* | |||
* Copyright 2019 Confluent Inc. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: 2020 🎉 (all of the new files)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The files were created in 2019 :)
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/HeartbeatAgent.java
Show resolved
Hide resolved
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/HeartbeatAgent.java
Outdated
Show resolved
Hide resolved
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/HeartbeatAgent.java
Outdated
Show resolved
Hide resolved
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/ClusterStatusResource.java
Show resolved
Hide resolved
} | ||
|
||
@Test(timeout = 60000) | ||
public void shouldMarkRemoteServerAsUpThenDownThenUp() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can do this in a follow-up PR, but I'm worried that these tests will add a non-trivial amount of time to our testing suite without necessarily adding much coverage. this is, if i'm understanding correctly, trying to cover the algorithm that computes missing intervals. the HeartbeatAgentTest
already does this. so while this test is great during development to make sure that things work, I don't know if we need it in our testing suite
also any test that involves timing is prone to flakiness - can you please run this test at least 50 times and make sure it passes 100% of the time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These tests don't actually check the algorithm for missing heartbeats but rather the whole heartbeating process. It uses the /clusterStatus
endpoint and all three services to check whether servers are reported alive or dead correctly. It tests the interplay of all three services
...-rest-app/src/test/java/io/confluent/ksql/rest/integration/HeartbeatAgentFunctionalTest.java
Outdated
Show resolved
Hide resolved
when(query1.getAllMetadata()).thenReturn(allMetadata1); | ||
when(streamsMetadata1.hostInfo()).thenReturn(remoteHostInfo); | ||
|
||
DiscoverClusterService discoverService = heartbeatAgent.new DiscoverClusterService(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
woah I've never seen this syntax before 😂 that's kinda cool.
if (shouldRetry(readTimeoutMs, e)) { | ||
postAsync(path, jsonEntity, calcReadTimeout(readTimeoutMs)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's usually better to retry in a loop instead of in the exception block. can you use a framework like Retryer
or our RetryUtil
to handle this?
Also even if it eventually succeeds, won't it throw because we still fall through...?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I followed what the prior code did here
fixed tests use application_server config to determine local host address fixed compile issues added extra tests test fixed failing test added debug logging, made critical section smaller addressed almogs comments added return
Description
Initial implementation of heartbeat mechanism as part of #3959
Added two new endpoints,
/heartbeat
used for registering received heartbeats and/clusterStatus
used for reporting current cluster status. All the logic is inHeartbeatHandler
.Determining whether server is UP or Down
Currently, the policy to decide whether a node is UP or DOWN is simplistic and will need to evolve: If a server misses more than
KSQL_HEARTBEAT_MISSED_THRESHOLD_CONFIG
it is marked as DOWN. However, if it receives even one heartbeat before the window closes, it will be marked as UP. So, it may miss all heartbeats except the last and still be marked as UP. We want to update this by counting the received heartbeats as well and having a threshold on them as well. We will update the policy after performing real-world tests.Cluster membership
Currently, the cluster membership is done via the persistent queries
KStreamsMetadata
.Testing done
Unit tests for the resources and functional test for the
HeartbeatHandler
.I have a functional test that has two servers that send heartbeats and check the cluster status response.
Manual testing: Three KSQL servers (
A
,B
andC
) on EC2.Correctness:
A
, see if the other servers (B
,C
) report it as down by checking the result ofcurl -sX GET "http://localhost:8088/clusterStatus"
. Bring the serverA
up, check again the result of curl to see if the rest of the server see it now as up.A
toB
. Use curl to check ifB
marksA
as down. Check ifC
marksA
as up (sinceA
stills ends traffic toC
). Restore traffic toB
, check ifA
is marked up.Time to detect change in status:
300ms to detect
A
is down. 150 ms to detect it is up again.Reviewer checklist