Skip to content

Commit

Permalink
feat(expressions): add built-in function 'if' to ScEL
Browse files Browse the repository at this point in the history
  • Loading branch information
fhussonnois committed Sep 15, 2021
1 parent e2f74b2 commit 28a6126
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.streamthoughts.kafka.connect.filepulse.expression.function.impl.Exists;
import io.streamthoughts.kafka.connect.filepulse.expression.function.impl.ExtractArray;
import io.streamthoughts.kafka.connect.filepulse.expression.function.impl.Hash;
import io.streamthoughts.kafka.connect.filepulse.expression.function.impl.If;
import io.streamthoughts.kafka.connect.filepulse.expression.function.impl.IsNull;
import io.streamthoughts.kafka.connect.filepulse.expression.function.impl.Lowercase;
import io.streamthoughts.kafka.connect.filepulse.expression.function.impl.Md5;
Expand Down Expand Up @@ -87,6 +88,7 @@ private ExpressionFunctionExecutors() {
register(new Md5());
register(new Split());
register(new UnixTimestamp());
register(new If());
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright 2021 StreamThoughts.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamthoughts.kafka.connect.filepulse.expression.function.impl;

import io.streamthoughts.kafka.connect.filepulse.data.TypedValue;
import io.streamthoughts.kafka.connect.filepulse.expression.Expression;
import io.streamthoughts.kafka.connect.filepulse.expression.function.Arguments;
import io.streamthoughts.kafka.connect.filepulse.expression.function.ExpressionArgument;
import io.streamthoughts.kafka.connect.filepulse.expression.function.ExpressionFunction;
import io.streamthoughts.kafka.connect.filepulse.expression.function.GenericArgument;
import io.streamthoughts.kafka.connect.filepulse.expression.function.MissingArgumentValue;

public class If implements ExpressionFunction {

private static final String BOOLEAN_EXPRESSION_ARG = "booleanExpression";
private static final String VALUE_IF_TRUE_ARG = "valueIfTrue";
private static final String VALUE_IF_FALSE_ARG = "valueIfFalse";

/**
* {@inheritDoc}
*/
@Override
public Arguments<?> prepare(final Expression[] args) {
if (args.length < 3) {
return Arguments.of(
new MissingArgumentValue(BOOLEAN_EXPRESSION_ARG),
new MissingArgumentValue(VALUE_IF_TRUE_ARG),
new MissingArgumentValue(VALUE_IF_FALSE_ARG)
);
}

return Arguments.of(
new ExpressionArgument(BOOLEAN_EXPRESSION_ARG, args[0]),
new ExpressionArgument(VALUE_IF_TRUE_ARG, args[1]),
new ExpressionArgument(VALUE_IF_FALSE_ARG, args[2])
);
}

/**
* {@inheritDoc}
*/
@Override
public TypedValue apply(Arguments<GenericArgument> args) {
final TypedValue condition = args.valueOf(BOOLEAN_EXPRESSION_ARG);
return condition.getBool() ? args.valueOf(VALUE_IF_TRUE_ARG) : args.valueOf(VALUE_IF_FALSE_ARG);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package io.streamthoughts.kafka.connect.filepulse.expression.parser.antlr4;

import io.streamthoughts.kafka.connect.filepulse.data.TypedValue;
import io.streamthoughts.kafka.connect.filepulse.data.internal.TypeConverter;
import io.streamthoughts.kafka.connect.filepulse.expression.Expression;
import io.streamthoughts.kafka.connect.filepulse.expression.ExpressionException;
import io.streamthoughts.kafka.connect.filepulse.expression.FunctionExpression;
Expand Down Expand Up @@ -130,7 +132,7 @@ public Expression expression() {
}

@Override
public void exitSubstitutionExpression(ScELParser.SubstitutionExpressionContext ctx) {
public void exitSubstitutionExpression(final ScELParser.SubstitutionExpressionContext ctx) {

final SubstitutionExpression substitution = new SubstitutionExpression(originalExpression);
while (!current.expressions.isEmpty()) {
Expand All @@ -141,13 +143,13 @@ public void exitSubstitutionExpression(ScELParser.SubstitutionExpressionContext
}

@Override
public void enterSubstitutionStrExpression(ScELParser.SubstitutionStrExpressionContext ctx) {
public void enterSubstitutionStrExpression(final ScELParser.SubstitutionStrExpressionContext ctx) {
current = new ContextExpressions(current);
contexts.put(ctx, current);
}

@Override
public void exitSubstitutionStrExpression(ScELParser.SubstitutionStrExpressionContext ctx) {
public void exitSubstitutionStrExpression(final ScELParser.SubstitutionStrExpressionContext ctx) {
final ContextExpressions contextExpression = contexts.remove(ctx);

final int start = ctx.LineSubstExprStart().getSymbol().getStartIndex();
Expand All @@ -173,7 +175,7 @@ public void enterFunctionDeclaration(ScELParser.FunctionDeclarationContext ctx)
}

@Override
public void exitFunctionDeclaration(ScELParser.FunctionDeclarationContext ctx) {
public void exitFunctionDeclaration(final ScELParser.FunctionDeclarationContext ctx) {
final ContextExpressions contextExpression = contexts.remove(ctx);

final ArrayDeque<Expression> argsExpressions = contextExpression.expressions;
Expand All @@ -198,7 +200,7 @@ public void exitFunctionDeclaration(ScELParser.FunctionDeclarationContext ctx) {
}

@Override
public void exitPropertyDeclaration(ScELParser.PropertyDeclarationContext ctx) {
public void exitPropertyDeclaration(final ScELParser.PropertyDeclarationContext ctx) {
final PropertyExpression expression = new PropertyExpression(
ctx.getText(),
ctx.scope() != null ? ctx.scope().getText() : defaultScope,
Expand All @@ -208,17 +210,18 @@ public void exitPropertyDeclaration(ScELParser.PropertyDeclarationContext ctx) {
}

@Override
public void exitValue(ScELParser.ValueContext ctx) {
public void exitValue(final ScELParser.ValueContext ctx) {
final String originalExpression = ctx.getText();

String value = originalExpression;
if (value.equalsIgnoreCase(NULL_STRING)) {
Object value;
if (originalExpression.equalsIgnoreCase(NULL_STRING)) {
value = null;
} else if (value.startsWith("'") && value.endsWith("'")) {
value = value.substring(1);
value = value.substring(0, value.length() - 1);
} else if (originalExpression.startsWith("'") && originalExpression.endsWith("'")) {
final String s = originalExpression.substring(1);
value = s.substring(0, s.length() - 1);
} else {
value = TypedValue.parse(originalExpression).value();
}

current.expressions.add(new ValueExpression(originalExpression, value));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,14 +197,25 @@ public void should_execute_concat_ws_function_given_no_empty_prefix_suffix() {
}

@Test
public void should_execute_hash_functions() {
public void should_execute_hash_function() {
Expression expression = parseExpression("{{ hash('hello') }}");
Assert.assertEquals("2132663229", expression.readValue(EMPTY_CONTEXT, TypedValue.class).getString());
}

@Test
public void should_execute_md5_functions() {
public void should_execute_md5_function() {
Expression expression = parseExpression("{{ md5('hello') }}");
Assert.assertEquals("5d41402abc4b2a76b9719d911017c592", expression.readValue(EMPTY_CONTEXT, TypedValue.class).getString());
}

@Test
public void should_execute_if_function() {
Expression expressionTrue = parseExpression("{{ if(exists($value, 'field'), true, false) }}");
Expression expressionFalse = parseExpression("{{ if(exists($value, 'dummy'), true, false) }}");
StandardEvaluationContext context = new StandardEvaluationContext(
TypedStruct.create().put("value", TypedStruct.create().put("field", ""))
);
Assert.assertTrue(expressionTrue.readValue(context, TypedValue.class).value());
Assert.assertFalse(expressionFalse.readValue(context, TypedValue.class).value());
}
}

0 comments on commit 28a6126

Please sign in to comment.