Skip to content

Commit

Permalink
feat: Add json_array_length UDF (#8602)
Browse files Browse the repository at this point in the history
  • Loading branch information
Gerrrr authored Jan 24, 2022
1 parent aebec54 commit f7c1334
Show file tree
Hide file tree
Showing 8 changed files with 575 additions and 1 deletion.
24 changes: 24 additions & 0 deletions docs/developer-guide/ksqldb-reference/scalar-functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -757,6 +757,30 @@ is_json_string("abc") => false
is_json_string(NULL) => false
```

### `JSON_ARRAY_LENGTH`

Since: 0.25.0

```sql
JSON_ARRAY_LENGTH(json_string) -> Integer
```

Given a string, parses it as a JSON value and returns the length of the top-level array. Returns
`NULL` if the string can't be interpreted as a JSON array, for example, when the string is `NULL`
or it does not contain valid JSON, or the JSON value is not an array.

Examples:

```sql
json_array_length("[1, 2, 3]") => 3
json_array_length("[1, [1, [2]], 3]") => 3
json_array_length("[]") => 0
json_array_length("{}") => NULL
json_array_length("123") => NULL
json_array_length(NULL) => NULL
json_array_length("abc") => throws "Invalid JSON format"
```

### `INITCAP`

Since: 0.6.0
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2022 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.function.udf.json;

import com.fasterxml.jackson.databind.JsonNode;
import io.confluent.ksql.function.FunctionCategory;
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;
import io.confluent.ksql.util.KsqlConstants;

@UdfDescription(
name = "JSON_ARRAY_LENGTH",
category = FunctionCategory.JSON,
description = "Given a string, parses it as a JSON value and returns the length of the "
+ "top-level array. Returns NULL if the string can't be interpreted as a JSON array, "
+ "for example, when the string is `NULL` or it does not contain valid JSON, or the JSON "
+ "value is not an array.",
author = KsqlConstants.CONFLUENT_AUTHOR
)
public class JsonArrayLength {

@Udf
public Integer length(@UdfParameter final String jsonArray) {
if (jsonArray == null) {
return null;
}

final JsonNode node = UdfJsonMapper.parseJson(jsonArray);
if (node.isMissingNode() || !node.isArray()) {
return null;
}

return node.size();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright 2022 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.function.udf.json;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;

import io.confluent.ksql.function.KsqlFunctionException;
import org.junit.Test;

public class JsonArrayLengthTest {

private static final JsonArrayLength udf = new JsonArrayLength();

@Test
public void shouldReturnFlatArrayLength() {
// When:
final Integer result = udf.length("[1, 2, 3]");

// Then:
assertEquals(Integer.valueOf(3), result);
}

@Test
public void shouldReturnNestedArrayLength() {
// When:
final Integer result = udf.length("[1, [1, [2]], 3]");

// Then:
assertEquals(Integer.valueOf(3), result);
}

@Test
public void shouldReturnEmptyArrayLength() {
// When:
final Integer result = udf.length("[]");

// Then:
assertEquals(Integer.valueOf(0), result);
}

@Test
public void shouldReturnNullForObjects() {
// When:
final Integer result = udf.length("{}");

// Then:
assertNull(result);
}

@Test
public void shouldReturnNullForNumber() {
// When:
final Integer result = udf.length("123");

// Then:
assertNull(result);
}

@Test
public void shouldReturnNullForString() {
// When:
final Integer result = udf.length("\"abc\"");

// Then:
assertNull(result);
}

@Test
public void shouldReturnNullForNull() {
// When:
final Integer result = udf.length(null);

// Then:
assertNull(result);
}

@Test
public void shouldReturnNullForStrNull() {
// When:
final Integer result = udf.length("null");

// Then:
assertNull(result);
}

@Test(expected = KsqlFunctionException.class)
public void shouldThrowForInvalidJson() {
udf.length("abc");
}
}
2 changes: 1 addition & 1 deletion ksqldb-functional-tests/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ generator to do so.

To generate new plans, just run `PlannedTestGeneratorTest.manuallyGeneratePlans`

## Topology comparision
## Topology comparison
These tests also validate the generated topology matches the expected topology,
i.e. a test will fail if the topology has changed from previous runs.
This is needed to detect potentially non-backwards compatible changes to the generated topology.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
{
"plan" : [ {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM TEST (K STRING KEY, COLORS STRING) WITH (KAFKA_TOPIC='test_topic', KEY_FORMAT='KAFKA', VALUE_FORMAT='JSON');",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "TEST",
"schema" : "`K` STRING KEY, `COLORS` STRING",
"topicName" : "test_topic",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
},
"orReplace" : false,
"isSource" : false
}
}, {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM OUTPUT AS SELECT\n TEST.K K,\n TEST.COLORS COLORS\nFROM TEST TEST\nWHERE (JSON_ARRAY_LENGTH(TEST.COLORS) > 0)\nEMIT CHANGES",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "OUTPUT",
"schema" : "`K` STRING KEY, `COLORS` STRING",
"topicName" : "OUTPUT",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
},
"orReplace" : false,
"isSource" : false
},
"queryPlan" : {
"sources" : [ "TEST" ],
"sink" : "OUTPUT",
"physicalPlan" : {
"@type" : "streamSinkV1",
"properties" : {
"queryContext" : "OUTPUT"
},
"source" : {
"@type" : "streamSelectV1",
"properties" : {
"queryContext" : "Project"
},
"source" : {
"@type" : "streamFilterV1",
"properties" : {
"queryContext" : "WhereFilter"
},
"source" : {
"@type" : "streamSourceV1",
"properties" : {
"queryContext" : "KsqlTopic/Source"
},
"topicName" : "test_topic",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
},
"sourceSchema" : "`K` STRING KEY, `COLORS` STRING",
"pseudoColumnVersion" : 1
},
"filterExpression" : "(JSON_ARRAY_LENGTH(COLORS) > 0)"
},
"keyColumnNames" : [ "K" ],
"selectExpressions" : [ "COLORS AS COLORS" ]
},
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
},
"topicName" : "OUTPUT"
},
"queryId" : "CSAS_OUTPUT_0"
}
} ],
"configs" : {
"ksql.extension.dir" : "ext",
"ksql.streams.cache.max.bytes.buffering" : "0",
"ksql.connect.request.headers.plugin" : null,
"ksql.security.extension.class" : null,
"metric.reporters" : "",
"ksql.transient.prefix" : "transient_",
"ksql.query.status.running.threshold.seconds" : "300",
"ksql.headers.columns.enabled" : "true",
"ksql.connect.basic.auth.credentials.reload" : "false",
"ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler",
"ksql.output.topic.name.prefix" : "",
"ksql.query.pull.stream.enabled" : "true",
"ksql.query.pull.enable.standby.reads" : "false",
"ksql.persistence.default.format.key" : "KAFKA",
"ksql.query.push.v2.interpreter.enabled" : "true",
"ksql.query.persistent.max.bytes.buffering.total" : "-1",
"ksql.queryanonymizer.logs_enabled" : "true",
"ksql.query.error.max.queue.size" : "10",
"ksql.variable.substitution.enable" : "true",
"ksql.query.cleanup.shutdown.timeout.ms" : "30000",
"ksql.internal.topic.min.insync.replicas" : "1",
"ksql.streams.shutdown.timeout.ms" : "300000",
"ksql.internal.topic.replicas" : "1",
"ksql.insert.into.values.enabled" : "true",
"ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807",
"ksql.query.pull.max.qps" : "2147483647",
"ksql.access.validator.enable" : "auto",
"ksql.streams.bootstrap.servers" : "localhost:0",
"ksql.queryanonymizer.cluster_namespace" : null,
"ksql.query.pull.metrics.enabled" : "true",
"ksql.create.or.replace.enabled" : "true",
"ksql.metrics.extension" : null,
"ksql.shared.runtimes.count" : "8",
"ksql.query.push.v2.alos.enabled" : "true",
"ksql.query.push.v2.max.hourly.bandwidth.megabytes" : "2147483647",
"ksql.query.pull.range.scan.enabled" : "true",
"ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.cast.strings.preserve.nulls" : "true",
"ksql.authorization.cache.max.entries" : "10000",
"ksql.pull.queries.enable" : "true",
"ksql.query.pull.consistency.token.enabled" : "false",
"ksql.lambdas.enabled" : "true",
"ksql.source.table.materialization.enabled" : "true",
"ksql.query.pull.max.hourly.bandwidth.megabytes" : "2147483647",
"ksql.suppress.enabled" : "false",
"ksql.sink.window.change.log.additional.retention" : "1000000",
"ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.query.persistent.active.limit" : "2147483647",
"ksql.connect.basic.auth.credentials.file" : "",
"ksql.persistence.wrap.single.values" : null,
"ksql.authorization.cache.expiry.time.secs" : "30",
"ksql.query.retry.backoff.initial.ms" : "15000",
"ksql.query.transient.max.bytes.buffering.total" : "-1",
"ksql.connect.basic.auth.credentials.source" : "NONE",
"ksql.schema.registry.url" : "schema_registry.url:0",
"ksql.properties.overrides.denylist" : "",
"ksql.query.pull.max.concurrent.requests" : "2147483647",
"ksql.streams.auto.offset.reset" : "earliest",
"ksql.connect.url" : "http://localhost:8083",
"ksql.service.id" : "some.ksql.service.id",
"ksql.query.push.v2.max.catchup.consumers" : "5",
"ksql.query.push.v2.new.latest.delay.ms" : "5000",
"ksql.query.push.v2.enabled" : "false",
"ksql.query.push.v2.latest.reset.age.ms" : "30000",
"ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler",
"ksql.query.pull.interpreter.enabled" : "true",
"ksql.rowpartition.rowoffset.enabled" : "true",
"ksql.streams.commit.interval.ms" : "2000",
"ksql.query.pull.table.scan.enabled" : "true",
"ksql.streams.auto.commit.interval.ms" : "0",
"ksql.query.pull.limit.clause.enabled" : "true",
"ksql.connect.error.handler" : null,
"ksql.streams.topology.optimization" : "all",
"ksql.query.pull.router.thread.pool.size" : "50",
"ksql.query.push.v2.continuation.tokens.enabled" : "false",
"ksql.endpoint.migrate.query" : "true",
"ksql.query.push.v2.registry.installed" : "false",
"ksql.query.retry.backoff.max.ms" : "900000",
"ksql.streams.num.stream.threads" : "4",
"ksql.timestamp.throw.on.invalid" : "false",
"ksql.metrics.tags.custom" : "",
"ksql.query.push.v2.catchup.consumer.msg.window" : "50",
"ksql.persistence.default.format.value" : null,
"ksql.udfs.enabled" : "true",
"ksql.udf.enable.security.manager" : "true",
"ksql.connect.worker.config" : "",
"ksql.runtime.feature.shared.enabled" : "false",
"ksql.nested.error.set.null" : "true",
"ksql.udf.collect.metrics" : "false",
"ksql.query.pull.thread.pool.size" : "50",
"ksql.persistent.prefix" : "query_",
"ksql.metastore.backup.location" : "",
"ksql.error.classifier.regex" : "",
"ksql.suppress.buffer.size.bytes" : "-1"
}
}
Loading

0 comments on commit f7c1334

Please sign in to comment.