diff --git a/connect-file-pulse-expression/src/main/java/io/streamthoughts/kafka/connect/filepulse/expression/function/ExpressionFunctionExecutors.java b/connect-file-pulse-expression/src/main/java/io/streamthoughts/kafka/connect/filepulse/expression/function/ExpressionFunctionExecutors.java index 9065ce9c7..d1a9490f0 100644 --- a/connect-file-pulse-expression/src/main/java/io/streamthoughts/kafka/connect/filepulse/expression/function/ExpressionFunctionExecutors.java +++ b/connect-file-pulse-expression/src/main/java/io/streamthoughts/kafka/connect/filepulse/expression/function/ExpressionFunctionExecutors.java @@ -28,6 +28,7 @@ import io.streamthoughts.kafka.connect.filepulse.expression.function.impl.Exists; import io.streamthoughts.kafka.connect.filepulse.expression.function.impl.ExtractArray; import io.streamthoughts.kafka.connect.filepulse.expression.function.impl.Hash; +import io.streamthoughts.kafka.connect.filepulse.expression.function.impl.If; import io.streamthoughts.kafka.connect.filepulse.expression.function.impl.IsNull; import io.streamthoughts.kafka.connect.filepulse.expression.function.impl.Lowercase; import io.streamthoughts.kafka.connect.filepulse.expression.function.impl.Md5; @@ -87,6 +88,7 @@ private ExpressionFunctionExecutors() { register(new Md5()); register(new Split()); register(new UnixTimestamp()); + register(new If()); } @SuppressWarnings("unchecked") diff --git a/connect-file-pulse-expression/src/main/java/io/streamthoughts/kafka/connect/filepulse/expression/function/impl/If.java b/connect-file-pulse-expression/src/main/java/io/streamthoughts/kafka/connect/filepulse/expression/function/impl/If.java new file mode 100644 index 000000000..8db663574 --- /dev/null +++ b/connect-file-pulse-expression/src/main/java/io/streamthoughts/kafka/connect/filepulse/expression/function/impl/If.java @@ -0,0 +1,63 @@ +/* + * Copyright 2021 StreamThoughts. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamthoughts.kafka.connect.filepulse.expression.function.impl; + +import io.streamthoughts.kafka.connect.filepulse.data.TypedValue; +import io.streamthoughts.kafka.connect.filepulse.expression.Expression; +import io.streamthoughts.kafka.connect.filepulse.expression.function.Arguments; +import io.streamthoughts.kafka.connect.filepulse.expression.function.ExpressionArgument; +import io.streamthoughts.kafka.connect.filepulse.expression.function.ExpressionFunction; +import io.streamthoughts.kafka.connect.filepulse.expression.function.GenericArgument; +import io.streamthoughts.kafka.connect.filepulse.expression.function.MissingArgumentValue; + +public class If implements ExpressionFunction { + + private static final String BOOLEAN_EXPRESSION_ARG = "booleanExpression"; + private static final String VALUE_IF_TRUE_ARG = "valueIfTrue"; + private static final String VALUE_IF_FALSE_ARG = "valueIfFalse"; + + /** + * {@inheritDoc} + */ + @Override + public Arguments prepare(final Expression[] args) { + if (args.length < 3) { + return Arguments.of( + new MissingArgumentValue(BOOLEAN_EXPRESSION_ARG), + new MissingArgumentValue(VALUE_IF_TRUE_ARG), + new MissingArgumentValue(VALUE_IF_FALSE_ARG) + ); + } + + return Arguments.of( + new ExpressionArgument(BOOLEAN_EXPRESSION_ARG, args[0]), + new ExpressionArgument(VALUE_IF_TRUE_ARG, args[1]), + new ExpressionArgument(VALUE_IF_FALSE_ARG, args[2]) + ); + } + + /** + * {@inheritDoc} + */ + @Override + public TypedValue apply(Arguments args) { + final TypedValue condition = args.valueOf(BOOLEAN_EXPRESSION_ARG); + return condition.getBool() ? args.valueOf(VALUE_IF_TRUE_ARG) : args.valueOf(VALUE_IF_FALSE_ARG); + } +} diff --git a/connect-file-pulse-expression/src/main/java/io/streamthoughts/kafka/connect/filepulse/expression/parser/antlr4/Antlr4ExpressionParser.java b/connect-file-pulse-expression/src/main/java/io/streamthoughts/kafka/connect/filepulse/expression/parser/antlr4/Antlr4ExpressionParser.java index e0302162d..1fd6e4f25 100644 --- a/connect-file-pulse-expression/src/main/java/io/streamthoughts/kafka/connect/filepulse/expression/parser/antlr4/Antlr4ExpressionParser.java +++ b/connect-file-pulse-expression/src/main/java/io/streamthoughts/kafka/connect/filepulse/expression/parser/antlr4/Antlr4ExpressionParser.java @@ -19,6 +19,8 @@ package io.streamthoughts.kafka.connect.filepulse.expression.parser.antlr4; +import io.streamthoughts.kafka.connect.filepulse.data.TypedValue; +import io.streamthoughts.kafka.connect.filepulse.data.internal.TypeConverter; import io.streamthoughts.kafka.connect.filepulse.expression.Expression; import io.streamthoughts.kafka.connect.filepulse.expression.ExpressionException; import io.streamthoughts.kafka.connect.filepulse.expression.FunctionExpression; @@ -130,7 +132,7 @@ public Expression expression() { } @Override - public void exitSubstitutionExpression(ScELParser.SubstitutionExpressionContext ctx) { + public void exitSubstitutionExpression(final ScELParser.SubstitutionExpressionContext ctx) { final SubstitutionExpression substitution = new SubstitutionExpression(originalExpression); while (!current.expressions.isEmpty()) { @@ -141,13 +143,13 @@ public void exitSubstitutionExpression(ScELParser.SubstitutionExpressionContext } @Override - public void enterSubstitutionStrExpression(ScELParser.SubstitutionStrExpressionContext ctx) { + public void enterSubstitutionStrExpression(final ScELParser.SubstitutionStrExpressionContext ctx) { current = new ContextExpressions(current); contexts.put(ctx, current); } @Override - public void exitSubstitutionStrExpression(ScELParser.SubstitutionStrExpressionContext ctx) { + public void exitSubstitutionStrExpression(final ScELParser.SubstitutionStrExpressionContext ctx) { final ContextExpressions contextExpression = contexts.remove(ctx); final int start = ctx.LineSubstExprStart().getSymbol().getStartIndex(); @@ -173,7 +175,7 @@ public void enterFunctionDeclaration(ScELParser.FunctionDeclarationContext ctx) } @Override - public void exitFunctionDeclaration(ScELParser.FunctionDeclarationContext ctx) { + public void exitFunctionDeclaration(final ScELParser.FunctionDeclarationContext ctx) { final ContextExpressions contextExpression = contexts.remove(ctx); final ArrayDeque argsExpressions = contextExpression.expressions; @@ -198,7 +200,7 @@ public void exitFunctionDeclaration(ScELParser.FunctionDeclarationContext ctx) { } @Override - public void exitPropertyDeclaration(ScELParser.PropertyDeclarationContext ctx) { + public void exitPropertyDeclaration(final ScELParser.PropertyDeclarationContext ctx) { final PropertyExpression expression = new PropertyExpression( ctx.getText(), ctx.scope() != null ? ctx.scope().getText() : defaultScope, @@ -208,17 +210,18 @@ public void exitPropertyDeclaration(ScELParser.PropertyDeclarationContext ctx) { } @Override - public void exitValue(ScELParser.ValueContext ctx) { + public void exitValue(final ScELParser.ValueContext ctx) { final String originalExpression = ctx.getText(); - String value = originalExpression; - if (value.equalsIgnoreCase(NULL_STRING)) { + Object value; + if (originalExpression.equalsIgnoreCase(NULL_STRING)) { value = null; - } else if (value.startsWith("'") && value.endsWith("'")) { - value = value.substring(1); - value = value.substring(0, value.length() - 1); + } else if (originalExpression.startsWith("'") && originalExpression.endsWith("'")) { + final String s = originalExpression.substring(1); + value = s.substring(0, s.length() - 1); + } else { + value = TypedValue.parse(originalExpression).value(); } - current.expressions.add(new ValueExpression(originalExpression, value)); } } diff --git a/connect-file-pulse-expression/src/test/java/io/streamthoughts/kafka/connect/filepulse/expression/function/impl/FunctionsTest.java b/connect-file-pulse-expression/src/test/java/io/streamthoughts/kafka/connect/filepulse/expression/function/impl/FunctionsTest.java index 1048eba1e..32f59d1d1 100644 --- a/connect-file-pulse-expression/src/test/java/io/streamthoughts/kafka/connect/filepulse/expression/function/impl/FunctionsTest.java +++ b/connect-file-pulse-expression/src/test/java/io/streamthoughts/kafka/connect/filepulse/expression/function/impl/FunctionsTest.java @@ -197,14 +197,25 @@ public void should_execute_concat_ws_function_given_no_empty_prefix_suffix() { } @Test - public void should_execute_hash_functions() { + public void should_execute_hash_function() { Expression expression = parseExpression("{{ hash('hello') }}"); Assert.assertEquals("2132663229", expression.readValue(EMPTY_CONTEXT, TypedValue.class).getString()); } @Test - public void should_execute_md5_functions() { + public void should_execute_md5_function() { Expression expression = parseExpression("{{ md5('hello') }}"); Assert.assertEquals("5d41402abc4b2a76b9719d911017c592", expression.readValue(EMPTY_CONTEXT, TypedValue.class).getString()); } + + @Test + public void should_execute_if_function() { + Expression expressionTrue = parseExpression("{{ if(exists($value, 'field'), true, false) }}"); + Expression expressionFalse = parseExpression("{{ if(exists($value, 'dummy'), true, false) }}"); + StandardEvaluationContext context = new StandardEvaluationContext( + TypedStruct.create().put("value", TypedStruct.create().put("field", "")) + ); + Assert.assertTrue(expressionTrue.readValue(context, TypedValue.class).value()); + Assert.assertFalse(expressionFalse.readValue(context, TypedValue.class).value()); + } } \ No newline at end of file