diff --git a/docs/developer-guide/ksqldb-reference/aggregate-functions.md b/docs/developer-guide/ksqldb-reference/aggregate-functions.md index 3e3167799f04..46139c5eacf8 100644 --- a/docs/developer-guide/ksqldb-reference/aggregate-functions.md +++ b/docs/developer-guide/ksqldb-reference/aggregate-functions.md @@ -221,6 +221,18 @@ Stream Return the minimum value for a given column and window. Rows that have `col1` set to null are ignored. +## `STDDEV_SAMP` + +Since: - 0.15.0 + +```sql +STDDEV_SAMP(col1) +``` + +Stream, Table + +Returns the sample standard deviation for the column. + ## `SUM` Since: - diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/function/udaf/stddev/StandardDeviationSampUdaf.java b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udaf/stddev/StandardDeviationSampUdaf.java new file mode 100644 index 000000000000..3623a8fc89bc --- /dev/null +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udaf/stddev/StandardDeviationSampUdaf.java @@ -0,0 +1,199 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.function.udaf.stddev; + +import io.confluent.ksql.function.udaf.TableUdaf; +import io.confluent.ksql.function.udaf.UdafDescription; +import io.confluent.ksql.function.udaf.UdafFactory; +import io.confluent.ksql.util.KsqlConstants; + +import java.util.function.BiFunction; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; + +@UdafDescription(name = "STDDEV_SAMP", + description = "Returns the sample standard deviation of the column. " + + + "Applicable only to numeric types.", + author = KsqlConstants.CONFLUENT_AUTHOR +) +public final class StandardDeviationSampUdaf { + + private static final String COUNT = "COUNT"; + private static final String SUM = "SUM"; + private static final String M2 = "M2"; + private static final Schema STRUCT_LONG = SchemaBuilder.struct().optional() + .field(SUM, Schema.OPTIONAL_INT64_SCHEMA) + .field(COUNT, Schema.OPTIONAL_INT64_SCHEMA) + .field(M2, Schema.OPTIONAL_FLOAT64_SCHEMA) + .build(); + + private static final Schema STRUCT_INT = SchemaBuilder.struct().optional() + .field(SUM, Schema.OPTIONAL_INT32_SCHEMA) + .field(COUNT, Schema.OPTIONAL_INT64_SCHEMA) + .field(M2, Schema.OPTIONAL_FLOAT64_SCHEMA) + .build(); + + private static final Schema STRUCT_DOUBLE = SchemaBuilder.struct().optional() + .field(SUM, Schema.OPTIONAL_FLOAT64_SCHEMA) + .field(COUNT, Schema.OPTIONAL_INT64_SCHEMA) + .field(M2, Schema.OPTIONAL_FLOAT64_SCHEMA) + .build(); + + private StandardDeviationSampUdaf() { + } + + @UdafFactory(description = "Compute sample standard deviation of column with type Long.", + aggregateSchema = "STRUCT") + public static TableUdaf stdDevLong() { + return getStdDevImplementation( + 0L, + STRUCT_LONG, + (agg, newValue) -> newValue + agg.getInt64(SUM), + (agg, newValue) -> + Double.valueOf(newValue * (agg.getInt64(COUNT) + 1) - (agg.getInt64(SUM) + newValue)), + (agg1, agg2) -> + Double.valueOf( + agg1.getInt64(SUM) / agg1.getInt64(COUNT) + - agg2.getInt64(SUM) / agg2.getInt64(COUNT)), + (agg1, agg2) -> agg1.getInt64(SUM) + agg2.getInt64(SUM), + (agg, valueToRemove) -> agg.getInt64(SUM) - valueToRemove); + } + + @UdafFactory(description = "Compute sample standard deviation of column with type Integer.", + aggregateSchema = "STRUCT") + public static TableUdaf stdDevInt() { + return getStdDevImplementation( + 0, + STRUCT_INT, + (agg, newValue) -> newValue + agg.getInt32(SUM), + (agg, newValue) -> + Double.valueOf(newValue * (agg.getInt64(COUNT) + 1) - (agg.getInt32(SUM) + newValue)), + (agg1, agg2) -> + Double.valueOf( + agg1.getInt32(SUM) / agg1.getInt64(COUNT) + - agg2.getInt32(SUM) / agg2.getInt64(COUNT)), + (agg1, agg2) -> agg1.getInt32(SUM) + agg2.getInt32(SUM), + (agg, valueToRemove) -> agg.getInt32(SUM) - valueToRemove); + } + + @UdafFactory(description = "Compute sample standard deviation of column with type Double.", + aggregateSchema = "STRUCT") + public static TableUdaf stdDevDouble() { + return getStdDevImplementation( + 0.0, + STRUCT_DOUBLE, + (agg, newValue) -> newValue + agg.getFloat64(SUM), + (agg, newValue) -> newValue * (agg.getInt64(COUNT) + 1) - (agg.getFloat64(SUM) + newValue), + (agg1, agg2) -> + agg1.getFloat64(SUM) / agg1.getInt64(COUNT) + - agg2.getFloat64(SUM) / agg2.getInt64(COUNT), + (agg1, agg2) -> agg1.getFloat64(SUM) + agg2.getFloat64(SUM), + (agg, valueToRemove) -> agg.getFloat64(SUM) - valueToRemove); + } + + private static TableUdaf getStdDevImplementation( + final I initialValue, + final Schema structSchema, + final BiFunction add, + final BiFunction createDelta, + final BiFunction mergeInner, + final BiFunction mergeSum, + final BiFunction undoSum) { + return new TableUdaf() { + + @Override + public Struct initialize() { + return new Struct(structSchema).put(SUM, initialValue).put(COUNT, 0L).put(M2, 0.0); + } + + @Override + public Struct aggregate(final I newValue, final Struct aggregate) { + // Uses the Youngs-Cramer algorithm to calculate standard deviation + if (newValue == null) { + return aggregate; + } + final long newCount = aggregate.getInt64(COUNT) + 1; + final double newM2; + + if (newCount - 1 > 0) { + final double delta = createDelta.apply(aggregate, newValue); + newM2 = delta * delta / (newCount * (newCount - 1)); + } else { + // processing the first item + newM2 = 0; + } + + return new Struct(structSchema) + .put(COUNT, newCount) + .put(SUM, add.apply(aggregate, newValue)) + .put(M2, newM2 + aggregate.getFloat64(M2)); + } + + @Override + public Struct merge(final Struct aggOne, final Struct aggTwo) { + final long countOne = aggOne.getInt64(COUNT); + final long countTwo = aggTwo.getInt64(COUNT); + + final double m2One = aggOne.getFloat64(M2); + final double m2Two = aggTwo.getFloat64(M2); + final long newCount = countOne + countTwo; + final double newM2; + + if (countOne == 0 || countTwo == 0) { + newM2 = m2One + m2Two; + } else { + final double innerCalc = mergeInner.apply(aggOne, aggTwo); + newM2 = m2One + m2Two + countOne * countTwo * innerCalc * innerCalc / newCount; + } + + return new Struct(structSchema) + .put(COUNT, newCount) + .put(SUM, mergeSum.apply(aggOne, aggTwo)) + .put(M2, newM2); + } + + @Override + public Double map(final Struct aggregate) { + final long count = aggregate.getInt64(COUNT); + if (count < 2) { + return 0.0; + } + return aggregate.getFloat64(M2) / (count - 1); + } + + @Override + public Struct undo(final I valueToUndo, final Struct aggregate) { + if (valueToUndo == null) { + return aggregate; + } + final long newCount = aggregate.getInt64(COUNT) - 1; + final double newM2; + if (newCount > 0) { + final double delta = createDelta.apply(aggregate, valueToUndo); + newM2 = delta * delta / (newCount * (newCount + 1)); + } else { + newM2 = 0; + } + return new Struct(structSchema) + .put(COUNT, newCount) + .put(SUM, undoSum.apply(aggregate, valueToUndo)) + .put(M2, aggregate.getFloat64(M2) - newM2); + } + }; + } +} diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/function/udaf/stddev/StandardDeviationSampUdafTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/function/udaf/stddev/StandardDeviationSampUdafTest.java new file mode 100644 index 000000000000..3c146ff0ca9f --- /dev/null +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/function/udaf/stddev/StandardDeviationSampUdafTest.java @@ -0,0 +1,220 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.function.udaf.stddev; + +import io.confluent.ksql.function.udaf.TableUdaf; +import static io.confluent.ksql.function.udaf.stddev.StandardDeviationSampUdaf.stdDevLong; +import static io.confluent.ksql.function.udaf.stddev.StandardDeviationSampUdaf.stdDevInt; +import static io.confluent.ksql.function.udaf.stddev.StandardDeviationSampUdaf.stdDevDouble; + +import org.apache.kafka.connect.data.Struct; +import org.junit.Test; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; + +public class StandardDeviationSampUdafTest { + + private static final String COUNT = "COUNT"; + private static final String SUM = "SUM"; + private static final String M2 = "M2"; + + @Test + public void shouldCalculateStdDevLongs() { + final TableUdaf udaf = stdDevLong(); + Struct agg = udaf.initialize(); + final Long[] values = new Long[] {1L, 2L, 3L, 4L, 5L}; + for (final Long thisValue : values) { + agg = udaf.aggregate(thisValue, agg); + } + assertThat(agg.getInt64(COUNT), equalTo(5L)); + assertThat(agg.getInt64(SUM), equalTo(15L)); + assertThat(agg.getFloat64(M2), equalTo(10.0)); + + final double standardDev = udaf.map(agg); + assertThat(standardDev, equalTo(2.5)); + } + + @Test + public void shouldCalculateStdDevInts() { + final TableUdaf udaf = stdDevInt(); + Struct agg = udaf.initialize(); + final Integer[] values = new Integer[] {3, 5, 6, 7}; + for (final Integer thisValue : values) { + agg = udaf.aggregate(thisValue, agg); + } + assertThat(agg.getInt64(COUNT), equalTo(4L)); + assertThat(agg.getInt32(SUM), equalTo(21)); + assertThat(agg.getFloat64(M2), equalTo(8.75)); + + final double standardDev = udaf.map(agg); + assertThat(standardDev, equalTo(2.9166666666666665)); + } + + @Test + public void shouldCalculateStdDevDoubles() { + final TableUdaf udaf = stdDevDouble(); + Struct agg = udaf.initialize(); + final Double[] values = new Double[] {10.2, 13.4, 14.5, 17.8}; + for (final Double thisValue : values) { + agg = udaf.aggregate(thisValue, agg); + } + assertThat(agg.getInt64(COUNT), equalTo(4L)); + assertThat(agg.getFloat64(SUM), equalTo(55.900000000000006)); + assertThat(agg.getFloat64(M2), equalTo(29.48749999999999)); + + final double standardDev = udaf.map(agg); + assertThat(standardDev, equalTo(9.829166666666664)); + } + + @Test + public void shouldAverageZeroes() { + final TableUdaf udaf = stdDevInt(); + Struct agg = udaf.initialize(); + final int[] values = new int[] {0, 0, 0}; + for (final int thisValue : values) { + agg = udaf.aggregate(thisValue, agg); + } + final double standardDev = udaf.map(agg); + + assertThat(standardDev, equalTo(0.0)); + } + + @Test + public void shouldAverageEmpty() { + final TableUdaf udaf = stdDevInt(); + final Struct agg = udaf.initialize(); + final double standardDev = udaf.map(agg); + + assertThat(standardDev, equalTo(0.0)); + } + + @Test + public void shouldIgnoreNull() { + final TableUdaf udaf = stdDevInt(); + Struct agg = udaf.initialize(); + final Integer[] values = new Integer[] {60, 64, 70}; + for (final int thisValue : values) { + agg = udaf.aggregate(thisValue, agg); + } + agg = udaf.aggregate(null, agg); + final double standardDev = udaf.map(agg); + assertThat(standardDev, equalTo(25.333333333333332)); + } + + @Test + public void shouldMergeLongs() { + final TableUdaf udaf = stdDevLong(); + + Struct left = udaf.initialize(); + final Long[] leftValues = new Long[] {1L, 2L, 3L, 4L, 5L}; + for (final Long thisValue : leftValues) { + left = udaf.aggregate(thisValue, left); + } + + Struct right = udaf.initialize(); + final Long[] rightValues = new Long[] {2L, 2L, 1L}; + for (final Long thisValue : rightValues) { + right = udaf.aggregate(thisValue, right); + } + + final Struct merged = udaf.merge(left, right); + + assertThat(merged.getInt64(COUNT), equalTo(8L)); + assertThat(merged.getInt64(SUM), equalTo(20L)); + assertThat(merged.getFloat64(M2), equalTo(18.166666666666664)); + + final double standardDev = udaf.map(merged); + assertThat(standardDev, equalTo(2.595238095238095)); + } + + @Test + public void shouldMergeInts() { + final TableUdaf udaf = stdDevInt(); + + Struct left = udaf.initialize(); + final Integer[] leftValues = new Integer[] {5, 8, 10}; + for (final Integer thisValue : leftValues) { + left = udaf.aggregate(thisValue, left); + } + + Struct right = udaf.initialize(); + final Integer[] rightValues = new Integer[] {6, 7, 9}; + for (final Integer thisValue : rightValues) { + right = udaf.aggregate(thisValue, right); + } + + final Struct merged = udaf.merge(left, right); + + assertThat(merged.getInt64(COUNT), equalTo(6L)); + assertThat(merged.getInt32(SUM), equalTo(45)); + assertThat(merged.getFloat64(M2), equalTo(17.333333333333332)); + + final double standardDev = udaf.map(merged); + assertThat(standardDev, equalTo(3.4666666666666663)); + } + + @Test + public void shouldMergeDoubles() { + final TableUdaf udaf = stdDevDouble(); + + Struct left = udaf.initialize(); + final Double[] leftValues = new Double[] {5.5, 8.4, 10.9}; + for (final Double thisValue : leftValues) { + left = udaf.aggregate(thisValue, left); + } + + Struct right = udaf.initialize(); + final Double[] rightValues = new Double[] {6.3, 7.2, 9.7}; + for (final Double thisValue : rightValues) { + right = udaf.aggregate(thisValue, right); + } + + final Struct merged = udaf.merge(left, right); + + assertThat(merged.getInt64(COUNT), equalTo(6L)); + assertThat(merged.getFloat64(SUM), equalTo(48.0)); + assertThat(merged.getFloat64(M2), equalTo(21.240000000000006)); + + final double standardDev = udaf.map(merged); + assertThat(standardDev, equalTo(4.248000000000001)); + } + + @Test + public void shouldUndoSummedLongs() { + final TableUdaf udaf = stdDevLong(); + Struct agg = udaf.initialize(); + final Long[] values = new Long[] {1L, 2L, 3L, 4L, 5L}; + for (final Long thisValue : values) { + agg = udaf.aggregate(thisValue, agg); + } + assertThat(agg.getInt64(COUNT), equalTo(5L)); + assertThat(agg.getInt64(SUM), equalTo(15L)); + assertThat(agg.getFloat64(M2), equalTo(10.0)); + + double standardDev = udaf.map(agg); + assertThat(standardDev, equalTo(2.5)); + + agg = udaf.undo(2L, agg); + + assertThat(agg.getInt64(COUNT), equalTo(4L)); + assertThat(agg.getInt64(SUM), equalTo(13L)); + assertThat(agg.getFloat64(M2), equalTo(8.75)); + + standardDev = udaf.map(agg); + assertThat(standardDev, equalTo(2.9166666666666665)); + } +} diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/standarddeviation_-_stddev_samp_double/6.2.0_1610468734732/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/standarddeviation_-_stddev_samp_double/6.2.0_1610468734732/plan.json new file mode 100644 index 000000000000..2bb4f04b7c91 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/standarddeviation_-_stddev_samp_double/6.2.0_1610468734732/plan.json @@ -0,0 +1,182 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TEST (ID BIGINT KEY, VALUE DOUBLE) WITH (KAFKA_TOPIC='test_topic', KEY_FORMAT='KAFKA', VALUE_FORMAT='AVRO');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TEST", + "schema" : "`ID` BIGINT KEY, `VALUE` DOUBLE", + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "orReplace" : false + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE TABLE OUTPUT AS SELECT\n TEST.ID ID,\n STDDEV_SAMP(TEST.VALUE) STDDEV\nFROM TEST TEST\nGROUP BY TEST.ID\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createTableV1", + "sourceName" : "OUTPUT", + "schema" : "`ID` BIGINT KEY, `STDDEV` DOUBLE", + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "orReplace" : false + }, + "queryPlan" : { + "sources" : [ "TEST" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "tableSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "tableSelectV1", + "properties" : { + "queryContext" : "Aggregate/Project" + }, + "source" : { + "@type" : "streamAggregateV1", + "properties" : { + "queryContext" : "Aggregate/Aggregate" + }, + "source" : { + "@type" : "streamGroupByKeyV1", + "properties" : { + "queryContext" : "Aggregate/GroupBy" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Aggregate/Prepare" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "sourceSchema" : "`ID` BIGINT KEY, `VALUE` DOUBLE" + }, + "keyColumnNames" : [ "ID" ], + "selectExpressions" : [ "ID AS ID", "VALUE AS VALUE" ] + }, + "internalFormats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + } + }, + "internalFormats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "nonAggregateColumns" : [ "ID", "VALUE" ], + "aggregationFunctions" : [ "STDDEV_SAMP(VALUE)" ] + }, + "keyColumnNames" : [ "ID" ], + "selectExpressions" : [ "KSQL_AGG_VARIABLE_0 AS STDDEV" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CTAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "metric.reporters" : "", + "ksql.transient.prefix" : "transient_", + "ksql.query.status.running.threshold.seconds" : "300", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.persistence.default.format.key" : "KAFKA", + "ksql.query.persistent.max.bytes.buffering.total" : "-1", + "ksql.query.error.max.queue.size" : "10", + "ksql.variable.substitution.enable" : "true", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.query.pull.metrics.enabled" : "false", + "ksql.create.or.replace.enabled" : "true", + "ksql.metrics.extension" : null, + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.cast.strings.preserve.nulls" : "true", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.pull.queries.enable" : "true", + "ksql.suppress.enabled" : "false", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.persistence.wrap.single.values" : null, + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.query.retry.backoff.initial.ms" : "15000", + "ksql.query.transient.max.bytes.buffering.total" : "-1", + "ksql.schema.registry.url" : "", + "ksql.properties.overrides.denylist" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.streams.topology.optimization" : "all", + "ksql.query.retry.backoff.max.ms" : "900000", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.metrics.tags.custom" : "", + "ksql.persistence.default.format.value" : null, + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.udf.collect.metrics" : "false", + "ksql.query.pull.thread.pool.size" : "100", + "ksql.persistent.prefix" : "query_", + "ksql.metastore.backup.location" : "", + "ksql.error.classifier.regex" : "", + "ksql.suppress.buffer.size.bytes" : "-1" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/standarddeviation_-_stddev_samp_double/6.2.0_1610468734732/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/standarddeviation_-_stddev_samp_double/6.2.0_1610468734732/spec.json new file mode 100644 index 000000000000..ccf9949ee240 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/standarddeviation_-_stddev_samp_double/6.2.0_1610468734732/spec.json @@ -0,0 +1,273 @@ +{ + "version" : "6.2.0", + "timestamp" : 1610468734732, + "path" : "query-validation-tests/standarddeviation.json", + "schemas" : { + "CTAS_OUTPUT_0.Aggregate.GroupBy" : { + "schema" : "`ID` BIGINT KEY, `ID` BIGINT, `VALUE` DOUBLE", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "CTAS_OUTPUT_0.KsqlTopic.Source" : { + "schema" : "`ID` BIGINT KEY, `VALUE` DOUBLE", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "CTAS_OUTPUT_0.Aggregate.Aggregate.Materialize" : { + "schema" : "`ID` BIGINT KEY, `ID` BIGINT, `VALUE` DOUBLE, `KSQL_AGG_VARIABLE_0` STRUCT<`SUM` DOUBLE, `COUNT` BIGINT, `M2` DOUBLE>", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "CTAS_OUTPUT_0.OUTPUT" : { + "schema" : "`ID` BIGINT KEY, `STDDEV` DOUBLE", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + } + }, + "testCase" : { + "name" : "stddev_samp double", + "inputs" : [ { + "topic" : "test_topic", + "key" : 0, + "value" : { + "value" : -1.8 + } + }, { + "topic" : "test_topic", + "key" : 0, + "value" : { + "value" : 2.3 + } + }, { + "topic" : "test_topic", + "key" : 100, + "value" : { + "value" : 9223372036854.775807 + } + }, { + "topic" : "test_topic", + "key" : 100, + "value" : { + "value" : 300.8 + } + }, { + "topic" : "test_topic", + "key" : 0, + "value" : { + "value" : 100.2 + } + }, { + "topic" : "test_topic", + "key" : 100, + "value" : { + "value" : -200000.6 + } + }, { + "topic" : "test_topic", + "key" : 100, + "value" : { + "value" : 0.0 + } + }, { + "topic" : "test_topic", + "key" : 0, + "value" : { + "value" : null + } + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : 0, + "value" : { + "STDDEV" : 0.0 + } + }, { + "topic" : "OUTPUT", + "key" : 0, + "value" : { + "STDDEV" : 8.405 + } + }, { + "topic" : "OUTPUT", + "key" : 100, + "value" : { + "STDDEV" : 0.0 + } + }, { + "topic" : "OUTPUT", + "key" : 100, + "value" : { + "STDDEV" : 42535295862342920000000000 + } + }, { + "topic" : "OUTPUT", + "key" : 0, + "value" : { + "STDDEV" : 3334.2033333333347 + } + }, { + "topic" : "OUTPUT", + "key" : 100, + "value" : { + "STDDEV" : 28356864524046740000000000 + } + }, { + "topic" : "OUTPUT", + "key" : 100, + "value" : { + "STDDEV" : 21267648239542927000000000 + } + }, { + "topic" : "OUTPUT", + "key" : 0, + "value" : { + "STDDEV" : 3334.2033333333347 + } + } ], + "topics" : [ { + "name" : "OUTPUT", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "test_topic", + "valueSchema" : { + "type" : "record", + "name" : "KsqlDataSourceSchema", + "namespace" : "io.confluent.ksql.avro_schemas", + "fields" : [ { + "name" : "VALUE", + "type" : [ "null", "double" ], + "default" : null + } ], + "connect.name" : "io.confluent.ksql.avro_schemas.KsqlDataSourceSchema" + }, + "valueFormat" : "AVRO", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM TEST (ID BIGINT KEY, VALUE double) WITH (kafka_topic='test_topic', value_format='AVRO');", "CREATE TABLE OUTPUT as SELECT ID, stddev_samp(value) as stddev FROM test group by id;" ], + "post" : { + "sources" : [ { + "name" : "OUTPUT", + "type" : "TABLE", + "schema" : "`ID` BIGINT KEY, `STDDEV` DOUBLE", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "AVRO", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + }, { + "name" : "TEST", + "type" : "STREAM", + "schema" : "`ID` BIGINT KEY, `VALUE` DOUBLE", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "AVRO", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + } ], + "topics" : { + "topics" : [ { + "name" : "test_topic", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + }, + "partitions" : 4, + "valueSchema" : { + "type" : "record", + "name" : "KsqlDataSourceSchema", + "namespace" : "io.confluent.ksql.avro_schemas", + "fields" : [ { + "name" : "VALUE", + "type" : [ "null", "double" ], + "default" : null + } ], + "connect.name" : "io.confluent.ksql.avro_schemas.KsqlDataSourceSchema" + } + }, { + "name" : "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-Aggregate-Materialize-changelog", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + }, + "valueSchema" : { + "type" : "record", + "name" : "KsqlDataSourceSchema", + "namespace" : "io.confluent.ksql.avro_schemas", + "fields" : [ { + "name" : "ID", + "type" : [ "null", "long" ], + "default" : null + }, { + "name" : "VALUE", + "type" : [ "null", "double" ], + "default" : null + }, { + "name" : "KSQL_AGG_VARIABLE_0", + "type" : [ "null", { + "type" : "record", + "name" : "KsqlDataSourceSchema_KSQL_AGG_VARIABLE_0", + "fields" : [ { + "name" : "SUM", + "type" : [ "null", "double" ], + "default" : null + }, { + "name" : "COUNT", + "type" : [ "null", "long" ], + "default" : null + }, { + "name" : "M2", + "type" : [ "null", "double" ], + "default" : null + } ] + } ], + "default" : null + } ] + } + }, { + "name" : "OUTPUT", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + }, + "partitions" : 4, + "valueSchema" : { + "type" : "record", + "name" : "KsqlDataSourceSchema", + "namespace" : "io.confluent.ksql.avro_schemas", + "fields" : [ { + "name" : "STDDEV", + "type" : [ "null", "double" ], + "default" : null + } ] + } + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/standarddeviation_-_stddev_samp_double/6.2.0_1610468734732/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/standarddeviation_-_stddev_samp_double/6.2.0_1610468734732/topology new file mode 100644 index 000000000000..e000b160c4db --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/standarddeviation_-_stddev_samp_double/6.2.0_1610468734732/topology @@ -0,0 +1,25 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Aggregate-Prepare + <-- KSTREAM-SOURCE-0000000000 + Processor: Aggregate-Prepare (stores: []) + --> KSTREAM-AGGREGATE-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-AGGREGATE-0000000003 (stores: [Aggregate-Aggregate-Materialize]) + --> Aggregate-Aggregate-ToOutputSchema + <-- Aggregate-Prepare + Processor: Aggregate-Aggregate-ToOutputSchema (stores: []) + --> Aggregate-Project + <-- KSTREAM-AGGREGATE-0000000003 + Processor: Aggregate-Project (stores: []) + --> KTABLE-TOSTREAM-0000000006 + <-- Aggregate-Aggregate-ToOutputSchema + Processor: KTABLE-TOSTREAM-0000000006 (stores: []) + --> KSTREAM-SINK-0000000007 + <-- Aggregate-Project + Sink: KSTREAM-SINK-0000000007 (topic: OUTPUT) + <-- KTABLE-TOSTREAM-0000000006 + diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/standarddeviation_-_stddev_samp_double_map/6.2.0_1610468734823/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/standarddeviation_-_stddev_samp_double_map/6.2.0_1610468734823/plan.json new file mode 100644 index 000000000000..77dbffb4c318 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/standarddeviation_-_stddev_samp_double_map/6.2.0_1610468734823/plan.json @@ -0,0 +1,182 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TEST (ID BIGINT KEY, NAME STRING, VALUE MAP) WITH (KAFKA_TOPIC='test_topic', KEY_FORMAT='KAFKA', VALUE_FORMAT='JSON');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TEST", + "schema" : "`ID` BIGINT KEY, `NAME` STRING, `VALUE` MAP", + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "orReplace" : false + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE TABLE OUTPUT AS SELECT\n TEST.ID ID,\n STDDEV_SAMP(TEST.VALUE['key1']) STDDEV\nFROM TEST TEST\nGROUP BY TEST.ID\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createTableV1", + "sourceName" : "OUTPUT", + "schema" : "`ID` BIGINT KEY, `STDDEV` DOUBLE", + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "orReplace" : false + }, + "queryPlan" : { + "sources" : [ "TEST" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "tableSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "tableSelectV1", + "properties" : { + "queryContext" : "Aggregate/Project" + }, + "source" : { + "@type" : "streamAggregateV1", + "properties" : { + "queryContext" : "Aggregate/Aggregate" + }, + "source" : { + "@type" : "streamGroupByKeyV1", + "properties" : { + "queryContext" : "Aggregate/GroupBy" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Aggregate/Prepare" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "sourceSchema" : "`ID` BIGINT KEY, `NAME` STRING, `VALUE` MAP" + }, + "keyColumnNames" : [ "ID" ], + "selectExpressions" : [ "ID AS ID", "VALUE AS VALUE", "VALUE['key1'] AS KSQL_INTERNAL_COL_2" ] + }, + "internalFormats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + } + }, + "internalFormats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "nonAggregateColumns" : [ "ID", "VALUE" ], + "aggregationFunctions" : [ "STDDEV_SAMP(KSQL_INTERNAL_COL_2)" ] + }, + "keyColumnNames" : [ "ID" ], + "selectExpressions" : [ "KSQL_AGG_VARIABLE_0 AS STDDEV" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CTAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "metric.reporters" : "", + "ksql.transient.prefix" : "transient_", + "ksql.query.status.running.threshold.seconds" : "300", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.persistence.default.format.key" : "KAFKA", + "ksql.query.persistent.max.bytes.buffering.total" : "-1", + "ksql.query.error.max.queue.size" : "10", + "ksql.variable.substitution.enable" : "true", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.query.pull.metrics.enabled" : "false", + "ksql.create.or.replace.enabled" : "true", + "ksql.metrics.extension" : null, + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.cast.strings.preserve.nulls" : "true", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.pull.queries.enable" : "true", + "ksql.suppress.enabled" : "false", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.persistence.wrap.single.values" : null, + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.query.retry.backoff.initial.ms" : "15000", + "ksql.query.transient.max.bytes.buffering.total" : "-1", + "ksql.schema.registry.url" : "", + "ksql.properties.overrides.denylist" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.streams.topology.optimization" : "all", + "ksql.query.retry.backoff.max.ms" : "900000", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.metrics.tags.custom" : "", + "ksql.persistence.default.format.value" : null, + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.udf.collect.metrics" : "false", + "ksql.query.pull.thread.pool.size" : "100", + "ksql.persistent.prefix" : "query_", + "ksql.metastore.backup.location" : "", + "ksql.error.classifier.regex" : "", + "ksql.suppress.buffer.size.bytes" : "-1" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/standarddeviation_-_stddev_samp_double_map/6.2.0_1610468734823/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/standarddeviation_-_stddev_samp_double_map/6.2.0_1610468734823/spec.json new file mode 100644 index 000000000000..98defa14c886 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/standarddeviation_-_stddev_samp_double_map/6.2.0_1610468734823/spec.json @@ -0,0 +1,190 @@ +{ + "version" : "6.2.0", + "timestamp" : 1610468734823, + "path" : "query-validation-tests/standarddeviation.json", + "schemas" : { + "CTAS_OUTPUT_0.Aggregate.GroupBy" : { + "schema" : "`ID` BIGINT KEY, `ID` BIGINT, `VALUE` MAP, `KSQL_INTERNAL_COL_2` DOUBLE", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "CTAS_OUTPUT_0.KsqlTopic.Source" : { + "schema" : "`ID` BIGINT KEY, `NAME` STRING, `VALUE` MAP", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "CTAS_OUTPUT_0.Aggregate.Aggregate.Materialize" : { + "schema" : "`ID` BIGINT KEY, `ID` BIGINT, `VALUE` MAP, `KSQL_AGG_VARIABLE_0` STRUCT<`SUM` DOUBLE, `COUNT` BIGINT, `M2` DOUBLE>", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "CTAS_OUTPUT_0.OUTPUT" : { + "schema" : "`ID` BIGINT KEY, `STDDEV` DOUBLE", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + } + }, + "testCase" : { + "name" : "stddev_samp double map", + "inputs" : [ { + "topic" : "test_topic", + "key" : 0, + "value" : { + "name" : "zero", + "value" : { + "key1" : 10.0, + "key2" : 1.0 + } + } + }, { + "topic" : "test_topic", + "key" : 0, + "value" : { + "name" : "zero", + "value" : { + "key1" : 12.0, + "key2" : 1.0 + } + } + }, { + "topic" : "test_topic", + "key" : 0, + "value" : { + "name" : "zero", + "value" : { + "key1" : 13.0, + "key2" : 1.0 + } + } + }, { + "topic" : "test_topic", + "key" : 0, + "value" : { + "name" : "zero", + "value" : { + "key1" : 14.0, + "key2" : 1.0 + } + } + }, { + "topic" : "test_topic", + "key" : 0, + "value" : { + "name" : "zero", + "value" : { + "key1" : 16.0, + "key2" : 1.0 + } + } + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : 0, + "value" : { + "STDDEV" : 0.0 + } + }, { + "topic" : "OUTPUT", + "key" : 0, + "value" : { + "STDDEV" : 2.0 + } + }, { + "topic" : "OUTPUT", + "key" : 0, + "value" : { + "STDDEV" : 2.333333333333333 + } + }, { + "topic" : "OUTPUT", + "key" : 0, + "value" : { + "STDDEV" : 2.9166666666666665 + } + }, { + "topic" : "OUTPUT", + "key" : 0, + "value" : { + "STDDEV" : 5.0 + } + } ], + "topics" : [ { + "name" : "OUTPUT", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "test_topic", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM TEST (ID BIGINT KEY, NAME varchar, VALUE map) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE TABLE OUTPUT as SELECT ID, stddev_samp(value['key1']) AS STDDEV FROM test group by id;" ], + "post" : { + "sources" : [ { + "name" : "OUTPUT", + "type" : "TABLE", + "schema" : "`ID` BIGINT KEY, `STDDEV` DOUBLE", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "JSON", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + }, { + "name" : "TEST", + "type" : "STREAM", + "schema" : "`ID` BIGINT KEY, `NAME` STRING, `VALUE` MAP", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "JSON", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + } ], + "topics" : { + "topics" : [ { + "name" : "test_topic", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + }, { + "name" : "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-Aggregate-Materialize-changelog", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, { + "name" : "OUTPUT", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/standarddeviation_-_stddev_samp_double_map/6.2.0_1610468734823/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/standarddeviation_-_stddev_samp_double_map/6.2.0_1610468734823/topology new file mode 100644 index 000000000000..e000b160c4db --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/standarddeviation_-_stddev_samp_double_map/6.2.0_1610468734823/topology @@ -0,0 +1,25 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Aggregate-Prepare + <-- KSTREAM-SOURCE-0000000000 + Processor: Aggregate-Prepare (stores: []) + --> KSTREAM-AGGREGATE-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-AGGREGATE-0000000003 (stores: [Aggregate-Aggregate-Materialize]) + --> Aggregate-Aggregate-ToOutputSchema + <-- Aggregate-Prepare + Processor: Aggregate-Aggregate-ToOutputSchema (stores: []) + --> Aggregate-Project + <-- KSTREAM-AGGREGATE-0000000003 + Processor: Aggregate-Project (stores: []) + --> KTABLE-TOSTREAM-0000000006 + <-- Aggregate-Aggregate-ToOutputSchema + Processor: KTABLE-TOSTREAM-0000000006 (stores: []) + --> KSTREAM-SINK-0000000007 + <-- Aggregate-Project + Sink: KSTREAM-SINK-0000000007 (topic: OUTPUT) + <-- KTABLE-TOSTREAM-0000000006 + diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/standarddeviation_-_stddev_samp_int/6.2.0_1610468734542/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/standarddeviation_-_stddev_samp_int/6.2.0_1610468734542/plan.json new file mode 100644 index 000000000000..7e61e63a6e1d --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/standarddeviation_-_stddev_samp_int/6.2.0_1610468734542/plan.json @@ -0,0 +1,182 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TEST (ID BIGINT KEY, VALUE INTEGER) WITH (KAFKA_TOPIC='test_topic', KEY_FORMAT='KAFKA', VALUE_FORMAT='AVRO');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TEST", + "schema" : "`ID` BIGINT KEY, `VALUE` INTEGER", + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "orReplace" : false + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE TABLE OUTPUT AS SELECT\n TEST.ID ID,\n STDDEV_SAMP(TEST.VALUE) STDDEV\nFROM TEST TEST\nGROUP BY TEST.ID\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createTableV1", + "sourceName" : "OUTPUT", + "schema" : "`ID` BIGINT KEY, `STDDEV` DOUBLE", + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "orReplace" : false + }, + "queryPlan" : { + "sources" : [ "TEST" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "tableSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "tableSelectV1", + "properties" : { + "queryContext" : "Aggregate/Project" + }, + "source" : { + "@type" : "streamAggregateV1", + "properties" : { + "queryContext" : "Aggregate/Aggregate" + }, + "source" : { + "@type" : "streamGroupByKeyV1", + "properties" : { + "queryContext" : "Aggregate/GroupBy" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Aggregate/Prepare" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "sourceSchema" : "`ID` BIGINT KEY, `VALUE` INTEGER" + }, + "keyColumnNames" : [ "ID" ], + "selectExpressions" : [ "ID AS ID", "VALUE AS VALUE" ] + }, + "internalFormats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + } + }, + "internalFormats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "nonAggregateColumns" : [ "ID", "VALUE" ], + "aggregationFunctions" : [ "STDDEV_SAMP(VALUE)" ] + }, + "keyColumnNames" : [ "ID" ], + "selectExpressions" : [ "KSQL_AGG_VARIABLE_0 AS STDDEV" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CTAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "metric.reporters" : "", + "ksql.transient.prefix" : "transient_", + "ksql.query.status.running.threshold.seconds" : "300", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.persistence.default.format.key" : "KAFKA", + "ksql.query.persistent.max.bytes.buffering.total" : "-1", + "ksql.query.error.max.queue.size" : "10", + "ksql.variable.substitution.enable" : "true", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.query.pull.metrics.enabled" : "false", + "ksql.create.or.replace.enabled" : "true", + "ksql.metrics.extension" : null, + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.cast.strings.preserve.nulls" : "true", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.pull.queries.enable" : "true", + "ksql.suppress.enabled" : "false", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.persistence.wrap.single.values" : null, + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.query.retry.backoff.initial.ms" : "15000", + "ksql.query.transient.max.bytes.buffering.total" : "-1", + "ksql.schema.registry.url" : "", + "ksql.properties.overrides.denylist" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.streams.topology.optimization" : "all", + "ksql.query.retry.backoff.max.ms" : "900000", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.metrics.tags.custom" : "", + "ksql.persistence.default.format.value" : null, + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.udf.collect.metrics" : "false", + "ksql.query.pull.thread.pool.size" : "100", + "ksql.persistent.prefix" : "query_", + "ksql.metastore.backup.location" : "", + "ksql.error.classifier.regex" : "", + "ksql.suppress.buffer.size.bytes" : "-1" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/standarddeviation_-_stddev_samp_int/6.2.0_1610468734542/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/standarddeviation_-_stddev_samp_int/6.2.0_1610468734542/spec.json new file mode 100644 index 000000000000..09e41da4113c --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/standarddeviation_-_stddev_samp_int/6.2.0_1610468734542/spec.json @@ -0,0 +1,273 @@ +{ + "version" : "6.2.0", + "timestamp" : 1610468734542, + "path" : "query-validation-tests/standarddeviation.json", + "schemas" : { + "CTAS_OUTPUT_0.Aggregate.GroupBy" : { + "schema" : "`ID` BIGINT KEY, `ID` BIGINT, `VALUE` INTEGER", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "CTAS_OUTPUT_0.KsqlTopic.Source" : { + "schema" : "`ID` BIGINT KEY, `VALUE` INTEGER", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "CTAS_OUTPUT_0.Aggregate.Aggregate.Materialize" : { + "schema" : "`ID` BIGINT KEY, `ID` BIGINT, `VALUE` INTEGER, `KSQL_AGG_VARIABLE_0` STRUCT<`SUM` INTEGER, `COUNT` BIGINT, `M2` DOUBLE>", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "CTAS_OUTPUT_0.OUTPUT" : { + "schema" : "`ID` BIGINT KEY, `STDDEV` DOUBLE", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + } + }, + "testCase" : { + "name" : "stddev_samp int", + "inputs" : [ { + "topic" : "test_topic", + "key" : 0, + "value" : { + "value" : 0 + } + }, { + "topic" : "test_topic", + "key" : 0, + "value" : { + "value" : 10 + } + }, { + "topic" : "test_topic", + "key" : 100, + "value" : { + "value" : 50 + } + }, { + "topic" : "test_topic", + "key" : 100, + "value" : { + "value" : 10 + } + }, { + "topic" : "test_topic", + "key" : 0, + "value" : { + "value" : 7 + } + }, { + "topic" : "test_topic", + "key" : 100, + "value" : { + "value" : 3 + } + }, { + "topic" : "test_topic", + "key" : 100, + "value" : { + "value" : null + } + }, { + "topic" : "test_topic", + "key" : 0, + "value" : { + "value" : 6 + } + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : 0, + "value" : { + "STDDEV" : 0.0 + } + }, { + "topic" : "OUTPUT", + "key" : 0, + "value" : { + "STDDEV" : 50.0 + } + }, { + "topic" : "OUTPUT", + "key" : 100, + "value" : { + "STDDEV" : 0.0 + } + }, { + "topic" : "OUTPUT", + "key" : 100, + "value" : { + "STDDEV" : 800.0 + } + }, { + "topic" : "OUTPUT", + "key" : 0, + "value" : { + "STDDEV" : 26.333333333333332 + } + }, { + "topic" : "OUTPUT", + "key" : 100, + "value" : { + "STDDEV" : 643.0 + } + }, { + "topic" : "OUTPUT", + "key" : 100, + "value" : { + "STDDEV" : 643.0 + } + }, { + "topic" : "OUTPUT", + "key" : 0, + "value" : { + "STDDEV" : 17.583333333333332 + } + } ], + "topics" : [ { + "name" : "OUTPUT", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "test_topic", + "valueSchema" : { + "type" : "record", + "name" : "KsqlDataSourceSchema", + "namespace" : "io.confluent.ksql.avro_schemas", + "fields" : [ { + "name" : "VALUE", + "type" : [ "null", "int" ], + "default" : null + } ], + "connect.name" : "io.confluent.ksql.avro_schemas.KsqlDataSourceSchema" + }, + "valueFormat" : "AVRO", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM TEST (ID BIGINT KEY, VALUE integer) WITH (kafka_topic='test_topic',value_format='AVRO');", "CREATE TABLE OUTPUT as SELECT ID, stddev_samp(value) AS stddev FROM test group by id;" ], + "post" : { + "sources" : [ { + "name" : "OUTPUT", + "type" : "TABLE", + "schema" : "`ID` BIGINT KEY, `STDDEV` DOUBLE", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "AVRO", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + }, { + "name" : "TEST", + "type" : "STREAM", + "schema" : "`ID` BIGINT KEY, `VALUE` INTEGER", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "AVRO", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + } ], + "topics" : { + "topics" : [ { + "name" : "test_topic", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + }, + "partitions" : 4, + "valueSchema" : { + "type" : "record", + "name" : "KsqlDataSourceSchema", + "namespace" : "io.confluent.ksql.avro_schemas", + "fields" : [ { + "name" : "VALUE", + "type" : [ "null", "int" ], + "default" : null + } ], + "connect.name" : "io.confluent.ksql.avro_schemas.KsqlDataSourceSchema" + } + }, { + "name" : "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-Aggregate-Materialize-changelog", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + }, + "valueSchema" : { + "type" : "record", + "name" : "KsqlDataSourceSchema", + "namespace" : "io.confluent.ksql.avro_schemas", + "fields" : [ { + "name" : "ID", + "type" : [ "null", "long" ], + "default" : null + }, { + "name" : "VALUE", + "type" : [ "null", "int" ], + "default" : null + }, { + "name" : "KSQL_AGG_VARIABLE_0", + "type" : [ "null", { + "type" : "record", + "name" : "KsqlDataSourceSchema_KSQL_AGG_VARIABLE_0", + "fields" : [ { + "name" : "SUM", + "type" : [ "null", "int" ], + "default" : null + }, { + "name" : "COUNT", + "type" : [ "null", "long" ], + "default" : null + }, { + "name" : "M2", + "type" : [ "null", "double" ], + "default" : null + } ] + } ], + "default" : null + } ] + } + }, { + "name" : "OUTPUT", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + }, + "partitions" : 4, + "valueSchema" : { + "type" : "record", + "name" : "KsqlDataSourceSchema", + "namespace" : "io.confluent.ksql.avro_schemas", + "fields" : [ { + "name" : "STDDEV", + "type" : [ "null", "double" ], + "default" : null + } ] + } + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/standarddeviation_-_stddev_samp_int/6.2.0_1610468734542/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/standarddeviation_-_stddev_samp_int/6.2.0_1610468734542/topology new file mode 100644 index 000000000000..e000b160c4db --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/standarddeviation_-_stddev_samp_int/6.2.0_1610468734542/topology @@ -0,0 +1,25 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Aggregate-Prepare + <-- KSTREAM-SOURCE-0000000000 + Processor: Aggregate-Prepare (stores: []) + --> KSTREAM-AGGREGATE-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-AGGREGATE-0000000003 (stores: [Aggregate-Aggregate-Materialize]) + --> Aggregate-Aggregate-ToOutputSchema + <-- Aggregate-Prepare + Processor: Aggregate-Aggregate-ToOutputSchema (stores: []) + --> Aggregate-Project + <-- KSTREAM-AGGREGATE-0000000003 + Processor: Aggregate-Project (stores: []) + --> KTABLE-TOSTREAM-0000000006 + <-- Aggregate-Aggregate-ToOutputSchema + Processor: KTABLE-TOSTREAM-0000000006 (stores: []) + --> KSTREAM-SINK-0000000007 + <-- Aggregate-Project + Sink: KSTREAM-SINK-0000000007 (topic: OUTPUT) + <-- KTABLE-TOSTREAM-0000000006 + diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/standarddeviation_-_stddev_samp_long/6.2.0_1610468734648/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/standarddeviation_-_stddev_samp_long/6.2.0_1610468734648/plan.json new file mode 100644 index 000000000000..beaa3dde9199 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/standarddeviation_-_stddev_samp_long/6.2.0_1610468734648/plan.json @@ -0,0 +1,182 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TEST (ID BIGINT KEY, VALUE BIGINT) WITH (KAFKA_TOPIC='test_topic', KEY_FORMAT='KAFKA', VALUE_FORMAT='AVRO');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TEST", + "schema" : "`ID` BIGINT KEY, `VALUE` BIGINT", + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "orReplace" : false + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE TABLE OUTPUT AS SELECT\n TEST.ID ID,\n STDDEV_SAMP(TEST.VALUE) STDDEV\nFROM TEST TEST\nGROUP BY TEST.ID\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createTableV1", + "sourceName" : "OUTPUT", + "schema" : "`ID` BIGINT KEY, `STDDEV` DOUBLE", + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "orReplace" : false + }, + "queryPlan" : { + "sources" : [ "TEST" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "tableSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "tableSelectV1", + "properties" : { + "queryContext" : "Aggregate/Project" + }, + "source" : { + "@type" : "streamAggregateV1", + "properties" : { + "queryContext" : "Aggregate/Aggregate" + }, + "source" : { + "@type" : "streamGroupByKeyV1", + "properties" : { + "queryContext" : "Aggregate/GroupBy" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Aggregate/Prepare" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "sourceSchema" : "`ID` BIGINT KEY, `VALUE` BIGINT" + }, + "keyColumnNames" : [ "ID" ], + "selectExpressions" : [ "ID AS ID", "VALUE AS VALUE" ] + }, + "internalFormats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + } + }, + "internalFormats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "nonAggregateColumns" : [ "ID", "VALUE" ], + "aggregationFunctions" : [ "STDDEV_SAMP(VALUE)" ] + }, + "keyColumnNames" : [ "ID" ], + "selectExpressions" : [ "KSQL_AGG_VARIABLE_0 AS STDDEV" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CTAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "metric.reporters" : "", + "ksql.transient.prefix" : "transient_", + "ksql.query.status.running.threshold.seconds" : "300", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.persistence.default.format.key" : "KAFKA", + "ksql.query.persistent.max.bytes.buffering.total" : "-1", + "ksql.query.error.max.queue.size" : "10", + "ksql.variable.substitution.enable" : "true", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.query.pull.metrics.enabled" : "false", + "ksql.create.or.replace.enabled" : "true", + "ksql.metrics.extension" : null, + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.cast.strings.preserve.nulls" : "true", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.pull.queries.enable" : "true", + "ksql.suppress.enabled" : "false", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.persistence.wrap.single.values" : null, + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.query.retry.backoff.initial.ms" : "15000", + "ksql.query.transient.max.bytes.buffering.total" : "-1", + "ksql.schema.registry.url" : "", + "ksql.properties.overrides.denylist" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.streams.topology.optimization" : "all", + "ksql.query.retry.backoff.max.ms" : "900000", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.metrics.tags.custom" : "", + "ksql.persistence.default.format.value" : null, + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.udf.collect.metrics" : "false", + "ksql.query.pull.thread.pool.size" : "100", + "ksql.persistent.prefix" : "query_", + "ksql.metastore.backup.location" : "", + "ksql.error.classifier.regex" : "", + "ksql.suppress.buffer.size.bytes" : "-1" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/standarddeviation_-_stddev_samp_long/6.2.0_1610468734648/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/standarddeviation_-_stddev_samp_long/6.2.0_1610468734648/spec.json new file mode 100644 index 000000000000..6f349ca60fc9 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/standarddeviation_-_stddev_samp_long/6.2.0_1610468734648/spec.json @@ -0,0 +1,273 @@ +{ + "version" : "6.2.0", + "timestamp" : 1610468734648, + "path" : "query-validation-tests/standarddeviation.json", + "schemas" : { + "CTAS_OUTPUT_0.Aggregate.GroupBy" : { + "schema" : "`ID` BIGINT KEY, `ID` BIGINT, `VALUE` BIGINT", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "CTAS_OUTPUT_0.KsqlTopic.Source" : { + "schema" : "`ID` BIGINT KEY, `VALUE` BIGINT", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "CTAS_OUTPUT_0.Aggregate.Aggregate.Materialize" : { + "schema" : "`ID` BIGINT KEY, `ID` BIGINT, `VALUE` BIGINT, `KSQL_AGG_VARIABLE_0` STRUCT<`SUM` BIGINT, `COUNT` BIGINT, `M2` DOUBLE>", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "CTAS_OUTPUT_0.OUTPUT" : { + "schema" : "`ID` BIGINT KEY, `STDDEV` DOUBLE", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + } + } + }, + "testCase" : { + "name" : "stddev_samp long", + "inputs" : [ { + "topic" : "test_topic", + "key" : 0, + "value" : { + "value" : 2147483648 + } + }, { + "topic" : "test_topic", + "key" : 0, + "value" : { + "value" : 100 + } + }, { + "topic" : "test_topic", + "key" : 100, + "value" : { + "value" : 500 + } + }, { + "topic" : "test_topic", + "key" : 100, + "value" : { + "value" : 100 + } + }, { + "topic" : "test_topic", + "key" : 0, + "value" : { + "value" : -2 + } + }, { + "topic" : "test_topic", + "key" : 100, + "value" : { + "value" : 0 + } + }, { + "topic" : "test_topic", + "key" : 100, + "value" : { + "value" : -6 + } + }, { + "topic" : "test_topic", + "key" : 0, + "value" : { + "value" : null + } + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : 0, + "value" : { + "STDDEV" : 0.0 + } + }, { + "topic" : "OUTPUT", + "key" : 0, + "value" : { + "STDDEV" : 2305842794465334300 + } + }, { + "topic" : "OUTPUT", + "key" : 100, + "value" : { + "STDDEV" : 0.0 + } + }, { + "topic" : "OUTPUT", + "key" : 100, + "value" : { + "STDDEV" : 80000.0 + } + }, { + "topic" : "OUTPUT", + "key" : 0, + "value" : { + "STDDEV" : 1537228602658000400 + } + }, { + "topic" : "OUTPUT", + "key" : 100, + "value" : { + "STDDEV" : 70000.0 + } + }, { + "topic" : "OUTPUT", + "key" : 100, + "value" : { + "STDDEV" : 57275.666666666664 + } + }, { + "topic" : "OUTPUT", + "key" : 0, + "value" : { + "STDDEV" : 1537228602658000400 + } + } ], + "topics" : [ { + "name" : "OUTPUT", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "test_topic", + "valueSchema" : { + "type" : "record", + "name" : "KsqlDataSourceSchema", + "namespace" : "io.confluent.ksql.avro_schemas", + "fields" : [ { + "name" : "VALUE", + "type" : [ "null", "long" ], + "default" : null + } ], + "connect.name" : "io.confluent.ksql.avro_schemas.KsqlDataSourceSchema" + }, + "valueFormat" : "AVRO", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM TEST (ID BIGINT KEY, VALUE bigint) WITH (kafka_topic='test_topic', value_format='AVRO');", "CREATE TABLE OUTPUT as SELECT ID, stddev_samp(value) as stddev FROM test group by id;" ], + "post" : { + "sources" : [ { + "name" : "OUTPUT", + "type" : "TABLE", + "schema" : "`ID` BIGINT KEY, `STDDEV` DOUBLE", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "AVRO", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + }, { + "name" : "TEST", + "type" : "STREAM", + "schema" : "`ID` BIGINT KEY, `VALUE` BIGINT", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "AVRO", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + } ], + "topics" : { + "topics" : [ { + "name" : "test_topic", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + }, + "partitions" : 4, + "valueSchema" : { + "type" : "record", + "name" : "KsqlDataSourceSchema", + "namespace" : "io.confluent.ksql.avro_schemas", + "fields" : [ { + "name" : "VALUE", + "type" : [ "null", "long" ], + "default" : null + } ], + "connect.name" : "io.confluent.ksql.avro_schemas.KsqlDataSourceSchema" + } + }, { + "name" : "_confluent-ksql-some.ksql.service.idquery_CTAS_OUTPUT_0-Aggregate-Aggregate-Materialize-changelog", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + }, + "valueSchema" : { + "type" : "record", + "name" : "KsqlDataSourceSchema", + "namespace" : "io.confluent.ksql.avro_schemas", + "fields" : [ { + "name" : "ID", + "type" : [ "null", "long" ], + "default" : null + }, { + "name" : "VALUE", + "type" : [ "null", "long" ], + "default" : null + }, { + "name" : "KSQL_AGG_VARIABLE_0", + "type" : [ "null", { + "type" : "record", + "name" : "KsqlDataSourceSchema_KSQL_AGG_VARIABLE_0", + "fields" : [ { + "name" : "SUM", + "type" : [ "null", "long" ], + "default" : null + }, { + "name" : "COUNT", + "type" : [ "null", "long" ], + "default" : null + }, { + "name" : "M2", + "type" : [ "null", "double" ], + "default" : null + } ] + } ], + "default" : null + } ] + } + }, { + "name" : "OUTPUT", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "AVRO" + }, + "partitions" : 4, + "valueSchema" : { + "type" : "record", + "name" : "KsqlDataSourceSchema", + "namespace" : "io.confluent.ksql.avro_schemas", + "fields" : [ { + "name" : "STDDEV", + "type" : [ "null", "double" ], + "default" : null + } ] + } + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/standarddeviation_-_stddev_samp_long/6.2.0_1610468734648/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/standarddeviation_-_stddev_samp_long/6.2.0_1610468734648/topology new file mode 100644 index 000000000000..e000b160c4db --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/standarddeviation_-_stddev_samp_long/6.2.0_1610468734648/topology @@ -0,0 +1,25 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Aggregate-Prepare + <-- KSTREAM-SOURCE-0000000000 + Processor: Aggregate-Prepare (stores: []) + --> KSTREAM-AGGREGATE-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-AGGREGATE-0000000003 (stores: [Aggregate-Aggregate-Materialize]) + --> Aggregate-Aggregate-ToOutputSchema + <-- Aggregate-Prepare + Processor: Aggregate-Aggregate-ToOutputSchema (stores: []) + --> Aggregate-Project + <-- KSTREAM-AGGREGATE-0000000003 + Processor: Aggregate-Project (stores: []) + --> KTABLE-TOSTREAM-0000000006 + <-- Aggregate-Aggregate-ToOutputSchema + Processor: KTABLE-TOSTREAM-0000000006 (stores: []) + --> KSTREAM-SINK-0000000007 + <-- Aggregate-Project + Sink: KSTREAM-SINK-0000000007 (topic: OUTPUT) + <-- KTABLE-TOSTREAM-0000000006 + diff --git a/ksqldb-functional-tests/src/test/resources/query-validation-tests/standarddeviation.json b/ksqldb-functional-tests/src/test/resources/query-validation-tests/standarddeviation.json new file mode 100644 index 000000000000..26cfdbedb68a --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/query-validation-tests/standarddeviation.json @@ -0,0 +1,121 @@ +{ + "comments": [ + "Test cases covering the use of the aggregate STDDEV_SAMP function" + ], + "tests": [ + { + "name": "stddev_samp int", + "statements": [ + "CREATE STREAM TEST (ID BIGINT KEY, VALUE integer) WITH (kafka_topic='test_topic',value_format='AVRO');", + "CREATE TABLE OUTPUT as SELECT ID, stddev_samp(value) AS stddev FROM test group by id;" + ], + "inputs": [ + {"topic": "test_topic", "key": 0,"value": {"value": 0}}, + {"topic": "test_topic", "key": 0,"value": {"value": 10}}, + {"topic": "test_topic", "key": 100,"value": {"value": 50}}, + {"topic": "test_topic", "key": 100,"value": {"value": 10}}, + {"topic": "test_topic", "key": 0,"value": {"value": 7}}, + {"topic": "test_topic", "key": 100,"value": {"value": 3}}, + {"topic": "test_topic", "key": 100,"value": {"value": null}}, + {"topic": "test_topic", "key": 0,"value": {"value": 6}} + ], + "outputs": [ + {"topic": "OUTPUT", "key": 0,"value": {"STDDEV": 0.0}}, + {"topic": "OUTPUT", "key": 0,"value": {"STDDEV": 50.0}}, + {"topic": "OUTPUT", "key": 100,"value": {"STDDEV": 0.0}}, + {"topic": "OUTPUT", "key": 100,"value": {"STDDEV": 800.0}}, + {"topic": "OUTPUT", "key": 0,"value": {"STDDEV": 26.333333333333332}}, + {"topic": "OUTPUT", "key": 100,"value": {"STDDEV": 643.0}}, + {"topic": "OUTPUT", "key": 100,"value": {"STDDEV": 643.0}}, + {"topic": "OUTPUT", "key": 0,"value": {"STDDEV": 17.583333333333332}} + ] + }, + { + "name": "stddev_samp long", + "statements": [ + "CREATE STREAM TEST (ID BIGINT KEY, VALUE bigint) WITH (kafka_topic='test_topic', value_format='AVRO');", + "CREATE TABLE OUTPUT as SELECT ID, stddev_samp(value) as stddev FROM test group by id;" + ], + "inputs": [ + {"topic": "test_topic", "key": 0,"value": {"value": 2147483648}}, + {"topic": "test_topic", "key": 0,"value": {"value": 100}}, + {"topic": "test_topic", "key": 100,"value": {"value": 500}}, + {"topic": "test_topic", "key": 100,"value": {"value": 100}}, + {"topic": "test_topic", "key": 0,"value": {"value": -2}}, + {"topic": "test_topic", "key": 100,"value": {"value": 0}}, + {"topic": "test_topic", "key": 100,"value": {"value": -6}}, + {"topic": "test_topic", "key": 0,"value": {"value": null}} + ], + "outputs": [ + {"topic": "OUTPUT", "key": 0,"value": {"STDDEV": 0.0}}, + {"topic": "OUTPUT", "key": 0,"value": {"STDDEV": 2.3058427944653343e+18}}, + {"topic": "OUTPUT", "key": 100,"value": {"STDDEV": 0.0}}, + {"topic": "OUTPUT", "key": 100,"value": {"STDDEV": 80000.0}}, + {"topic": "OUTPUT", "key": 0,"value": {"STDDEV": 1.5372286026580004e+18}}, + {"topic": "OUTPUT", "key": 100,"value": {"STDDEV": 70000.0}}, + {"topic": "OUTPUT", "key": 100,"value": {"STDDEV": 57275.666666666664}}, + {"topic": "OUTPUT", "key": 0,"value": {"STDDEV": 1.5372286026580004e+18}} + ] + }, + { + "name": "stddev_samp double", + "statements": [ + "CREATE STREAM TEST (ID BIGINT KEY, VALUE double) WITH (kafka_topic='test_topic', value_format='AVRO');", + "CREATE TABLE OUTPUT as SELECT ID, stddev_samp(value) as stddev FROM test group by id;" + ], + "inputs": [ + {"topic": "test_topic", "key": 0,"value": {"value": -1.8}}, + {"topic": "test_topic", "key": 0,"value": {"value": 2.3}}, + {"topic": "test_topic", "key": 100,"value": {"value": 9223372036854.775807}}, + {"topic": "test_topic", "key": 100,"value": {"value": 300.8}}, + {"topic": "test_topic", "key": 0,"value": {"value": 100.2}}, + {"topic": "test_topic", "key": 100,"value": {"value": -200000.6}}, + {"topic": "test_topic", "key": 100,"value": {"value": 0.0}}, + {"topic": "test_topic", "key": 0,"value": {"value": null}} + ], + "outputs": [ + {"topic": "OUTPUT", "key": 0,"value": {"STDDEV": 0.0}}, + {"topic": "OUTPUT", "key": 0,"value": {"STDDEV": 8.405}}, + {"topic": "OUTPUT", "key": 100,"value": {"STDDEV": 0.0}}, + {"topic": "OUTPUT", "key": 100,"value": {"STDDEV": 4.253529586234292e+25}}, + {"topic": "OUTPUT", "key": 0,"value": {"STDDEV": 3334.2033333333347}}, + {"topic": "OUTPUT", "key": 100,"value": {"STDDEV": 2.835686452404674e+25}}, + {"topic": "OUTPUT", "key": 100,"value": {"STDDEV": 2.1267648239542927e+25}}, + {"topic": "OUTPUT", "key": 0,"value": {"STDDEV": 3334.2033333333347}} + ] + }, + { + "name": "stddev_samp double map", + "statements": [ + "CREATE STREAM TEST (ID BIGINT KEY, NAME varchar, VALUE map) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE TABLE OUTPUT as SELECT ID, stddev_samp(value['key1']) AS STDDEV FROM test group by id;" + ], + "inputs": [ + {"topic": "test_topic", "key": 0,"value": {"name": "zero","value":{"key1":10.0, "key2":1.0}}}, + {"topic": "test_topic", "key": 0,"value": {"name": "zero", "value": {"key1":12.0, "key2":1.0}}}, + {"topic": "test_topic", "key": 0,"value": {"name": "zero", "value": {"key1":13.0, "key2":1.0}}}, + {"topic": "test_topic", "key": 0,"value": {"name": "zero", "value": {"key1":14.0, "key2":1.0}}}, + {"topic": "test_topic", "key": 0,"value": {"name": "zero", "value": {"key1":16.0, "key2":1.0}}} + ], + "outputs": [ + {"topic": "OUTPUT", "key": 0,"value": {"STDDEV":0.0}}, + {"topic": "OUTPUT", "key": 0,"value": {"STDDEV":2.0}}, + {"topic": "OUTPUT", "key": 0,"value": {"STDDEV":2.333333333333333}}, + {"topic": "OUTPUT", "key": 0,"value": {"STDDEV":2.9166666666666665}}, + {"topic": "OUTPUT", "key": 0,"value": {"STDDEV":5.0}} + ] + }, + { + "name": "stddev_samp - DELIMITED", + "comment": "DELIMITED does not support STRUCT, so can't support STDDEV_SAMP until we use a different internal format", + "statements": [ + "CREATE STREAM INPUT (ID STRING KEY, VALUE integer) WITH (kafka_topic='test_topic', value_format='DELIMITED');", + "CREATE TABLE OUTPUT AS SELECT ID, stddev_samp(value) AS stddev FROM INPUT group by ID;" + ], + "expectedException": { + "type": "io.confluent.ksql.util.KsqlException", + "message": "One of the functions used in the statement has an intermediate type that the value format can not handle. Please remove the function or change the format." + } + } + ] +} \ No newline at end of file