From 355b6e4a81035d7a5985110ff76043ba0af99a3c Mon Sep 17 00:00:00 2001 From: Florian Hussonnois Date: Mon, 13 Sep 2021 23:07:35 +0200 Subject: [PATCH] feat(expression): add UnixTimestamp expression function --- .../function/ExpressionFunctionExecutors.java | 9 +++-- .../function/impl/UnixTimestamp.java | 39 +++++++++++++++++++ 2 files changed, 45 insertions(+), 3 deletions(-) create mode 100644 connect-file-pulse-expression/src/main/java/io/streamthoughts/kafka/connect/filepulse/expression/function/impl/UnixTimestamp.java diff --git a/connect-file-pulse-expression/src/main/java/io/streamthoughts/kafka/connect/filepulse/expression/function/ExpressionFunctionExecutors.java b/connect-file-pulse-expression/src/main/java/io/streamthoughts/kafka/connect/filepulse/expression/function/ExpressionFunctionExecutors.java index 1f9dac087..9065ce9c7 100644 --- a/connect-file-pulse-expression/src/main/java/io/streamthoughts/kafka/connect/filepulse/expression/function/ExpressionFunctionExecutors.java +++ b/connect-file-pulse-expression/src/main/java/io/streamthoughts/kafka/connect/filepulse/expression/function/ExpressionFunctionExecutors.java @@ -38,6 +38,7 @@ import io.streamthoughts.kafka.connect.filepulse.expression.function.impl.Split; import io.streamthoughts.kafka.connect.filepulse.expression.function.impl.StartsWith; import io.streamthoughts.kafka.connect.filepulse.expression.function.impl.Trim; +import io.streamthoughts.kafka.connect.filepulse.expression.function.impl.UnixTimestamp; import io.streamthoughts.kafka.connect.filepulse.expression.function.impl.Uppercase; import io.streamthoughts.kafka.connect.filepulse.expression.function.impl.Uuid; import org.slf4j.Logger; @@ -64,7 +65,7 @@ public static ExpressionFunctionExecutor resolve(final String functionName, fina * Creates a new {@link ExpressionFunctionExecutors} instance. */ private ExpressionFunctionExecutors() { - // TODO function registration is hard-coded + // List of built-in expression functions to register. register(new Lowercase()); register(new Uppercase()); register(new Converts()); @@ -85,6 +86,7 @@ private ExpressionFunctionExecutors() { register(new Hash()); register(new Md5()); register(new Split()); + register(new UnixTimestamp()); } @SuppressWarnings("unchecked") @@ -106,8 +108,9 @@ private ExpressionFunctionExecutor make(final String functionName, final Express return new ExpressionFunctionExecutor(functionName, function, prepared); } - private void register(final ExpressionFunction function) { - LOG.info("Registered expression function '" + function.name() + "'"); + public void register(final ExpressionFunction function) { + Objects.requireNonNull(function, "'function' should not be null"); + LOG.info("Registered built-in expression function '{}'", function.name() ); functions.put(function.name(), function); } } diff --git a/connect-file-pulse-expression/src/main/java/io/streamthoughts/kafka/connect/filepulse/expression/function/impl/UnixTimestamp.java b/connect-file-pulse-expression/src/main/java/io/streamthoughts/kafka/connect/filepulse/expression/function/impl/UnixTimestamp.java new file mode 100644 index 000000000..6b5e5f5f7 --- /dev/null +++ b/connect-file-pulse-expression/src/main/java/io/streamthoughts/kafka/connect/filepulse/expression/function/impl/UnixTimestamp.java @@ -0,0 +1,39 @@ +/* + * Copyright 2021 StreamThoughts. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.streamthoughts.kafka.connect.filepulse.expression.function.impl; + +import io.streamthoughts.kafka.connect.filepulse.data.TypedValue; +import io.streamthoughts.kafka.connect.filepulse.expression.function.Arguments; +import io.streamthoughts.kafka.connect.filepulse.expression.function.ExpressionFunction; +import io.streamthoughts.kafka.connect.filepulse.expression.function.GenericArgument; + +/** + * Function to return the current Unix timestamp in seconds. + */ +public class UnixTimestamp implements ExpressionFunction { + + /** + * {@inheritDoc} + */ + @Override + public TypedValue apply(final Arguments args) { + return TypedValue.int64(System.currentTimeMillis()); + } +}