diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/HeartbeatAgent.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/HeartbeatAgent.java index b6af30d0970e..8eff959468ab 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/HeartbeatAgent.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/HeartbeatAgent.java @@ -223,6 +223,7 @@ private void processHeartbeats(final long windowStart, final long windowEnd) { } return status; }); + notifyListeners(); return; } @@ -253,8 +254,16 @@ private void processHeartbeats(final long windowStart, final long windowEnd) { .withHostAlive(isAlive).withLastStatusUpdateMs(windowEnd)); } } + notifyListeners(); + } + + private void notifyListeners() { for (HostStatusListener listener : hostStatusListeners) { - listener.onHostStatusUpdated(getHostsStatus()); + try { + listener.onHostStatusUpdated(getHostsStatus()); + } catch (Throwable t) { + LOG.error("Error while notifying listener", t); + } } } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/MaximumLagFilter.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/MaximumLagFilter.java index 6dd785312661..6e17765f3753 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/MaximumLagFilter.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/MaximumLagFilter.java @@ -66,8 +66,8 @@ public boolean filter(final KsqlHostInfo hostInfo) { final long offsetLag = Math.max(endOffset - hostLag.getCurrentOffsetPosition(), 0); return offsetLag <= allowedOffsetLag; }) - // If we don't have lag info, we'll be conservative and not include the host - .orElse(false); + // If we don't have lag info, we'll be conservative and include the host + .orElse(true); } /** diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/HeartbeatAgentTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/HeartbeatAgentTest.java index ec64c99934fe..7d7ce40cbb20 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/HeartbeatAgentTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/HeartbeatAgentTest.java @@ -19,6 +19,7 @@ import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableList; @@ -27,6 +28,7 @@ import io.confluent.ksql.rest.server.HeartbeatAgent.Builder; import io.confluent.ksql.rest.server.HeartbeatAgent.CheckHeartbeatService; import io.confluent.ksql.rest.server.HeartbeatAgent.DiscoverClusterService; +import io.confluent.ksql.rest.server.HeartbeatAgent.HostStatusListener; import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.util.HostStatus; import io.confluent.ksql.util.KsqlHostInfo; @@ -39,6 +41,7 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; +import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; @RunWith(MockitoJUnitRunner.class) @@ -55,6 +58,8 @@ public class HeartbeatAgentTest { private ServiceContext serviceContext; @Mock private KsqlEngine ksqlEngine; + @Mock + private HostStatusListener hostStatusListener; private HeartbeatAgent heartbeatAgent; private HostInfo localHostInfo; @@ -63,6 +68,7 @@ public class HeartbeatAgentTest { private KsqlHostInfo remoteHost; private List allMetadata0; private List allMetadata1; + private Map hostsStatus; private static final String LOCALHOST_URL = "http://localhost:8088"; @Before @@ -76,12 +82,12 @@ public void setUp() { heartbeatAgent = builder .heartbeatSendInterval(1) .heartbeatMissedThreshold(2) + .addHostStatusListener(hostStatusListener) .build(ksqlEngine, serviceContext); heartbeatAgent.setLocalAddress(LOCALHOST_URL); - Map hostsStatus = ImmutableMap + hostsStatus = ImmutableMap .of(localHost, new HostStatus(true, 0L), remoteHost, new HostStatus(true, 0L)); - heartbeatAgent.setHostsStatus(hostsStatus); allMetadata0 = ImmutableList.of(streamsMetadata0); allMetadata1 = ImmutableList.of(streamsMetadata1); } @@ -89,6 +95,7 @@ public void setUp() { @Test public void shouldDiscoverServersInCluster() { // Given: + heartbeatAgent.setHostsStatus(hostsStatus); when(ksqlEngine.getPersistentQueries()).thenReturn(ImmutableList.of(query0, query1)); when(query0.getAllMetadata()).thenReturn(allMetadata0); @@ -106,11 +113,30 @@ public void shouldDiscoverServersInCluster() { assertThat(heartbeatAgent.getHostsStatus(), hasKey(remoteHost)); } + @Test + public void localhostOnlyShouldMarkServerAsUp() { + // Given: + long windowStart = 0; + long windowEnd = 5; + hostsStatus = ImmutableMap.of(localHost, new HostStatus(true, 0L)); + heartbeatAgent.setHostsStatus(hostsStatus); + CheckHeartbeatService processService = heartbeatAgent.new CheckHeartbeatService(); + + // When: + processService.runWithWindow(windowStart, windowEnd); + + // Then: + assertThat(heartbeatAgent.getHostsStatus().entrySet(), hasSize(1)); + assertThat(heartbeatAgent.getHostsStatus().get(localHost).isHostAlive(), is(true)); + verify(hostStatusListener).onHostStatusUpdated(Mockito.eq(heartbeatAgent.getHostsStatus())); + } + @Test public void shouldMarkServerAsUpNoMissingHeartbeat() { // Given: long windowStart = 0; long windowEnd = 5; + heartbeatAgent.setHostsStatus(hostsStatus); heartbeatAgent.receiveHeartbeat(remoteHost, 0L); heartbeatAgent.receiveHeartbeat(remoteHost, 1L); heartbeatAgent.receiveHeartbeat(remoteHost, 2L); @@ -125,6 +151,7 @@ public void shouldMarkServerAsUpNoMissingHeartbeat() { // Then: assertThat(heartbeatAgent.getHostsStatus().entrySet(), hasSize(2)); assertThat(heartbeatAgent.getHostsStatus().get(remoteHost).isHostAlive(), is(true)); + verify(hostStatusListener).onHostStatusUpdated(Mockito.eq(heartbeatAgent.getHostsStatus())); } @Test @@ -132,6 +159,7 @@ public void shouldMarkServerAsUpMissOneHeartbeat() { // Given: long windowStart = 1; long windowEnd = 10; + heartbeatAgent.setHostsStatus(hostsStatus); heartbeatAgent.receiveHeartbeat(remoteHost, 0L); heartbeatAgent.receiveHeartbeat(remoteHost, 2L); heartbeatAgent.receiveHeartbeat(remoteHost, 4L); @@ -146,6 +174,7 @@ public void shouldMarkServerAsUpMissOneHeartbeat() { // Then: assertThat(heartbeatAgent.getHostsStatus().entrySet(), hasSize(2)); assertThat(heartbeatAgent.getHostsStatus().get(remoteHost).isHostAlive(), is(true)); + verify(hostStatusListener).onHostStatusUpdated(Mockito.eq(heartbeatAgent.getHostsStatus())); } @Test @@ -153,6 +182,7 @@ public void shouldMarkServerAsUpMissAtBeginning() { // Given: long windowStart = 0; long windowEnd = 10; + heartbeatAgent.setHostsStatus(hostsStatus); heartbeatAgent.receiveHeartbeat(remoteHost, 8L); CheckHeartbeatService processService = heartbeatAgent.new CheckHeartbeatService(); @@ -162,6 +192,7 @@ public void shouldMarkServerAsUpMissAtBeginning() { // Then: assertThat(heartbeatAgent.getHostsStatus().entrySet(), hasSize(2)); assertThat(heartbeatAgent.getHostsStatus().get(remoteHost).isHostAlive(), is(true)); + verify(hostStatusListener).onHostStatusUpdated(Mockito.eq(heartbeatAgent.getHostsStatus())); } @Test @@ -169,6 +200,7 @@ public void shouldMarkServerAsUpMissInterleaved() { // Given: long windowStart = 0; long windowEnd = 10; + heartbeatAgent.setHostsStatus(hostsStatus); heartbeatAgent.receiveHeartbeat(remoteHost, 0L); heartbeatAgent.receiveHeartbeat(remoteHost, 2L); heartbeatAgent.receiveHeartbeat(remoteHost, 5L); @@ -181,6 +213,7 @@ public void shouldMarkServerAsUpMissInterleaved() { // Then: assertThat(heartbeatAgent.getHostsStatus().entrySet(), hasSize(2)); assertThat(heartbeatAgent.getHostsStatus().get(remoteHost).isHostAlive(), is(true)); + verify(hostStatusListener).onHostStatusUpdated(Mockito.eq(heartbeatAgent.getHostsStatus())); } @Test @@ -188,6 +221,7 @@ public void shouldMarkServerAsUpOutOfOrderHeartbeats() { // Given: long windowStart = 0; long windowEnd = 10; + heartbeatAgent.setHostsStatus(hostsStatus); heartbeatAgent.receiveHeartbeat(remoteHost, 8L); heartbeatAgent.receiveHeartbeat(remoteHost, 0L); heartbeatAgent.receiveHeartbeat(remoteHost, 5L); @@ -200,6 +234,7 @@ public void shouldMarkServerAsUpOutOfOrderHeartbeats() { // Then: assertThat(heartbeatAgent.getHostsStatus().entrySet(), hasSize(2)); assertThat(heartbeatAgent.getHostsStatus().get(remoteHost).isHostAlive(), is(true)); + verify(hostStatusListener).onHostStatusUpdated(Mockito.eq(heartbeatAgent.getHostsStatus())); } @Test @@ -207,6 +242,7 @@ public void shouldMarkServerAsDownMissAtEnd() { // Given: long windowStart = 0; long windowEnd = 10; + heartbeatAgent.setHostsStatus(hostsStatus); heartbeatAgent.receiveHeartbeat(remoteHost, 0L); heartbeatAgent.receiveHeartbeat(remoteHost, 2L); heartbeatAgent.receiveHeartbeat(remoteHost, 4L); @@ -220,6 +256,7 @@ public void shouldMarkServerAsDownMissAtEnd() { // Then: assertThat(heartbeatAgent.getHostsStatus().entrySet(), hasSize(2)); assertThat(heartbeatAgent.getHostsStatus().get(remoteHost).isHostAlive(), is(false)); + verify(hostStatusListener).onHostStatusUpdated(Mockito.eq(heartbeatAgent.getHostsStatus())); } @Test @@ -227,6 +264,7 @@ public void shouldMarkServerAsDownIgnoreHeartbeatsOutOfWindow() { // Given: long windowStart = 5; long windowEnd = 8; + heartbeatAgent.setHostsStatus(hostsStatus); heartbeatAgent.receiveHeartbeat(remoteHost, 0L); heartbeatAgent.receiveHeartbeat(remoteHost, 1L); heartbeatAgent.receiveHeartbeat(remoteHost, 2L); @@ -242,6 +280,7 @@ public void shouldMarkServerAsDownIgnoreHeartbeatsOutOfWindow() { // Then: assertThat(heartbeatAgent.getHostsStatus().entrySet(), hasSize(2)); assertThat(heartbeatAgent.getHostsStatus().get(remoteHost).isHostAlive(), is(false)); + verify(hostStatusListener).onHostStatusUpdated(Mockito.eq(heartbeatAgent.getHostsStatus())); } @Test @@ -249,6 +288,7 @@ public void shouldMarkServerAsDownOutOfOrderHeartbeats() { // Given: long windowStart = 5; long windowEnd = 8; + heartbeatAgent.setHostsStatus(hostsStatus); heartbeatAgent.receiveHeartbeat(remoteHost, 10L); heartbeatAgent.receiveHeartbeat(remoteHost, 9L); heartbeatAgent.receiveHeartbeat(remoteHost, 0L); @@ -264,5 +304,6 @@ public void shouldMarkServerAsDownOutOfOrderHeartbeats() { // Then: assertThat(heartbeatAgent.getHostsStatus().entrySet(), hasSize(2)); assertThat(heartbeatAgent.getHostsStatus().get(remoteHost).isHostAlive(), is(false)); + verify(hostStatusListener).onHostStatusUpdated(Mockito.eq(heartbeatAgent.getHostsStatus())); } } diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/MaximumLagFilterTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/MaximumLagFilterTest.java index d9ca4ada8001..5caf3ea1ee21 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/MaximumLagFilterTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/MaximumLagFilterTest.java @@ -94,7 +94,7 @@ public void filter_hostNoLag() { PARTITION).get(); // Then: - assertFalse(filter.filter(HOST1)); + assertTrue(filter.filter(HOST1)); } @Test