Skip to content

Commit

Permalink
feat(expression): add new SCEL functions
Browse files Browse the repository at this point in the history
Add new expressions: concat, concat_ws, hash, md5
  • Loading branch information
fhussonnois committed Sep 18, 2020
1 parent 3809af0 commit 1c75a3e
Show file tree
Hide file tree
Showing 41 changed files with 1,094 additions and 761 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,10 @@ public boolean isNull() {
return value == null;
}

public boolean isNotNull() {
return value != null;
}

public boolean isEmpty() {
final Type type = schema.get().type();
if (Type.STRING == type) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,11 @@ functionDeclaration
;

functionParameters
: LPAREN functionObjectParameter (COMMA value)* RPAREN
: LPAREN (functionObjectParameter (COMMA functionObjectParameter)*?)* RPAREN
;

functionObjectParameter
: expression
: expression | value
;


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,17 @@

public class FunctionExpression extends AbstractExpression {

private final Expression valueExpression;
private final ExpressionFunctionExecutor functionExecutor;

/**
* Creates a new {@link FunctionExpression} instance.
*
* @param originalExpression the original string expression.
* @param valueExpression the expression value
* @param functionExecutor the function to be apply on the acceded value.
*/
public FunctionExpression(final String originalExpression,
final Expression valueExpression,
final ExpressionFunctionExecutor functionExecutor) {
super(originalExpression);
this.valueExpression = valueExpression;
this.functionExecutor = functionExecutor;
}

Expand All @@ -59,8 +55,7 @@ public TypedValue readValue(final EvaluationContext context) {
@SuppressWarnings("unchecked")
@Override
public <T> T readValue(final EvaluationContext context, final Class<T> expectedType) {
TypedValue returned = valueExpression.readValue(context, TypedValue.class);
final Object evaluated = functionExecutor.execute(returned);
final Object evaluated = functionExecutor.execute(context);

if (evaluated != null && expectedType.isAssignableFrom(evaluated.getClass())) {
return (T)evaluated;
Expand All @@ -86,10 +81,6 @@ public boolean canWrite() {
return false;
}

public Expression getValueExpression() {
return valueExpression;
}

public ExpressionFunctionExecutor getFunctionExecutor() {
return functionExecutor;
}
Expand All @@ -98,7 +89,6 @@ public ExpressionFunctionExecutor getFunctionExecutor() {
public String toString() {
return "[" +
"originalExpression=" + originalExpression() +
", valueExpression=" + valueExpression +
", function=" + functionExecutor +
']';
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2019-2020 StreamThoughts.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.streamthoughts.kafka.connect.filepulse.expression.function;

import io.streamthoughts.kafka.connect.filepulse.expression.EvaluationContext;

import java.util.List;

public interface Argument {

String name();

Object value();

List<String> errorMessages();

boolean isValid();

Object evaluate(final EvaluationContext context) ;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,29 @@
*/
package io.streamthoughts.kafka.connect.filepulse.expression.function;

import io.streamthoughts.kafka.connect.filepulse.expression.EvaluationContext;

import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.StreamSupport;

public interface Arguments extends Iterable<ArgumentValue> {
public class Arguments<T extends Argument> implements Iterable<T> {

@SafeVarargs
public static <T extends Argument> Arguments<T> of(final T... arguments) {
return new Arguments<>(Arrays.asList(arguments));
}

static Arguments empty() {
return new Arguments() {
public static <T extends Argument> Arguments<T> empty() {
return new Arguments<T>() {
@Override
public Iterator<ArgumentValue> iterator() {
return Collections.<ArgumentValue>emptyList().iterator();
public Iterator<T> iterator() {
return Collections.emptyIterator();
}

@Override
Expand All @@ -38,29 +50,120 @@ public String toString() {
};
}

default boolean valid() {
private final List<T> arguments;

/**
* Creates a new {@link Arguments} instance.
*/
public Arguments() {
this(new LinkedList<>());
}

/**
* Creates a new {@link Arguments} instance.
*
* @param argument the single argument.
*/
public Arguments(final T argument) {
this(Collections.singletonList(argument));
}

/**
* Creates a new {@link Arguments} instance.
* @param arguments the list of arguments.
*
*/
public Arguments(final List<T> arguments) {
this.arguments = arguments;
}

private Arguments<T> add(final T argument) {
arguments.add(argument);
return this;
}

/**
* Returns the argument at the specified position in this list.
*
* @param index index of the argument to return
* @return the argument at the specified position in this list
* @throws IndexOutOfBoundsException if the index is out of range
* ({@code index < 0 || index >= size()})
*/
public T get(final int index) {
return arguments.get(index);
}

public List<T> get(final int index, final int to) {
return arguments.subList(index, to);
}

public int size() {
return arguments.size();
}

@SuppressWarnings("unchecked")
public <V> V valueOf(final String name) {
Objects.requireNonNull(name, "name cannot be null");
Optional<Object> value = arguments
.stream()
.filter(a -> a.name().equals(name))
.findFirst()
.map(Argument::value);
if (value.isPresent()) return (V) value.get();

throw new IllegalArgumentException("No argument with name '" + name + "'");
}

Arguments<GenericArgument> evaluate(final EvaluationContext context) {
Arguments<GenericArgument> evaluated = new Arguments<>();
for (T arg : arguments) {
Object value = arg.evaluate(context);
evaluated.add(new GenericArgument<>(arg.name(), value));
}
return evaluated;
}

public boolean valid() {
return StreamSupport
.stream(this.spliterator(), true)
.allMatch(ArgumentValue::isValid);
.allMatch(Argument::isValid);
}

default String buildErrorMessage() {
String buildErrorMessage() {
final StringBuilder errors = new StringBuilder();
for (ArgumentValue value : this) {
for (T value : arguments) {
if (!value.errorMessages().isEmpty()) {
for (String error : value.errorMessages()) {
List<String> errorMessages = value.errorMessages();
for (String error : errorMessages) {
errors
.append("\n\t")
.append("Invalid argument with name='")
.append(value.name()).append("'")
.append(", value=")
.append("'").append(value.value()).append("'")
.append(" - ")
.append(error)
.append("\n\t");
.append("\n\t")
.append("Invalid argument with name='")
.append(value.name()).append("'")
.append(", value=")
.append("'").append(value.value()).append("'")
.append(" - ")
.append(error)
.append("\n\t");
}
}
}
return errors.toString();
}

/**
* {@inheritDoc}
*/
@Override
public Iterator<T> iterator() {
return arguments.iterator();
}

/**
* {@inheritDoc}
*/
@Override
public String toString() {
return arguments.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,29 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.streamthoughts.kafka.connect.filepulse.expression.function;

import io.streamthoughts.kafka.connect.filepulse.data.Type;
import io.streamthoughts.kafka.connect.filepulse.data.TypedValue;
import io.streamthoughts.kafka.connect.filepulse.expression.EvaluationContext;
import io.streamthoughts.kafka.connect.filepulse.expression.Expression;

public abstract class TypedExpressionFunction<E, T extends Arguments> implements ExpressionFunction<T> {

private final Type accept;
public class ExpressionArgument extends GenericArgument<Expression> {

/**
* Creates a new {@link TypedExpressionFunction} instance.
*
* @param accept the {@link Type} which is accepted.
* Creates a new {@link ExpressionArgument} instance.
* @param name the argument name.
* @param expression the argument expression.
*/
protected TypedExpressionFunction(final Type accept) {
this.accept = accept;
public ExpressionArgument(final String name, final Expression expression) {
super(name, expression);
}

/**
* {@inheritDoc}
*/
@Override
public boolean accept(final TypedValue value) {
return accept.equals(value.type());
public TypedValue evaluate(EvaluationContext context) {
return value().readValue(context, TypedValue.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@
package io.streamthoughts.kafka.connect.filepulse.expression.function;

import io.streamthoughts.kafka.connect.filepulse.data.TypedValue;
import io.streamthoughts.kafka.connect.filepulse.expression.Expression;
import org.apache.kafka.connect.data.SchemaAndValue;

import java.util.ArrayList;
import java.util.List;

/**
* Default interface to define a function that can be used into an expression.
*
* @param <T> the type of {@link Arguments}.
*/
public interface ExpressionFunction<T extends Arguments> {
public interface ExpressionFunction {

/**
* Returns the case-insensitive function name.
Expand All @@ -38,32 +40,38 @@ default String name() {
}

/**
* Prepares the arguments that will be passed to {@link #apply(TypedValue, Arguments)}.
* Prepares the arguments that will be passed to {@link #validate(Arguments)}.
*
* @param args list of {@link TypedValue} arguments.
* @return an instance of {@link Arguments}.
*/
T prepare(final TypedValue[] args);
default Arguments<?> prepare(final Expression[] args) {
if (args.length == 0) return Arguments.empty();
List<Argument> arguments = new ArrayList<>();
for (int i = 0; i < args.length; i++) {
arguments.add(new ExpressionArgument(String.valueOf(i), args[i]));
}
return new Arguments<>(arguments);
}

/**
* Checks whether this function accepts the specified value.
* Checks whether this function accepts the given arguments.
*
* @param value the value to be checked.
* @return {@code true} if this function can be executed on the value.
* @param arguments the arguments value to be checked.
* @return {@code true} if this function can be executed with the given arguments.
*/
default boolean accept(final TypedValue value) {
return true;
default Arguments<GenericArgument> validate(final Arguments<GenericArgument> arguments) {
return arguments;
}

/**
* Executes the function on the specified value for the specified arguments.
*
* @param field the field on which to apply the function.
* @param args the function arguments.
*
* @return a new {@link SchemaAndValue}.
*/
TypedValue apply(final TypedValue field, final T args);
TypedValue apply(final Arguments<GenericArgument> args);

static String functionNameFor(final ExpressionFunction function) {
// simple class name conversion to camelCase
Expand Down
Loading

0 comments on commit 1c75a3e

Please sign in to comment.