diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/min-group-by_-_min_date_group_by_valid_after_7.2.0/7.2.0_1648094175528/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/min-group-by_-_min_date_group_by_valid_after_7.2.0/7.2.0_1648094175528/plan.json new file mode 100644 index 000000000000..28cfb2f0fc41 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/min-group-by_-_min_date_group_by_valid_after_7.2.0/7.2.0_1648094175528/plan.json @@ -0,0 +1,234 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TEST (ID BIGINT KEY, VALUE DATE) WITH (KAFKA_TOPIC='test_topic', KEY_FORMAT='KAFKA', VALUE_FORMAT='AVRO');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TEST", + "schema" : "`ID` BIGINT KEY, `VALUE` DATE", + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "orReplace" : false, + "isSource" : false + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE TABLE S2 AS SELECT\n TEST.ID ID,\n MIN(TEST.VALUE) MIN\nFROM TEST TEST\nGROUP BY TEST.ID\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createTableV1", + "sourceName" : "S2", + "schema" : "`ID` BIGINT KEY, `MIN` DATE", + "topicName" : "S2", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "orReplace" : false, + "isSource" : false + }, + "queryPlan" : { + "sources" : [ "TEST" ], + "sink" : "S2", + "physicalPlan" : { + "@type" : "tableSinkV1", + "properties" : { + "queryContext" : "S2" + }, + "source" : { + "@type" : "tableSelectV1", + "properties" : { + "queryContext" : "Aggregate/Project" + }, + "source" : { + "@type" : "streamAggregateV1", + "properties" : { + "queryContext" : "Aggregate/Aggregate" + }, + "source" : { + "@type" : "streamGroupByKeyV1", + "properties" : { + "queryContext" : "Aggregate/GroupBy" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Aggregate/Prepare" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "sourceSchema" : "`ID` BIGINT KEY, `VALUE` DATE", + "pseudoColumnVersion" : 1 + }, + "keyColumnNames" : [ "ID" ], + "selectExpressions" : [ "ID AS ID", "VALUE AS VALUE" ] + }, + "internalFormats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + } + }, + "internalFormats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "nonAggregateColumns" : [ "ID", "VALUE" ], + "aggregationFunctions" : [ "MIN(VALUE)" ] + }, + "keyColumnNames" : [ "ID" ], + "selectExpressions" : [ "KSQL_AGG_VARIABLE_0 AS MIN" ], + "internalFormats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + } + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "topicName" : "S2" + }, + "queryId" : "CTAS_S2_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "metric.reporters" : "", + "ksql.query.status.running.threshold.seconds" : "300", + "ksql.connect.basic.auth.credentials.reload" : "false", + "ksql.output.topic.name.prefix" : "", + "ksql.query.pull.stream.enabled" : "true", + "ksql.query.push.v2.interpreter.enabled" : "true", + "ksql.queryanonymizer.logs_enabled" : "true", + "ksql.variable.substitution.enable" : "true", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.query.pull.metrics.enabled" : "true", + "ksql.metrics.extension" : null, + "ksql.query.push.v2.alos.enabled" : "true", + "ksql.query.push.v2.max.hourly.bandwidth.megabytes" : "2147483647", + "ksql.query.pull.range.scan.enabled" : "true", + "ksql.transient.query.cleanup.service.initial.delay.seconds" : "600", + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.lambdas.enabled" : "true", + "ksql.source.table.materialization.enabled" : "true", + "ksql.query.pull.max.hourly.bandwidth.megabytes" : "2147483647", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.persistence.wrap.single.values" : null, + "ksql.query.transient.max.bytes.buffering.total" : "-1", + "ksql.connect.basic.auth.credentials.source" : "NONE", + "ksql.schema.registry.url" : "schema_registry.url:0", + "ksql.properties.overrides.denylist" : "", + "ksql.service.id" : "some.ksql.service.id", + "ksql.query.push.v2.max.catchup.consumers" : "5", + "ksql.query.push.v2.enabled" : "false", + "ksql.transient.query.cleanup.service.enable" : "true", + "ksql.query.push.v2.metrics.enabled" : "true", + "ksql.rowpartition.rowoffset.enabled" : "true", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.query.pull.table.scan.enabled" : "true", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.streams.topology.optimization" : "all", + "ksql.endpoint.migrate.query" : "true", + "ksql.query.push.v2.registry.installed" : "false", + "ksql.streams.num.stream.threads" : "4", + "ksql.metrics.tags.custom" : "", + "ksql.query.push.v2.catchup.consumer.msg.window" : "50", + "ksql.runtime.feature.shared.enabled" : "false", + "ksql.udf.collect.metrics" : "false", + "ksql.new.query.planner.enabled" : "false", + "ksql.connect.request.headers.plugin" : null, + "ksql.security.extension.class" : null, + "ksql.transient.prefix" : "transient_", + "ksql.headers.columns.enabled" : "true", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.connect.request.timeout.ms" : "5000", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.persistence.default.format.key" : "KAFKA", + "ksql.query.persistent.max.bytes.buffering.total" : "-1", + "ksql.query.error.max.queue.size" : "10", + "ksql.query.cleanup.shutdown.timeout.ms" : "30000", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.queryanonymizer.cluster_namespace" : null, + "ksql.create.or.replace.enabled" : "true", + "ksql.shared.runtimes.count" : "2", + "ksql.cast.strings.preserve.nulls" : "true", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.pull.queries.enable" : "true", + "ksql.query.pull.consistency.token.enabled" : "true", + "ksql.transient.query.cleanup.service.period.seconds" : "600", + "ksql.suppress.enabled" : "false", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.connect.basic.auth.credentials.file" : "", + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.query.retry.backoff.initial.ms" : "15000", + "ksql.query.pull.max.concurrent.requests" : "2147483647", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.connect.url" : "http://localhost:8083", + "ksql.query.push.v2.new.latest.delay.ms" : "5000", + "ksql.query.push.v2.latest.reset.age.ms" : "30000", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.query.pull.interpreter.enabled" : "true", + "ksql.query.pull.limit.clause.enabled" : "true", + "ksql.connect.error.handler" : null, + "ksql.query.pull.router.thread.pool.size" : "50", + "ksql.query.push.v2.continuation.tokens.enabled" : "false", + "ksql.query.retry.backoff.max.ms" : "900000", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.persistence.default.format.value" : null, + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.nested.error.set.null" : "true", + "ksql.query.pull.thread.pool.size" : "50", + "ksql.persistent.prefix" : "query_", + "ksql.metastore.backup.location" : "", + "ksql.error.classifier.regex" : "", + "ksql.suppress.buffer.size.bytes" : "-1" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/min-group-by_-_min_date_group_by_valid_after_7.2.0/7.2.0_1648094175528/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/min-group-by_-_min_date_group_by_valid_after_7.2.0/7.2.0_1648094175528/spec.json new file mode 100644 index 000000000000..f52c13db04e8 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/min-group-by_-_min_date_group_by_valid_after_7.2.0/7.2.0_1648094175528/spec.json @@ -0,0 +1,299 @@ +{ + "version" : "7.2.0", + "timestamp" : 1648094175528, + "path" : "query-validation-tests/min-group-by.json", + "schemas" : { + "CTAS_S2_0.Aggregate.Aggregate.Materialize" : { + "schema" : "`ID` BIGINT KEY, `ID` BIGINT, `VALUE` DATE, `KSQL_AGG_VARIABLE_0` DATE", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "CTAS_S2_0.Aggregate.Project" : { + "schema" : "`ID` BIGINT KEY, `MIN` DATE", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "CTAS_S2_0.KsqlTopic.Source" : { + "schema" : "`ID` BIGINT KEY, `VALUE` DATE", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "CTAS_S2_0.S2" : { + "schema" : "`ID` BIGINT KEY, `MIN` DATE", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "CTAS_S2_0.Aggregate.GroupBy" : { + "schema" : "`ID` BIGINT KEY, `ID` BIGINT, `VALUE` DATE", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + } + }, + "testCase" : { + "name" : "min date group by valid after 7.2.0", + "inputs" : [ { + "topic" : "test_topic", + "key" : 1, + "value" : { + "value" : null + } + }, { + "topic" : "test_topic", + "key" : 0, + "value" : { + "value" : -2147483647 + } + }, { + "topic" : "test_topic", + "key" : 0, + "value" : { + "value" : 5 + } + }, { + "topic" : "test_topic", + "key" : 0, + "value" : { + "value" : null + } + }, { + "topic" : "test_topic", + "key" : 100, + "value" : { + "value" : 100 + } + }, { + "topic" : "test_topic", + "key" : 100, + "value" : { + "value" : 6 + } + }, { + "topic" : "test_topic", + "key" : 100, + "value" : { + "value" : 300 + } + }, { + "topic" : "test_topic", + "key" : 0, + "value" : { + "value" : 2000 + } + }, { + "topic" : "test_topic", + "key" : 0, + "value" : { + "value" : 100 + } + } ], + "outputs" : [ { + "topic" : "S2", + "key" : 1, + "value" : { + "MIN" : null + } + }, { + "topic" : "S2", + "key" : 0, + "value" : { + "MIN" : -2147483647 + } + }, { + "topic" : "S2", + "key" : 0, + "value" : { + "MIN" : -2147483647 + } + }, { + "topic" : "S2", + "key" : 0, + "value" : { + "MIN" : -2147483647 + } + }, { + "topic" : "S2", + "key" : 100, + "value" : { + "MIN" : 100 + } + }, { + "topic" : "S2", + "key" : 100, + "value" : { + "MIN" : 6 + } + }, { + "topic" : "S2", + "key" : 100, + "value" : { + "MIN" : 6 + } + }, { + "topic" : "S2", + "key" : 0, + "value" : { + "MIN" : -2147483647 + } + }, { + "topic" : "S2", + "key" : 0, + "value" : { + "MIN" : -2147483647 + } + } ], + "topics" : [ { + "name" : "test_topic", + "valueSchema" : { + "type" : "record", + "name" : "KsqlDataSourceSchema", + "namespace" : "io.confluent.ksql.avro_schemas", + "fields" : [ { + "name" : "VALUE", + "type" : [ "null", { + "type" : "int", + "connect.version" : 1, + "connect.name" : "org.apache.kafka.connect.data.Date", + "logicalType" : "date" + } ], + "default" : null + } ], + "connect.name" : "io.confluent.ksql.avro_schemas.KsqlDataSourceSchema" + }, + "valueFormat" : "AVRO", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "S2", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM TEST (ID BIGINT KEY, VALUE DATE) WITH (kafka_topic='test_topic',value_format='AVRO');", "CREATE TABLE S2 as SELECT ID, min(value) as MIN FROM test group by id;" ], + "post" : { + "sources" : [ { + "name" : "S2", + "type" : "TABLE", + "schema" : "`ID` BIGINT KEY, `MIN` DATE", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "AVRO", + "keyFeatures" : [ ], + "valueFeatures" : [ ], + "isSource" : false + }, { + "name" : "TEST", + "type" : "STREAM", + "schema" : "`ID` BIGINT KEY, `VALUE` DATE", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "AVRO", + "keyFeatures" : [ ], + "valueFeatures" : [ ], + "isSource" : false + } ], + "topics" : { + "topics" : [ { + "name" : "_confluent-ksql-some.ksql.service.idquery_CTAS_S2_0-Aggregate-Aggregate-Materialize-changelog", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + }, + "valueSchema" : { + "type" : "record", + "name" : "KsqlDataSourceSchema", + "namespace" : "io.confluent.ksql.avro_schemas", + "fields" : [ { + "name" : "ID", + "type" : [ "null", "long" ], + "default" : null + }, { + "name" : "VALUE", + "type" : [ "null", { + "type" : "int", + "logicalType" : "date" + } ], + "default" : null + }, { + "name" : "KSQL_AGG_VARIABLE_0", + "type" : [ "null", { + "type" : "int", + "logicalType" : "date" + } ], + "default" : null + } ] + } + }, { + "name" : "test_topic", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + }, + "partitions" : 4, + "valueSchema" : { + "type" : "record", + "name" : "KsqlDataSourceSchema", + "namespace" : "io.confluent.ksql.avro_schemas", + "fields" : [ { + "name" : "VALUE", + "type" : [ "null", { + "type" : "int", + "connect.version" : 1, + "connect.name" : "org.apache.kafka.connect.data.Date", + "logicalType" : "date" + } ], + "default" : null + } ], + "connect.name" : "io.confluent.ksql.avro_schemas.KsqlDataSourceSchema" + } + }, { + "name" : "S2", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + }, + "partitions" : 4, + "valueSchema" : { + "type" : "record", + "name" : "KsqlDataSourceSchema", + "namespace" : "io.confluent.ksql.avro_schemas", + "fields" : [ { + "name" : "MIN", + "type" : [ "null", { + "type" : "int", + "logicalType" : "date" + } ], + "default" : null + } ] + } + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/min-group-by_-_min_date_group_by_valid_after_7.2.0/7.2.0_1648094175528/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/min-group-by_-_min_date_group_by_valid_after_7.2.0/7.2.0_1648094175528/topology new file mode 100644 index 000000000000..fcfc8fb28665 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/min-group-by_-_min_date_group_by_valid_after_7.2.0/7.2.0_1648094175528/topology @@ -0,0 +1,25 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Aggregate-Prepare + <-- KSTREAM-SOURCE-0000000000 + Processor: Aggregate-Prepare (stores: []) + --> KSTREAM-AGGREGATE-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-AGGREGATE-0000000003 (stores: [Aggregate-Aggregate-Materialize]) + --> Aggregate-Aggregate-ToOutputSchema + <-- Aggregate-Prepare + Processor: Aggregate-Aggregate-ToOutputSchema (stores: []) + --> Aggregate-Project + <-- KSTREAM-AGGREGATE-0000000003 + Processor: Aggregate-Project (stores: []) + --> KTABLE-TOSTREAM-0000000006 + <-- Aggregate-Aggregate-ToOutputSchema + Processor: KTABLE-TOSTREAM-0000000006 (stores: []) + --> KSTREAM-SINK-0000000007 + <-- Aggregate-Project + Sink: KSTREAM-SINK-0000000007 (topic: S2) + <-- KTABLE-TOSTREAM-0000000006 + diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/min-group-by_-_min_time_group_by_valid_after_7.2.0/7.2.0_1648094176457/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/min-group-by_-_min_time_group_by_valid_after_7.2.0/7.2.0_1648094176457/plan.json new file mode 100644 index 000000000000..053b99191254 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/min-group-by_-_min_time_group_by_valid_after_7.2.0/7.2.0_1648094176457/plan.json @@ -0,0 +1,234 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TEST (ID BIGINT KEY, VALUE TIME) WITH (KAFKA_TOPIC='test_topic', KEY_FORMAT='KAFKA', VALUE_FORMAT='AVRO');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TEST", + "schema" : "`ID` BIGINT KEY, `VALUE` TIME", + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "orReplace" : false, + "isSource" : false + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE TABLE S2 AS SELECT\n TEST.ID ID,\n MIN(TEST.VALUE) MIN\nFROM TEST TEST\nGROUP BY TEST.ID\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createTableV1", + "sourceName" : "S2", + "schema" : "`ID` BIGINT KEY, `MIN` TIME", + "topicName" : "S2", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "orReplace" : false, + "isSource" : false + }, + "queryPlan" : { + "sources" : [ "TEST" ], + "sink" : "S2", + "physicalPlan" : { + "@type" : "tableSinkV1", + "properties" : { + "queryContext" : "S2" + }, + "source" : { + "@type" : "tableSelectV1", + "properties" : { + "queryContext" : "Aggregate/Project" + }, + "source" : { + "@type" : "streamAggregateV1", + "properties" : { + "queryContext" : "Aggregate/Aggregate" + }, + "source" : { + "@type" : "streamGroupByKeyV1", + "properties" : { + "queryContext" : "Aggregate/GroupBy" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Aggregate/Prepare" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "sourceSchema" : "`ID` BIGINT KEY, `VALUE` TIME", + "pseudoColumnVersion" : 1 + }, + "keyColumnNames" : [ "ID" ], + "selectExpressions" : [ "ID AS ID", "VALUE AS VALUE" ] + }, + "internalFormats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + } + }, + "internalFormats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "nonAggregateColumns" : [ "ID", "VALUE" ], + "aggregationFunctions" : [ "MIN(VALUE)" ] + }, + "keyColumnNames" : [ "ID" ], + "selectExpressions" : [ "KSQL_AGG_VARIABLE_0 AS MIN" ], + "internalFormats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + } + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "topicName" : "S2" + }, + "queryId" : "CTAS_S2_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "metric.reporters" : "", + "ksql.query.status.running.threshold.seconds" : "300", + "ksql.connect.basic.auth.credentials.reload" : "false", + "ksql.output.topic.name.prefix" : "", + "ksql.query.pull.stream.enabled" : "true", + "ksql.query.push.v2.interpreter.enabled" : "true", + "ksql.queryanonymizer.logs_enabled" : "true", + "ksql.variable.substitution.enable" : "true", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.query.pull.metrics.enabled" : "true", + "ksql.metrics.extension" : null, + "ksql.query.push.v2.alos.enabled" : "true", + "ksql.query.push.v2.max.hourly.bandwidth.megabytes" : "2147483647", + "ksql.query.pull.range.scan.enabled" : "true", + "ksql.transient.query.cleanup.service.initial.delay.seconds" : "600", + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.lambdas.enabled" : "true", + "ksql.source.table.materialization.enabled" : "true", + "ksql.query.pull.max.hourly.bandwidth.megabytes" : "2147483647", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.persistence.wrap.single.values" : null, + "ksql.query.transient.max.bytes.buffering.total" : "-1", + "ksql.connect.basic.auth.credentials.source" : "NONE", + "ksql.schema.registry.url" : "schema_registry.url:0", + "ksql.properties.overrides.denylist" : "", + "ksql.service.id" : "some.ksql.service.id", + "ksql.query.push.v2.max.catchup.consumers" : "5", + "ksql.query.push.v2.enabled" : "false", + "ksql.transient.query.cleanup.service.enable" : "true", + "ksql.query.push.v2.metrics.enabled" : "true", + "ksql.rowpartition.rowoffset.enabled" : "true", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.query.pull.table.scan.enabled" : "true", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.streams.topology.optimization" : "all", + "ksql.endpoint.migrate.query" : "true", + "ksql.query.push.v2.registry.installed" : "false", + "ksql.streams.num.stream.threads" : "4", + "ksql.metrics.tags.custom" : "", + "ksql.query.push.v2.catchup.consumer.msg.window" : "50", + "ksql.runtime.feature.shared.enabled" : "false", + "ksql.udf.collect.metrics" : "false", + "ksql.new.query.planner.enabled" : "false", + "ksql.connect.request.headers.plugin" : null, + "ksql.security.extension.class" : null, + "ksql.transient.prefix" : "transient_", + "ksql.headers.columns.enabled" : "true", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.connect.request.timeout.ms" : "5000", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.persistence.default.format.key" : "KAFKA", + "ksql.query.persistent.max.bytes.buffering.total" : "-1", + "ksql.query.error.max.queue.size" : "10", + "ksql.query.cleanup.shutdown.timeout.ms" : "30000", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.queryanonymizer.cluster_namespace" : null, + "ksql.create.or.replace.enabled" : "true", + "ksql.shared.runtimes.count" : "2", + "ksql.cast.strings.preserve.nulls" : "true", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.pull.queries.enable" : "true", + "ksql.query.pull.consistency.token.enabled" : "true", + "ksql.transient.query.cleanup.service.period.seconds" : "600", + "ksql.suppress.enabled" : "false", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.connect.basic.auth.credentials.file" : "", + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.query.retry.backoff.initial.ms" : "15000", + "ksql.query.pull.max.concurrent.requests" : "2147483647", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.connect.url" : "http://localhost:8083", + "ksql.query.push.v2.new.latest.delay.ms" : "5000", + "ksql.query.push.v2.latest.reset.age.ms" : "30000", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.query.pull.interpreter.enabled" : "true", + "ksql.query.pull.limit.clause.enabled" : "true", + "ksql.connect.error.handler" : null, + "ksql.query.pull.router.thread.pool.size" : "50", + "ksql.query.push.v2.continuation.tokens.enabled" : "false", + "ksql.query.retry.backoff.max.ms" : "900000", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.persistence.default.format.value" : null, + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.nested.error.set.null" : "true", + "ksql.query.pull.thread.pool.size" : "50", + "ksql.persistent.prefix" : "query_", + "ksql.metastore.backup.location" : "", + "ksql.error.classifier.regex" : "", + "ksql.suppress.buffer.size.bytes" : "-1" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/min-group-by_-_min_time_group_by_valid_after_7.2.0/7.2.0_1648094176457/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/min-group-by_-_min_time_group_by_valid_after_7.2.0/7.2.0_1648094176457/spec.json new file mode 100644 index 000000000000..b7443d0574bd --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/min-group-by_-_min_time_group_by_valid_after_7.2.0/7.2.0_1648094176457/spec.json @@ -0,0 +1,299 @@ +{ + "version" : "7.2.0", + "timestamp" : 1648094176457, + "path" : "query-validation-tests/min-group-by.json", + "schemas" : { + "CTAS_S2_0.Aggregate.Aggregate.Materialize" : { + "schema" : "`ID` BIGINT KEY, `ID` BIGINT, `VALUE` TIME, `KSQL_AGG_VARIABLE_0` TIME", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "CTAS_S2_0.Aggregate.Project" : { + "schema" : "`ID` BIGINT KEY, `MIN` TIME", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "CTAS_S2_0.KsqlTopic.Source" : { + "schema" : "`ID` BIGINT KEY, `VALUE` TIME", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "CTAS_S2_0.S2" : { + "schema" : "`ID` BIGINT KEY, `MIN` TIME", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "CTAS_S2_0.Aggregate.GroupBy" : { + "schema" : "`ID` BIGINT KEY, `ID` BIGINT, `VALUE` TIME", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + } + }, + "testCase" : { + "name" : "min time group by valid after 7.2.0", + "inputs" : [ { + "topic" : "test_topic", + "key" : 1, + "value" : { + "value" : null + } + }, { + "topic" : "test_topic", + "key" : 0, + "value" : { + "value" : 5 + } + }, { + "topic" : "test_topic", + "key" : 0, + "value" : { + "value" : 2647 + } + }, { + "topic" : "test_topic", + "key" : 0, + "value" : { + "value" : null + } + }, { + "topic" : "test_topic", + "key" : 100, + "value" : { + "value" : 100 + } + }, { + "topic" : "test_topic", + "key" : 100, + "value" : { + "value" : 6 + } + }, { + "topic" : "test_topic", + "key" : 100, + "value" : { + "value" : 300 + } + }, { + "topic" : "test_topic", + "key" : 0, + "value" : { + "value" : 1 + } + }, { + "topic" : "test_topic", + "key" : 0, + "value" : { + "value" : 100 + } + } ], + "outputs" : [ { + "topic" : "S2", + "key" : 1, + "value" : { + "MIN" : null + } + }, { + "topic" : "S2", + "key" : 0, + "value" : { + "MIN" : 5 + } + }, { + "topic" : "S2", + "key" : 0, + "value" : { + "MIN" : 5 + } + }, { + "topic" : "S2", + "key" : 0, + "value" : { + "MIN" : 5 + } + }, { + "topic" : "S2", + "key" : 100, + "value" : { + "MIN" : 100 + } + }, { + "topic" : "S2", + "key" : 100, + "value" : { + "MIN" : 6 + } + }, { + "topic" : "S2", + "key" : 100, + "value" : { + "MIN" : 6 + } + }, { + "topic" : "S2", + "key" : 0, + "value" : { + "MIN" : 1 + } + }, { + "topic" : "S2", + "key" : 0, + "value" : { + "MIN" : 1 + } + } ], + "topics" : [ { + "name" : "test_topic", + "valueSchema" : { + "type" : "record", + "name" : "KsqlDataSourceSchema", + "namespace" : "io.confluent.ksql.avro_schemas", + "fields" : [ { + "name" : "VALUE", + "type" : [ "null", { + "type" : "int", + "connect.version" : 1, + "connect.name" : "org.apache.kafka.connect.data.Time", + "logicalType" : "time-millis" + } ], + "default" : null + } ], + "connect.name" : "io.confluent.ksql.avro_schemas.KsqlDataSourceSchema" + }, + "valueFormat" : "AVRO", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "S2", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM TEST (ID BIGINT KEY, VALUE TIME) WITH (kafka_topic='test_topic',value_format='AVRO');", "CREATE TABLE S2 as SELECT ID, min(value) as MIN FROM test group by id;" ], + "post" : { + "sources" : [ { + "name" : "S2", + "type" : "TABLE", + "schema" : "`ID` BIGINT KEY, `MIN` TIME", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "AVRO", + "keyFeatures" : [ ], + "valueFeatures" : [ ], + "isSource" : false + }, { + "name" : "TEST", + "type" : "STREAM", + "schema" : "`ID` BIGINT KEY, `VALUE` TIME", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "AVRO", + "keyFeatures" : [ ], + "valueFeatures" : [ ], + "isSource" : false + } ], + "topics" : { + "topics" : [ { + "name" : "_confluent-ksql-some.ksql.service.idquery_CTAS_S2_0-Aggregate-Aggregate-Materialize-changelog", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + }, + "valueSchema" : { + "type" : "record", + "name" : "KsqlDataSourceSchema", + "namespace" : "io.confluent.ksql.avro_schemas", + "fields" : [ { + "name" : "ID", + "type" : [ "null", "long" ], + "default" : null + }, { + "name" : "VALUE", + "type" : [ "null", { + "type" : "int", + "logicalType" : "time-millis" + } ], + "default" : null + }, { + "name" : "KSQL_AGG_VARIABLE_0", + "type" : [ "null", { + "type" : "int", + "logicalType" : "time-millis" + } ], + "default" : null + } ] + } + }, { + "name" : "test_topic", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + }, + "partitions" : 4, + "valueSchema" : { + "type" : "record", + "name" : "KsqlDataSourceSchema", + "namespace" : "io.confluent.ksql.avro_schemas", + "fields" : [ { + "name" : "VALUE", + "type" : [ "null", { + "type" : "int", + "connect.version" : 1, + "connect.name" : "org.apache.kafka.connect.data.Time", + "logicalType" : "time-millis" + } ], + "default" : null + } ], + "connect.name" : "io.confluent.ksql.avro_schemas.KsqlDataSourceSchema" + } + }, { + "name" : "S2", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + }, + "partitions" : 4, + "valueSchema" : { + "type" : "record", + "name" : "KsqlDataSourceSchema", + "namespace" : "io.confluent.ksql.avro_schemas", + "fields" : [ { + "name" : "MIN", + "type" : [ "null", { + "type" : "int", + "logicalType" : "time-millis" + } ], + "default" : null + } ] + } + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/min-group-by_-_min_time_group_by_valid_after_7.2.0/7.2.0_1648094176457/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/min-group-by_-_min_time_group_by_valid_after_7.2.0/7.2.0_1648094176457/topology new file mode 100644 index 000000000000..fcfc8fb28665 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/min-group-by_-_min_time_group_by_valid_after_7.2.0/7.2.0_1648094176457/topology @@ -0,0 +1,25 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Aggregate-Prepare + <-- KSTREAM-SOURCE-0000000000 + Processor: Aggregate-Prepare (stores: []) + --> KSTREAM-AGGREGATE-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-AGGREGATE-0000000003 (stores: [Aggregate-Aggregate-Materialize]) + --> Aggregate-Aggregate-ToOutputSchema + <-- Aggregate-Prepare + Processor: Aggregate-Aggregate-ToOutputSchema (stores: []) + --> Aggregate-Project + <-- KSTREAM-AGGREGATE-0000000003 + Processor: Aggregate-Project (stores: []) + --> KTABLE-TOSTREAM-0000000006 + <-- Aggregate-Aggregate-ToOutputSchema + Processor: KTABLE-TOSTREAM-0000000006 (stores: []) + --> KSTREAM-SINK-0000000007 + <-- Aggregate-Project + Sink: KSTREAM-SINK-0000000007 (topic: S2) + <-- KTABLE-TOSTREAM-0000000006 + diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/min-group-by_-_min_timestamp_group_by_valid_after_7.2.0/7.2.0_1648094176018/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/min-group-by_-_min_timestamp_group_by_valid_after_7.2.0/7.2.0_1648094176018/plan.json new file mode 100644 index 000000000000..7c42e62e34cf --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/min-group-by_-_min_timestamp_group_by_valid_after_7.2.0/7.2.0_1648094176018/plan.json @@ -0,0 +1,234 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TEST (ID BIGINT KEY, VALUE TIMESTAMP) WITH (KAFKA_TOPIC='test_topic', KEY_FORMAT='KAFKA', VALUE_FORMAT='AVRO');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TEST", + "schema" : "`ID` BIGINT KEY, `VALUE` TIMESTAMP", + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "orReplace" : false, + "isSource" : false + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE TABLE S2 AS SELECT\n TEST.ID ID,\n MIN(TEST.VALUE) MIN\nFROM TEST TEST\nGROUP BY TEST.ID\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createTableV1", + "sourceName" : "S2", + "schema" : "`ID` BIGINT KEY, `MIN` TIMESTAMP", + "topicName" : "S2", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "orReplace" : false, + "isSource" : false + }, + "queryPlan" : { + "sources" : [ "TEST" ], + "sink" : "S2", + "physicalPlan" : { + "@type" : "tableSinkV1", + "properties" : { + "queryContext" : "S2" + }, + "source" : { + "@type" : "tableSelectV1", + "properties" : { + "queryContext" : "Aggregate/Project" + }, + "source" : { + "@type" : "streamAggregateV1", + "properties" : { + "queryContext" : "Aggregate/Aggregate" + }, + "source" : { + "@type" : "streamGroupByKeyV1", + "properties" : { + "queryContext" : "Aggregate/GroupBy" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Aggregate/Prepare" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "sourceSchema" : "`ID` BIGINT KEY, `VALUE` TIMESTAMP", + "pseudoColumnVersion" : 1 + }, + "keyColumnNames" : [ "ID" ], + "selectExpressions" : [ "ID AS ID", "VALUE AS VALUE" ] + }, + "internalFormats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + } + }, + "internalFormats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "nonAggregateColumns" : [ "ID", "VALUE" ], + "aggregationFunctions" : [ "MIN(VALUE)" ] + }, + "keyColumnNames" : [ "ID" ], + "selectExpressions" : [ "KSQL_AGG_VARIABLE_0 AS MIN" ], + "internalFormats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + } + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "topicName" : "S2" + }, + "queryId" : "CTAS_S2_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "metric.reporters" : "", + "ksql.query.status.running.threshold.seconds" : "300", + "ksql.connect.basic.auth.credentials.reload" : "false", + "ksql.output.topic.name.prefix" : "", + "ksql.query.pull.stream.enabled" : "true", + "ksql.query.push.v2.interpreter.enabled" : "true", + "ksql.queryanonymizer.logs_enabled" : "true", + "ksql.variable.substitution.enable" : "true", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.query.pull.metrics.enabled" : "true", + "ksql.metrics.extension" : null, + "ksql.query.push.v2.alos.enabled" : "true", + "ksql.query.push.v2.max.hourly.bandwidth.megabytes" : "2147483647", + "ksql.query.pull.range.scan.enabled" : "true", + "ksql.transient.query.cleanup.service.initial.delay.seconds" : "600", + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.lambdas.enabled" : "true", + "ksql.source.table.materialization.enabled" : "true", + "ksql.query.pull.max.hourly.bandwidth.megabytes" : "2147483647", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.persistence.wrap.single.values" : null, + "ksql.query.transient.max.bytes.buffering.total" : "-1", + "ksql.connect.basic.auth.credentials.source" : "NONE", + "ksql.schema.registry.url" : "schema_registry.url:0", + "ksql.properties.overrides.denylist" : "", + "ksql.service.id" : "some.ksql.service.id", + "ksql.query.push.v2.max.catchup.consumers" : "5", + "ksql.query.push.v2.enabled" : "false", + "ksql.transient.query.cleanup.service.enable" : "true", + "ksql.query.push.v2.metrics.enabled" : "true", + "ksql.rowpartition.rowoffset.enabled" : "true", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.query.pull.table.scan.enabled" : "true", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.streams.topology.optimization" : "all", + "ksql.endpoint.migrate.query" : "true", + "ksql.query.push.v2.registry.installed" : "false", + "ksql.streams.num.stream.threads" : "4", + "ksql.metrics.tags.custom" : "", + "ksql.query.push.v2.catchup.consumer.msg.window" : "50", + "ksql.runtime.feature.shared.enabled" : "false", + "ksql.udf.collect.metrics" : "false", + "ksql.new.query.planner.enabled" : "false", + "ksql.connect.request.headers.plugin" : null, + "ksql.security.extension.class" : null, + "ksql.transient.prefix" : "transient_", + "ksql.headers.columns.enabled" : "true", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.connect.request.timeout.ms" : "5000", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.persistence.default.format.key" : "KAFKA", + "ksql.query.persistent.max.bytes.buffering.total" : "-1", + "ksql.query.error.max.queue.size" : "10", + "ksql.query.cleanup.shutdown.timeout.ms" : "30000", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.queryanonymizer.cluster_namespace" : null, + "ksql.create.or.replace.enabled" : "true", + "ksql.shared.runtimes.count" : "2", + "ksql.cast.strings.preserve.nulls" : "true", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.pull.queries.enable" : "true", + "ksql.query.pull.consistency.token.enabled" : "true", + "ksql.transient.query.cleanup.service.period.seconds" : "600", + "ksql.suppress.enabled" : "false", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.connect.basic.auth.credentials.file" : "", + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.query.retry.backoff.initial.ms" : "15000", + "ksql.query.pull.max.concurrent.requests" : "2147483647", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.connect.url" : "http://localhost:8083", + "ksql.query.push.v2.new.latest.delay.ms" : "5000", + "ksql.query.push.v2.latest.reset.age.ms" : "30000", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.query.pull.interpreter.enabled" : "true", + "ksql.query.pull.limit.clause.enabled" : "true", + "ksql.connect.error.handler" : null, + "ksql.query.pull.router.thread.pool.size" : "50", + "ksql.query.push.v2.continuation.tokens.enabled" : "false", + "ksql.query.retry.backoff.max.ms" : "900000", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.persistence.default.format.value" : null, + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.nested.error.set.null" : "true", + "ksql.query.pull.thread.pool.size" : "50", + "ksql.persistent.prefix" : "query_", + "ksql.metastore.backup.location" : "", + "ksql.error.classifier.regex" : "", + "ksql.suppress.buffer.size.bytes" : "-1" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/min-group-by_-_min_timestamp_group_by_valid_after_7.2.0/7.2.0_1648094176018/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/min-group-by_-_min_timestamp_group_by_valid_after_7.2.0/7.2.0_1648094176018/spec.json new file mode 100644 index 000000000000..b97aa007f4b1 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/min-group-by_-_min_timestamp_group_by_valid_after_7.2.0/7.2.0_1648094176018/spec.json @@ -0,0 +1,299 @@ +{ + "version" : "7.2.0", + "timestamp" : 1648094176018, + "path" : "query-validation-tests/min-group-by.json", + "schemas" : { + "CTAS_S2_0.Aggregate.Aggregate.Materialize" : { + "schema" : "`ID` BIGINT KEY, `ID` BIGINT, `VALUE` TIMESTAMP, `KSQL_AGG_VARIABLE_0` TIMESTAMP", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "CTAS_S2_0.Aggregate.Project" : { + "schema" : "`ID` BIGINT KEY, `MIN` TIMESTAMP", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "CTAS_S2_0.KsqlTopic.Source" : { + "schema" : "`ID` BIGINT KEY, `VALUE` TIMESTAMP", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "CTAS_S2_0.S2" : { + "schema" : "`ID` BIGINT KEY, `MIN` TIMESTAMP", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "CTAS_S2_0.Aggregate.GroupBy" : { + "schema" : "`ID` BIGINT KEY, `ID` BIGINT, `VALUE` TIMESTAMP", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + } + }, + "testCase" : { + "name" : "min timestamp group by valid after 7.2.0", + "inputs" : [ { + "topic" : "test_topic", + "key" : 1, + "value" : { + "value" : null + } + }, { + "topic" : "test_topic", + "key" : 0, + "value" : { + "value" : -100 + } + }, { + "topic" : "test_topic", + "key" : 0, + "value" : { + "value" : 5 + } + }, { + "topic" : "test_topic", + "key" : 0, + "value" : { + "value" : null + } + }, { + "topic" : "test_topic", + "key" : 100, + "value" : { + "value" : 5 + } + }, { + "topic" : "test_topic", + "key" : 100, + "value" : { + "value" : 6 + } + }, { + "topic" : "test_topic", + "key" : 100, + "value" : { + "value" : 300 + } + }, { + "topic" : "test_topic", + "key" : 0, + "value" : { + "value" : -2147483647 + } + }, { + "topic" : "test_topic", + "key" : 0, + "value" : { + "value" : 100 + } + } ], + "outputs" : [ { + "topic" : "S2", + "key" : 1, + "value" : { + "MIN" : null + } + }, { + "topic" : "S2", + "key" : 0, + "value" : { + "MIN" : -100 + } + }, { + "topic" : "S2", + "key" : 0, + "value" : { + "MIN" : -100 + } + }, { + "topic" : "S2", + "key" : 0, + "value" : { + "MIN" : -100 + } + }, { + "topic" : "S2", + "key" : 100, + "value" : { + "MIN" : 5 + } + }, { + "topic" : "S2", + "key" : 100, + "value" : { + "MIN" : 5 + } + }, { + "topic" : "S2", + "key" : 100, + "value" : { + "MIN" : 5 + } + }, { + "topic" : "S2", + "key" : 0, + "value" : { + "MIN" : -2147483647 + } + }, { + "topic" : "S2", + "key" : 0, + "value" : { + "MIN" : -2147483647 + } + } ], + "topics" : [ { + "name" : "test_topic", + "valueSchema" : { + "type" : "record", + "name" : "KsqlDataSourceSchema", + "namespace" : "io.confluent.ksql.avro_schemas", + "fields" : [ { + "name" : "VALUE", + "type" : [ "null", { + "type" : "long", + "connect.version" : 1, + "connect.name" : "org.apache.kafka.connect.data.Timestamp", + "logicalType" : "timestamp-millis" + } ], + "default" : null + } ], + "connect.name" : "io.confluent.ksql.avro_schemas.KsqlDataSourceSchema" + }, + "valueFormat" : "AVRO", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "S2", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM TEST (ID BIGINT KEY, VALUE TIMESTAMP) WITH (kafka_topic='test_topic',value_format='AVRO');", "CREATE TABLE S2 as SELECT ID, min(value) as MIN FROM test group by id;" ], + "post" : { + "sources" : [ { + "name" : "S2", + "type" : "TABLE", + "schema" : "`ID` BIGINT KEY, `MIN` TIMESTAMP", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "AVRO", + "keyFeatures" : [ ], + "valueFeatures" : [ ], + "isSource" : false + }, { + "name" : "TEST", + "type" : "STREAM", + "schema" : "`ID` BIGINT KEY, `VALUE` TIMESTAMP", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "AVRO", + "keyFeatures" : [ ], + "valueFeatures" : [ ], + "isSource" : false + } ], + "topics" : { + "topics" : [ { + "name" : "_confluent-ksql-some.ksql.service.idquery_CTAS_S2_0-Aggregate-Aggregate-Materialize-changelog", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + }, + "valueSchema" : { + "type" : "record", + "name" : "KsqlDataSourceSchema", + "namespace" : "io.confluent.ksql.avro_schemas", + "fields" : [ { + "name" : "ID", + "type" : [ "null", "long" ], + "default" : null + }, { + "name" : "VALUE", + "type" : [ "null", { + "type" : "long", + "logicalType" : "timestamp-millis" + } ], + "default" : null + }, { + "name" : "KSQL_AGG_VARIABLE_0", + "type" : [ "null", { + "type" : "long", + "logicalType" : "timestamp-millis" + } ], + "default" : null + } ] + } + }, { + "name" : "test_topic", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + }, + "partitions" : 4, + "valueSchema" : { + "type" : "record", + "name" : "KsqlDataSourceSchema", + "namespace" : "io.confluent.ksql.avro_schemas", + "fields" : [ { + "name" : "VALUE", + "type" : [ "null", { + "type" : "long", + "connect.version" : 1, + "connect.name" : "org.apache.kafka.connect.data.Timestamp", + "logicalType" : "timestamp-millis" + } ], + "default" : null + } ], + "connect.name" : "io.confluent.ksql.avro_schemas.KsqlDataSourceSchema" + } + }, { + "name" : "S2", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + }, + "partitions" : 4, + "valueSchema" : { + "type" : "record", + "name" : "KsqlDataSourceSchema", + "namespace" : "io.confluent.ksql.avro_schemas", + "fields" : [ { + "name" : "MIN", + "type" : [ "null", { + "type" : "long", + "logicalType" : "timestamp-millis" + } ], + "default" : null + } ] + } + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/min-group-by_-_min_timestamp_group_by_valid_after_7.2.0/7.2.0_1648094176018/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/min-group-by_-_min_timestamp_group_by_valid_after_7.2.0/7.2.0_1648094176018/topology new file mode 100644 index 000000000000..fcfc8fb28665 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/min-group-by_-_min_timestamp_group_by_valid_after_7.2.0/7.2.0_1648094176018/topology @@ -0,0 +1,25 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Aggregate-Prepare + <-- KSTREAM-SOURCE-0000000000 + Processor: Aggregate-Prepare (stores: []) + --> KSTREAM-AGGREGATE-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-AGGREGATE-0000000003 (stores: [Aggregate-Aggregate-Materialize]) + --> Aggregate-Aggregate-ToOutputSchema + <-- Aggregate-Prepare + Processor: Aggregate-Aggregate-ToOutputSchema (stores: []) + --> Aggregate-Project + <-- KSTREAM-AGGREGATE-0000000003 + Processor: Aggregate-Project (stores: []) + --> KTABLE-TOSTREAM-0000000006 + <-- Aggregate-Aggregate-ToOutputSchema + Processor: KTABLE-TOSTREAM-0000000006 (stores: []) + --> KSTREAM-SINK-0000000007 + <-- Aggregate-Project + Sink: KSTREAM-SINK-0000000007 (topic: S2) + <-- KTABLE-TOSTREAM-0000000006 + diff --git a/ksqldb-functional-tests/src/test/resources/query-validation-tests/min-group-by.json b/ksqldb-functional-tests/src/test/resources/query-validation-tests/min-group-by.json index c8bc4d9fa844..280e2cb6c759 100644 --- a/ksqldb-functional-tests/src/test/resources/query-validation-tests/min-group-by.json +++ b/ksqldb-functional-tests/src/test/resources/query-validation-tests/min-group-by.json @@ -125,6 +125,102 @@ {"topic": "S2", "key": 0,"value": {"MIN": "-01.79"}}, {"topic": "S2", "key": 0,"value": {"MIN": "-01.79"}} ] + }, + { + "name": "min date group by valid after 7.2.0", + "versions": { + "min": "7.2.0" + }, + "statements": [ + "CREATE STREAM TEST (ID BIGINT KEY, VALUE DATE) WITH (kafka_topic='test_topic',value_format='AVRO');", + "CREATE TABLE S2 as SELECT ID, min(value) as MIN FROM test group by id;" + ], + "inputs": [ + {"topic": "test_topic", "key": 1,"value": {"value": null}}, + {"topic": "test_topic", "key": 0,"value": {"value": -2147483647}}, + {"topic": "test_topic", "key": 0,"value": {"value": 5}}, + {"topic": "test_topic", "key": 0,"value": {"value": null}}, + {"topic": "test_topic", "key": 100,"value": {"value": 100}}, + {"topic": "test_topic", "key": 100,"value": {"value": 6}}, + {"topic": "test_topic", "key": 100,"value": {"value": 300}}, + {"topic": "test_topic", "key": 0,"value": {"value": 2000}}, + {"topic": "test_topic", "key": 0,"value": {"value": 100}} + ], + "outputs": [ + {"topic": "S2", "key": 1,"value": {"MIN": null}}, + {"topic": "S2", "key": 0,"value": {"MIN": -2147483647}}, + {"topic": "S2", "key": 0,"value": {"MIN": -2147483647}}, + {"topic": "S2", "key": 0,"value": {"MIN": -2147483647}}, + {"topic": "S2", "key": 100,"value": {"MIN": 100}}, + {"topic": "S2", "key": 100,"value": {"MIN": 6}}, + {"topic": "S2", "key": 100,"value": {"MIN": 6}}, + {"topic": "S2", "key": 0,"value": {"MIN": -2147483647}}, + {"topic": "S2", "key": 0,"value": {"MIN": -2147483647}} + ] + }, + { + "name": "min timestamp group by valid after 7.2.0", + "versions": { + "min": "7.2.0" + }, + "statements": [ + "CREATE STREAM TEST (ID BIGINT KEY, VALUE TIMESTAMP) WITH (kafka_topic='test_topic',value_format='AVRO');", + "CREATE TABLE S2 as SELECT ID, min(value) as MIN FROM test group by id;" + ], + "inputs": [ + {"topic": "test_topic", "key": 1,"value": {"value": null}}, + {"topic": "test_topic", "key": 0,"value": {"value": -100}}, + {"topic": "test_topic", "key": 0,"value": {"value": 5}}, + {"topic": "test_topic", "key": 0,"value": {"value": null}}, + {"topic": "test_topic", "key": 100,"value": {"value": 5}}, + {"topic": "test_topic", "key": 100,"value": {"value": 6}}, + {"topic": "test_topic", "key": 100,"value": {"value": 300}}, + {"topic": "test_topic", "key": 0,"value": {"value": -2147483647}}, + {"topic": "test_topic", "key": 0,"value": {"value": 100}} + ], + "outputs": [ + {"topic": "S2", "key": 1,"value": {"MIN": null}}, + {"topic": "S2", "key": 0,"value": {"MIN": -100}}, + {"topic": "S2", "key": 0,"value": {"MIN": -100}}, + {"topic": "S2", "key": 0,"value": {"MIN": -100}}, + {"topic": "S2", "key": 100,"value": {"MIN": 5}}, + {"topic": "S2", "key": 100,"value": {"MIN": 5}}, + {"topic": "S2", "key": 100,"value": {"MIN": 5}}, + {"topic": "S2", "key": 0,"value": {"MIN": -2147483647}}, + {"topic": "S2", "key": 0,"value": {"MIN": -2147483647}} + ] + }, + { + "name": "min time group by valid after 7.2.0", + "versions": { + "min": "7.2.0" + }, + "statements": [ + "CREATE STREAM TEST (ID BIGINT KEY, VALUE TIME) WITH (kafka_topic='test_topic',value_format='AVRO');", + "CREATE TABLE S2 as SELECT ID, min(value) as MIN FROM test group by id;" + ], + "inputs": [ + {"topic": "test_topic", "key": 1,"value": {"value": null}}, + {"topic": "test_topic", "key": 0,"value": {"value": 5}}, + {"topic": "test_topic", "key": 0,"value": {"value": 2647}}, + {"topic": "test_topic", "key": 0,"value": {"value": null}}, + {"topic": "test_topic", "key": 100,"value": {"value": 100}}, + {"topic": "test_topic", "key": 100,"value": {"value": 6}}, + {"topic": "test_topic", "key": 100,"value": {"value": 300}}, + {"topic": "test_topic", "key": 0,"value": {"value": 1}}, + {"topic": "test_topic", "key": 0,"value": {"value": 100}} + ], + "outputs": [ + {"topic": "S2", "key": 1,"value": {"MIN": null}}, + {"topic": "S2", "key": 0,"value": {"MIN": 5}}, + {"topic": "S2", "key": 0,"value": {"MIN": 5}}, + {"topic": "S2", "key": 0,"value": {"MIN": 5}}, + {"topic": "S2", "key": 100,"value": {"MIN": 100}}, + {"topic": "S2", "key": 100,"value": {"MIN": 6}}, + {"topic": "S2", "key": 100,"value": {"MIN": 6}}, + {"topic": "S2", "key": 0,"value": {"MIN": 1}}, + {"topic": "S2", "key": 0,"value": {"MIN": 1}} + ] } ] } \ No newline at end of file