Skip to content

Commit

Permalink
enforce offsets only for sources
Browse files Browse the repository at this point in the history
  • Loading branch information
jeqo committed May 26, 2020
1 parent f55b82a commit 9c4aeb8
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 60 deletions.
37 changes: 20 additions & 17 deletions ksqldb-cli/src/main/java/io/confluent/ksql/cli/console/Console.java
Original file line number Diff line number Diff line change
Expand Up @@ -620,23 +620,26 @@ private void printSourceDescription(final SourceDescription source) {
+ source.getTopic()
));
writer().println();
ConsumerGroupOffsets consumerGroupOffsets = source.getConsumerGroupOffsets();
writer().println(String.format("%-20s : %s", "Consumer Group", consumerGroupOffsets.getGroupId()));
writer().println(String.format("%-20s : %s", "Kafka topic", consumerGroupOffsets.getKafkaTopic()));
writer().println("");
final Table taskTable = new Table.Builder()
.withColumnHeaders(ImmutableList.of("Partition", "Start Offset", "End Offset", "Offset", "Lag"))
.withRows(consumerGroupOffsets.getOffsets()
.stream()
.map(offset -> ImmutableList.of(
String.valueOf(offset.getPartition()),
String.valueOf(offset.getLogStartOffset()),
String.valueOf(offset.getLogEndOffset()),
String.valueOf(offset.getConsumerOffset()),
String.valueOf(offset.getLogEndOffset() - offset.getConsumerOffset())
)))
.build();
taskTable.print(this);
Optional<ConsumerGroupOffsets> consumerGroupOffsetsOptional = source.getConsumerGroupOffsets();
if (consumerGroupOffsetsOptional.isPresent()) {
ConsumerGroupOffsets consumerGroupOffsets = consumerGroupOffsetsOptional.get();
writer().println(String.format("%-20s : %s", "Consumer Group", consumerGroupOffsets.getGroupId()));
writer().println(String.format("%-20s : %s", "Kafka topic", consumerGroupOffsets.getKafkaTopic()));
writer().println("");
final Table taskTable = new Table.Builder()
.withColumnHeaders(ImmutableList.of("Partition", "Start Offset", "End Offset", "Offset", "Lag"))
.withRows(consumerGroupOffsets.getOffsets()
.stream()
.map(offset -> ImmutableList.of(
String.valueOf(offset.getPartition()),
String.valueOf(offset.getLogStartOffset()),
String.valueOf(offset.getLogEndOffset()),
String.valueOf(offset.getConsumerOffset()),
String.valueOf(offset.getLogEndOffset() - offset.getConsumerOffset())
)))
.build();
taskTable.print(this);
}
}

private void printSourceDescriptionList(final SourceDescriptionList sourceDescriptionList) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@
import io.confluent.ksql.rest.entity.RunningQuery;
import io.confluent.ksql.rest.entity.SchemaInfo;
import io.confluent.ksql.rest.entity.SimpleConnectorInfo;
import io.confluent.ksql.rest.entity.ConsumerGroupOffsets;
import io.confluent.ksql.rest.entity.SourceDescription;
import io.confluent.ksql.rest.entity.SourceDescriptionEntity;
import io.confluent.ksql.rest.entity.SourceInfo;
Expand Down Expand Up @@ -142,7 +141,7 @@ public class ConsoleTest {
2,
1,
"statement",
new ConsumerGroupOffsets("", "kadka-topic", Collections.emptyList()));
Optional.empty());

