Skip to content

Commit

Permalink
feat: Add json_records UDF
Browse files Browse the repository at this point in the history
  • Loading branch information
Gerrrr committed Jan 20, 2022
1 parent 6c3320c commit b5243c2
Show file tree
Hide file tree
Showing 4 changed files with 222 additions and 0 deletions.
24 changes: 24 additions & 0 deletions docs/developer-guide/ksqldb-reference/scalar-functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -734,6 +734,30 @@ multiple elements, like those containing wildcards, aren't supported.

`CREATE STREAM LOGS (LOG STRUCT<CLOUD STRING, APP STRING, INSTANCE INT>, ...) WITH (VALUE_FORMAT='JSON', ...)`

### `JSON_RECORDS`

Since: 0.25.0

```sql
json_records(json_string) -> Array<Struct<json_key:String, json_value:String>>
```

Given a string, parses it as a JSON object and returns a ksqlDB array of structs containing 2
strings - json_key and json_value representing the top-level keys and values. Returns `NULL` if the
string can't be interpreted as a JSON object, for example, when the string is `NULL` or it does not
contain valid JSON, or the JSON value is not an object.

Examples:

```sql
json_records("{\"a\": \"abc\", \"b\": { \"c\": \"a\" }, \"d\": 1}") // => [Struct{json_key="a", json_value="\"abc\""}, Struct{json_key="b", json_value="{ \"c\": \"a\" }}, Struct{json_key="d", json_value="1"}]
json_records("{}") // => []
json_records("[]") // => NULL
json_records("123") // => NULL
json_records(NULL) // => NULL
json_records("abc") // => NULL
```
### `INITCAP`
Since: 0.6.0
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.function.udf.json;

import com.fasterxml.jackson.core.JacksonException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectReader;
import com.google.common.annotations.VisibleForTesting;
import io.confluent.ksql.function.FunctionCategory;
import io.confluent.ksql.function.KsqlFunctionException;
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;
import io.confluent.ksql.util.KsqlConstants;
import java.util.ArrayList;
import java.util.List;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;

