Skip to content

Commit

Permalink
feat: add extra log messages for pull queries (#4909)
Browse files Browse the repository at this point in the history
* extra log messages

* fixed test

* forgot to add
  • Loading branch information
vpapavas authored May 29, 2020
1 parent 0b1162c commit d622ecc
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,8 @@ private void processHeartbeats(final long windowStart, final long windowEnd) {
// 1. remove heartbeats older than window
heartbeats.headMap(windowStart).clear();
copy = new TreeMap<>(heartbeats.subMap(windowStart, true, windowEnd, true));
LOG.debug("Process heartbeats: {} of host: {}, window start: {}, window end: {}",
copy, ksqlHostInfo, windowStart, windowEnd);
}
// 2. count consecutive missed heartbeats and mark as alive or dead
final boolean isAlive = decideStatus(ksqlHostInfo, windowStart, windowEnd, copy);
Expand Down Expand Up @@ -285,6 +287,9 @@ private boolean decideStatus(
}
if (ts - config.heartbeatSendIntervalMs > prev) {
missedCount = (ts - prev - 1) / config.heartbeatSendIntervalMs;
LOG.debug("Host: {} missed: {} heartbeats, current heartbeat: {}, previous heartbeat: {},"
+ " send interval: {}.",
ksqlHostInfo, missedCount, ts, prev, config.heartbeatSendIntervalMs);
} else {
//Reset missed count when we receive heartbeat
missedCount = 0;
Expand All @@ -294,6 +299,9 @@ private boolean decideStatus(
// Check frame from last received heartbeat to window end
if (windowEnd - prev - 1 > 0) {
missedCount = (windowEnd - prev - 1) / config.heartbeatSendIntervalMs;
LOG.debug("Host: {} missed: {} heartbeats, window end: {}, previous heartbeat: {},"
+ " send interval: {}.",
ksqlHostInfo, missedCount, windowEnd, prev, config.heartbeatSendIntervalMs);
}
LOG.debug("Host: {} has {} missing heartbeats", ksqlHostInfo, missedCount);
return (missedCount < config.heartbeatMissedThreshold);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,11 @@ private TableRowsEntity handlePullQuery(
);

if (filteredAndOrderedNodes.isEmpty()) {
throw new MaterializationException("All nodes are dead or exceed max allowed lag.");
LOG.debug("Unable to execute pull query: {}. All nodes are dead or exceed max allowed lag.",
statement.getStatementText());
throw new MaterializationException(String.format(
"Unable to execute pull query %s. All nodes are dead or exceed max allowed lag.",
statement.getStatementText()));
}

// Nodes are ordered by preference: active is first if alive then standby nodes in
Expand All @@ -268,8 +272,8 @@ private TableRowsEntity handlePullQuery(
try {
return routeQuery(node, statement, executionContext, serviceContext, pullQueryContext);
} catch (Exception t) {
LOG.error("Error routing query {} to host {} at timestamp {}",
statement.getStatementText(), node, System.currentTimeMillis(), t);
LOG.debug("Error routing query {} to host {} at timestamp {} with exception {}",
statement.getStatementText(), node, System.currentTimeMillis(), t);
}
}
throw new MaterializationException(String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ public void shouldFilterLaggyServers() throws Exception {
KsqlErrorMessage errorMessage = makePullQueryRequestWithError(clusterFormation.router.right,
sql, LAG_FILTER_25);
Assert.assertEquals(40001, errorMessage.getErrorCode());
Assert.assertEquals("All nodes are dead or exceed max allowed lag.", errorMessage.getMessage());
Assert.assertTrue(errorMessage.getMessage().contains("All nodes are dead or exceed max allowed lag."));
}

private void sendLagReportingMessages(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ public List<KsqlNode> locate(
"KeyQueryMetadata not available for state store %s and key %s", stateStoreName, key));
}

LOG.debug("Handling pull query for key {} in partition {} of state store {}.",
key, metadata.getPartition(), stateStoreName);

final HostInfo activeHost = metadata.getActiveHost();
final Set<HostInfo> standByHosts = metadata.getStandbyHosts();

Expand Down

0 comments on commit d622ecc

Please sign in to comment.