@Mock
private QueryStatusCount queryStatusCount;
Expand Down Expand Up @@ -501,7 +500,7 @@ public void testPrintSourceDescription() {
1,
1,
"sql statement",
new ConsumerGroupOffsets("", "kadka-topic", Collections.emptyList())),
Optional.empty()),
Collections.emptyList()
)
));
Expand Down Expand Up @@ -1150,7 +1149,7 @@ public void shouldPrintTopicDescribeExtended() {
"kadka-topic",
2, 1,
"sql statement text",
new ConsumerGroupOffsets("", "kadka-topic", Collections.emptyList())),
Optional.empty()),
Collections.emptyList()
))
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import io.confluent.ksql.rest.util.EntityUtil;
import io.confluent.ksql.schema.utils.FormatOptions;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -71,16 +70,10 @@ public static SourceDescription create(
topicDescription.map(td -> td.partitions().size()).orElse(0),
topicDescription.map(td -> td.partitions().get(0).replicas().size()).orElse(0),
dataSource.getSqlExpression(),
topicDescription.map(td ->
consumerGroupDescription.map(
cg -> new ConsumerGroupOffsets(
cg.groupId(),
dataSource.getKafkaTopicName(),
consumerOffsets(td, topicAndStartOffsets, topicAndEndOffsets,
topicAndConsumerOffsets)))
.orElse(
new ConsumerGroupOffsets("", "", Collections.emptyList())))
.orElse(new ConsumerGroupOffsets("", "", Collections.emptyList())));
topicDescription.flatMap(td -> consumerGroupDescription.map(cg ->
new ConsumerGroupOffsets(cg.groupId(), td.name(),
consumerOffsets(td, topicAndStartOffsets, topicAndEndOffsets,
topicAndConsumerOffsets)))));
}