@UdfDescription(
name = "JSON_RECORDS",
category = FunctionCategory.JSON,
description = "Given a string, parses it as a JSON object and returns a ksqlDB array of "
+ "structs containing 2 strings - json_key and json_value representing the top-level "
+ "keys and values. Returns NULL if the string can't be interpreted as a JSON object, "
+ "for example, when the string is NULL or it does not contain valid JSON, or the JSON "
+ "value is not an object.",
author = KsqlConstants.CONFLUENT_AUTHOR
)
public class JsonRecords {
private static final ObjectReader OBJECT_READER = UdfJsonMapper.INSTANCE.get().reader();
static final String KEY_FIELD_NAME = "JSON_KEY";
private static final String VALUE_FIELD_NAME = "JSON_VALUE";

@VisibleForTesting
static final Schema STRUCT_SCHEMA = SchemaBuilder.struct()
.field(KEY_FIELD_NAME, Schema.OPTIONAL_STRING_SCHEMA)
.field(VALUE_FIELD_NAME, Schema.OPTIONAL_STRING_SCHEMA)
.optional()
.build();

@Udf(schema = "ARRAY<STRUCT<JSON_KEY STRING, JSON_VALUE STRING>>")
public List<Struct> records(@UdfParameter final String jsonObj) {
if (jsonObj == null) {
return null;
}

final JsonNode node = parseJson(jsonObj);
if (node.isMissingNode() || !node.isObject()) {
return null;
}

final List<Struct> ret = new ArrayList<>();
node.fieldNames().forEachRemaining(k -> {
final Struct struct = new Struct(STRUCT_SCHEMA);
struct.put(KEY_FIELD_NAME, k);
struct.put(VALUE_FIELD_NAME, node.get(k).toString());
ret.add(struct);
});
return ret;
}

private static JsonNode parseJson(final String jsonString) {
try {
return OBJECT_READER.readTree(jsonString);
} catch (final JacksonException e) {
throw new KsqlFunctionException("Invalid JSON format:" + jsonString, e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.function.udf.json;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;

import io.confluent.ksql.function.KsqlFunctionException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.kafka.connect.data.Struct;
import org.junit.Test;

public class JsonRecordsTest {
private final JsonRecords udf = new JsonRecords();

@Test
public void shouldExtractRecords() {
// When
final List<Struct> result = udf.records("{\"a\": \"abc\", \"b\": { \"c\": \"a\" }, \"d\": 1}");

// Then:
final Struct s1 = new Struct(JsonRecords.STRUCT_SCHEMA);
s1.put("JSON_KEY", "a");
s1.put("JSON_VALUE", "\"abc\"");
final Struct s2 = new Struct(JsonRecords.STRUCT_SCHEMA);
s2.put("JSON_KEY", "b");
s2.put("JSON_VALUE", "{\"c\":\"a\"}");
final Struct s3 = new Struct(JsonRecords.STRUCT_SCHEMA);
s3.put("JSON_KEY", "d");
s3.put("JSON_VALUE", "1");

assertEquals(Arrays.asList(s1, s2, s3), result);
}

@Test
public void shouldReturnEmptyListForEmptyObject() {
assertEquals(Collections.emptyList(), udf.records("{}"));
}

@Test
public void shouldReturnNullForJsonNull() {
assertNull(udf.records("null"));
}

@Test
public void shouldReturnNullForJsonArray() {
assertNull(udf.records("[1,2,3]"));
}

@Test
public void shouldReturnNullForJsonNumber() {
assertNull(udf.records("123"));
}

@Test
public void shouldReturnNullForNull() {
assertNull(udf.records(null));
}

@Test(expected = KsqlFunctionException.class)
public void shouldThrowForInvalidJson() {
udf.records("abc");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
{
"comments": [
"Tests covering the use of the JSON_RECORDS function."
],
"tests": [
{
"name": "extract records from a json object",
"statements": [
"CREATE STREAM test (K STRING KEY, colors_obj STRING) WITH (kafka_topic='test_topic', value_format='JSON');",
"CREATE STREAM OUTPUT AS SELECT K, JSON_RECORDS(colors_obj) AS COLORS FROM test WHERE JSON_RECORDS(colors_obj) IS NOT NULL;"
],
"inputs": [
{"topic": "test_topic", "key": "1", "value": {"colors_obj": "{\"Red\": 3, \"Green\": 5}"}, "timestamp": 0},
{"topic": "test_topic", "key": "1", "value": {"colors_obj": "{\"Black\": 2}"}, "timestamp": 0},
{"topic": "test_topic", "key": "1", "value": {"colors_obj": "{\"Pink\": 1, \"Yellow\": 3}"}, "timestamp": 0},
{"topic": "test_topic", "key": "1", "value": {"colors_obj": "{\"White\": 7, \"Pink\": 8}"}, "timestamp": 0},
{"topic": "test_topic", "key": "1", "value": {"colors_obj": "{}"}, "timestamp": 0},
{"topic": "test_topic", "key": "1", "value": {"colors_obj": null, "timestamp": 0}},
{"topic": "test_topic", "key": "1", "value": {"colors_obj": "", "timestamp": 0}}
],
"outputs": [
{"topic": "OUTPUT", "key": "1", "value": {"COLORS": [{"JSON_KEY": "Red", "JSON_VALUE": "3"}, {"JSON_KEY": "Green", "JSON_VALUE": "5"}]}, "timestamp": 0},
{"topic": "OUTPUT", "key": "1", "value": {"COLORS": [{"JSON_KEY": "Black", "JSON_VALUE": "2"}]}, "timestamp": 0},
{"topic": "OUTPUT", "key": "1", "value": {"COLORS": [{"JSON_KEY": "Pink", "JSON_VALUE": "1"}, {"JSON_KEY": "Yellow", "JSON_VALUE": "3"}]}, "timestamp": 0},
{"topic": "OUTPUT", "key": "1", "value": {"COLORS": [{"JSON_KEY": "White", "JSON_VALUE": "7"}, {"JSON_KEY": "Pink", "JSON_VALUE": "8"}]}, "timestamp": 0},
{"topic": "OUTPUT", "key": "1", "value": {"COLORS": []}, "timestamp": 0}
],
"post": {
"sources": [
{"name": "OUTPUT", "type": "stream", "schema": "K STRING KEY, COLORS ARRAY<STRUCT<JSON_KEY STRING, JSON_VALUE STRING>>"}
]
}
}
]
}

0 comments on commit b5243c2

Please sign in to comment.