Skip to content

Commit

Permalink
feat: update len function to accept BYTES (#7865)
Browse files Browse the repository at this point in the history
  • Loading branch information
Zara Lim authored Jul 26, 2021
1 parent 2a8fa78 commit eaaa0db
Show file tree
Hide file tree
Showing 7 changed files with 341 additions and 5 deletions.
5 changes: 3 additions & 2 deletions docs/developer-guide/ksqldb-reference/scalar-functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -794,10 +794,11 @@ Convert a string to lowercase.
Since: -

```sql
LEN(col1)
LEN(string)
LEN(bytes)
```

The length of a string.
The length of a string or the number of bytes in a BYTES value.

### `LPAD`

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;
import java.nio.ByteBuffer;

@UdfDescription(
name = "len",
category = FunctionCategory.STRING,
description = "Returns the length of the input string.")
description = "Returns the length of the input string or byte array.")
public class Len {

@Udf
Expand All @@ -33,4 +34,13 @@ public Integer len(
}
return input.length();
}

@Udf
public Integer len(
@UdfParameter(description = "The input byte array") final ByteBuffer input) {
if (input == null) {
return null;
}
return input.capacity();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;

import java.nio.ByteBuffer;
import org.junit.Before;
import org.junit.Test;

Expand All @@ -44,8 +45,14 @@ public void shouldReturnZeroForEmptyInput() {

@Test
public void shouldReturnNullForNullInput() {
final Integer result = udf.len(null);
assertThat(result, is(nullValue()));
assertThat(udf.len((String) null), is(nullValue()));
assertThat(udf.len((ByteBuffer) null), is(nullValue()));
}

@Test
public void shouldReturnLengthOfByteArray() {
final Integer result = udf.len(ByteBuffer.wrap(new byte[] {123, 89}));
assertThat(result, is(2));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
{
"plan" : [ {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM INPUT (ID BIGINT KEY, S STRING, B BYTES) WITH (KAFKA_TOPIC='input_topic', KEY_FORMAT='KAFKA', VALUE_FORMAT='JSON');",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "INPUT",
"schema" : "`ID` BIGINT KEY, `S` STRING, `B` BYTES",
"topicName" : "input_topic",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
},
"orReplace" : false
}
}, {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM OUTPUT AS SELECT\n INPUT.ID ID,\n LEN(INPUT.S) S,\n LEN(INPUT.B) B\nFROM INPUT INPUT\nEMIT CHANGES",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "OUTPUT",
"schema" : "`ID` BIGINT KEY, `S` INTEGER, `B` INTEGER",
"topicName" : "OUTPUT",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
},
"orReplace" : false
},
"queryPlan" : {
"sources" : [ "INPUT" ],
"sink" : "OUTPUT",
"physicalPlan" : {
"@type" : "streamSinkV1",
"properties" : {
"queryContext" : "OUTPUT"
},
"source" : {
"@type" : "streamSelectV1",
"properties" : {
"queryContext" : "Project"
},
"source" : {
"@type" : "streamSourceV1",
"properties" : {
"queryContext" : "KsqlTopic/Source"
},
"topicName" : "input_topic",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
},
"sourceSchema" : "`ID` BIGINT KEY, `S` STRING, `B` BYTES"
},
"keyColumnNames" : [ "ID" ],
"selectExpressions" : [ "LEN(S) AS S", "LEN(B) AS B" ]
},
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
},
"topicName" : "OUTPUT"
},
"queryId" : "CSAS_OUTPUT_0"
}
} ],
"configs" : {
"ksql.extension.dir" : "ext",
"ksql.streams.cache.max.bytes.buffering" : "0",
"ksql.security.extension.class" : null,
"metric.reporters" : "",
"ksql.transient.prefix" : "transient_",
"ksql.query.status.running.threshold.seconds" : "300",
"ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler",
"ksql.output.topic.name.prefix" : "",
"ksql.query.pull.enable.standby.reads" : "false",
"ksql.persistence.default.format.key" : "KAFKA",
"ksql.query.persistent.max.bytes.buffering.total" : "-1",
"ksql.queryanonymizer.logs_enabled" : "true",
"ksql.query.error.max.queue.size" : "10",
"ksql.variable.substitution.enable" : "true",
"ksql.internal.topic.min.insync.replicas" : "1",
"ksql.streams.shutdown.timeout.ms" : "300000",
"ksql.internal.topic.replicas" : "1",
"ksql.insert.into.values.enabled" : "true",
"ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807",
"ksql.query.pull.max.qps" : "2147483647",
"ksql.access.validator.enable" : "auto",
"ksql.streams.bootstrap.servers" : "localhost:0",
"ksql.queryanonymizer.cluster_namespace" : null,
"ksql.query.pull.metrics.enabled" : "true",
"ksql.create.or.replace.enabled" : "true",
"ksql.metrics.extension" : null,
"ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.cast.strings.preserve.nulls" : "true",
"ksql.authorization.cache.max.entries" : "10000",
"ksql.pull.queries.enable" : "true",
"ksql.lambdas.enabled" : "true",
"ksql.query.pull.max.hourly.bandwidth.megabytes" : "2147483647",
"ksql.suppress.enabled" : "false",
"ksql.query.push.scalable.enabled" : "false",
"ksql.query.push.scalable.interpreter.enabled" : "true",
"ksql.sink.window.change.log.additional.retention" : "1000000",
"ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.query.persistent.active.limit" : "2147483647",
"ksql.persistence.wrap.single.values" : null,
"ksql.authorization.cache.expiry.time.secs" : "30",
"ksql.query.retry.backoff.initial.ms" : "15000",
"ksql.query.transient.max.bytes.buffering.total" : "-1",
"ksql.schema.registry.url" : "",
"ksql.properties.overrides.denylist" : "",
"ksql.query.pull.max.concurrent.requests" : "2147483647",
"ksql.streams.auto.offset.reset" : "earliest",
"ksql.connect.url" : "http://localhost:8083",
"ksql.service.id" : "some.ksql.service.id",
"ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler",
"ksql.query.pull.interpreter.enabled" : "true",
"ksql.streams.commit.interval.ms" : "2000",
"ksql.query.pull.table.scan.enabled" : "false",
"ksql.streams.auto.commit.interval.ms" : "0",
"ksql.streams.topology.optimization" : "all",
"ksql.query.retry.backoff.max.ms" : "900000",
"ksql.streams.num.stream.threads" : "4",
"ksql.timestamp.throw.on.invalid" : "false",
"ksql.metrics.tags.custom" : "",
"ksql.persistence.default.format.value" : null,
"ksql.udfs.enabled" : "true",
"ksql.udf.enable.security.manager" : "true",
"ksql.connect.worker.config" : "",
"ksql.nested.error.set.null" : "true",
"ksql.udf.collect.metrics" : "false",
"ksql.query.pull.thread.pool.size" : "100",
"ksql.persistent.prefix" : "query_",
"ksql.metastore.backup.location" : "",
"ksql.error.classifier.regex" : "",
"ksql.suppress.buffer.size.bytes" : "-1"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
{
"version" : "7.1.0",
"timestamp" : 1627010796566,
"path" : "query-validation-tests/len.json",
"schemas" : {
"CSAS_OUTPUT_0.KsqlTopic.Source" : {
"schema" : "`ID` BIGINT KEY, `S` STRING, `B` BYTES",
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
},
"CSAS_OUTPUT_0.OUTPUT" : {
"schema" : "`ID` BIGINT KEY, `S` INTEGER, `B` INTEGER",
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
}
},
"testCase" : {
"name" : "length of a string - JSON",
"inputs" : [ {
"topic" : "input_topic",
"key" : 1,
"value" : {
"S" : "ABC",
"B" : "YWJj"
}
}, {
"topic" : "input_topic",
"key" : 2,
"value" : {
"S" : "",
"B" : ""
}
}, {
"topic" : "input_topic",
"key" : 3,
"value" : {
"S" : null,
"B" : null
}
} ],
"outputs" : [ {
"topic" : "OUTPUT",
"key" : 1,
"value" : {
"S" : 3,
"B" : 3
}
}, {
"topic" : "OUTPUT",
"key" : 2,
"value" : {
"S" : 0,
"B" : 0
}
}, {
"topic" : "OUTPUT",
"key" : 3,
"value" : {
"S" : null,
"B" : null
}
} ],
"topics" : [ {
"name" : "OUTPUT",
"replicas" : 1,
"numPartitions" : 4
}, {
"name" : "input_topic",
"replicas" : 1,
"numPartitions" : 4
} ],
"statements" : [ "CREATE STREAM INPUT (ID BIGINT KEY, S STRING, B BYTES) WITH (kafka_topic='input_topic',value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT ID, LEN(S) AS S, LEN(B) AS B FROM INPUT;" ],
"post" : {
"sources" : [ {
"name" : "INPUT",
"type" : "STREAM",
"schema" : "`ID` BIGINT KEY, `S` STRING, `B` BYTES",
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : "JSON",
"keyFeatures" : [ ],
"valueFeatures" : [ ]
}, {
"name" : "OUTPUT",
"type" : "STREAM",
"schema" : "`ID` BIGINT KEY, `S` INTEGER, `B` INTEGER",
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : "JSON",
"keyFeatures" : [ ],
"valueFeatures" : [ ]
} ],
"topics" : {
"topics" : [ {
"name" : "input_topic",
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
},
"partitions" : 4
}, {
"name" : "OUTPUT",
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
},
"partitions" : 4
} ]
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [input_topic])
--> KSTREAM-TRANSFORMVALUES-0000000001
Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: [])
--> Project
<-- KSTREAM-SOURCE-0000000000
Processor: Project (stores: [])
--> KSTREAM-SINK-0000000003
<-- KSTREAM-TRANSFORMVALUES-0000000001
Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT)
<-- Project

Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{
"comments": [
"Tests covering the use of the LEN function"
],
"tests": [
{
"name": "length of a string",
"format": ["JSON"],
"statements": [
"CREATE STREAM INPUT (ID BIGINT KEY, S STRING, B BYTES) WITH (kafka_topic='input_topic',value_format='{FORMAT}');",
"CREATE STREAM OUTPUT AS SELECT ID, LEN(S) AS S, LEN(B) AS B FROM INPUT;"
],
"inputs": [
{"topic": "input_topic", "key": 1, "value": {"S": "ABC", "B": "YWJj"}},
{"topic": "input_topic", "key": 2, "value": {"S": "", "B": ""}},
{"topic": "input_topic", "key": 3, "value": {"S": null, "B": null}}
],
"outputs": [
{"topic": "OUTPUT", "key": 1, "value": {"S": 3, "B": 3}},
{"topic": "OUTPUT", "key": 2, "value": {"S": 0, "B": 0}},
{"topic": "OUTPUT", "key": 3, "value": {"S": null, "B": null}}
]
}
]
}

0 comments on commit eaaa0db

Please sign in to comment.