From 9207057959b3712492da7d8caa4ade3edea861d0 Mon Sep 17 00:00:00 2001 From: "A. Sophie Blee-Goldman" Date: Wed, 26 Jan 2022 03:11:25 -0800 Subject: [PATCH] fix: get ksql compiling again by removing all usages of the internal StreamsMetadataImpl.NOT_AVAILABLE variable which was just deleted upstream in AK --- .../ksql/physical/scalablepush/locator/AllHostsLocator.java | 1 - .../physical/scalablepush/locator/AllHostsLocatorTest.java | 5 ++--- .../ksql/rest/server/resources/ClusterStatusResource.java | 4 +--- .../io/confluent/ksql/rest/util/DiscoverRemoteHostsUtil.java | 2 -- 4 files changed, 3 insertions(+), 9 deletions(-) diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/locator/AllHostsLocator.java b/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/locator/AllHostsLocator.java index b5c66f19460a..afcef03f17d0 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/locator/AllHostsLocator.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/locator/AllHostsLocator.java @@ -62,7 +62,6 @@ public List locate() { .map(QueryMetadata::getAllMetadata) .filter(Objects::nonNull) .flatMap(Collection::stream) - .filter(streamsMetadata -> !(streamsMetadata.equals(StreamsMetadataImpl.NOT_AVAILABLE))) .map(StreamsMetadata::hostInfo) .map(hi -> new Node(isLocalhost(hi), buildLocation(hi))) .distinct() diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/physical/scalablepush/locator/AllHostsLocatorTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/physical/scalablepush/locator/AllHostsLocatorTest.java index 5fb899c9a4a1..2b499e806198 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/physical/scalablepush/locator/AllHostsLocatorTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/physical/scalablepush/locator/AllHostsLocatorTest.java @@ -9,6 +9,7 @@ import io.confluent.ksql.util.PersistentQueryMetadata; import java.net.MalformedURLException; import java.net.URL; +import java.util.Collections; import java.util.List; import org.apache.kafka.streams.StreamsMetadata; import org.apache.kafka.streams.KafkaStreams.State; @@ -18,8 +19,6 @@ import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; -import static org.apache.kafka.streams.state.internals.StreamsMetadataImpl.NOT_AVAILABLE; - @RunWith(MockitoJUnitRunner.class) public class AllHostsLocatorTest { @@ -44,7 +43,7 @@ public void shouldLocate() throws MalformedURLException { when(metadata1.getAllMetadata()) .thenReturn(ImmutableList.of(streamsMetadata1, streamsMetadata2)); when(metadata2.getAllMetadata()) - .thenReturn(ImmutableList.of(streamsMetadata3, NOT_AVAILABLE)); + .thenReturn(Collections.emptyList()); when(streamsMetadata1.hostInfo()) .thenReturn(new HostInfo("abc", 101), new HostInfo("localhost", 8088)); when(streamsMetadata2.hostInfo()).thenReturn(new HostInfo("localhost", 8088)); diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/ClusterStatusResource.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/ClusterStatusResource.java index ff6f65604ff7..9706be376aac 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/ClusterStatusResource.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/ClusterStatusResource.java @@ -38,7 +38,6 @@ import java.util.stream.Collectors; import org.apache.kafka.streams.StreamsMetadata; import org.apache.kafka.streams.state.HostInfo; -import org.apache.kafka.streams.state.internals.StreamsMetadataImpl; /** * Endpoint that reports the view of the cluster that this server has. @@ -103,8 +102,7 @@ private Map getActiveStandbyInformation( final Map perQueryMap = new HashMap<>(); for (PersistentQueryMetadata persistentQueryMetadata: engine.getPersistentQueries()) { for (StreamsMetadata streamsMetadata: persistentQueryMetadata.getAllMetadata()) { - if (streamsMetadata.equals(StreamsMetadataImpl.NOT_AVAILABLE) - || !streamsMetadata.hostInfo().equals(asHostInfo(ksqlHostInfo))) { + if (streamsMetadata.hostInfo().equals(asHostInfo(ksqlHostInfo))) { continue; } final QueryIdAndStreamsMetadata queryIdAndStreamsMetadata = new QueryIdAndStreamsMetadata( diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/util/DiscoverRemoteHostsUtil.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/util/DiscoverRemoteHostsUtil.java index 3f51408fdad7..7c607e4fcca3 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/util/DiscoverRemoteHostsUtil.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/util/DiscoverRemoteHostsUtil.java @@ -25,7 +25,6 @@ import java.util.stream.Collectors; import org.apache.kafka.streams.StreamsMetadata; import org.apache.kafka.streams.state.HostInfo; -import org.apache.kafka.streams.state.internals.StreamsMetadataImpl; public final class DiscoverRemoteHostsUtil { @@ -43,7 +42,6 @@ public static Set getRemoteHosts( .map(QueryMetadata::getAllMetadata) .filter(Objects::nonNull) .flatMap(Collection::stream) - .filter(streamsMetadata -> !(streamsMetadata.equals(StreamsMetadataImpl.NOT_AVAILABLE))) .map(StreamsMetadata::hostInfo) .filter(hostInfo -> !(hostInfo.host().equals(localHost.host()) && hostInfo.port() == (localHost.port())))