Skip to content

Commit

Permalink
Move JSON parsing to UdfJsonMapper
Browse files Browse the repository at this point in the history
  • Loading branch information
Gerrrr committed Jan 19, 2022
1 parent 0ff82ef commit a37c305
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,8 @@

package io.confluent.ksql.function.udf.json;

import com.fasterxml.jackson.core.JacksonException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectReader;
import io.confluent.ksql.function.FunctionCategory;
import io.confluent.ksql.function.KsqlFunctionException;
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;
Expand All @@ -31,7 +29,6 @@
+ " otherwise.",
author = KsqlConstants.CONFLUENT_AUTHOR)
public class IsJsonString {
private static final ObjectReader OBJECT_READER = UdfJsonMapper.INSTANCE.get().reader();

@Udf
public Boolean check(@UdfParameter(description = "The input JSON string") final String input) {
Expand All @@ -40,9 +37,8 @@ public Boolean check(@UdfParameter(description = "The input JSON string") final
}

try {
final JsonNode node = OBJECT_READER.readTree(input);
return !node.isMissingNode();
} catch (final JacksonException e) {
return !UdfJsonMapper.parseJson(input).isMissingNode();
} catch (KsqlFunctionException e) {
return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class JsonArrayContains {
private static final JsonFactory PARSER_FACTORY = new JsonFactoryBuilder()
.disable(CANONICALIZE_FIELD_NAMES)
.build()
.setCodec(UdfJsonMapper.INSTANCE.get());
.setCodec(UdfJsonMapper.INSTANCE);

private static final EnumMap<JsonToken, Predicate<Object>> TOKEN_COMPAT;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,10 @@
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.google.common.collect.ImmutableList;
import io.confluent.ksql.function.FunctionCategory;
import io.confluent.ksql.function.KsqlFunctionException;
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;
import io.confluent.ksql.util.json.JsonPathTokenizer;
import java.io.IOException;
import java.util.List;

@UdfDescription(
Expand All @@ -35,8 +33,6 @@
+ " JSONPath or NULL if the specified path does not exist.")
public class JsonExtractString {

private static final ObjectReader OBJECT_READER = UdfJsonMapper.INSTANCE.get().reader();

private String latestPath = null;
private List<String> latestTokens = null;

Expand All @@ -56,7 +52,7 @@ public String extract(
latestPath = path;
}

JsonNode currentNode = parseJsonDoc(input);
JsonNode currentNode = UdfJsonMapper.parseJson(input);
for (final String token : latestTokens) {
if (currentNode instanceof ArrayNode) {
try {
Expand All @@ -80,12 +76,4 @@ public String extract(
return currentNode.toString();
}
}

private static JsonNode parseJsonDoc(final String jsonString) {
try {
return OBJECT_READER.readTree(jsonString);
} catch (final IOException e) {
throw new KsqlFunctionException("Invalid JSON format:" + jsonString, e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,57 @@

package io.confluent.ksql.function.udf.json;

import com.fasterxml.jackson.core.JacksonException;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import io.confluent.ksql.function.KsqlFunctionException;

/**
* Shared Object mapper used by JSON processing UDFs
*/
public enum UdfJsonMapper {
final class UdfJsonMapper {

INSTANCE;
private UdfJsonMapper() {}

private final ObjectMapper mapper = new ObjectMapper()
.disable(JsonGenerator.Feature.AUTO_CLOSE_TARGET)
.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS)
.enable(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN)
.setNodeFactory(JsonNodeFactory.withExactBigDecimals(true));
/**
* It is thread-safe to share an instance of the configured ObjectMapper
* (see https://fasterxml.github.io/jackson-databind/javadoc/2.12/com/fasterxml/jackson/databind/ObjectReader.html for more details).
* The object is configured as part of static initialization, so it is published safely
* as well.
*/
public static final ObjectMapper INSTANCE;
/**
* Akin to the {@link UdfJsonMapper#INSTANCE}, the reader is fully thread-safe, so there is
* no need to construct more than one instance. See https://fasterxml.github.io/jackson-databind/javadoc/2.12/com/fasterxml/jackson/databind/ObjectReader.html
* for more details.
*/
private static final ObjectReader OBJECT_READER;

public ObjectMapper get() {
return mapper.copy();
static {
INSTANCE = new ObjectMapper()
.disable(JsonGenerator.Feature.AUTO_CLOSE_TARGET)
.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS)
.enable(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN)
.setNodeFactory(JsonNodeFactory.withExactBigDecimals(true));
OBJECT_READER = INSTANCE.reader();
}

/**
* Parses string into a {@link JsonNode}; throws {@link KsqlFunctionException} on invalid JSON.
*
* @param jsonString the string to parse
* @return a JSON node
*/
public static JsonNode parseJson(final String jsonString) {
try {
return OBJECT_READER.readTree(jsonString);
} catch (final JacksonException e) {
throw new KsqlFunctionException("Invalid JSON format:" + jsonString, e);
}
}
}

0 comments on commit a37c305

Please sign in to comment.