Skip to content

Commit

Permalink
feat: add to_json_string UDF
Browse files Browse the repository at this point in the history
  • Loading branch information
Gerrrr committed Jan 21, 2022
1 parent 82d9fc4 commit 1e50b6a
Show file tree
Hide file tree
Showing 49 changed files with 4,846 additions and 2 deletions.
33 changes: 33 additions & 0 deletions docs/developer-guide/ksqldb-reference/scalar-functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -758,6 +758,39 @@ json_records(NULL) // => NULL
json_records("abc") // => NULL
```
### `TO_JSON_STRING`
Since: 0.25.0
```sql
to_json_string(val) -> String
```
Given any ksqlDB type returns the equivalent JSON string.
Examples:
**Primitives types**
```sql
to_json_string(1) // => "1"
to_json_string(15.3) // => "15.3"
to_json_string("abc") // => "\"abc\""
to_json_string(true) // => "true"
to_json_string(2021-10-11) // DATE type, => "\"2021-10-11\""
to_json_string(13:25) // TIME type, => "\"13:25:10\""
to_json_string(2021-06-31T12:18:39.446) // TIMESTAMP type, => "\"2021-06-31T12:18:39.446\""
to_json_string(NULL) // => "null"
```
**Compound types**
```sql
to_json_string(Array[1, 2, 3]) // => "[1, 2, 3]"
to_json_string(Struct{id=1,name=A}) // => "{\"id\": 1, \"name\": \"a\"}"
to_json_string(Map('c' := 2, 'd' := 4)) // => "{\"c\": 2, \"d\": 4}"
to_json_string(Array[Struct{json_key=1 json_value=Map('c' := 2, 'd' := true)}]) // => "[{\"json_key\": 1, \"json_value\": {\"c\": 2, \"d\": true}}]"
### `INITCAP`
Since: 0.6.0
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021 Confluent Inc.
* Copyright 2022 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Copyright 2022 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.function.udf.json;

import io.confluent.ksql.function.FunctionCategory;
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;
import io.confluent.ksql.schema.ksql.SqlTimeTypes;
import io.confluent.ksql.util.BytesUtils;
import io.confluent.ksql.util.BytesUtils.Encoding;
import io.confluent.ksql.util.KsqlConstants;
import java.nio.ByteBuffer;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;

