Skip to content

Commit

Permalink
fix: replace appId mapping based on QueryExecutor
Browse files Browse the repository at this point in the history
  • Loading branch information
jeqo committed Jun 3, 2020
1 parent c70a36e commit 1ef5cb3
Showing 1 changed file with 31 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.confluent.ksql.parser.tree.ListStreams;
import io.confluent.ksql.parser.tree.ListTables;
import io.confluent.ksql.parser.tree.ShowColumns;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.rest.SessionProperties;
import io.confluent.ksql.rest.entity.KsqlEntity;
import io.confluent.ksql.rest.entity.KsqlWarning;
Expand All @@ -45,6 +46,7 @@
import io.confluent.ksql.util.KsqlConstants;
import io.confluent.ksql.util.KsqlStatementException;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.ReservedInternalTopics;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.LinkedList;
Expand All @@ -69,13 +71,15 @@ private ListSourceExecutor() { }

private static Optional<KsqlEntity> sourceDescriptionList(
final ConfiguredStatement<?> statement,
final SessionProperties sessionProperties,
final KsqlExecutionContext executionContext,
final ServiceContext serviceContext,
final List<? extends DataSource> sources
) {
final List<SourceDescriptionWithWarnings> descriptions = sources.stream()
.map(
s -> describeSource(
sessionProperties,
executionContext,
serviceContext,
s.getName(),
Expand Down Expand Up @@ -104,6 +108,7 @@ public static Optional<KsqlEntity> streams(
if (listStreams.getShowExtended()) {
return sourceDescriptionList(
statement,
sessionProperties,
executionContext,
serviceContext,
ksqlStreams
Expand All @@ -129,7 +134,7 @@ public static Optional<KsqlEntity> tables(
if (listTables.getShowExtended()) {
return sourceDescriptionList(
statement,
executionContext,
sessionProperties, executionContext,
serviceContext,
ksqlTables
);
Expand All @@ -149,7 +154,7 @@ public static Optional<KsqlEntity> columns(
) {
final ShowColumns showColumns = statement.getStatement();
final SourceDescriptionWithWarnings descriptionWithWarnings = describeSource(
executionContext,
sessionProperties, executionContext,
serviceContext,
showColumns.getTable(),
showColumns.isExtended(),
Expand Down Expand Up @@ -187,6 +192,7 @@ private static List<KsqlStream<?>> getSpecificStreams(
}

private static SourceDescriptionWithWarnings describeSource(
final SessionProperties sessionProperties,
final KsqlExecutionContext ksqlEngine,
final ServiceContext serviceContext,
final SourceName name,
Expand Down Expand Up @@ -216,16 +222,21 @@ private static SourceDescriptionWithWarnings describeSource(
topicDescription = Optional.of(
serviceContext.getTopicClient().describeTopic(dataSource.getKafkaTopicName())
);
String serviceId = "default"; //FIXME not sure how to get this
if (sourceQueries.isEmpty()){
consumerGroupDescription = Optional.empty();
} else {
String queryId = sourceQueries.get(0).getId().toString();
String consumerGroupId = "_confluent-ksql-" + serviceId + "_" + KsqlConfig.KSQL_PERSISTENT_QUERY_NAME_PREFIX_DEFAULT + queryId; //FIXME there should be a better way to build this
QueryId queryId = sourceQueries.get(0).getId();
final String persistenceQueryPrefix =
sessionProperties.getMutableScopedProperties().get(KsqlConfig.KSQL_PERSISTENT_QUERY_NAME_PREFIX_CONFIG).toString();
final String applicationId = getQueryApplicationId(
getServiceId(sessionProperties.getMutableScopedProperties()),
persistenceQueryPrefix,
queryId
);
consumerGroupDescription = Optional.of(
serviceContext.getAdminClient().describeConsumerGroups(Collections.singletonList(consumerGroupId)).describedGroups().get(consumerGroupId).get()
serviceContext.getAdminClient().describeConsumerGroups(Collections.singletonList(applicationId)).describedGroups().get(applicationId).get()
);
topicAndConsumerOffsets = serviceContext.getAdminClient().listConsumerGroupOffsets(consumerGroupId).partitionsToOffsetAndMetadata().get();
topicAndConsumerOffsets = serviceContext.getAdminClient().listConsumerGroupOffsets(applicationId).partitionsToOffsetAndMetadata().get();
Map<TopicPartition, OffsetSpec> startRequest = new LinkedHashMap<>();
Map<TopicPartition, OffsetSpec> endRequest = new LinkedHashMap<>();
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry: topicAndConsumerOffsets.entrySet()) {
Expand Down Expand Up @@ -256,6 +267,19 @@ private static SourceDescriptionWithWarnings describeSource(
);
}

private static String getServiceId(
Map<String, Object> mutableScopedProperties) {
return ReservedInternalTopics.KSQL_INTERNAL_TOPIC_PREFIX
+ mutableScopedProperties.get(KsqlConfig.KSQL_SERVICE_ID_CONFIG).toString();
}

private static String getQueryApplicationId(
final String serviceId,
final String queryPrefix,
final QueryId queryId) {
return serviceId + queryPrefix + queryId;
}

private static List<RunningQuery> getQueries(
final KsqlExecutionContext ksqlEngine,
final Predicate<PersistentQueryMetadata> predicate
Expand Down

0 comments on commit 1ef5cb3

Please sign in to comment.