diff --git a/docs/developer-guide/ksqldb-reference/scalar-functions.md b/docs/developer-guide/ksqldb-reference/scalar-functions.md index bac03ab93b09..ce6288c684c8 100644 --- a/docs/developer-guide/ksqldb-reference/scalar-functions.md +++ b/docs/developer-guide/ksqldb-reference/scalar-functions.md @@ -794,10 +794,11 @@ Convert a string to lowercase. Since: - ```sql -LEN(col1) +LEN(string) +LEN(bytes) ``` -The length of a string. +The length of a string or the number of bytes in a BYTES value. ### `LPAD` diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/string/Len.java b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/string/Len.java index c611ec2a15d3..3ed9dd59cb08 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/string/Len.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/string/Len.java @@ -18,11 +18,12 @@ import io.confluent.ksql.function.udf.Udf; import io.confluent.ksql.function.udf.UdfDescription; import io.confluent.ksql.function.udf.UdfParameter; +import java.nio.ByteBuffer; @UdfDescription( name = "len", category = FunctionCategory.STRING, - description = "Returns the length of the input string.") + description = "Returns the length of the input string or byte array.") public class Len { @Udf @@ -33,4 +34,13 @@ public Integer len( } return input.length(); } + + @Udf + public Integer len( + @UdfParameter(description = "The input byte array") final ByteBuffer input) { + if (input == null) { + return null; + } + return input.capacity(); + } } diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/string/LenTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/string/LenTest.java index 343b6664ee95..e8df09f6d203 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/string/LenTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/string/LenTest.java @@ -18,6 +18,7 @@ import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; +import java.nio.ByteBuffer; import org.junit.Before; import org.junit.Test; @@ -44,8 +45,14 @@ public void shouldReturnZeroForEmptyInput() { @Test public void shouldReturnNullForNullInput() { - final Integer result = udf.len(null); - assertThat(result, is(nullValue())); + assertThat(udf.len((String) null), is(nullValue())); + assertThat(udf.len((ByteBuffer) null), is(nullValue())); + } + + @Test + public void shouldReturnLengthOfByteArray() { + final Integer result = udf.len(ByteBuffer.wrap(new byte[] {123, 89})); + assertThat(result, is(2)); } } diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/len_-_length_of_a_string_-_JSON/7.1.0_1627010796566/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/len_-_length_of_a_string_-_JSON/7.1.0_1627010796566/plan.json new file mode 100644 index 000000000000..6d341dd2e885 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/len_-_length_of_a_string_-_JSON/7.1.0_1627010796566/plan.json @@ -0,0 +1,154 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM INPUT (ID BIGINT KEY, S STRING, B BYTES) WITH (KAFKA_TOPIC='input_topic', KEY_FORMAT='KAFKA', VALUE_FORMAT='JSON');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "INPUT", + "schema" : "`ID` BIGINT KEY, `S` STRING, `B` BYTES", + "topicName" : "input_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "orReplace" : false + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT\n INPUT.ID ID,\n LEN(INPUT.S) S,\n LEN(INPUT.B) B\nFROM INPUT INPUT\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`ID` BIGINT KEY, `S` INTEGER, `B` INTEGER", + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "orReplace" : false + }, + "queryPlan" : { + "sources" : [ "INPUT" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "input_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "sourceSchema" : "`ID` BIGINT KEY, `S` STRING, `B` BYTES" + }, + "keyColumnNames" : [ "ID" ], + "selectExpressions" : [ "LEN(S) AS S", "LEN(B) AS B" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "metric.reporters" : "", + "ksql.transient.prefix" : "transient_", + "ksql.query.status.running.threshold.seconds" : "300", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.persistence.default.format.key" : "KAFKA", + "ksql.query.persistent.max.bytes.buffering.total" : "-1", + "ksql.queryanonymizer.logs_enabled" : "true", + "ksql.query.error.max.queue.size" : "10", + "ksql.variable.substitution.enable" : "true", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.queryanonymizer.cluster_namespace" : null, + "ksql.query.pull.metrics.enabled" : "true", + "ksql.create.or.replace.enabled" : "true", + "ksql.metrics.extension" : null, + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.cast.strings.preserve.nulls" : "true", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.pull.queries.enable" : "true", + "ksql.lambdas.enabled" : "true", + "ksql.query.pull.max.hourly.bandwidth.megabytes" : "2147483647", + "ksql.suppress.enabled" : "false", + "ksql.query.push.scalable.enabled" : "false", + "ksql.query.push.scalable.interpreter.enabled" : "true", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.persistence.wrap.single.values" : null, + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.query.retry.backoff.initial.ms" : "15000", + "ksql.query.transient.max.bytes.buffering.total" : "-1", + "ksql.schema.registry.url" : "", + "ksql.properties.overrides.denylist" : "", + "ksql.query.pull.max.concurrent.requests" : "2147483647", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.query.pull.interpreter.enabled" : "true", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.query.pull.table.scan.enabled" : "false", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.streams.topology.optimization" : "all", + "ksql.query.retry.backoff.max.ms" : "900000", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.metrics.tags.custom" : "", + "ksql.persistence.default.format.value" : null, + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.nested.error.set.null" : "true", + "ksql.udf.collect.metrics" : "false", + "ksql.query.pull.thread.pool.size" : "100", + "ksql.persistent.prefix" : "query_", + "ksql.metastore.backup.location" : "", + "ksql.error.classifier.regex" : "", + "ksql.suppress.buffer.size.bytes" : "-1" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/len_-_length_of_a_string_-_JSON/7.1.0_1627010796566/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/len_-_length_of_a_string_-_JSON/7.1.0_1627010796566/spec.json new file mode 100644 index 000000000000..5f8cf54ab181 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/len_-_length_of_a_string_-_JSON/7.1.0_1627010796566/spec.json @@ -0,0 +1,126 @@ +{ + "version" : "7.1.0", + "timestamp" : 1627010796566, + "path" : "query-validation-tests/len.json", + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : { + "schema" : "`ID` BIGINT KEY, `S` STRING, `B` BYTES", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "CSAS_OUTPUT_0.OUTPUT" : { + "schema" : "`ID` BIGINT KEY, `S` INTEGER, `B` INTEGER", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + } + }, + "testCase" : { + "name" : "length of a string - JSON", + "inputs" : [ { + "topic" : "input_topic", + "key" : 1, + "value" : { + "S" : "ABC", + "B" : "YWJj" + } + }, { + "topic" : "input_topic", + "key" : 2, + "value" : { + "S" : "", + "B" : "" + } + }, { + "topic" : "input_topic", + "key" : 3, + "value" : { + "S" : null, + "B" : null + } + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : 1, + "value" : { + "S" : 3, + "B" : 3 + } + }, { + "topic" : "OUTPUT", + "key" : 2, + "value" : { + "S" : 0, + "B" : 0 + } + }, { + "topic" : "OUTPUT", + "key" : 3, + "value" : { + "S" : null, + "B" : null + } + } ], + "topics" : [ { + "name" : "OUTPUT", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "input_topic", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM INPUT (ID BIGINT KEY, S STRING, B BYTES) WITH (kafka_topic='input_topic',value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT ID, LEN(S) AS S, LEN(B) AS B FROM INPUT;" ], + "post" : { + "sources" : [ { + "name" : "INPUT", + "type" : "STREAM", + "schema" : "`ID` BIGINT KEY, `S` STRING, `B` BYTES", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "JSON", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + }, { + "name" : "OUTPUT", + "type" : "STREAM", + "schema" : "`ID` BIGINT KEY, `S` INTEGER, `B` INTEGER", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "JSON", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + } ], + "topics" : { + "topics" : [ { + "name" : "input_topic", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + }, { + "name" : "OUTPUT", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/len_-_length_of_a_string_-_JSON/7.1.0_1627010796566/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/len_-_length_of_a_string_-_JSON/7.1.0_1627010796566/topology new file mode 100644 index 000000000000..12f8f6574002 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/len_-_length_of_a_string_-_JSON/7.1.0_1627010796566/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [input_topic]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/query-validation-tests/len.json b/ksqldb-functional-tests/src/test/resources/query-validation-tests/len.json new file mode 100644 index 000000000000..07788ebc2f99 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/query-validation-tests/len.json @@ -0,0 +1,25 @@ +{ + "comments": [ + "Tests covering the use of the LEN function" + ], + "tests": [ + { + "name": "length of a string", + "format": ["JSON"], + "statements": [ + "CREATE STREAM INPUT (ID BIGINT KEY, S STRING, B BYTES) WITH (kafka_topic='input_topic',value_format='{FORMAT}');", + "CREATE STREAM OUTPUT AS SELECT ID, LEN(S) AS S, LEN(B) AS B FROM INPUT;" + ], + "inputs": [ + {"topic": "input_topic", "key": 1, "value": {"S": "ABC", "B": "YWJj"}}, + {"topic": "input_topic", "key": 2, "value": {"S": "", "B": ""}}, + {"topic": "input_topic", "key": 3, "value": {"S": null, "B": null}} + ], + "outputs": [ + {"topic": "OUTPUT", "key": 1, "value": {"S": 3, "B": 3}}, + {"topic": "OUTPUT", "key": 2, "value": {"S": 0, "B": 0}}, + {"topic": "OUTPUT", "key": 3, "value": {"S": null, "B": null}} + ] + } + ] +} \ No newline at end of file