@UdfDescription(
name = "TO_JSON_STRING",
category = FunctionCategory.JSON,
description = "Given any ksqlDB type returns the equivalent JSON string.",
author = KsqlConstants.CONFLUENT_AUTHOR)
public class ToJsonString {

@Udf
public <T> String toJsonString(@UdfParameter final T input) {
return toJson(input);
}

private String toJson(final Object input) {
return UdfJsonMapper.writeValueAsJson(prepare(input));
}

private Object prepare(final Object input) {
if (input instanceof Time) {
return prepareTime((Time) input);
} else if (input instanceof Date) {
return prepareDate((Date) input);
} else if (input instanceof Timestamp) {
return prepareTimestamp((Timestamp) input);
} else if (input instanceof ByteBuffer) {
return prepareByteBuffer((ByteBuffer) input);
} else if (input instanceof Struct) {
return prepareStruct((Struct) input);
} else if (input instanceof Map) {
return prepareMap((Map<?, ?>) input);
} else if (input instanceof List) {
return prepareList((List<?>) input);
}
return input;
}

private String prepareTime(final Time time) {
return SqlTimeTypes.formatTime(time);
}

private String prepareDate(final Date date) {
return SqlTimeTypes.formatDate(date);
}

private String prepareTimestamp(final Timestamp timestamp) {
return SqlTimeTypes.formatTimestamp(timestamp);
}

private String prepareByteBuffer(final ByteBuffer bb) {
return BytesUtils.encode(bb.array(), Encoding.BASE64);
}

private Map<String, Object> prepareStruct(final Struct input) {
// avoids flaky tests by guaranteeing predictable order
final SortedMap<String, Object> map = new TreeMap<>();
for (final Field f : input.schema().fields()) {
map.put(f.name(), prepare(input.get(f)));
}
return map;
}

private Map<String, Object> prepareMap(final Map<?, ?> input) {
// avoids flaky tests by guaranteeing predictable order
final SortedMap<String, Object> map = new TreeMap<>();
for (final Map.Entry<?, ?> entry : input.entrySet()) {
map.put(entry.getKey().toString(), prepare(entry.getValue()));
}
return map;
}

private List<Object> prepareList(final List<?> input) {
final List<Object> lst = new ArrayList<>(input.size());
for (Object o : input) {
lst.add(prepare(o));
}
return lst;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.confluent.ksql.function.udf.json;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
Expand All @@ -37,4 +38,12 @@ public enum UdfJsonMapper {
public ObjectMapper get() {
return mapper.copy();
}

public static String writeValueAsJson(final Object obj) {
try {
return UdfJsonMapper.INSTANCE.writeValueAsString(obj);
} catch (JsonProcessingException e) {
throw new KsqlFunctionException("JSON serialization error: " + e.getMessage());
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021 Confluent Inc.
* Copyright 2022 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
/*
* Copyright 2022 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.function.udf.json;

import static org.junit.Assert.assertEquals;

import io.confluent.ksql.schema.ksql.SqlTimeTypes;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.junit.Test;

public class ToJsonStringTest {
private final ToJsonString udf = new ToJsonString();

@Test
public void shouldSerializeInt() {
// When:
final String result = udf.toJsonString(123);

// Then:
assertEquals("123", result);
}

@Test
public void shouldSerializeBoolean() {
// When:
final String result = udf.toJsonString(true);

// Then:
assertEquals("true", result);
}

@Test
public void shouldSerializeLong() {
// When:
final String result = udf.toJsonString(123L);

// Then:
assertEquals("123", result);
}

@Test
public void shouldSerializeDouble() {
// When:
final String result = udf.toJsonString(123.456d);

// Then:
assertEquals("123.456", result);
}

@Test
public void shouldSerializeDecimal() {
// When:
final String result = udf.toJsonString(new BigDecimal("0.33333"));

// Then:
assertEquals("0.33333", result);
}

@Test
public void shouldSerializeString() {
// When:
final String result = udf.toJsonString("abc");

// Then:
assertEquals("\"abc\"", result);
}

@Test
public void shouldSerializeBytes() {
// When:
final String result = udf.toJsonString(ByteBuffer.allocate(4).putInt(1097151));

// Then:
assertEquals("\"ABC9vw==\"", result);
}

@Test
public void shouldSerializeDate() {
// When:
final String result = udf.toJsonString(SqlTimeTypes.parseDate("2022-01-20"));

// Then:
assertEquals("\"2022-01-20\"", result);
}

@Test
public void shouldSerializeTime() {
// When:
final String result = udf.toJsonString(SqlTimeTypes.parseTime("06:02:20.588"));

// Then:
assertEquals("\"06:02:20\"", result);
}

@Test
public void shouldSerializeTimestamp() {
// When:
final String result = udf.toJsonString(SqlTimeTypes.parseTimestamp("2022-01-20T17:06:02.588"));

// Then:
assertEquals("\"2022-01-20T17:06:02.588\"", result);
}

@Test
public void shouldSerializeNull() {
// When:
final String result = udf.toJsonString((Integer)null);

// Then:
assertEquals("null", result);
}

@Test
public void shouldSerializeArray() {
// When:
final String result = udf.toJsonString(Arrays.asList(1, 2, 3));

// Then:
assertEquals("[1,2,3]", result);
}

@Test
public void shouldSerializeStruct() {
// When:
final Schema schema = SchemaBuilder.struct()
.field("id", Schema.INT32_SCHEMA)
.field("name", Schema.STRING_SCHEMA)
.build();
final Struct struct = new Struct(schema);
struct.put("id", 1);
struct.put("name", "Alice");
final String result = udf.toJsonString(struct);

// Then:
assertEquals("{\"id\":1,\"name\":\"Alice\"}", result);
}

@Test
public void shouldSerializeMap() {
// When:
final Map<String, Integer> map = new HashMap<String, Integer>() {{
put("c", 2);
put("d", 4);
}};
final String result = udf.toJsonString(map);

// Then:
assertEquals("{\"c\":2,\"d\":4}", result);
}

@Test
public void shouldSerializeNestedArrayOfStructs() {
// When:
final Schema mapSchema = SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.STRING_SCHEMA);
final String structkey = "json_key";
final String structValue = "json_value";
final Schema schema = SchemaBuilder.struct()
.field(structkey, Schema.INT32_SCHEMA)
.field(structValue, mapSchema)
.build();
final Struct struct = new Struct(schema);
struct.put(structkey, 1);
struct.put(structValue, new HashMap<Integer, String>() {{
put(2, "c");
put(4, "d");
}}
);
final String result = udf.toJsonString(Collections.singletonList(struct));

// Then:
assertEquals("[{\"json_key\":1,\"json_value\":{\"2\":\"c\",\"4\":\"d\"}}]", result);
}
}
Loading

0 comments on commit 1e50b6a

Please sign in to comment.