Skip to content

Commit

Permalink
fix: remove any rowtime or rowkey columns from query schema (MINOR) (…
Browse files Browse the repository at this point in the history
…Fixes 3039) (#3043)

* fix(3039): explain of transient queries should only return value fields.

Transient queries only stream back the value fields. So the `EXPLAIN` command should only return the value fields, i.e. it should not return `ROWTIME` or `ROWKEY` unless they have been copied into the value schema:

The following _should_ include ROWTIME and ROWKEY:

```SELECT * FROM x;```
```SELECT x, y, ROWTIME, z, ROWKEY FROM x;```

The following should not:

```SELECT x, y, z FROM x;```
  • Loading branch information
big-andy-coates authored Jul 5, 2019
1 parent 29dea47 commit 0346933
Show file tree
Hide file tree
Showing 9 changed files with 236 additions and 117 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1007,6 +1007,6 @@ private static List<FieldInfo> buildTestSchema(final Schema... fieldTypes) {
dataSourceBuilder.field("f_" + idx, fieldTypes[idx]);
}

return EntityUtil.buildSourceSchemaEntity(LogicalSchema.of(dataSourceBuilder.build()));
return EntityUtil.buildSourceSchemaEntity(LogicalSchema.of(dataSourceBuilder.build()), false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,13 @@ public QueryDescription(
private QueryDescription(
final String id,
final QueryMetadata queryMetadata,
final Set<String> sinks) {
final Set<String> sinks,
final boolean valueSchemaOnly
) {
this(
new EntityQueryId(id),
queryMetadata.getStatementString(),
EntityUtil.buildSourceSchemaEntity(queryMetadata.getLogicalSchema()),
EntityUtil.buildSourceSchemaEntity(queryMetadata.getLogicalSchema(), valueSchemaOnly),
queryMetadata.getSourceNames(),
sinks,
queryMetadata.getTopologyDescription(),
Expand All @@ -77,11 +79,15 @@ private QueryDescription(

public static QueryDescription forQueryMetadata(final QueryMetadata queryMetadata) {
if (queryMetadata instanceof PersistentQueryMetadata) {
final PersistentQueryMetadata persistentQuery = (PersistentQueryMetadata) queryMetadata;
return new QueryDescription(
((PersistentQueryMetadata) queryMetadata).getQueryId().getId(), queryMetadata,
((PersistentQueryMetadata) queryMetadata).getSinkNames());
persistentQuery.getQueryId().getId(),
persistentQuery,
persistentQuery.getSinkNames(),
false
);
}
return new QueryDescription("", queryMetadata, Collections.emptySet());
return new QueryDescription("", queryMetadata, Collections.emptySet(), true);
}

public EntityQueryId getId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public SourceDescription(
dataSource.getName(),
readQueries,
writeQueries,
EntityUtil.buildSourceSchemaEntity(dataSource.getSchema()),
EntityUtil.buildSourceSchemaEntity(dataSource.getSchema(), false),
dataSource.getDataSourceType().getKsqlType(),
dataSource.getKeyField().name().orElse(""),
Optional.ofNullable(dataSource.getTimestampExtractionPolicy())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public void onComplete() {
public void onSchema(final LogicalSchema schema) {
try {
session.getBasicRemote().sendText(
mapper.writeValueAsString(EntityUtil.buildSourceSchemaEntity(schema))
mapper.writeValueAsString(EntityUtil.buildSourceSchemaEntity(schema, true))
);
} catch (final IOException e) {
log.error("Error sending schema", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,16 @@ public final class EntityUtil {
private EntityUtil() {
}

public static List<FieldInfo> buildSourceSchemaEntity(final LogicalSchema schema) {
public static List<FieldInfo> buildSourceSchemaEntity(
final LogicalSchema schema,
final boolean valueSchemaOnly
) {

final List<FieldInfo> allFields = new ArrayList<>();
allFields.addAll(getFields(schema.metaFields(), "implicit"));
allFields.addAll(getFields(schema.keyFields(), "key"));
if (!valueSchemaOnly) {
allFields.addAll(getFields(schema.metaFields(), "implicit"));
allFields.addAll(getFields(schema.keyFields(), "key"));
}
allFields.addAll(getFields(schema.valueFields(), "value"));

return allFields;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,13 @@
package io.confluent.ksql.rest.entity;

import static io.confluent.ksql.metastore.model.DataSource.DataSourceType;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.isEmptyString;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.when;

import com.google.common.collect.ImmutableSet;
import io.confluent.ksql.metastore.model.KeyField;
import io.confluent.ksql.metastore.model.KsqlStream;
import io.confluent.ksql.metastore.model.KsqlTopic;
import io.confluent.ksql.physical.LimitHandler;
import io.confluent.ksql.physical.QuerySchemas;
Expand All @@ -32,19 +31,14 @@
import io.confluent.ksql.schema.ksql.PhysicalSchema;
import io.confluent.ksql.schema.ksql.SqlBaseType;
import io.confluent.ksql.serde.SerdeOption;
import io.confluent.ksql.serde.json.KsqlJsonSerdeFactory;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.QueryMetadata;
import io.confluent.ksql.util.TransientQueryMetadata;
import io.confluent.ksql.util.timestamp.MetadataTimestampExtractionPolicy;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.streams.KafkaStreams;
Expand All @@ -59,46 +53,149 @@
@RunWith(MockitoJUnitRunner.class)
public class QueryDescriptionTest {

private static final LogicalSchema SCHEMA = LogicalSchema.of(
private static final LogicalSchema SOME_SCHEMA = LogicalSchema.of(
SchemaBuilder.struct()
.field("field1", Schema.OPTIONAL_INT32_SCHEMA)
.field("field2", Schema.OPTIONAL_STRING_SCHEMA)
.build());

private static final List<FieldInfo> EXPECTED_FIELDS = Arrays.asList(
new FieldInfo("ROWTIME", new SchemaInfo(SqlBaseType.BIGINT, null, null)),
new FieldInfo("ROWKEY", new SchemaInfo(SqlBaseType.STRING, null, null)),
new FieldInfo("field1", new SchemaInfo(SqlBaseType.INTEGER, null, null)),
new FieldInfo("field2", new SchemaInfo(SqlBaseType.STRING, null, null)));

private static final String STATEMENT = "statement";
private static final Map<String, Object> STREAMS_PROPS = Collections.singletonMap("k1", "v1");
private static final Map<String, Object> PROP_OVERRIDES = Collections.singletonMap("k2", "v2");
private static final QueryId QUERY_ID = new QueryId("query_id");
private static final ImmutableSet<String> SOURCE_NAMES = ImmutableSet.of("s1, s2");
private static final String SQL_TEXT = "test statement";
private static final String TOPOLOGY_TEXT = "Topology Text";

@Mock
private Consumer<QueryMetadata> queryCloseCallback;
@Mock
private KafkaStreams queryStreams;
@Mock
private Topology topology;
@Mock
@Mock(name = TOPOLOGY_TEXT)
private TopologyDescription topologyDescription;
@Mock
private Consumer<LimitHandler> limitHandler;
@Mock
private KsqlTopic sinkTopic;
private QueryMetadata transientQuery;
private PersistentQueryMetadata persistentQuery;
private QueryDescription transientQueryDescription;
private QueryDescription persistentQueryDescription;

@Before
public void setUp() {
when(topology.describe()).thenReturn(topologyDescription);

transientQuery = new TransientQueryMetadata(
SQL_TEXT,
queryStreams,
SOME_SCHEMA,
SOURCE_NAMES,
limitHandler,
"execution plan",
new LinkedBlockingQueue<>(),
DataSourceType.KSTREAM,
"app id",
topology,
STREAMS_PROPS,
PROP_OVERRIDES,
queryCloseCallback);

transientQueryDescription = QueryDescription.forQueryMetadata(transientQuery);

persistentQuery = new PersistentQueryMetadata(
SQL_TEXT,
queryStreams,
PhysicalSchema.from(SOME_SCHEMA, SerdeOption.none()),
SOURCE_NAMES,
"sink Name",
"execution plan",
QUERY_ID,
DataSourceType.KSTREAM,
"app id",
sinkTopic,
topology,
QuerySchemas.of(new LinkedHashMap<>()),
STREAMS_PROPS,
PROP_OVERRIDES,
queryCloseCallback);

persistentQueryDescription = QueryDescription.forQueryMetadata(persistentQuery);
}

@Test
public void shouldHaveEmptyQueryIdFromTransientQuery() {
assertThat(transientQueryDescription.getId().getId(), is(isEmptyString()));
}

@Test
public void shouldSetFieldsCorrectlyForQueryMetadata() {
public void shouldHaveQueryIdForPersistentQuery() {
assertThat(persistentQueryDescription.getId().getId(), is(QUERY_ID.getId()));
}

@Test
public void shouldExposeExecutionPlan() {
assertThat(transientQueryDescription.getExecutionPlan(), is("execution plan"));
assertThat(persistentQueryDescription.getExecutionPlan(), is("execution plan"));
}

@Test
public void shouldExposeSources() {
assertThat(transientQueryDescription.getSources(), is(SOURCE_NAMES));
assertThat(persistentQueryDescription.getSources(), is(SOURCE_NAMES));
}

@Test
public void shouldExposeStatementText() {
assertThat(transientQueryDescription.getStatementText(), is(SQL_TEXT));
assertThat(persistentQueryDescription.getStatementText(), is(SQL_TEXT));
}

@Test
public void shouldExposeTopology() {
assertThat(transientQueryDescription.getTopology(), is(TOPOLOGY_TEXT));
assertThat(persistentQueryDescription.getTopology(), is(TOPOLOGY_TEXT));
}

@Test
public void shouldExposeOverridenProperties() {
assertThat(transientQueryDescription.getOverriddenProperties(), is(PROP_OVERRIDES));
assertThat(persistentQueryDescription.getOverriddenProperties(), is(PROP_OVERRIDES));
}

@Test
public void shouldExposeValueFieldsForTransientQueries() {
assertThat(transientQueryDescription.getFields(), contains(
new FieldInfo("field1", new SchemaInfo(SqlBaseType.INTEGER, null, null)),
new FieldInfo("field2", new SchemaInfo(SqlBaseType.STRING, null, null))));
}

@Test
public void shouldExposeAllFieldsForPersistentQueries() {
assertThat(persistentQueryDescription.getFields(), contains(
new FieldInfo("ROWTIME", new SchemaInfo(SqlBaseType.BIGINT, null, null)),
new FieldInfo("ROWKEY", new SchemaInfo(SqlBaseType.STRING, null, null)),
new FieldInfo("field1", new SchemaInfo(SqlBaseType.INTEGER, null, null)),
new FieldInfo("field2", new SchemaInfo(SqlBaseType.STRING, null, null))));
}

@Test
public void shouldHandleRowTimeInValueSchemaForTransientQuery() {
// Given:
final QueryMetadata queryMetadata = new TransientQueryMetadata(
"test statement",
final LogicalSchema schema = LogicalSchema.of(
SchemaBuilder.struct()
.field("field1", Schema.OPTIONAL_INT32_SCHEMA)
.field("ROWTIME", Schema.OPTIONAL_INT64_SCHEMA)
.field("field2", Schema.OPTIONAL_STRING_SCHEMA)
.build());

transientQuery = new TransientQueryMetadata(
SQL_TEXT,
queryStreams,
SCHEMA,
ImmutableSet.of("s1, s2"),
schema,
SOURCE_NAMES,
limitHandler,
"execution plan",
new LinkedBlockingQueue<>(),
Expand All @@ -110,57 +207,47 @@ public void shouldSetFieldsCorrectlyForQueryMetadata() {
queryCloseCallback);

// When:
final QueryDescription queryDescription = QueryDescription.forQueryMetadata(queryMetadata);
transientQueryDescription = QueryDescription.forQueryMetadata(transientQuery);

// Then:
assertThat(queryDescription.getId().getId(), equalTo(""));
assertThat(queryDescription.getExecutionPlan(), equalTo("execution plan"));
assertThat(queryDescription.getSources(), equalTo(ImmutableSet.of("s1, s2")));
assertThat(queryDescription.getStatementText(), equalTo("test statement"));
assertThat(queryDescription.getTopology(), equalTo(topologyDescription.toString()));
assertThat(queryDescription.getFields(), equalTo(EXPECTED_FIELDS));
assertThat(queryDescription.getOverriddenProperties(), is(PROP_OVERRIDES));
assertThat(transientQueryDescription.getFields(), contains(
new FieldInfo("field1", new SchemaInfo(SqlBaseType.INTEGER, null, null)),
new FieldInfo("ROWTIME", new SchemaInfo(SqlBaseType.BIGINT, null, null)),
new FieldInfo("field2", new SchemaInfo(SqlBaseType.STRING, null, null))));
}

@Test
public void shouldSetFieldsCorrectlyForPersistentQueryMetadata() {
public void shouldHandleRowKeyInValueSchemaForTransientQuery() {
// Given:
final KsqlTopic sinkTopic = new KsqlTopic("fake_sink", "fake_sink", new KsqlJsonSerdeFactory(), true);
final KsqlStream<?> fakeSink = new KsqlStream<>(
STATEMENT,
"fake_sink",
SCHEMA,
SerdeOption.none(),
KeyField.of(SCHEMA.valueFields().get(0).name(), SCHEMA.valueFields().get(0)),
new MetadataTimestampExtractionPolicy(),
sinkTopic,
Serdes::String
);

final PersistentQueryMetadata queryMetadata = new PersistentQueryMetadata(
"test statement",
final LogicalSchema schema = LogicalSchema.of(
SchemaBuilder.struct()
.field("field1", Schema.OPTIONAL_INT32_SCHEMA)
.field("ROWKEY", Schema.OPTIONAL_STRING_SCHEMA)
.field("field2", Schema.OPTIONAL_STRING_SCHEMA)
.build());

transientQuery = new TransientQueryMetadata(
SQL_TEXT,
queryStreams,
PhysicalSchema.from(SCHEMA, SerdeOption.none()),
Collections.emptySet(),
fakeSink.getName(),
schema,
SOURCE_NAMES,
limitHandler,
"execution plan",
new QueryId("query_id"),
new LinkedBlockingQueue<>(),
DataSourceType.KSTREAM,
"app id",
sinkTopic,
topology,
QuerySchemas.of(new LinkedHashMap<>()),
STREAMS_PROPS,
PROP_OVERRIDES,
queryCloseCallback);

// When:
final QueryDescription queryDescription = QueryDescription.forQueryMetadata(queryMetadata);
transientQueryDescription = QueryDescription.forQueryMetadata(transientQuery);

// Then:
assertThat(queryDescription.getId().getId(), equalTo("query_id"));
assertThat(queryDescription.getSinks(), equalTo(Collections.singleton("fake_sink")));
assertThat(queryDescription.getFields(), equalTo(EXPECTED_FIELDS));
assertThat(queryDescription.getOverriddenProperties(), is(PROP_OVERRIDES));
assertThat(transientQueryDescription.getFields(), contains(
new FieldInfo("field1", new SchemaInfo(SqlBaseType.INTEGER, null, null)),
new FieldInfo("ROWKEY", new SchemaInfo(SqlBaseType.STRING, null, null)),
new FieldInfo("field2", new SchemaInfo(SqlBaseType.STRING, null, null))));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.QueryMetadata;
import io.confluent.ksql.util.Sandbox;
import io.confluent.ksql.util.TransientQueryMetadata;
import io.confluent.ksql.util.timestamp.MetadataTimestampExtractionPolicy;
import io.confluent.ksql.version.metrics.ActivenessRegistrar;
import io.confluent.rest.RestConfig;
Expand Down Expand Up @@ -1852,8 +1853,9 @@ private static void validateQueryDescription(
assertThat(entity, instanceOf(QueryDescriptionEntity.class));
final QueryDescriptionEntity queryDescriptionEntity = (QueryDescriptionEntity) entity;
final QueryDescription queryDescription = queryDescriptionEntity.getQueryDescription();
final boolean valueSchemaOnly = queryMetadata instanceof TransientQueryMetadata;
assertThat(queryDescription.getFields(), is(
EntityUtil.buildSourceSchemaEntity(queryMetadata.getLogicalSchema())));
EntityUtil.buildSourceSchemaEntity(queryMetadata.getLogicalSchema(), valueSchemaOnly)));
assertThat(queryDescription.getOverriddenProperties(), is(overriddenProperties));
}

Expand Down
Loading

0 comments on commit 0346933

Please sign in to comment.