Skip to content

Commit

Permalink
fix test
Browse files Browse the repository at this point in the history
  • Loading branch information
stevenpyzhang committed May 13, 2022
1 parent f4fb120 commit 9c12741
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -627,9 +627,7 @@ private SharedKafkaStreamsRuntime getKafkaStreamsInstance(
metricCollectors,
config.getConfig(true),
processingLogContext
),
metricCollectors.getMetrics(),
ksqlConfig.getStringAsMap(KsqlConfig.KSQL_CUSTOM_METRICS_TAGS)
)
);
} else {
stream = new SandboxedSharedKafkaStreamsRuntimeImpl(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KafkaStreams.StateListener;
import org.apache.kafka.streams.StreamsConfig;
Expand All @@ -55,9 +54,7 @@ public SharedKafkaStreamsRuntimeImpl(final KafkaStreamsBuilder kafkaStreamsBuild
final QueryErrorClassifier errorClassifier,
final int maxQueryErrorsQueueSize,
final long shutdownTimeoutConfig,
final Map<String, Object> streamsProperties,
final Metrics metrics,
final Map<String, String> metricsTags) {
final Map<String, Object> streamsProperties) {
super(
kafkaStreamsBuilder,
streamsProperties
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
Expand Down Expand Up @@ -47,6 +48,7 @@
import java.util.Optional;

import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KafkaStreams.State;
import org.apache.kafka.streams.Topology;
Expand Down Expand Up @@ -101,6 +103,8 @@ public class PersistentQueryMetadataTest {
private Metrics metrics;
@Mock
private MeteredProcessingLoggerFactory processingLoggerFactory;
@Mock
private Sensor sensor;

private PersistentQueryMetadata query;

Expand All @@ -111,6 +115,7 @@ public void setUp() {
when(materializationProviderBuilder.apply(kafkaStreams, topology))
.thenReturn(Optional.of(materializationProvider));
when(kafkaStreams.state()).thenReturn(State.NOT_RUNNING);
when(metrics.sensor(anyString())).thenReturn(sensor);

query = new PersistentQueryMetadataImpl(
KsqlConstants.PersistentQueryType.CREATE_AS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.confluent.ksql.util;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verifyNoMoreInteractions;
Expand All @@ -40,6 +41,7 @@
import java.util.function.Consumer;

import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.Topology;
import org.junit.Before;
Expand Down Expand Up @@ -93,6 +95,8 @@ public class SandboxedPersistentQueryMetadataImplTest {
private MeteredProcessingLoggerFactory processingLoggerFactory;
@Mock
private Metrics metrics;
@Mock
private Sensor sensor;

private SandboxedPersistentQueryMetadataImpl sandbox;

Expand All @@ -102,6 +106,7 @@ public void setUp() {
when(physicalSchema.logicalSchema()).thenReturn(mock(LogicalSchema.class));
when(materializationProviderBuilder.apply(kafkaStreams, topology))
.thenReturn(Optional.of(materializationProvider));
when(metrics.sensor(anyString())).thenReturn(sensor);

final PersistentQueryMetadataImpl query = new PersistentQueryMetadataImpl(
KsqlConstants.PersistentQueryType.CREATE_AS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import static org.junit.Assert.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

Expand All @@ -72,7 +73,9 @@ public class SharedKafkaStreamsRuntimeImplTest {
@Mock
private KafkaFuture<Void> future;
@Mock
private Sensor sensor;
private Sensor restartErrorSensorQuery1;
@Mock
private Sensor restartErrorSensorQuery2;

private final QueryId queryId = new QueryId("query-1");
private final QueryId queryId2= new QueryId("query-2");
Expand All @@ -89,11 +92,9 @@ public class SharedKafkaStreamsRuntimeImplTest {
new StreamsException("query down!", new TaskId(0, 0, "not-a-real-query"));

private SharedKafkaStreamsRuntimeImpl sharedKafkaStreamsRuntimeImpl;
private MetricCollectors metricCollectors;

@Before
public void setUp() throws Exception {
metricCollectors = new MetricCollectors();
when(kafkaStreamsBuilder.buildNamedTopologyWrapper(any())).thenReturn(kafkaStreamsNamedTopologyWrapper).thenReturn(kafkaStreamsNamedTopologyWrapper2);
streamProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "runtime");
streamProps.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "old");
Expand All @@ -102,9 +103,7 @@ public void setUp() throws Exception {
queryErrorClassifier,
5,
300_000L,
streamProps,
metricCollectors.getMetrics(),
metricsTags
streamProps
);

when(kafkaStreamsNamedTopologyWrapper.getTopologyByName(any())).thenReturn(Optional.empty());
Expand All @@ -114,6 +113,8 @@ public void setUp() throws Exception {
when(binPackedPersistentQueryMetadata.getTopologyCopy(any())).thenReturn(namedTopology);
when(binPackedPersistentQueryMetadata.getQueryId()).thenReturn(queryId);
when(binPackedPersistentQueryMetadata2.getQueryId()).thenReturn(queryId2);
when(binPackedPersistentQueryMetadata.getRestartMetricsSensor()).thenReturn(Optional.of(restartErrorSensorQuery1));
when(binPackedPersistentQueryMetadata2.getRestartMetricsSensor()).thenReturn(Optional.of(restartErrorSensorQuery2));

sharedKafkaStreamsRuntimeImpl.register(
binPackedPersistentQueryMetadata
Expand Down Expand Up @@ -277,8 +278,8 @@ public void shouldRecordMetricForQuery1WhenError() {
sharedKafkaStreamsRuntimeImpl.uncaughtHandler(query1Exception);

//Then:
assertThat(getMetricValue(queryId.toString(), metricsTags), is(1.0));
assertThat(getMetricValue(queryId2.toString(), metricsTags), is(0.0));
verify(restartErrorSensorQuery1, times(1)).record();
verify(restartErrorSensorQuery2, never()).record();
}

@Test
Expand All @@ -293,25 +294,11 @@ public void shouldRecordMetricForAllQueriesWhenErrorWithNoTask() {
sharedKafkaStreamsRuntimeImpl.start(queryId);
sharedKafkaStreamsRuntimeImpl.start(queryId2);

sharedKafkaStreamsRuntimeImpl.uncaughtHandler(runtimeExceptionWithNoTask);
sharedKafkaStreamsRuntimeImpl.uncaughtHandler(runtimeExceptionWithNoTask);

//Then:
assertThat(getMetricValue(queryId.toString(), metricsTags), is(1.0));
assertThat(getMetricValue(queryId2.toString(), metricsTags), is(1.0));
}

private double getMetricValue(final String queryId, final Map<String, String> metricsTags) {
final Map<String, String> customMetricsTags = new HashMap<>(metricsTags);
customMetricsTags.put("query-id", queryId);
final Metrics metrics = metricCollectors.getMetrics();
return Double.parseDouble(
metrics.metric(
metrics.metricName(
QueryMetadataImpl.QUERY_RESTART_METRIC_NAME,
QueryMetadataImpl.QUERY_RESTART_METRIC_GROUP_NAME,
QueryMetadataImpl.QUERY_RESTART_METRIC_DESCRIPTION,
customMetricsTags)
).metricValue().toString()
);
verify(restartErrorSensorQuery1, times(2)).record();
verify(restartErrorSensorQuery2, times(2)).record();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.function.Consumer;

import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KafkaStreams.State;
import org.apache.kafka.streams.Topology;
Expand Down Expand Up @@ -81,6 +82,8 @@ public class TransientQueryMetadataTest {
private MeteredProcessingLoggerFactory loggerFactory;
@Mock
private Metrics metrics;
@Mock
private Sensor sensor;

private TransientQueryMetadata query;

Expand All @@ -89,6 +92,7 @@ public void setUp() {
when(kafkaStreamsBuilder.build(any(), any())).thenReturn(kafkaStreams);
when(kafkaStreams.state()).thenReturn(State.NOT_RUNNING);
when(sourceNames.toArray()).thenReturn(new SourceName[0]);
when(metrics.sensor(any())).thenReturn(sensor);

query = new TransientQueryMetadata(
SQL,
Expand Down

0 comments on commit 9c12741

Please sign in to comment.