Skip to content

Commit

Permalink
Adds test cases for callback
Browse files Browse the repository at this point in the history
  • Loading branch information
AlanConfluent committed Feb 10, 2020
1 parent f90bd2d commit ba8c6c9
Showing 1 changed file with 43 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.google.common.collect.ImmutableList;
Expand All @@ -27,6 +28,7 @@
import io.confluent.ksql.rest.server.HeartbeatAgent.Builder;
import io.confluent.ksql.rest.server.HeartbeatAgent.CheckHeartbeatService;
import io.confluent.ksql.rest.server.HeartbeatAgent.DiscoverClusterService;
import io.confluent.ksql.rest.server.HeartbeatAgent.HostStatusListener;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.HostStatus;
import io.confluent.ksql.util.KsqlHostInfo;
Expand All @@ -39,6 +41,7 @@
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
Expand All @@ -55,6 +58,8 @@ public class HeartbeatAgentTest {
private ServiceContext serviceContext;
@Mock
private KsqlEngine ksqlEngine;
@Mock
private HostStatusListener hostStatusListener;

private HeartbeatAgent heartbeatAgent;
private HostInfo localHostInfo;
Expand All @@ -63,6 +68,7 @@ public class HeartbeatAgentTest {
private KsqlHostInfo remoteHost;
private List<StreamsMetadata> allMetadata0;
private List<StreamsMetadata> allMetadata1;
private Map<KsqlHostInfo, HostStatus> hostsStatus;
private static final String LOCALHOST_URL = "http://localhost:8088";

@Before
Expand All @@ -76,19 +82,20 @@ public void setUp() {
heartbeatAgent = builder
.heartbeatSendInterval(1)
.heartbeatMissedThreshold(2)
.addHostStatusListener(hostStatusListener)
.build(ksqlEngine, serviceContext);
heartbeatAgent.setLocalAddress(LOCALHOST_URL);
Map<KsqlHostInfo, HostStatus> hostsStatus = ImmutableMap
hostsStatus = ImmutableMap
.of(localHost, new HostStatus(true, 0L),
remoteHost, new HostStatus(true, 0L));
heartbeatAgent.setHostsStatus(hostsStatus);
allMetadata0 = ImmutableList.of(streamsMetadata0);
allMetadata1 = ImmutableList.of(streamsMetadata1);
}

@Test
public void shouldDiscoverServersInCluster() {
// Given:
heartbeatAgent.setHostsStatus(hostsStatus);
when(ksqlEngine.getPersistentQueries()).thenReturn(ImmutableList.of(query0, query1));

when(query0.getAllMetadata()).thenReturn(allMetadata0);
Expand All @@ -106,11 +113,30 @@ public void shouldDiscoverServersInCluster() {
assertThat(heartbeatAgent.getHostsStatus(), hasKey(remoteHost));
}

@Test
public void localhostOnlyShouldMarkServerAsUp() {
// Given:
long windowStart = 0;
long windowEnd = 5;
hostsStatus = ImmutableMap.of(localHost, new HostStatus(true, 0L));
heartbeatAgent.setHostsStatus(hostsStatus);
CheckHeartbeatService processService = heartbeatAgent.new CheckHeartbeatService();

// When:
processService.runWithWindow(windowStart, windowEnd);

// Then:
assertThat(heartbeatAgent.getHostsStatus().entrySet(), hasSize(1));
assertThat(heartbeatAgent.getHostsStatus().get(localHost).isHostAlive(), is(true));
verify(hostStatusListener).onHostStatusUpdated(Mockito.eq(heartbeatAgent.getHostsStatus()));
}

@Test
public void shouldMarkServerAsUpNoMissingHeartbeat() {
// Given:
long windowStart = 0;
long windowEnd = 5;
heartbeatAgent.setHostsStatus(hostsStatus);
heartbeatAgent.receiveHeartbeat(remoteHost, 0L);
heartbeatAgent.receiveHeartbeat(remoteHost, 1L);
heartbeatAgent.receiveHeartbeat(remoteHost, 2L);
Expand All @@ -125,13 +151,15 @@ public void shouldMarkServerAsUpNoMissingHeartbeat() {
// Then:
assertThat(heartbeatAgent.getHostsStatus().entrySet(), hasSize(2));
assertThat(heartbeatAgent.getHostsStatus().get(remoteHost).isHostAlive(), is(true));
verify(hostStatusListener).onHostStatusUpdated(Mockito.eq(heartbeatAgent.getHostsStatus()));
}

@Test
public void shouldMarkServerAsUpMissOneHeartbeat() {
// Given:
long windowStart = 1;
long windowEnd = 10;
heartbeatAgent.setHostsStatus(hostsStatus);
heartbeatAgent.receiveHeartbeat(remoteHost, 0L);
heartbeatAgent.receiveHeartbeat(remoteHost, 2L);
heartbeatAgent.receiveHeartbeat(remoteHost, 4L);
Expand All @@ -146,13 +174,15 @@ public void shouldMarkServerAsUpMissOneHeartbeat() {
// Then:
assertThat(heartbeatAgent.getHostsStatus().entrySet(), hasSize(2));
assertThat(heartbeatAgent.getHostsStatus().get(remoteHost).isHostAlive(), is(true));
verify(hostStatusListener).onHostStatusUpdated(Mockito.eq(heartbeatAgent.getHostsStatus()));
}

@Test
public void shouldMarkServerAsUpMissAtBeginning() {
// Given:
long windowStart = 0;
long windowEnd = 10;
heartbeatAgent.setHostsStatus(hostsStatus);
heartbeatAgent.receiveHeartbeat(remoteHost, 8L);
CheckHeartbeatService processService = heartbeatAgent.new CheckHeartbeatService();

Expand All @@ -162,13 +192,15 @@ public void shouldMarkServerAsUpMissAtBeginning() {
// Then:
assertThat(heartbeatAgent.getHostsStatus().entrySet(), hasSize(2));
assertThat(heartbeatAgent.getHostsStatus().get(remoteHost).isHostAlive(), is(true));
verify(hostStatusListener).onHostStatusUpdated(Mockito.eq(heartbeatAgent.getHostsStatus()));
}

@Test
public void shouldMarkServerAsUpMissInterleaved() {
// Given:
long windowStart = 0;
long windowEnd = 10;
heartbeatAgent.setHostsStatus(hostsStatus);
heartbeatAgent.receiveHeartbeat(remoteHost, 0L);
heartbeatAgent.receiveHeartbeat(remoteHost, 2L);
heartbeatAgent.receiveHeartbeat(remoteHost, 5L);
Expand All @@ -181,13 +213,15 @@ public void shouldMarkServerAsUpMissInterleaved() {
// Then:
assertThat(heartbeatAgent.getHostsStatus().entrySet(), hasSize(2));
assertThat(heartbeatAgent.getHostsStatus().get(remoteHost).isHostAlive(), is(true));
verify(hostStatusListener).onHostStatusUpdated(Mockito.eq(heartbeatAgent.getHostsStatus()));
}

@Test
public void shouldMarkServerAsUpOutOfOrderHeartbeats() {
// Given:
long windowStart = 0;
long windowEnd = 10;
heartbeatAgent.setHostsStatus(hostsStatus);
heartbeatAgent.receiveHeartbeat(remoteHost, 8L);
heartbeatAgent.receiveHeartbeat(remoteHost, 0L);
heartbeatAgent.receiveHeartbeat(remoteHost, 5L);
Expand All @@ -200,13 +234,15 @@ public void shouldMarkServerAsUpOutOfOrderHeartbeats() {
// Then:
assertThat(heartbeatAgent.getHostsStatus().entrySet(), hasSize(2));
assertThat(heartbeatAgent.getHostsStatus().get(remoteHost).isHostAlive(), is(true));
verify(hostStatusListener).onHostStatusUpdated(Mockito.eq(heartbeatAgent.getHostsStatus()));
}

@Test
public void shouldMarkServerAsDownMissAtEnd() {
// Given:
long windowStart = 0;
long windowEnd = 10;
heartbeatAgent.setHostsStatus(hostsStatus);
heartbeatAgent.receiveHeartbeat(remoteHost, 0L);
heartbeatAgent.receiveHeartbeat(remoteHost, 2L);
heartbeatAgent.receiveHeartbeat(remoteHost, 4L);
Expand All @@ -220,13 +256,15 @@ public void shouldMarkServerAsDownMissAtEnd() {
// Then:
assertThat(heartbeatAgent.getHostsStatus().entrySet(), hasSize(2));
assertThat(heartbeatAgent.getHostsStatus().get(remoteHost).isHostAlive(), is(false));
verify(hostStatusListener).onHostStatusUpdated(Mockito.eq(heartbeatAgent.getHostsStatus()));
}

@Test
public void shouldMarkServerAsDownIgnoreHeartbeatsOutOfWindow() {
// Given:
long windowStart = 5;
long windowEnd = 8;
heartbeatAgent.setHostsStatus(hostsStatus);
heartbeatAgent.receiveHeartbeat(remoteHost, 0L);
heartbeatAgent.receiveHeartbeat(remoteHost, 1L);
heartbeatAgent.receiveHeartbeat(remoteHost, 2L);
Expand All @@ -242,13 +280,15 @@ public void shouldMarkServerAsDownIgnoreHeartbeatsOutOfWindow() {
// Then:
assertThat(heartbeatAgent.getHostsStatus().entrySet(), hasSize(2));
assertThat(heartbeatAgent.getHostsStatus().get(remoteHost).isHostAlive(), is(false));
verify(hostStatusListener).onHostStatusUpdated(Mockito.eq(heartbeatAgent.getHostsStatus()));
}

@Test
public void shouldMarkServerAsDownOutOfOrderHeartbeats() {
// Given:
long windowStart = 5;
long windowEnd = 8;
heartbeatAgent.setHostsStatus(hostsStatus);
heartbeatAgent.receiveHeartbeat(remoteHost, 10L);
heartbeatAgent.receiveHeartbeat(remoteHost, 9L);
heartbeatAgent.receiveHeartbeat(remoteHost, 0L);
Expand All @@ -264,5 +304,6 @@ public void shouldMarkServerAsDownOutOfOrderHeartbeats() {
// Then:
assertThat(heartbeatAgent.getHostsStatus().entrySet(), hasSize(2));
assertThat(heartbeatAgent.getHostsStatus().get(remoteHost).isHostAlive(), is(false));
verify(hostStatusListener).onHostStatusUpdated(Mockito.eq(heartbeatAgent.getHostsStatus()));
}
}

0 comments on commit ba8c6c9

Please sign in to comment.