diff --git a/docs/developer-guide/ksqldb-reference/scalar-functions.md b/docs/developer-guide/ksqldb-reference/scalar-functions.md index fbae991c8a0d..0dc6129aafb6 100644 --- a/docs/developer-guide/ksqldb-reference/scalar-functions.md +++ b/docs/developer-guide/ksqldb-reference/scalar-functions.md @@ -781,6 +781,29 @@ json_array_length(NULL) => NULL json_array_length("abc") => throws "Invalid JSON format" ``` +### `JSON_KEYS` + +Since: 0.25.0 + +```sql +json_keys(json_string) -> Array +``` + +Given a string, parses it as a JSON object and returns a ksqlDB array of strings representing the +top-level keys. Returns `NULL` if the string can't be interpreted as a JSON object, for example, +when the string is `NULL` or it does not contain valid JSON, or the JSON value is not an object. + +Examples: + +```sql +json_keys("{\"a\": \"abc\", \"b\": { \"c\": \"a\" }, \"d\": 1}") // => ["a", "b", "d"] +json_keys("{}") // => [] +json_keys("[]") // => NULL +json_keys("123") // => NULL +json_keys(NULL) // => NULL +json_keys("") // throws "Invalid JSON format" +``` + ### `INITCAP` Since: 0.6.0 diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/json/JsonKeys.java b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/json/JsonKeys.java new file mode 100644 index 000000000000..bd48e55a8fc5 --- /dev/null +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/json/JsonKeys.java @@ -0,0 +1,53 @@ +/* + * Copyright 2022 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.function.udf.json; + +import com.fasterxml.jackson.databind.JsonNode; +import io.confluent.ksql.function.FunctionCategory; +import io.confluent.ksql.function.udf.Udf; +import io.confluent.ksql.function.udf.UdfDescription; +import io.confluent.ksql.function.udf.UdfParameter; +import io.confluent.ksql.util.KsqlConstants; +import java.util.ArrayList; +import java.util.List; + +@UdfDescription( + name = "JSON_KEYS", + category = FunctionCategory.JSON, + description = "Given a string, parses it as a JSON object and returns a ksqlDB array of " + + "strings representing the top-level keys. Returns NULL if the string can't be " + + "interpreted as a JSON object, for example, when the string is NULL or it does not " + + "contain valid JSON, or the JSON value is not an object.", + author = KsqlConstants.CONFLUENT_AUTHOR +) +public class JsonKeys { + + @Udf + public List keys(@UdfParameter final String jsonObj) { + if (jsonObj == null) { + return null; + } + + final JsonNode node = UdfJsonMapper.parseJson(jsonObj); + if (node.isMissingNode() || !node.isObject()) { + return null; + } + + final List ret = new ArrayList<>(); + node.fieldNames().forEachRemaining(ret::add); + return ret; + } +} diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/json/JsonKeysTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/json/JsonKeysTest.java new file mode 100644 index 000000000000..9b4488a663c8 --- /dev/null +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/json/JsonKeysTest.java @@ -0,0 +1,74 @@ +/* + * Copyright 2022 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.function.udf.json; + + +import static junit.framework.TestCase.assertEquals; +import static junit.framework.TestCase.assertNull; + +import io.confluent.ksql.function.KsqlFunctionException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.junit.Test; + +public class JsonKeysTest { + + private static final JsonKeys udf = new JsonKeys(); + + @Test + public void shouldReturnObjectKeys() { + // When: + final List result = udf.keys("{\"a\": \"abc\", \"b\": { \"c\": \"a\" }, \"d\": 1}"); + + // Then: + assertEquals(Arrays.asList("a", "b", "d"), result); + } + + @Test + public void shouldReturnKeysForEmptyObject() { + // When: + final List result = udf.keys("{}"); + + // Then: + assertEquals(Collections.emptyList(), result); + } + + @Test + public void shouldReturnNullForArray() { + assertNull(udf.keys("[]")); + } + + @Test + public void shouldReturnNullForNumber() { + assertNull(udf.keys("123")); + } + + @Test + public void shouldReturnNullForNull() { + assertNull(udf.keys("null")); + } + + @Test + public void shouldReturnNullForString() { + assertNull(udf.keys("\"abc\"")); + } + + @Test(expected = KsqlFunctionException.class) + public void shouldReturnNullForInvalidJson() { + udf.keys("abc"); + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/json_keys_-_extract_keys_from_a_json_object/7.2.0_1642513802022/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/json_keys_-_extract_keys_from_a_json_object/7.2.0_1642513802022/plan.json new file mode 100644 index 000000000000..56303216d2f0 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/json_keys_-_extract_keys_from_a_json_object/7.2.0_1642513802022/plan.json @@ -0,0 +1,189 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TEST (K STRING KEY, COLORS_OBJ STRING) WITH (KAFKA_TOPIC='test_topic', KEY_FORMAT='KAFKA', VALUE_FORMAT='JSON');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TEST", + "schema" : "`K` STRING KEY, `COLORS_OBJ` STRING", + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "orReplace" : false, + "isSource" : false + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT\n TEST.K K,\n JSON_KEYS(TEST.COLORS_OBJ) COLORS\nFROM TEST TEST\nWHERE (JSON_KEYS(TEST.COLORS_OBJ) IS NOT NULL)\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`K` STRING KEY, `COLORS` ARRAY", + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "orReplace" : false, + "isSource" : false + }, + "queryPlan" : { + "sources" : [ "TEST" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamFilterV1", + "properties" : { + "queryContext" : "WhereFilter" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "sourceSchema" : "`K` STRING KEY, `COLORS_OBJ` STRING", + "pseudoColumnVersion" : 1 + }, + "filterExpression" : "(JSON_KEYS(COLORS_OBJ) IS NOT NULL)" + }, + "keyColumnNames" : [ "K" ], + "selectExpressions" : [ "JSON_KEYS(COLORS_OBJ) AS COLORS" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.connect.request.headers.plugin" : null, + "ksql.security.extension.class" : null, + "metric.reporters" : "", + "ksql.transient.prefix" : "transient_", + "ksql.query.status.running.threshold.seconds" : "300", + "ksql.headers.columns.enabled" : "true", + "ksql.connect.basic.auth.credentials.reload" : "false", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.query.pull.stream.enabled" : "true", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.persistence.default.format.key" : "KAFKA", + "ksql.query.push.v2.interpreter.enabled" : "true", + "ksql.query.persistent.max.bytes.buffering.total" : "-1", + "ksql.queryanonymizer.logs_enabled" : "true", + "ksql.query.error.max.queue.size" : "10", + "ksql.variable.substitution.enable" : "true", + "ksql.query.cleanup.shutdown.timeout.ms" : "30000", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.queryanonymizer.cluster_namespace" : null, + "ksql.query.pull.metrics.enabled" : "true", + "ksql.create.or.replace.enabled" : "true", + "ksql.metrics.extension" : null, + "ksql.shared.runtimes.count" : "8", + "ksql.query.push.v2.alos.enabled" : "true", + "ksql.query.push.v2.max.hourly.bandwidth.megabytes" : "2147483647", + "ksql.query.pull.range.scan.enabled" : "true", + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.cast.strings.preserve.nulls" : "true", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.pull.queries.enable" : "true", + "ksql.query.pull.consistency.token.enabled" : "false", + "ksql.lambdas.enabled" : "true", + "ksql.source.table.materialization.enabled" : "true", + "ksql.query.pull.max.hourly.bandwidth.megabytes" : "2147483647", + "ksql.suppress.enabled" : "false", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.connect.basic.auth.credentials.file" : "", + "ksql.persistence.wrap.single.values" : null, + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.query.retry.backoff.initial.ms" : "15000", + "ksql.query.transient.max.bytes.buffering.total" : "-1", + "ksql.connect.basic.auth.credentials.source" : "NONE", + "ksql.schema.registry.url" : "schema_registry.url:0", + "ksql.properties.overrides.denylist" : "", + "ksql.query.pull.max.concurrent.requests" : "2147483647", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.query.push.v2.max.catchup.consumers" : "5", + "ksql.query.push.v2.new.latest.delay.ms" : "5000", + "ksql.query.push.v2.enabled" : "false", + "ksql.query.push.v2.latest.reset.age.ms" : "30000", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.query.pull.interpreter.enabled" : "true", + "ksql.rowpartition.rowoffset.enabled" : "true", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.query.pull.table.scan.enabled" : "true", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.query.pull.limit.clause.enabled" : "true", + "ksql.connect.error.handler" : null, + "ksql.streams.topology.optimization" : "all", + "ksql.query.pull.router.thread.pool.size" : "50", + "ksql.query.push.v2.continuation.tokens.enabled" : "false", + "ksql.endpoint.migrate.query" : "true", + "ksql.query.push.v2.registry.installed" : "false", + "ksql.query.retry.backoff.max.ms" : "900000", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.metrics.tags.custom" : "", + "ksql.query.push.v2.catchup.consumer.msg.window" : "50", + "ksql.persistence.default.format.value" : null, + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.runtime.feature.shared.enabled" : "false", + "ksql.nested.error.set.null" : "true", + "ksql.udf.collect.metrics" : "false", + "ksql.query.pull.thread.pool.size" : "50", + "ksql.persistent.prefix" : "query_", + "ksql.metastore.backup.location" : "", + "ksql.error.classifier.regex" : "", + "ksql.suppress.buffer.size.bytes" : "-1" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/json_keys_-_extract_keys_from_a_json_object/7.2.0_1642513802022/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/json_keys_-_extract_keys_from_a_json_object/7.2.0_1642513802022/spec.json new file mode 100644 index 000000000000..b4170c666bbe --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/json_keys_-_extract_keys_from_a_json_object/7.2.0_1642513802022/spec.json @@ -0,0 +1,170 @@ +{ + "version" : "7.2.0", + "timestamp" : 1642513802022, + "path" : "query-validation-tests/json_keys.json", + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : { + "schema" : "`K` STRING KEY, `COLORS_OBJ` STRING", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "CSAS_OUTPUT_0.OUTPUT" : { + "schema" : "`K` STRING KEY, `COLORS` ARRAY", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + } + } + }, + "testCase" : { + "name" : "extract keys from a json object", + "inputs" : [ { + "topic" : "test_topic", + "key" : "1", + "value" : { + "colors_obj" : "{\"Red\": 3, \"Green\": 5}" + }, + "timestamp" : 0 + }, { + "topic" : "test_topic", + "key" : "1", + "value" : { + "colors_obj" : "{\"Black\": 2}" + }, + "timestamp" : 0 + }, { + "topic" : "test_topic", + "key" : "1", + "value" : { + "colors_obj" : "{\"Pink\": 1, \"Yellow\": 3}" + }, + "timestamp" : 0 + }, { + "topic" : "test_topic", + "key" : "1", + "value" : { + "colors_obj" : "{\"White\": 7, \"Pink\": 8}" + }, + "timestamp" : 0 + }, { + "topic" : "test_topic", + "key" : "1", + "value" : { + "colors_obj" : "{}" + }, + "timestamp" : 0 + }, { + "topic" : "test_topic", + "key" : "1", + "value" : { + "colors_obj" : null, + "timestamp" : 0 + } + }, { + "topic" : "test_topic", + "key" : "1", + "value" : { + "colors_obj" : "", + "timestamp" : 0 + } + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : "1", + "value" : { + "COLORS" : [ "Red", "Green" ] + }, + "timestamp" : 0 + }, { + "topic" : "OUTPUT", + "key" : "1", + "value" : { + "COLORS" : [ "Black" ] + }, + "timestamp" : 0 + }, { + "topic" : "OUTPUT", + "key" : "1", + "value" : { + "COLORS" : [ "Pink", "Yellow" ] + }, + "timestamp" : 0 + }, { + "topic" : "OUTPUT", + "key" : "1", + "value" : { + "COLORS" : [ "White", "Pink" ] + }, + "timestamp" : 0 + }, { + "topic" : "OUTPUT", + "key" : "1", + "value" : { + "COLORS" : [ ] + }, + "timestamp" : 0 + } ], + "topics" : [ { + "name" : "OUTPUT", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "test_topic", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM test (K STRING KEY, colors_obj STRING) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT K, JSON_KEYS(colors_obj) AS COLORS FROM test WHERE JSON_KEYS(colors_obj) IS NOT NULL;" ], + "post" : { + "sources" : [ { + "name" : "OUTPUT", + "type" : "STREAM", + "schema" : "`K` STRING KEY, `COLORS` ARRAY", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "JSON", + "keyFeatures" : [ ], + "valueFeatures" : [ ], + "isSource" : false + }, { + "name" : "TEST", + "type" : "STREAM", + "schema" : "`K` STRING KEY, `COLORS_OBJ` STRING", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "JSON", + "keyFeatures" : [ ], + "valueFeatures" : [ ], + "isSource" : false + } ], + "topics" : { + "topics" : [ { + "name" : "test_topic", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + }, { + "name" : "OUTPUT", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/json_keys_-_extract_keys_from_a_json_object/7.2.0_1642513802022/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/json_keys_-_extract_keys_from_a_json_object/7.2.0_1642513802022/topology new file mode 100644 index 000000000000..7b6119f7a4ae --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/json_keys_-_extract_keys_from_a_json_object/7.2.0_1642513802022/topology @@ -0,0 +1,16 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> WhereFilter + <-- KSTREAM-SOURCE-0000000000 + Processor: WhereFilter (stores: []) + --> Project + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000004 + <-- WhereFilter + Sink: KSTREAM-SINK-0000000004 (topic: OUTPUT) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/query-validation-tests/json_keys.json b/ksqldb-functional-tests/src/test/resources/query-validation-tests/json_keys.json new file mode 100644 index 000000000000..52a937f6edd5 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/query-validation-tests/json_keys.json @@ -0,0 +1,30 @@ +{ + "comments": [ + "Tests covering the use of the JSON_KEYS function." + ], + "tests": [ + { + "name": "extract keys from a json object", + "statements": [ + "CREATE STREAM test (K STRING KEY, colors_obj STRING) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT K, JSON_KEYS(colors_obj) AS COLORS FROM test WHERE JSON_KEYS(colors_obj) IS NOT NULL;" + ], + "inputs": [ + {"topic": "test_topic", "key": "1", "value": {"colors_obj": "{\"Red\": 3, \"Green\": 5}"}, "timestamp": 0}, + {"topic": "test_topic", "key": "1", "value": {"colors_obj": "{\"Black\": 2}"}, "timestamp": 0}, + {"topic": "test_topic", "key": "1", "value": {"colors_obj": "{\"Pink\": 1, \"Yellow\": 3}"}, "timestamp": 0}, + {"topic": "test_topic", "key": "1", "value": {"colors_obj": "{\"White\": 7, \"Pink\": 8}"}, "timestamp": 0}, + {"topic": "test_topic", "key": "1", "value": {"colors_obj": "{}"}, "timestamp": 0}, + {"topic": "test_topic", "key": "1", "value": {"colors_obj": null, "timestamp": 0}}, + {"topic": "test_topic", "key": "1", "value": {"colors_obj": "", "timestamp": 0}} + ], + "outputs": [ + {"topic": "OUTPUT", "key": "1", "value": {"COLORS": ["Red", "Green"]}, "timestamp": 0}, + {"topic": "OUTPUT", "key": "1", "value": {"COLORS": ["Black"]}, "timestamp": 0}, + {"topic": "OUTPUT", "key": "1", "value": {"COLORS": ["Pink", "Yellow"]}, "timestamp": 0}, + {"topic": "OUTPUT", "key": "1", "value": {"COLORS": ["White", "Pink"]}, "timestamp": 0}, + {"topic": "OUTPUT", "key": "1", "value": {"COLORS": []}, "timestamp": 0} + ] + } + ] +} \ No newline at end of file