Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: remove any rowtime or rowkey columns from query schema (MINOR) (Fixes 3039) #3043

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1007,6 +1007,6 @@ private static List<FieldInfo> buildTestSchema(final Schema... fieldTypes) {
dataSourceBuilder.field("f_" + idx, fieldTypes[idx]);
}

return EntityUtil.buildSourceSchemaEntity(LogicalSchema.of(dataSourceBuilder.build()));
return EntityUtil.buildSourceSchemaEntity(LogicalSchema.of(dataSourceBuilder.build()), false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,13 @@ public QueryDescription(
private QueryDescription(
final String id,
final QueryMetadata queryMetadata,
final Set<String> sinks) {
final Set<String> sinks,
final boolean valueSchemaOnly
) {
this(
new EntityQueryId(id),
queryMetadata.getStatementString(),
EntityUtil.buildSourceSchemaEntity(queryMetadata.getLogicalSchema()),
EntityUtil.buildSourceSchemaEntity(queryMetadata.getLogicalSchema(), valueSchemaOnly),
queryMetadata.getSourceNames(),
sinks,
queryMetadata.getTopologyDescription(),
Expand All @@ -77,11 +79,15 @@ private QueryDescription(

public static QueryDescription forQueryMetadata(final QueryMetadata queryMetadata) {
if (queryMetadata instanceof PersistentQueryMetadata) {
final PersistentQueryMetadata persistentQuery = (PersistentQueryMetadata) queryMetadata;
return new QueryDescription(
((PersistentQueryMetadata) queryMetadata).getQueryId().getId(), queryMetadata,
((PersistentQueryMetadata) queryMetadata).getSinkNames());
persistentQuery.getQueryId().getId(),
persistentQuery,
persistentQuery.getSinkNames(),
false
);
}
return new QueryDescription("", queryMetadata, Collections.emptySet());
return new QueryDescription("", queryMetadata, Collections.emptySet(), true);
}

public EntityQueryId getId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public SourceDescription(
dataSource.getName(),
readQueries,
writeQueries,
EntityUtil.buildSourceSchemaEntity(dataSource.getSchema()),
EntityUtil.buildSourceSchemaEntity(dataSource.getSchema(), false),
dataSource.getDataSourceType().getKsqlType(),
dataSource.getKeyField().name().orElse(""),
Optional.ofNullable(dataSource.getTimestampExtractionPolicy())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public void onComplete() {
public void onSchema(final LogicalSchema schema) {
try {
session.getBasicRemote().sendText(
mapper.writeValueAsString(EntityUtil.buildSourceSchemaEntity(schema))
mapper.writeValueAsString(EntityUtil.buildSourceSchemaEntity(schema, true))
);
} catch (final IOException e) {
log.error("Error sending schema", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,16 @@ public final class EntityUtil {
private EntityUtil() {
}

public static List<FieldInfo> buildSourceSchemaEntity(final LogicalSchema schema) {
public static List<FieldInfo> buildSourceSchemaEntity(
final LogicalSchema schema,
final boolean valueSchemaOnly
) {

final List<FieldInfo> allFields = new ArrayList<>();
allFields.addAll(getFields(schema.metaFields(), "implicit"));
allFields.addAll(getFields(schema.keyFields(), "key"));
if (!valueSchemaOnly) {
allFields.addAll(getFields(schema.metaFields(), "implicit"));
allFields.addAll(getFields(schema.keyFields(), "key"));
}
allFields.addAll(getFields(schema.valueFields(), "value"));

return allFields;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,13 @@
package io.confluent.ksql.rest.entity;

import static io.confluent.ksql.metastore.model.DataSource.DataSourceType;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.isEmptyString;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.when;

import com.google.common.collect.ImmutableSet;
import io.confluent.ksql.metastore.model.KeyField;
import io.confluent.ksql.metastore.model.KsqlStream;
import io.confluent.ksql.metastore.model.KsqlTopic;
import io.confluent.ksql.physical.LimitHandler;
import io.confluent.ksql.physical.QuerySchemas;
Expand All @@ -32,19 +31,14 @@
import io.confluent.ksql.schema.ksql.PhysicalSchema;
import io.confluent.ksql.schema.ksql.SqlBaseType;
import io.confluent.ksql.serde.SerdeOption;
import io.confluent.ksql.serde.json.KsqlJsonSerdeFactory;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.QueryMetadata;
import io.confluent.ksql.util.TransientQueryMetadata;
import io.confluent.ksql.util.timestamp.MetadataTimestampExtractionPolicy;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.streams.KafkaStreams;
Expand All @@ -59,46 +53,149 @@
@RunWith(MockitoJUnitRunner.class)
public class QueryDescriptionTest {

private static final LogicalSchema SCHEMA = LogicalSchema.of(
private static final LogicalSchema SOME_SCHEMA = LogicalSchema.of(
SchemaBuilder.struct()
.field("field1", Schema.OPTIONAL_INT32_SCHEMA)
.field("field2", Schema.OPTIONAL_STRING_SCHEMA)
.build());

private static final List<FieldInfo> EXPECTED_FIELDS = Arrays.asList(
new FieldInfo("ROWTIME", new SchemaInfo(SqlBaseType.BIGINT, null, null)),
new FieldInfo("ROWKEY", new SchemaInfo(SqlBaseType.STRING, null, null)),
new FieldInfo("field1", new SchemaInfo(SqlBaseType.INTEGER, null, null)),
new FieldInfo("field2", new SchemaInfo(SqlBaseType.STRING, null, null)));

private static final String STATEMENT = "statement";
private static final Map<String, Object> STREAMS_PROPS = Collections.singletonMap("k1", "v1");
private static final Map<String, Object> PROP_OVERRIDES = Collections.singletonMap("k2", "v2");
private static final QueryId QUERY_ID = new QueryId("query_id");
private static final ImmutableSet<String> SOURCE_NAMES = ImmutableSet.of("s1, s2");
private static final String SQL_TEXT = "test statement";
private static final String TOPOLOGY_TEXT = "Topology Text";

@Mock
private Consumer<QueryMetadata> queryCloseCallback;
@Mock
private KafkaStreams queryStreams;
@Mock
private Topology topology;
@Mock
@Mock(name = TOPOLOGY_TEXT)
private TopologyDescription topologyDescription;
@Mock
private Consumer<LimitHandler> limitHandler;
@Mock
private KsqlTopic sinkTopic;
private QueryMetadata transientQuery;
private PersistentQueryMetadata persistentQuery;
private QueryDescription transientQueryDescription;
private QueryDescription persistentQueryDescription;

@Before
public void setUp() {
when(topology.describe()).thenReturn(topologyDescription);

transientQuery = new TransientQueryMetadata(
SQL_TEXT,
queryStreams,
SOME_SCHEMA,
SOURCE_NAMES,
limitHandler,
"execution plan",
new LinkedBlockingQueue<>(),
DataSourceType.KSTREAM,
"app id",
topology,
STREAMS_PROPS,
PROP_OVERRIDES,
queryCloseCallback);

transientQueryDescription = QueryDescription.forQueryMetadata(transientQuery);

persistentQuery = new PersistentQueryMetadata(
SQL_TEXT,
queryStreams,
PhysicalSchema.from(SOME_SCHEMA, SerdeOption.none()),
SOURCE_NAMES,
"sink Name",
"execution plan",
QUERY_ID,
DataSourceType.KSTREAM,
"app id",
sinkTopic,
topology,
QuerySchemas.of(new LinkedHashMap<>()),
STREAMS_PROPS,
PROP_OVERRIDES,
queryCloseCallback);

persistentQueryDescription = QueryDescription.forQueryMetadata(persistentQuery);
}

@Test
public void shouldHaveEmptyQueryIdFromTransientQuery() {
assertThat(transientQueryDescription.getId().getId(), is(isEmptyString()));
}

@Test
public void shouldSetFieldsCorrectlyForQueryMetadata() {
public void shouldHaveQueryIdForPersistentQuery() {
assertThat(persistentQueryDescription.getId().getId(), is(QUERY_ID.getId()));
}

@Test
public void shouldExposeExecutionPlan() {
assertThat(transientQueryDescription.getExecutionPlan(), is("execution plan"));
assertThat(persistentQueryDescription.getExecutionPlan(), is("execution plan"));
}

@Test
public void shouldExposeSources() {
assertThat(transientQueryDescription.getSources(), is(SOURCE_NAMES));
assertThat(persistentQueryDescription.getSources(), is(SOURCE_NAMES));
}

@Test
public void shouldExposeStatementText() {
assertThat(transientQueryDescription.getStatementText(), is(SQL_TEXT));
assertThat(persistentQueryDescription.getStatementText(), is(SQL_TEXT));
}

@Test
public void shouldExposeTopology() {
assertThat(transientQueryDescription.getTopology(), is(TOPOLOGY_TEXT));
assertThat(persistentQueryDescription.getTopology(), is(TOPOLOGY_TEXT));
}

@Test
public void shouldExposeOverridenProperties() {
assertThat(transientQueryDescription.getOverriddenProperties(), is(PROP_OVERRIDES));
assertThat(persistentQueryDescription.getOverriddenProperties(), is(PROP_OVERRIDES));
}

@Test
public void shouldExposeValueFieldsForTransientQueries() {
assertThat(transientQueryDescription.getFields(), contains(
new FieldInfo("field1", new SchemaInfo(SqlBaseType.INTEGER, null, null)),
new FieldInfo("field2", new SchemaInfo(SqlBaseType.STRING, null, null))));
}

@Test
public void shouldExposeAllFieldsForPersistentQueries() {
assertThat(persistentQueryDescription.getFields(), contains(
new FieldInfo("ROWTIME", new SchemaInfo(SqlBaseType.BIGINT, null, null)),
new FieldInfo("ROWKEY", new SchemaInfo(SqlBaseType.STRING, null, null)),
new FieldInfo("field1", new SchemaInfo(SqlBaseType.INTEGER, null, null)),
new FieldInfo("field2", new SchemaInfo(SqlBaseType.STRING, null, null))));
}

@Test
public void shouldHandleRowTimeInValueSchemaForTransientQuery() {
// Given:
final QueryMetadata queryMetadata = new TransientQueryMetadata(
"test statement",
final LogicalSchema schema = LogicalSchema.of(
SchemaBuilder.struct()
.field("field1", Schema.OPTIONAL_INT32_SCHEMA)
.field("ROWTIME", Schema.OPTIONAL_INT64_SCHEMA)
.field("field2", Schema.OPTIONAL_STRING_SCHEMA)
.build());

transientQuery = new TransientQueryMetadata(
SQL_TEXT,
queryStreams,
SCHEMA,
ImmutableSet.of("s1, s2"),
schema,
SOURCE_NAMES,
limitHandler,
"execution plan",
new LinkedBlockingQueue<>(),
Expand All @@ -110,57 +207,47 @@ public void shouldSetFieldsCorrectlyForQueryMetadata() {
queryCloseCallback);

// When:
final QueryDescription queryDescription = QueryDescription.forQueryMetadata(queryMetadata);
transientQueryDescription = QueryDescription.forQueryMetadata(transientQuery);

// Then:
assertThat(queryDescription.getId().getId(), equalTo(""));
assertThat(queryDescription.getExecutionPlan(), equalTo("execution plan"));
assertThat(queryDescription.getSources(), equalTo(ImmutableSet.of("s1, s2")));
assertThat(queryDescription.getStatementText(), equalTo("test statement"));
assertThat(queryDescription.getTopology(), equalTo(topologyDescription.toString()));
assertThat(queryDescription.getFields(), equalTo(EXPECTED_FIELDS));
assertThat(queryDescription.getOverriddenProperties(), is(PROP_OVERRIDES));
assertThat(transientQueryDescription.getFields(), contains(
new FieldInfo("field1", new SchemaInfo(SqlBaseType.INTEGER, null, null)),
new FieldInfo("ROWTIME", new SchemaInfo(SqlBaseType.BIGINT, null, null)),
new FieldInfo("field2", new SchemaInfo(SqlBaseType.STRING, null, null))));
}

@Test
public void shouldSetFieldsCorrectlyForPersistentQueryMetadata() {
public void shouldHandleRowKeyInValueSchemaForTransientQuery() {
// Given:
final KsqlTopic sinkTopic = new KsqlTopic("fake_sink", "fake_sink", new KsqlJsonSerdeFactory(), true);
final KsqlStream<?> fakeSink = new KsqlStream<>(
STATEMENT,
"fake_sink",
SCHEMA,
SerdeOption.none(),
KeyField.of(SCHEMA.valueFields().get(0).name(), SCHEMA.valueFields().get(0)),
new MetadataTimestampExtractionPolicy(),
sinkTopic,
Serdes::String
);

final PersistentQueryMetadata queryMetadata = new PersistentQueryMetadata(
"test statement",
final LogicalSchema schema = LogicalSchema.of(
SchemaBuilder.struct()
.field("field1", Schema.OPTIONAL_INT32_SCHEMA)
.field("ROWKEY", Schema.OPTIONAL_STRING_SCHEMA)
.field("field2", Schema.OPTIONAL_STRING_SCHEMA)
.build());

transientQuery = new TransientQueryMetadata(
SQL_TEXT,
queryStreams,
PhysicalSchema.from(SCHEMA, SerdeOption.none()),
Collections.emptySet(),
fakeSink.getName(),
schema,
SOURCE_NAMES,
limitHandler,
"execution plan",
new QueryId("query_id"),
new LinkedBlockingQueue<>(),
DataSourceType.KSTREAM,
"app id",
sinkTopic,
topology,
QuerySchemas.of(new LinkedHashMap<>()),
STREAMS_PROPS,
PROP_OVERRIDES,
queryCloseCallback);

// When:
final QueryDescription queryDescription = QueryDescription.forQueryMetadata(queryMetadata);
transientQueryDescription = QueryDescription.forQueryMetadata(transientQuery);

// Then:
assertThat(queryDescription.getId().getId(), equalTo("query_id"));
assertThat(queryDescription.getSinks(), equalTo(Collections.singleton("fake_sink")));
assertThat(queryDescription.getFields(), equalTo(EXPECTED_FIELDS));
assertThat(queryDescription.getOverriddenProperties(), is(PROP_OVERRIDES));
assertThat(transientQueryDescription.getFields(), contains(
new FieldInfo("field1", new SchemaInfo(SqlBaseType.INTEGER, null, null)),
new FieldInfo("ROWKEY", new SchemaInfo(SqlBaseType.STRING, null, null)),
new FieldInfo("field2", new SchemaInfo(SqlBaseType.STRING, null, null))));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.QueryMetadata;
import io.confluent.ksql.util.Sandbox;
import io.confluent.ksql.util.TransientQueryMetadata;
import io.confluent.ksql.util.timestamp.MetadataTimestampExtractionPolicy;
import io.confluent.ksql.version.metrics.ActivenessRegistrar;
import io.confluent.rest.RestConfig;
Expand Down Expand Up @@ -1852,8 +1853,9 @@ private static void validateQueryDescription(
assertThat(entity, instanceOf(QueryDescriptionEntity.class));
final QueryDescriptionEntity queryDescriptionEntity = (QueryDescriptionEntity) entity;
final QueryDescription queryDescription = queryDescriptionEntity.getQueryDescription();
final boolean valueSchemaOnly = queryMetadata instanceof TransientQueryMetadata;
assertThat(queryDescription.getFields(), is(
EntityUtil.buildSourceSchemaEntity(queryMetadata.getLogicalSchema())));
EntityUtil.buildSourceSchemaEntity(queryMetadata.getLogicalSchema(), valueSchemaOnly)));
assertThat(queryDescription.getOverriddenProperties(), is(overriddenProperties));
}

Expand Down
Loading