private static List<ConsumerOffset> consumerOffsets(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,9 +217,11 @@ private static SourceDescriptionWithWarnings describeSource(
serviceContext.getTopicClient().describeTopic(dataSource.getKafkaTopicName())
);
String serviceId = "default"; //FIXME not sure how to get this
String queryId = sourceQueries.isEmpty() ? sinkQueries.get(0).getId().toString() : sourceQueries.get(0).getId().toString();
String consumerGroupId = "_confluent-ksql-" + serviceId + "_" + KsqlConfig.KSQL_PERSISTENT_QUERY_NAME_PREFIX_DEFAULT + queryId; //FIXME there should be a better way to build this
try {
if (sourceQueries.isEmpty()){
consumerGroupDescription = Optional.empty();
} else {
String queryId = sourceQueries.get(0).getId().toString();
String consumerGroupId = "_confluent-ksql-" + serviceId + "_" + KsqlConfig.KSQL_PERSISTENT_QUERY_NAME_PREFIX_DEFAULT + queryId; //FIXME there should be a better way to build this
consumerGroupDescription = Optional.of(
serviceContext.getAdminClient().describeConsumerGroups(Collections.singletonList(consumerGroupId)).describedGroups().get(consumerGroupId).get()
);
Expand All @@ -232,10 +234,8 @@ private static SourceDescriptionWithWarnings describeSource(
}
topicAndStartOffsets = serviceContext.getAdminClient().listOffsets(startRequest).all().get();
topicAndEndOffsets = serviceContext.getAdminClient().listOffsets(endRequest).all().get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
} catch (final KafkaException | KafkaResponseGetFailedException e) {
} catch (final KafkaException | KafkaResponseGetFailedException | InterruptedException | ExecutionException e) {
warnings.add(new KsqlWarning("Error from Kafka: " + e.getMessage()));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class SourceDescription {
private final int partitions;
private final int replication;
private final String statement;
private final ConsumerGroupOffsets consumerGroupOffsets;
private final Optional<ConsumerGroupOffsets> consumerGroupOffsets;

// CHECKSTYLE_RULES.OFF: ParameterNumberCheck
@JsonCreator
Expand All @@ -68,7 +68,7 @@ public SourceDescription(
@JsonProperty("partitions") final int partitions,
@JsonProperty("replication") final int replication,
@JsonProperty("statement") final String statement,
@JsonProperty("consumerGroupOffsets") ConsumerGroupOffsets consumerGroupOffsets) {
@JsonProperty("consumerGroupOffsets") final Optional<ConsumerGroupOffsets> consumerGroupOffsets) {
// CHECKSTYLE_RULES.ON: ParameterNumberCheck
this.name = Objects.requireNonNull(name, "name");
this.windowType = Objects.requireNonNull(windowType, "windowType");
Expand Down Expand Up @@ -156,7 +156,7 @@ public String getErrorStats() {
return errorStats;
}

public ConsumerGroupOffsets getConsumerGroupOffsets() {
public Optional<ConsumerGroupOffsets> getConsumerGroupOffsets() {
return consumerGroupOffsets;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ public class SourceDescriptionTest {
private static final String SOME_STRING = "some string";
private static final int SOME_INT = 3;
private static final boolean SOME_BOOL = true;
private static final ConsumerGroupOffsets SOME_CG_OFFSETS =
new ConsumerGroupOffsets("group", "topic", new ArrayList<>());

@Mock
private RunningQuery query1;
Expand All @@ -56,117 +54,117 @@ public void shouldImplementHashCodeAndEqualsProperty() {
SOME_STRING, Optional.empty(), readQueries, writeQueries, fields, SOME_STRING,
SOME_STRING, SOME_STRING, SOME_STRING,
SOME_BOOL, SOME_STRING, SOME_STRING, SOME_STRING, SOME_INT, SOME_INT,
SOME_STRING, SOME_CG_OFFSETS),
SOME_STRING, Optional.empty()),
new SourceDescription(
SOME_STRING, Optional.empty(), readQueries, writeQueries, fields, SOME_STRING,
SOME_STRING, SOME_STRING, SOME_STRING,
SOME_BOOL, SOME_STRING, SOME_STRING, SOME_STRING, SOME_INT, SOME_INT,
SOME_STRING, SOME_CG_OFFSETS)
SOME_STRING, Optional.empty())
)
.addEqualityGroup(
new SourceDescription(
"diff", Optional.of(WindowType.SESSION), readQueries, writeQueries, fields,
SOME_STRING, SOME_STRING, SOME_STRING, SOME_STRING,
SOME_BOOL, SOME_STRING, SOME_STRING, SOME_STRING, SOME_INT, SOME_INT,
SOME_STRING, SOME_CG_OFFSETS)
SOME_STRING, Optional.empty())
)
.addEqualityGroup(
new SourceDescription(
SOME_STRING, Optional.empty(), ImmutableList.of(), writeQueries, fields,
SOME_STRING, SOME_STRING, SOME_STRING, SOME_STRING,
SOME_BOOL, SOME_STRING, SOME_STRING, SOME_STRING, SOME_INT, SOME_INT,
SOME_STRING, SOME_CG_OFFSETS)
SOME_STRING, Optional.empty())
)
.addEqualityGroup(
new SourceDescription(
SOME_STRING, Optional.empty(), readQueries, ImmutableList.of(), fields,
SOME_STRING, SOME_STRING, SOME_STRING, SOME_STRING,
SOME_BOOL, SOME_STRING, SOME_STRING, SOME_STRING, SOME_INT, SOME_INT,
SOME_STRING, SOME_CG_OFFSETS)
SOME_STRING, Optional.empty())
)
.addEqualityGroup(
new SourceDescription(
SOME_STRING, Optional.empty(), readQueries, writeQueries, ImmutableList.of(),
SOME_STRING, SOME_STRING, SOME_STRING, SOME_STRING,
SOME_BOOL, SOME_STRING, SOME_STRING, SOME_STRING, SOME_INT, SOME_INT,
SOME_STRING, SOME_CG_OFFSETS)
SOME_STRING, Optional.empty())
)
.addEqualityGroup(
new SourceDescription(
SOME_STRING, Optional.empty(), readQueries, writeQueries, fields, "diff",
SOME_STRING, SOME_STRING, SOME_STRING,
SOME_BOOL, SOME_STRING, SOME_STRING, SOME_STRING, SOME_INT, SOME_INT,
SOME_STRING, SOME_CG_OFFSETS)
SOME_STRING, Optional.empty())
)
.addEqualityGroup(
new SourceDescription(
SOME_STRING, Optional.empty(), readQueries, writeQueries, fields, SOME_STRING,
"diff", SOME_STRING, SOME_STRING,
SOME_BOOL, SOME_STRING, SOME_STRING, SOME_STRING, SOME_INT, SOME_INT,
SOME_STRING, SOME_CG_OFFSETS)
SOME_STRING, Optional.empty())
)
.addEqualityGroup(
new SourceDescription(
SOME_STRING, Optional.empty(), readQueries, writeQueries, fields, SOME_STRING,
SOME_STRING, "diff", SOME_STRING,
SOME_BOOL, SOME_STRING, SOME_STRING, SOME_STRING, SOME_INT, SOME_INT,
SOME_STRING, SOME_CG_OFFSETS)
SOME_STRING, Optional.empty())
)
.addEqualityGroup(
new SourceDescription(
SOME_STRING, Optional.empty(), readQueries, writeQueries, fields, SOME_STRING,
SOME_STRING, SOME_STRING, "diff",
SOME_BOOL, SOME_STRING, SOME_STRING, SOME_STRING, SOME_INT, SOME_INT,
SOME_STRING, SOME_CG_OFFSETS)
SOME_STRING, Optional.empty())
)
.addEqualityGroup(
new SourceDescription(
SOME_STRING, Optional.empty(), readQueries, writeQueries, fields, SOME_STRING,
SOME_STRING, SOME_STRING, SOME_STRING,
SOME_BOOL, "diff", SOME_STRING, SOME_STRING, SOME_INT, SOME_INT,
SOME_STRING, SOME_CG_OFFSETS)
SOME_STRING, Optional.empty())
)
.addEqualityGroup(
new SourceDescription(
SOME_STRING, Optional.empty(), readQueries, writeQueries, fields, SOME_STRING,
SOME_STRING, SOME_STRING, SOME_STRING,
!SOME_BOOL, SOME_STRING, SOME_STRING, SOME_STRING, SOME_INT, SOME_INT,
SOME_STRING, SOME_CG_OFFSETS)
SOME_STRING, Optional.empty())
)
.addEqualityGroup(
new SourceDescription(
SOME_STRING, Optional.empty(), readQueries, writeQueries, fields, SOME_STRING,
SOME_STRING, SOME_STRING, SOME_STRING,
SOME_BOOL, SOME_STRING, "diff", SOME_STRING, SOME_INT, SOME_INT,
SOME_STRING, SOME_CG_OFFSETS)
SOME_STRING, Optional.empty())
)
.addEqualityGroup(
new SourceDescription(
SOME_STRING, Optional.empty(), readQueries, writeQueries, fields, SOME_STRING,
SOME_STRING, SOME_STRING, SOME_STRING,
SOME_BOOL, SOME_STRING, SOME_STRING, "diff", SOME_INT, SOME_INT,
SOME_STRING, SOME_CG_OFFSETS)
SOME_STRING, Optional.empty())
)
.addEqualityGroup(
new SourceDescription(
SOME_STRING, Optional.empty(), readQueries, writeQueries, fields, SOME_STRING,
SOME_STRING, SOME_STRING, SOME_STRING,
SOME_BOOL, SOME_STRING, SOME_STRING, SOME_STRING, SOME_INT + 1, SOME_INT,
SOME_STRING, SOME_CG_OFFSETS)
SOME_STRING, Optional.empty())
)
.addEqualityGroup(
new SourceDescription(
SOME_STRING, Optional.empty(), readQueries, writeQueries, fields, SOME_STRING,
SOME_STRING, SOME_STRING, SOME_STRING,
SOME_BOOL, SOME_STRING, SOME_STRING, SOME_STRING, SOME_INT, SOME_INT + 1,
SOME_STRING, SOME_CG_OFFSETS)
SOME_STRING, Optional.empty())
)
.addEqualityGroup(
new SourceDescription(
SOME_STRING, Optional.empty(), readQueries, writeQueries, fields, SOME_STRING,
SOME_STRING, SOME_STRING, SOME_STRING,
SOME_BOOL, SOME_STRING, SOME_STRING, SOME_STRING, SOME_INT, SOME_INT,
"diff", SOME_CG_OFFSETS)
"diff", Optional.empty())
)
.testEquals();
}
Expand Down

0 comments on commit 9c4aeb8

Please sign in to comment.