Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: correctly capture rows returned for scalable push query metrics #8230

Merged
merged 1 commit into from
Oct 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
import io.confluent.ksql.physical.scalablepush.PushQueryPreparer;
import io.confluent.ksql.physical.scalablepush.PushQueryQueuePopulator;
import io.confluent.ksql.physical.scalablepush.PushRouting.PushConnectionsHandle;
import io.confluent.ksql.query.BlockingRowQueue;
import io.confluent.ksql.query.CompletionHandler;
import io.confluent.ksql.query.LimitHandler;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.query.TransientQueryQueue;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.util.KsqlConstants.QuerySourceType;
import io.confluent.ksql.util.KsqlConstants.RoutingNodeType;
Expand All @@ -39,7 +39,7 @@ public class ScalablePushQueryMetadata implements PushQueryMetadata {
private volatile boolean closed = false;
private final LogicalSchema logicalSchema;
private final QueryId queryId;
private final BlockingRowQueue rowQueue;
private final TransientQueryQueue transientQueryQueue;
private final Optional<ScalablePushQueryMetrics> scalablePushQueryMetrics;
private final ResultType resultType;
private final PushQueryQueuePopulator pushQueryQueuePopulator;
Expand All @@ -59,7 +59,7 @@ public class ScalablePushQueryMetadata implements PushQueryMetadata {
public ScalablePushQueryMetadata(
final LogicalSchema logicalSchema,
final QueryId queryId,
final BlockingRowQueue blockingRowQueue,
final TransientQueryQueue transientQueryQueue,
final Optional<ScalablePushQueryMetrics> scalablePushQueryMetrics,
final ResultType resultType,
final PushQueryQueuePopulator pushQueryQueuePopulator,
Expand All @@ -70,7 +70,7 @@ public ScalablePushQueryMetadata(
) {
this.logicalSchema = logicalSchema;
this.queryId = queryId;
this.rowQueue = blockingRowQueue;
this.transientQueryQueue = transientQueryQueue;
this.scalablePushQueryMetrics = scalablePushQueryMetrics;
this.resultType = resultType;
this.pushQueryQueuePopulator = pushQueryQueuePopulator;
Expand Down Expand Up @@ -108,7 +108,7 @@ public void start() {

@Override
public void close() {
rowQueue.close();
transientQueryQueue.close();
startFuture.thenApply(handle -> {
handle.close();
return null;
Expand All @@ -123,18 +123,18 @@ public boolean isRunning() {

@Override
@SuppressFBWarnings(value = "EI_EXPOSE_REP")
public BlockingRowQueue getRowQueue() {
return rowQueue;
public TransientQueryQueue getRowQueue() {
return transientQueryQueue;
}

@Override
public void setLimitHandler(final LimitHandler limitHandler) {
rowQueue.setLimitHandler(limitHandler);
transientQueryQueue.setLimitHandler(limitHandler);
}

@Override
public void setCompletionHandler(final CompletionHandler completionHandler) {
rowQueue.setCompletionHandler(completionHandler);
transientQueryQueue.setCompletionHandler(completionHandler);
}

@Override
Expand Down Expand Up @@ -186,7 +186,7 @@ public RoutingNodeType getRoutingNodeType() {
}

public long getTotalRowsReturned() {
return rowQueue.size();
return transientQueryQueue.getTotalRowsQueued();
}

public long getTotalRowsProcessed() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import io.confluent.ksql.internal.ScalablePushQueryMetrics;
import io.confluent.ksql.physical.scalablepush.PushQueryQueuePopulator;
import io.confluent.ksql.physical.scalablepush.PushRouting.PushConnectionsHandle;
import io.confluent.ksql.query.BlockingRowQueue;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.query.TransientQueryQueue;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.util.PushQueryMetadata.ResultType;
import java.util.Optional;
Expand All @@ -45,7 +45,7 @@ public class ScalablePushQueryMetadataTest {
@Mock
private LogicalSchema logicalSchema;
@Mock
private BlockingRowQueue blockingRowQueue;
private TransientQueryQueue transientQueryQueue;
@Mock
private PushQueryQueuePopulator populator;
@Mock
Expand All @@ -64,7 +64,7 @@ public void setUp() {
query = new ScalablePushQueryMetadata(
logicalSchema,
new QueryId("queryid"),
blockingRowQueue,
transientQueryQueue,
metrics,
ResultType.STREAM,
populator,
Expand Down