diff --git a/ksql-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.java b/ksql-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.java index e56469b69d6f..a2d853d98a62 100644 --- a/ksql-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.java +++ b/ksql-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.java @@ -1007,6 +1007,6 @@ private static List buildTestSchema(final Schema... fieldTypes) { dataSourceBuilder.field("f_" + idx, fieldTypes[idx]); } - return EntityUtil.buildSourceSchemaEntity(LogicalSchema.of(dataSourceBuilder.build())); + return EntityUtil.buildSourceSchemaEntity(LogicalSchema.of(dataSourceBuilder.build()), false); } } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/QueryDescription.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/QueryDescription.java index 668b227d4ebb..ba684061a6ef 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/QueryDescription.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/QueryDescription.java @@ -63,11 +63,13 @@ public QueryDescription( private QueryDescription( final String id, final QueryMetadata queryMetadata, - final Set sinks) { + final Set sinks, + final boolean valueSchemaOnly + ) { this( new EntityQueryId(id), queryMetadata.getStatementString(), - EntityUtil.buildSourceSchemaEntity(queryMetadata.getLogicalSchema()), + EntityUtil.buildSourceSchemaEntity(queryMetadata.getLogicalSchema(), valueSchemaOnly), queryMetadata.getSourceNames(), sinks, queryMetadata.getTopologyDescription(), @@ -77,11 +79,15 @@ private QueryDescription( public static QueryDescription forQueryMetadata(final QueryMetadata queryMetadata) { if (queryMetadata instanceof PersistentQueryMetadata) { + final PersistentQueryMetadata persistentQuery = (PersistentQueryMetadata) queryMetadata; return new QueryDescription( - ((PersistentQueryMetadata) queryMetadata).getQueryId().getId(), queryMetadata, - ((PersistentQueryMetadata) queryMetadata).getSinkNames()); + persistentQuery.getQueryId().getId(), + persistentQuery, + persistentQuery.getSinkNames(), + false + ); } - return new QueryDescription("", queryMetadata, Collections.emptySet()); + return new QueryDescription("", queryMetadata, Collections.emptySet(), true); } public EntityQueryId getId() { diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/SourceDescription.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/SourceDescription.java index a8e1c803c42c..e54aca4cf1d8 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/SourceDescription.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/SourceDescription.java @@ -97,7 +97,7 @@ public SourceDescription( dataSource.getName(), readQueries, writeQueries, - EntityUtil.buildSourceSchemaEntity(dataSource.getSchema()), + EntityUtil.buildSourceSchemaEntity(dataSource.getSchema(), false), dataSource.getDataSourceType().getKsqlType(), dataSource.getKeyField().name().orElse(""), Optional.ofNullable(dataSource.getTimestampExtractionPolicy()) diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WebSocketSubscriber.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WebSocketSubscriber.java index e3091dcd408d..c0670ff8b87d 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WebSocketSubscriber.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WebSocketSubscriber.java @@ -93,7 +93,7 @@ public void onComplete() { public void onSchema(final LogicalSchema schema) { try { session.getBasicRemote().sendText( - mapper.writeValueAsString(EntityUtil.buildSourceSchemaEntity(schema)) + mapper.writeValueAsString(EntityUtil.buildSourceSchemaEntity(schema, true)) ); } catch (final IOException e) { log.error("Error sending schema", e); diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/util/EntityUtil.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/util/EntityUtil.java index 6bcc0163ae99..4a5e8e2fe3e3 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/util/EntityUtil.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/util/EntityUtil.java @@ -31,11 +31,16 @@ public final class EntityUtil { private EntityUtil() { } - public static List buildSourceSchemaEntity(final LogicalSchema schema) { + public static List buildSourceSchemaEntity( + final LogicalSchema schema, + final boolean valueSchemaOnly + ) { final List allFields = new ArrayList<>(); - allFields.addAll(getFields(schema.metaFields(), "implicit")); - allFields.addAll(getFields(schema.keyFields(), "key")); + if (!valueSchemaOnly) { + allFields.addAll(getFields(schema.metaFields(), "implicit")); + allFields.addAll(getFields(schema.keyFields(), "key")); + } allFields.addAll(getFields(schema.valueFields(), "value")); return allFields; diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/entity/QueryDescriptionTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/entity/QueryDescriptionTest.java index e49222c5be3f..bf2f5fdb9319 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/entity/QueryDescriptionTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/entity/QueryDescriptionTest.java @@ -16,14 +16,13 @@ package io.confluent.ksql.rest.entity; import static io.confluent.ksql.metastore.model.DataSource.DataSourceType; -import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.isEmptyString; import static org.junit.Assert.assertThat; import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableSet; -import io.confluent.ksql.metastore.model.KeyField; -import io.confluent.ksql.metastore.model.KsqlStream; import io.confluent.ksql.metastore.model.KsqlTopic; import io.confluent.ksql.physical.LimitHandler; import io.confluent.ksql.physical.QuerySchemas; @@ -32,19 +31,14 @@ import io.confluent.ksql.schema.ksql.PhysicalSchema; import io.confluent.ksql.schema.ksql.SqlBaseType; import io.confluent.ksql.serde.SerdeOption; -import io.confluent.ksql.serde.json.KsqlJsonSerdeFactory; import io.confluent.ksql.util.PersistentQueryMetadata; import io.confluent.ksql.util.QueryMetadata; import io.confluent.ksql.util.TransientQueryMetadata; -import io.confluent.ksql.util.timestamp.MetadataTimestampExtractionPolicy; -import java.util.Arrays; import java.util.Collections; import java.util.LinkedHashMap; -import java.util.List; import java.util.Map; import java.util.concurrent.LinkedBlockingQueue; import java.util.function.Consumer; -import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.streams.KafkaStreams; @@ -59,21 +53,19 @@ @RunWith(MockitoJUnitRunner.class) public class QueryDescriptionTest { - private static final LogicalSchema SCHEMA = LogicalSchema.of( + private static final LogicalSchema SOME_SCHEMA = LogicalSchema.of( SchemaBuilder.struct() .field("field1", Schema.OPTIONAL_INT32_SCHEMA) .field("field2", Schema.OPTIONAL_STRING_SCHEMA) .build()); - private static final List EXPECTED_FIELDS = Arrays.asList( - new FieldInfo("ROWTIME", new SchemaInfo(SqlBaseType.BIGINT, null, null)), - new FieldInfo("ROWKEY", new SchemaInfo(SqlBaseType.STRING, null, null)), - new FieldInfo("field1", new SchemaInfo(SqlBaseType.INTEGER, null, null)), - new FieldInfo("field2", new SchemaInfo(SqlBaseType.STRING, null, null))); - private static final String STATEMENT = "statement"; private static final Map STREAMS_PROPS = Collections.singletonMap("k1", "v1"); private static final Map PROP_OVERRIDES = Collections.singletonMap("k2", "v2"); + private static final QueryId QUERY_ID = new QueryId("query_id"); + private static final ImmutableSet SOURCE_NAMES = ImmutableSet.of("s1, s2"); + private static final String SQL_TEXT = "test statement"; + private static final String TOPOLOGY_TEXT = "Topology Text"; @Mock private Consumer queryCloseCallback; @@ -81,24 +73,129 @@ public class QueryDescriptionTest { private KafkaStreams queryStreams; @Mock private Topology topology; - @Mock + @Mock(name = TOPOLOGY_TEXT) private TopologyDescription topologyDescription; @Mock private Consumer limitHandler; + @Mock + private KsqlTopic sinkTopic; + private QueryMetadata transientQuery; + private PersistentQueryMetadata persistentQuery; + private QueryDescription transientQueryDescription; + private QueryDescription persistentQueryDescription; @Before public void setUp() { when(topology.describe()).thenReturn(topologyDescription); + + transientQuery = new TransientQueryMetadata( + SQL_TEXT, + queryStreams, + SOME_SCHEMA, + SOURCE_NAMES, + limitHandler, + "execution plan", + new LinkedBlockingQueue<>(), + DataSourceType.KSTREAM, + "app id", + topology, + STREAMS_PROPS, + PROP_OVERRIDES, + queryCloseCallback); + + transientQueryDescription = QueryDescription.forQueryMetadata(transientQuery); + + persistentQuery = new PersistentQueryMetadata( + SQL_TEXT, + queryStreams, + PhysicalSchema.from(SOME_SCHEMA, SerdeOption.none()), + SOURCE_NAMES, + "sink Name", + "execution plan", + QUERY_ID, + DataSourceType.KSTREAM, + "app id", + sinkTopic, + topology, + QuerySchemas.of(new LinkedHashMap<>()), + STREAMS_PROPS, + PROP_OVERRIDES, + queryCloseCallback); + + persistentQueryDescription = QueryDescription.forQueryMetadata(persistentQuery); + } + + @Test + public void shouldHaveEmptyQueryIdFromTransientQuery() { + assertThat(transientQueryDescription.getId().getId(), is(isEmptyString())); } @Test - public void shouldSetFieldsCorrectlyForQueryMetadata() { + public void shouldHaveQueryIdForPersistentQuery() { + assertThat(persistentQueryDescription.getId().getId(), is(QUERY_ID.getId())); + } + + @Test + public void shouldExposeExecutionPlan() { + assertThat(transientQueryDescription.getExecutionPlan(), is("execution plan")); + assertThat(persistentQueryDescription.getExecutionPlan(), is("execution plan")); + } + + @Test + public void shouldExposeSources() { + assertThat(transientQueryDescription.getSources(), is(SOURCE_NAMES)); + assertThat(persistentQueryDescription.getSources(), is(SOURCE_NAMES)); + } + + @Test + public void shouldExposeStatementText() { + assertThat(transientQueryDescription.getStatementText(), is(SQL_TEXT)); + assertThat(persistentQueryDescription.getStatementText(), is(SQL_TEXT)); + } + + @Test + public void shouldExposeTopology() { + assertThat(transientQueryDescription.getTopology(), is(TOPOLOGY_TEXT)); + assertThat(persistentQueryDescription.getTopology(), is(TOPOLOGY_TEXT)); + } + + @Test + public void shouldExposeOverridenProperties() { + assertThat(transientQueryDescription.getOverriddenProperties(), is(PROP_OVERRIDES)); + assertThat(persistentQueryDescription.getOverriddenProperties(), is(PROP_OVERRIDES)); + } + + @Test + public void shouldExposeValueFieldsForTransientQueries() { + assertThat(transientQueryDescription.getFields(), contains( + new FieldInfo("field1", new SchemaInfo(SqlBaseType.INTEGER, null, null)), + new FieldInfo("field2", new SchemaInfo(SqlBaseType.STRING, null, null)))); + } + + @Test + public void shouldExposeAllFieldsForPersistentQueries() { + assertThat(persistentQueryDescription.getFields(), contains( + new FieldInfo("ROWTIME", new SchemaInfo(SqlBaseType.BIGINT, null, null)), + new FieldInfo("ROWKEY", new SchemaInfo(SqlBaseType.STRING, null, null)), + new FieldInfo("field1", new SchemaInfo(SqlBaseType.INTEGER, null, null)), + new FieldInfo("field2", new SchemaInfo(SqlBaseType.STRING, null, null)))); + } + + @Test + public void shouldHandleRowTimeInValueSchemaForTransientQuery() { // Given: - final QueryMetadata queryMetadata = new TransientQueryMetadata( - "test statement", + final LogicalSchema schema = LogicalSchema.of( + SchemaBuilder.struct() + .field("field1", Schema.OPTIONAL_INT32_SCHEMA) + .field("ROWTIME", Schema.OPTIONAL_INT64_SCHEMA) + .field("field2", Schema.OPTIONAL_STRING_SCHEMA) + .build()); + + transientQuery = new TransientQueryMetadata( + SQL_TEXT, queryStreams, - SCHEMA, - ImmutableSet.of("s1, s2"), + schema, + SOURCE_NAMES, limitHandler, "execution plan", new LinkedBlockingQueue<>(), @@ -110,57 +207,47 @@ public void shouldSetFieldsCorrectlyForQueryMetadata() { queryCloseCallback); // When: - final QueryDescription queryDescription = QueryDescription.forQueryMetadata(queryMetadata); + transientQueryDescription = QueryDescription.forQueryMetadata(transientQuery); // Then: - assertThat(queryDescription.getId().getId(), equalTo("")); - assertThat(queryDescription.getExecutionPlan(), equalTo("execution plan")); - assertThat(queryDescription.getSources(), equalTo(ImmutableSet.of("s1, s2"))); - assertThat(queryDescription.getStatementText(), equalTo("test statement")); - assertThat(queryDescription.getTopology(), equalTo(topologyDescription.toString())); - assertThat(queryDescription.getFields(), equalTo(EXPECTED_FIELDS)); - assertThat(queryDescription.getOverriddenProperties(), is(PROP_OVERRIDES)); + assertThat(transientQueryDescription.getFields(), contains( + new FieldInfo("field1", new SchemaInfo(SqlBaseType.INTEGER, null, null)), + new FieldInfo("ROWTIME", new SchemaInfo(SqlBaseType.BIGINT, null, null)), + new FieldInfo("field2", new SchemaInfo(SqlBaseType.STRING, null, null)))); } @Test - public void shouldSetFieldsCorrectlyForPersistentQueryMetadata() { + public void shouldHandleRowKeyInValueSchemaForTransientQuery() { // Given: - final KsqlTopic sinkTopic = new KsqlTopic("fake_sink", "fake_sink", new KsqlJsonSerdeFactory(), true); - final KsqlStream fakeSink = new KsqlStream<>( - STATEMENT, - "fake_sink", - SCHEMA, - SerdeOption.none(), - KeyField.of(SCHEMA.valueFields().get(0).name(), SCHEMA.valueFields().get(0)), - new MetadataTimestampExtractionPolicy(), - sinkTopic, - Serdes::String - ); - - final PersistentQueryMetadata queryMetadata = new PersistentQueryMetadata( - "test statement", + final LogicalSchema schema = LogicalSchema.of( + SchemaBuilder.struct() + .field("field1", Schema.OPTIONAL_INT32_SCHEMA) + .field("ROWKEY", Schema.OPTIONAL_STRING_SCHEMA) + .field("field2", Schema.OPTIONAL_STRING_SCHEMA) + .build()); + + transientQuery = new TransientQueryMetadata( + SQL_TEXT, queryStreams, - PhysicalSchema.from(SCHEMA, SerdeOption.none()), - Collections.emptySet(), - fakeSink.getName(), + schema, + SOURCE_NAMES, + limitHandler, "execution plan", - new QueryId("query_id"), + new LinkedBlockingQueue<>(), DataSourceType.KSTREAM, "app id", - sinkTopic, topology, - QuerySchemas.of(new LinkedHashMap<>()), STREAMS_PROPS, PROP_OVERRIDES, queryCloseCallback); // When: - final QueryDescription queryDescription = QueryDescription.forQueryMetadata(queryMetadata); + transientQueryDescription = QueryDescription.forQueryMetadata(transientQuery); // Then: - assertThat(queryDescription.getId().getId(), equalTo("query_id")); - assertThat(queryDescription.getSinks(), equalTo(Collections.singleton("fake_sink"))); - assertThat(queryDescription.getFields(), equalTo(EXPECTED_FIELDS)); - assertThat(queryDescription.getOverriddenProperties(), is(PROP_OVERRIDES)); + assertThat(transientQueryDescription.getFields(), contains( + new FieldInfo("field1", new SchemaInfo(SqlBaseType.INTEGER, null, null)), + new FieldInfo("ROWKEY", new SchemaInfo(SqlBaseType.STRING, null, null)), + new FieldInfo("field2", new SchemaInfo(SqlBaseType.STRING, null, null)))); } } diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java index 5b55ec766493..a041d04ca61b 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java @@ -133,6 +133,7 @@ import io.confluent.ksql.util.PersistentQueryMetadata; import io.confluent.ksql.util.QueryMetadata; import io.confluent.ksql.util.Sandbox; +import io.confluent.ksql.util.TransientQueryMetadata; import io.confluent.ksql.util.timestamp.MetadataTimestampExtractionPolicy; import io.confluent.ksql.version.metrics.ActivenessRegistrar; import io.confluent.rest.RestConfig; @@ -1852,8 +1853,9 @@ private static void validateQueryDescription( assertThat(entity, instanceOf(QueryDescriptionEntity.class)); final QueryDescriptionEntity queryDescriptionEntity = (QueryDescriptionEntity) entity; final QueryDescription queryDescription = queryDescriptionEntity.getQueryDescription(); + final boolean valueSchemaOnly = queryMetadata instanceof TransientQueryMetadata; assertThat(queryDescription.getFields(), is( - EntityUtil.buildSourceSchemaEntity(queryMetadata.getLogicalSchema()))); + EntityUtil.buildSourceSchemaEntity(queryMetadata.getLogicalSchema(), valueSchemaOnly))); assertThat(queryDescription.getOverriddenProperties(), is(overriddenProperties)); } diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/WebSocketSubscriberTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/WebSocketSubscriberTest.java index 65a7dfc2c967..5c31363b4ffd 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/WebSocketSubscriberTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/WebSocketSubscriberTest.java @@ -102,7 +102,6 @@ public void testStopSendingAfterClose() { EasyMock.verify(subscription, session, async); } - @Test public void testOnSchema() throws Exception { replayOnSubscribe(); @@ -128,10 +127,6 @@ public void testOnSchema() throws Exception { assertEquals( "[" + - "{\"name\":\"ROWTIME\"," + - "\"schema\":{\"type\":\"BIGINT\",\"fields\":null,\"memberSchema\":null}}," + - "{\"name\":\"ROWKEY\"," + - "\"schema\":{\"type\":\"STRING\",\"fields\":null,\"memberSchema\":null}}," + "{\"name\":\"currency\"," + "\"schema\":{\"type\":\"STRING\",\"fields\":null,\"memberSchema\":null}}," + "{\"name\":\"amount\"," + diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/util/EntityUtilTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/util/EntityUtilTest.java index 9a3b14107b1e..b71162fc6139 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/util/EntityUtilTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/util/EntityUtilTest.java @@ -16,9 +16,7 @@ package io.confluent.ksql.rest.util; import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.hasSize; -import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; import io.confluent.ksql.rest.entity.FieldInfo; @@ -32,7 +30,6 @@ import org.apache.kafka.connect.data.SchemaBuilder; import org.junit.Test; -@SuppressWarnings("unchecked") public class EntityUtilTest { private static final FieldInfo ROWTIME_FIELD = @@ -84,15 +81,14 @@ public void shouldBuildCorrectMapField() { .build()); // When: - final List entity = EntityUtil.buildSourceSchemaEntity(schema); + final List fields = EntityUtil.buildSourceSchemaEntity(schema, true); // Then: - final List valueFields = getValueFields(entity); - assertThat(valueFields, hasSize(1)); - assertThat(valueFields.get(0).getName(), equalTo("field")); - assertThat(valueFields.get(0).getSchema().getTypeName(), equalTo("MAP")); - assertThat(valueFields.get(0).getSchema().getFields(), equalTo(Optional.empty())); - assertThat(valueFields.get(0).getSchema().getMemberSchema().get().getTypeName(), + assertThat(fields, hasSize(1)); + assertThat(fields.get(0).getName(), equalTo("field")); + assertThat(fields.get(0).getSchema().getTypeName(), equalTo("MAP")); + assertThat(fields.get(0).getSchema().getFields(), equalTo(Optional.empty())); + assertThat(fields.get(0).getSchema().getMemberSchema().get().getTypeName(), equalTo("INTEGER")); } @@ -108,15 +104,14 @@ public void shouldBuildCorrectArrayField() { .build()); // When: - final List entity = EntityUtil.buildSourceSchemaEntity(schema); + final List fields = EntityUtil.buildSourceSchemaEntity(schema, true); // Then: - final List valueFields = getValueFields(entity); - assertThat(valueFields, hasSize(1)); - assertThat(valueFields.get(0).getName(), equalTo("field")); - assertThat(valueFields.get(0).getSchema().getTypeName(), equalTo("ARRAY")); - assertThat(valueFields.get(0).getSchema().getFields(), equalTo(Optional.empty())); - assertThat(valueFields.get(0).getSchema().getMemberSchema().get().getTypeName(), + assertThat(fields, hasSize(1)); + assertThat(fields.get(0).getName(), equalTo("field")); + assertThat(fields.get(0).getSchema().getTypeName(), equalTo("ARRAY")); + assertThat(fields.get(0).getSchema().getFields(), equalTo(Optional.empty())); + assertThat(fields.get(0).getSchema().getMemberSchema().get().getTypeName(), equalTo("BIGINT")); } @@ -135,17 +130,16 @@ public void shouldBuildCorrectStructField() { .build()); // When: - final List entity = EntityUtil.buildSourceSchemaEntity(schema); + final List fields = EntityUtil.buildSourceSchemaEntity(schema, true); // Then: - final List valueFields = getValueFields(entity); - assertThat(valueFields, hasSize(1)); - assertThat(valueFields.get(0).getName(), equalTo("field")); - assertThat(valueFields.get(0).getSchema().getTypeName(), equalTo("STRUCT")); - assertThat(valueFields.get(0).getSchema().getFields().get().size(), equalTo(1)); - final FieldInfo inner = valueFields.get(0).getSchema().getFields().get().get(0); + assertThat(fields, hasSize(1)); + assertThat(fields.get(0).getName(), equalTo("field")); + assertThat(fields.get(0).getSchema().getTypeName(), equalTo("STRUCT")); + assertThat(fields.get(0).getSchema().getFields().get().size(), equalTo(1)); + final FieldInfo inner = fields.get(0).getSchema().getFields().get().get(0); assertThat(inner.getSchema().getTypeName(), equalTo("STRING")); - assertThat(valueFields.get(0).getSchema().getMemberSchema(), equalTo(Optional.empty())); + assertThat(fields.get(0).getSchema().getMemberSchema(), equalTo(Optional.empty())); } @Test @@ -158,15 +152,53 @@ public void shouldBuildMiltipleFieldsCorrectly() { .build()); // When: - final List entity = EntityUtil.buildSourceSchemaEntity(schema); + final List fields = EntityUtil.buildSourceSchemaEntity(schema, true); // Then: - final List valueFields = getValueFields(entity); - assertThat(valueFields, hasSize(2)); - assertThat(valueFields.get(0).getName(), equalTo("field1")); - assertThat(valueFields.get(0).getSchema().getTypeName(), equalTo("INTEGER")); - assertThat(valueFields.get(1).getName(), equalTo("field2")); - assertThat(valueFields.get(1).getSchema().getTypeName(), equalTo("BIGINT")); + assertThat(fields, hasSize(2)); + assertThat(fields.get(0).getName(), equalTo("field1")); + assertThat(fields.get(0).getSchema().getTypeName(), equalTo("INTEGER")); + assertThat(fields.get(1).getName(), equalTo("field2")); + assertThat(fields.get(1).getSchema().getTypeName(), equalTo("BIGINT")); + } + + @Test + public void shouldSupportRowTimeAndKeyInValueSchema() { + // Given: + final LogicalSchema schema = LogicalSchema.of(SchemaBuilder + .struct() + .field("ROWKEY", Schema.OPTIONAL_STRING_SCHEMA) + .field("ROWTIME", Schema.OPTIONAL_INT32_SCHEMA) + .field("field1", Schema.OPTIONAL_INT32_SCHEMA) + .build()); + + // When: + final List fields = EntityUtil.buildSourceSchemaEntity(schema, true); + + // Then: + assertThat(fields, hasSize(3)); + assertThat(fields.get(0).getName(), equalTo("ROWKEY")); + assertThat(fields.get(1).getName(), equalTo("ROWTIME")); + } + + @Test + public void shouldSupportGettingFullSchema() { + // Given: + final LogicalSchema schema = LogicalSchema.of(SchemaBuilder + .struct() + .field("field1", Schema.OPTIONAL_INT32_SCHEMA) + .build()); + + // When: + final List fields = EntityUtil.buildSourceSchemaEntity(schema, false); + + // Then: + assertThat(fields, hasSize(3)); + assertThat(fields.get(0).getName(), equalTo("ROWTIME")); + assertThat(fields.get(0).getSchema().getTypeName(), equalTo("BIGINT")); + assertThat(fields.get(1).getName(), equalTo("ROWKEY")); + assertThat(fields.get(1).getSchema().getTypeName(), equalTo("STRING")); + assertThat(fields.get(2).getName(), equalTo("field1")); } private static void shouldBuildCorrectPrimitiveField( @@ -180,21 +212,13 @@ private static void shouldBuildCorrectPrimitiveField( .build()); // When: - final List entity = EntityUtil.buildSourceSchemaEntity(schema); + final List fields = EntityUtil.buildSourceSchemaEntity(schema, true); // Then: - final List valueFields = getValueFields(entity); - assertThat(valueFields.get(0).getName(), equalTo("field")); - assertThat(valueFields.get(0).getSchema().getTypeName(), equalTo(schemaName)); - assertThat(valueFields.get(0).getSchema().getFields(), equalTo(Optional.empty())); - assertThat(valueFields.get(0).getSchema().getMemberSchema(), equalTo(Optional.empty())); - } - - private static List getValueFields(final List entity) { - assertThat(entity, hasSize(greaterThan(2))); - assertThat(entity.get(0), is(ROWTIME_FIELD)); - assertThat(entity.get(1), is(ROWKEY_FIELD)); - return entity.subList(2, entity.size()); + assertThat(fields.get(0).getName(), equalTo("field")); + assertThat(fields.get(0).getSchema().getTypeName(), equalTo(schemaName)); + assertThat(fields.get(0).getSchema().getFields(), equalTo(Optional.empty())); + assertThat(fields.get(0).getSchema().getMemberSchema(), equalTo(Optional.empty())); } }