Skip to content

Commit

Permalink
Make json_records return map<string,string>
Browse files Browse the repository at this point in the history
  • Loading branch information
Gerrrr committed Jan 25, 2022
1 parent e9305d6 commit eeb1066
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 92 deletions.
12 changes: 6 additions & 6 deletions docs/developer-guide/ksqldb-reference/scalar-functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -809,18 +809,18 @@ json_keys("") // => NULL
Since: 0.25.0

```sql
json_records(json_string) -> Array<Struct<json_key:String, json_value:String>>
json_records(json_string) -> Map<String, String>
```

Given a string, parses it as a JSON object and returns a ksqlDB array of structs containing 2
strings - json_key and json_value representing the top-level keys and values. Returns `NULL` if the
string can't be interpreted as a JSON object, for example, when the string is `NULL` or it does not
contain valid JSON, or the JSON value is not an object.
Given a string, parses it as a JSON object and returns a map representing the top-level keys and
values. Returns `NULL` if the string can't be interpreted as a JSON object, i.e. it is `NULL` or
it does not contain valid JSON, or the JSON value is not an object.


Examples:

```sql
json_records("{\"a\": \"abc\", \"b\": { \"c\": \"a\" }, \"d\": 1}") // => [Struct{json_key="a", json_value="\"abc\""}, Struct{json_key="b", json_value="{ \"c\": \"a\" }}, Struct{json_key="d", json_value="1"}]
json_records("{\"a\": \"abc\", \"b\": { \"c\": \"a\" }, \"d\": 1}") // {"a": "\"abc\"", "b": "{ \"c\": \"a\" }", "d": "1"}
json_records("{}") // => []
json_records("[]") // => NULL
json_records("123") // => NULL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,41 +16,27 @@
package io.confluent.ksql.function.udf.json;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import io.confluent.ksql.function.FunctionCategory;
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;
import io.confluent.ksql.util.KsqlConstants;
import java.util.ArrayList;
import java.util.List;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import java.util.HashMap;
import java.util.Map;

@UdfDescription(
name = "JSON_RECORDS",
category = FunctionCategory.JSON,
description = "Given a string, parses it as a JSON object and returns a ksqlDB array of "
+ "structs containing 2 strings - json_key and json_value representing the top-level "
+ "keys and values. Returns NULL if the string can't be interpreted as a JSON object, "
+ "for example, when the string is NULL or it does not contain valid JSON, or the JSON "
+ "value is not an object.",
description = "Given a string, parses it as a JSON object and returns a map representing "
+ "the top-level keys and values. Returns `NULL` if the string can't be interpreted as a "
+ "JSON object, i.e. it is `NULL` or it does not contain valid JSON, or the JSON value is "
+ "not an object.",
author = KsqlConstants.CONFLUENT_AUTHOR
)
public class JsonRecords {
static final String KEY_FIELD_NAME = "JSON_KEY";
private static final String VALUE_FIELD_NAME = "JSON_VALUE";

@VisibleForTesting
static final Schema STRUCT_SCHEMA = SchemaBuilder.struct()
.field(KEY_FIELD_NAME, Schema.OPTIONAL_STRING_SCHEMA)
.field(VALUE_FIELD_NAME, Schema.OPTIONAL_STRING_SCHEMA)
.optional()
.build();

@Udf(schema = "ARRAY<STRUCT<JSON_KEY STRING, JSON_VALUE STRING>>")
public List<Struct> records(@UdfParameter final String jsonObj) {
@Udf
public Map<String, String> records(@UdfParameter final String jsonObj) {
if (jsonObj == null) {
return null;
}
Expand All @@ -60,13 +46,8 @@ public List<Struct> records(@UdfParameter final String jsonObj) {
return null;
}

final List<Struct> ret = new ArrayList<>(node.size());
node.fieldNames().forEachRemaining(k -> {
final Struct struct = new Struct(STRUCT_SCHEMA);
struct.put(KEY_FIELD_NAME, k);
struct.put(VALUE_FIELD_NAME, node.get(k).toString());
ret.add(struct);
});
final Map<String, String> ret = new HashMap<>(node.size());
node.fieldNames().forEachRemaining(k -> ret.put(k, node.get(k).toString()));
return ret;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@
import static org.junit.Assert.assertNull;

import io.confluent.ksql.function.KsqlFunctionException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.kafka.connect.data.Struct;
import java.util.HashMap;
import java.util.Map;
import org.junit.Test;

public class JsonRecordsTest {
Expand All @@ -31,25 +30,21 @@ public class JsonRecordsTest {
@Test
public void shouldExtractRecords() {
// When
final List<Struct> result = udf.records("{\"a\": \"abc\", \"b\": { \"c\": \"a\" }, \"d\": 1}");
final Map<String, String> result = udf.records("{\"a\": \"abc\", \"b\": { \"c\": \"a\" }, \"d\": 1}");

// Then:
final Struct s1 = new Struct(JsonRecords.STRUCT_SCHEMA);
s1.put("JSON_KEY", "a");
s1.put("JSON_VALUE", "\"abc\"");
final Struct s2 = new Struct(JsonRecords.STRUCT_SCHEMA);
s2.put("JSON_KEY", "b");
s2.put("JSON_VALUE", "{\"c\":\"a\"}");
final Struct s3 = new Struct(JsonRecords.STRUCT_SCHEMA);
s3.put("JSON_KEY", "d");
s3.put("JSON_VALUE", "1");
final Map<String, String> expected = new HashMap<String, String>() {{
put("a", "\"abc\"");
put("b", "{\"c\":\"a\"}");
put("d", "1");
}};

assertEquals(Arrays.asList(s1, s2, s3), result);
assertEquals(expected, result);
}

@Test
public void shouldReturnEmptyListForEmptyObject() {
assertEquals(Collections.emptyList(), udf.records("{}"));
public void shouldReturnEmptyMapForEmptyObject() {
assertEquals(Collections.emptyMap(), udf.records("{}"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "OUTPUT",
"schema" : "`K` STRING KEY, `COLORS` ARRAY<STRUCT<`JSON_KEY` STRING, `JSON_VALUE` STRING>>",
"schema" : "`K` STRING KEY, `COLORS` MAP<STRING, STRING>",
"topicName" : "OUTPUT",
"formats" : {
"keyFormat" : {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"version" : "7.2.0",
"timestamp" : 1642675042992,
"timestamp" : 1643134010105,
"path" : "query-validation-tests/json_records.json",
"schemas" : {
"CSAS_OUTPUT_0.KsqlTopic.Source" : {
Expand All @@ -13,7 +13,7 @@
}
},
"CSAS_OUTPUT_0.OUTPUT" : {
"schema" : "`K` STRING KEY, `COLORS` ARRAY<STRUCT<`JSON_KEY` STRING, `JSON_VALUE` STRING>>",
"schema" : "`K` STRING KEY, `COLORS` MAP<STRING, STRING>",
"keyFormat" : {
"format" : "KAFKA"
},
Expand Down Expand Up @@ -78,56 +78,46 @@
"topic" : "OUTPUT",
"key" : "1",
"value" : {
"COLORS" : [ {
"JSON_KEY" : "Red",
"JSON_VALUE" : "3"
}, {
"JSON_KEY" : "Green",
"JSON_VALUE" : "5"
} ]
"COLORS" : {
"Red" : "3",
"Green" : "5"
}
},
"timestamp" : 0
}, {
"topic" : "OUTPUT",
"key" : "1",
"value" : {
"COLORS" : [ {
"JSON_KEY" : "Black",
"JSON_VALUE" : "2"
} ]
"COLORS" : {
"Black" : "2"
}
},
"timestamp" : 0
}, {
"topic" : "OUTPUT",
"key" : "1",
"value" : {
"COLORS" : [ {
"JSON_KEY" : "Pink",
"JSON_VALUE" : "1"
}, {
"JSON_KEY" : "Yellow",
"JSON_VALUE" : "3"
} ]
"COLORS" : {
"Pink" : "1",
"Yellow" : "3"
}
},
"timestamp" : 0
}, {
"topic" : "OUTPUT",
"key" : "1",
"value" : {
"COLORS" : [ {
"JSON_KEY" : "White",
"JSON_VALUE" : "7"
}, {
"JSON_KEY" : "Pink",
"JSON_VALUE" : "8"
} ]
"COLORS" : {
"White" : "7",
"Pink" : "8"
}
},
"timestamp" : 0
}, {
"topic" : "OUTPUT",
"key" : "1",
"value" : {
"COLORS" : [ ]
"COLORS" : { }
},
"timestamp" : 0
} ],
Expand All @@ -145,7 +135,7 @@
"sources" : [ {
"name" : "OUTPUT",
"type" : "STREAM",
"schema" : "`K` STRING KEY, `COLORS` ARRAY<STRUCT<`JSON_KEY` STRING, `JSON_VALUE` STRING>>",
"schema" : "`K` STRING KEY, `COLORS` MAP<STRING, STRING>",
"keyFormat" : {
"format" : "KAFKA"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,12 @@
{"topic": "test_topic", "key": "1", "value": {"colors_obj": "", "timestamp": 0}}
],
"outputs": [
{"topic": "OUTPUT", "key": "1", "value": {"COLORS": [{"JSON_KEY": "Red", "JSON_VALUE": "3"}, {"JSON_KEY": "Green", "JSON_VALUE": "5"}]}, "timestamp": 0},
{"topic": "OUTPUT", "key": "1", "value": {"COLORS": [{"JSON_KEY": "Black", "JSON_VALUE": "2"}]}, "timestamp": 0},
{"topic": "OUTPUT", "key": "1", "value": {"COLORS": [{"JSON_KEY": "Pink", "JSON_VALUE": "1"}, {"JSON_KEY": "Yellow", "JSON_VALUE": "3"}]}, "timestamp": 0},
{"topic": "OUTPUT", "key": "1", "value": {"COLORS": [{"JSON_KEY": "White", "JSON_VALUE": "7"}, {"JSON_KEY": "Pink", "JSON_VALUE": "8"}]}, "timestamp": 0},
{"topic": "OUTPUT", "key": "1", "value": {"COLORS": []}, "timestamp": 0}
],
"post": {
"sources": [
{"name": "OUTPUT", "type": "stream", "schema": "K STRING KEY, COLORS ARRAY<STRUCT<JSON_KEY STRING, JSON_VALUE STRING>>"}
]
}
{"topic": "OUTPUT", "key": "1", "value": {"COLORS": {"Red": "3", "Green": "5"}}, "timestamp": 0},
{"topic": "OUTPUT", "key": "1", "value": {"COLORS": {"Black": "2"}}, "timestamp": 0},
{"topic": "OUTPUT", "key": "1", "value": {"COLORS": {"Pink": "1", "Yellow": "3"}}, "timestamp": 0},
{"topic": "OUTPUT", "key": "1", "value": {"COLORS": {"White": "7", "Pink": "8"}}, "timestamp": 0},
{"topic": "OUTPUT", "key": "1", "value": {"COLORS": {}}, "timestamp": 0}
]
}
]
}

0 comments on commit eeb1066

Please sign in to comment.