diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/json/IsJsonString.java b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/json/IsJsonString.java index 56731c61295f..5587a4ae1e72 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/json/IsJsonString.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/json/IsJsonString.java @@ -15,10 +15,8 @@ package io.confluent.ksql.function.udf.json; -import com.fasterxml.jackson.core.JacksonException; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectReader; import io.confluent.ksql.function.FunctionCategory; +import io.confluent.ksql.function.KsqlFunctionException; import io.confluent.ksql.function.udf.Udf; import io.confluent.ksql.function.udf.UdfDescription; import io.confluent.ksql.function.udf.UdfParameter; @@ -31,7 +29,6 @@ + " otherwise.", author = KsqlConstants.CONFLUENT_AUTHOR) public class IsJsonString { - private static final ObjectReader OBJECT_READER = UdfJsonMapper.INSTANCE.get().reader(); @Udf public Boolean check(@UdfParameter(description = "The input JSON string") final String input) { @@ -40,9 +37,8 @@ public Boolean check(@UdfParameter(description = "The input JSON string") final } try { - final JsonNode node = OBJECT_READER.readTree(input); - return !node.isMissingNode(); - } catch (final JacksonException e) { + return !UdfJsonMapper.parseJson(input).isMissingNode(); + } catch (KsqlFunctionException e) { return false; } } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/json/JsonArrayContains.java b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/json/JsonArrayContains.java index adc6207e04bf..0af2c9418971 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/json/JsonArrayContains.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/json/JsonArrayContains.java @@ -53,7 +53,7 @@ public class JsonArrayContains { private static final JsonFactory PARSER_FACTORY = new JsonFactoryBuilder() .disable(CANONICALIZE_FIELD_NAMES) .build() - .setCodec(UdfJsonMapper.INSTANCE.get()); + .setCodec(UdfJsonMapper.INSTANCE); private static final EnumMap> TOKEN_COMPAT; diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/json/JsonExtractString.java b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/json/JsonExtractString.java index 3947a58a2481..d6c5bdf8c28d 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/json/JsonExtractString.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/json/JsonExtractString.java @@ -20,12 +20,10 @@ import com.fasterxml.jackson.databind.node.ArrayNode; import com.google.common.collect.ImmutableList; import io.confluent.ksql.function.FunctionCategory; -import io.confluent.ksql.function.KsqlFunctionException; import io.confluent.ksql.function.udf.Udf; import io.confluent.ksql.function.udf.UdfDescription; import io.confluent.ksql.function.udf.UdfParameter; import io.confluent.ksql.util.json.JsonPathTokenizer; -import java.io.IOException; import java.util.List; @UdfDescription( @@ -35,8 +33,6 @@ + " JSONPath or NULL if the specified path does not exist.") public class JsonExtractString { - private static final ObjectReader OBJECT_READER = UdfJsonMapper.INSTANCE.get().reader(); - private String latestPath = null; private List latestTokens = null; @@ -56,7 +52,7 @@ public String extract( latestPath = path; } - JsonNode currentNode = parseJsonDoc(input); + JsonNode currentNode = UdfJsonMapper.parseJson(input); for (final String token : latestTokens) { if (currentNode instanceof ArrayNode) { try { @@ -80,12 +76,4 @@ public String extract( return currentNode.toString(); } } - - private static JsonNode parseJsonDoc(final String jsonString) { - try { - return OBJECT_READER.readTree(jsonString); - } catch (final IOException e) { - throw new KsqlFunctionException("Invalid JSON format:" + jsonString, e); - } - } } diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/json/UdfJsonMapper.java b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/json/UdfJsonMapper.java index 7ab2182ec126..7db3f3953d8e 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/json/UdfJsonMapper.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/json/UdfJsonMapper.java @@ -15,26 +15,57 @@ package io.confluent.ksql.function.udf.json; +import com.fasterxml.jackson.core.JacksonException; import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectReader; import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import io.confluent.ksql.function.KsqlFunctionException; /** * Shared Object mapper used by JSON processing UDFs */ -public enum UdfJsonMapper { +final class UdfJsonMapper { - INSTANCE; + private UdfJsonMapper() {} - private final ObjectMapper mapper = new ObjectMapper() - .disable(JsonGenerator.Feature.AUTO_CLOSE_TARGET) - .disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES) - .enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS) - .enable(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN) - .setNodeFactory(JsonNodeFactory.withExactBigDecimals(true)); + /** + * It is thread-safe to share an instance of the configured ObjectMapper + * (see https://fasterxml.github.io/jackson-databind/javadoc/2.12/com/fasterxml/jackson/databind/ObjectReader.html for more details). + * The object is configured as part of static initialization, so it is published safely + * as well. + */ + public static final ObjectMapper INSTANCE; + /** + * Akin to the {@link UdfJsonMapper#INSTANCE}, the reader is fully thread-safe, so there is + * no need to construct more than one instance. See https://fasterxml.github.io/jackson-databind/javadoc/2.12/com/fasterxml/jackson/databind/ObjectReader.html + * for more details. + */ + private static final ObjectReader OBJECT_READER; - public ObjectMapper get() { - return mapper.copy(); + static { + INSTANCE = new ObjectMapper() + .disable(JsonGenerator.Feature.AUTO_CLOSE_TARGET) + .disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES) + .enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS) + .enable(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN) + .setNodeFactory(JsonNodeFactory.withExactBigDecimals(true)); + OBJECT_READER = INSTANCE.reader(); + } + + /** + * Parses string into a {@link JsonNode}; throws {@link KsqlFunctionException} on invalid JSON. + * + * @param jsonString the string to parse + * @return a JSON node + */ + public static JsonNode parseJson(final String jsonString) { + try { + return OBJECT_READER.readTree(jsonString); + } catch (final JacksonException e) { + throw new KsqlFunctionException("Invalid JSON format:" + jsonString, e); + } } } \ No newline at end of file