Skip to content

Commit

Permalink
feat: add Logarithm, Exponential and Sqrt functions (#3091)
Browse files Browse the repository at this point in the history
* feat: add Logarithm, Exponential and Sqrt functions

Co-authored-by: chappers <[email protected]>
  • Loading branch information
big-andy-coates and 8bit-pixies authored Jul 19, 2019
1 parent 71ec1c0 commit a4ca934
Show file tree
Hide file tree
Showing 21 changed files with 456 additions and 17 deletions.
14 changes: 10 additions & 4 deletions docs/developer-guide/syntax-reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ supported KSQL types, including the complex types ``MAP``, ``ARRAY``, and
``STRUCT``.

.. note::

``Properties`` is not a valid field name.

Here's an example CREATE STREAM statement that uses a ``STRUCT`` to
Expand Down Expand Up @@ -90,7 +90,7 @@ KSQL Time Units
The following list shows valid time units for the SIZE, ADVANCE BY, SESSION, and
WITHIN clauses.

* DAY, DAYS
* DAY, DAYS
* HOUR, HOURS
* MINUTE, MINUTES
* SECOND, SECONDS
Expand All @@ -108,13 +108,13 @@ timestamp in ``ROWTIME``. By default, the implicit ``ROWTIME`` column is the
timestamp of a message in a Kafka topic. Timestamps have an accuracy of
one millisecond.

Use the TIMESTAMP property to override ``ROWTIME`` with the contents of the
Use the TIMESTAMP property to override ``ROWTIME`` with the contents of the
specified column. Define the format of a record's timestamp by using the
TIMESTAMP_FORMAT property.

If you use the TIMESTAMP property but don't set TIMESTAMP_FORMAT, KSQL assumes
that the timestamp field is a ``bigint``. If you set TIMESTAMP_FORMAT, the
TIMESTAMP field must be of type ``varchar`` and have a format that the
TIMESTAMP field must be of type ``varchar`` and have a format that the
``DateTimeFormatter`` Java class can parse.

If your timestamp format has embedded single quotes, you can escape them by
Expand Down Expand Up @@ -1559,6 +1559,8 @@ Scalar functions
| | | |
| | | ``{"foo": {"bar": "quux"}}`` |
+------------------------+---------------------------------------------------------------------------+---------------------------------------------------+
| EXP | ``EXP(col1)`` | The exponential of a value. |
+------------------------+---------------------------------------------------------------------------+---------------------------------------------------+
| FIELD | ``FIELD(str VARCHAR, args VARCHAR[])`` | Returns the 1-indexed position of ``str`` in |
| | | ``args``, or 0 if not found. If ``str`` is NULL, |
| | | the return value is 0, because NULL is not |
Expand All @@ -1581,6 +1583,8 @@ Scalar functions
+------------------------+---------------------------------------------------------------------------+---------------------------------------------------+
| LEN | ``LEN(col1)`` | The length of a string. |
+------------------------+---------------------------------------------------------------------------+---------------------------------------------------+
| LN | ``LN(col1)`` | The natural logarithm of a value. |
+------------------------+---------------------------------------------------------------------------+---------------------------------------------------+
| MASK | ``MASK(col1, 'X', 'x', 'n', '-')`` | Convert a string to a masked or obfuscated |
| | | version of itself. The optional arguments |
| | | following the input string to be masked are the |
Expand Down Expand Up @@ -1624,6 +1628,8 @@ Scalar functions
+------------------------+---------------------------------------------------------------------------+---------------------------------------------------+
| ROUND | ``ROUND(col1)`` | Round a value to the nearest BIGINT value. |
+------------------------+---------------------------------------------------------------------------+---------------------------------------------------+
| SQRT | ``SQRT(col1)`` | The square root of a value. |
+------------------------+---------------------------------------------------------------------------+---------------------------------------------------+
| SPLIT | ``SPLIT(col1, delimiter)`` | Splits a string into an array of substrings based |
| | | on a delimiter. If the delimiter is not found, |
| | | then the original string is returned as the only |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ public final class KsqlConstants {
private KsqlConstants() {
}

public static final String CONFLUENT_AUTHOR = "Confluent";

public static final String KSQL_INTERNAL_TOPIC_PREFIX = "_confluent-ksql-";
public static final String CONFLUENT_INTERNAL_TOPIC_PREFIX = "__confluent";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;
import io.confluent.ksql.util.KsqlConstants;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.ExecutionException;

@UdfDescription(name = "datetostring", author = "Confluent",
@UdfDescription(name = "datetostring", author = KsqlConstants.CONFLUENT_AUTHOR,
description = "Converts an integer representing days since epoch to a date string"
+ " using the given format pattern. Note this is the format Kafka Connect uses"
+ " to represent dates with no time component. The format pattern should be"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;
import io.confluent.ksql.util.KsqlConstants;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.ExecutionException;

@UdfDescription(name = "stringtodate", author = "Confluent",
@UdfDescription(name = "stringtodate", author = KsqlConstants.CONFLUENT_AUTHOR,
description = "Converts a string representation of a date into an integer representing"
+ " days since epoch using the given format pattern."
+ " Note this is the format Kafka Connect uses to represent dates with no time component."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;
import io.confluent.ksql.util.KsqlConstants;
import io.confluent.ksql.util.timestamp.StringToTimestampParser;
import java.time.ZoneId;
import java.util.concurrent.ExecutionException;

@UdfDescription(name = "stringtotimestamp", author = "Confluent",
@UdfDescription(name = "stringtotimestamp", author = KsqlConstants.CONFLUENT_AUTHOR,
description = "Converts a string representation of a date in the given format"
+ " into the BIGINT value that represents the millisecond timestamp.")
public class StringToTimestamp {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;
import io.confluent.ksql.util.KsqlConstants;
import java.sql.Timestamp;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.ExecutionException;

@UdfDescription(name = "timestamptostring", author = "Confluent",
@UdfDescription(name = "timestamptostring", author = KsqlConstants.CONFLUENT_AUTHOR,
description = "Converts a BIGINT millisecond timestamp value into"
+ " the string representation of the timestamp in the given format.")
public class TimestampToString {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;

import io.confluent.ksql.util.KsqlConstants;
import java.util.List;

/**
Expand All @@ -31,7 +31,7 @@
* <p>An optional fifth parameter allows to specify either "MI" (miles) or "KM" (kilometers) as the
* desired unit for the output measurement. Default is KM.
*/
@UdfDescription(name = "geo_distance", author = "Confluent",
@UdfDescription(name = "geo_distance", author = KsqlConstants.CONFLUENT_AUTHOR,
description = "Compute the distance between two points on the surface of the earth,"
+ " according to the Haversine formula for \"great circle distance\".")
public class GeoDistance {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright 2018 Confluent Inc.
*
* Licensed under the Confluent Community License; you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.function.udf.math;

import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;
import io.confluent.ksql.util.KsqlConstants;

@SuppressWarnings("WeakerAccess") // Invoked via reflection
@UdfDescription(
name = "exp",
author = KsqlConstants.CONFLUENT_AUTHOR,
description = "The exponential of a value."
)
public class Exp {

@Udf(description = "Returns Euler's number e raised to the power of an INT value.")
public Double exp(
@UdfParameter(
value = "exponent",
description = "the exponent to raise e to."
) final Integer exponent
) {
return exp(exponent == null ? null : exponent.doubleValue());
}

@Udf(description = "Returns Euler's number e raised to the power of a BIGINT value.")
public Double exp(
@UdfParameter(
value = "exponent",
description = "the exponent to raise e to."
) final Long exponent
) {
return exp(exponent == null ? null : exponent.doubleValue());
}

@Udf(description = "Returns Euler's number e raised to the power of a DOUBLE value.")
public Double exp(
@UdfParameter(
value = "exponent",
description = "the exponent to raise e to."
) final Double exponent
) {
return exponent == null
? null
: Math.exp(exponent);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright 2018 Confluent Inc.
*
* Licensed under the Confluent Community License; you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.function.udf.math;

import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;
import io.confluent.ksql.util.KsqlConstants;

@UdfDescription(
name = "ln",
author = KsqlConstants.CONFLUENT_AUTHOR,
description = "The natural logarithm of a value."
)
public class Ln {

@Udf(description = "Returns the natural logarithm (base e) of an INT value.")
public Double ln(
@UdfParameter(
value = "value",
description = "the value get the natual logarithm of."
) final Integer value
) {
return ln(value == null ? null : value.doubleValue());
}

@Udf(description = "Returns the natural logarithm (base e) of a BIGINT value.")
public Double ln(
@UdfParameter(
value = "value",
description = "the value get the natual logarithm of."
) final Long value
) {
return ln(value == null ? null : value.doubleValue());
}

@Udf(description = "Returns the natural logarithm (base e) of a DOUBLE value.")
public Double ln(
@UdfParameter(
value = "value",
description = "the value get the natual logarithm of."
) final Double value
) {
return value == null
? null
: Math.log(value);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright 2018 Confluent Inc.
*
* Licensed under the Confluent Community License; you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.function.udf.math;

import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;
import io.confluent.ksql.util.KsqlConstants;

@SuppressWarnings("WeakerAccess") // Invoked via reflection
@UdfDescription(
name = "sqrt",
author = KsqlConstants.CONFLUENT_AUTHOR,
description = "The square root of a value."
)
public class Sqrt {

@Udf(description = "Returns the correctly rounded positive square root of a DOUBLE value")
public Double sqrt(
@UdfParameter(
value = "value",
description = "The value to get the square root of."
) final Integer value
) {
return sqrt(value == null ? null : value.doubleValue());
}

@Udf(description = "Returns the correctly rounded positive square root of a DOUBLE value")
public Double sqrt(
@UdfParameter(
value = "value",
description = "The value to get the square root of."
) final Long value
) {
return sqrt(value == null ? null : value.doubleValue());
}

@Udf(description = "Returns the correctly rounded positive square root of a DOUBLE value")
public Double sqrt(
@UdfParameter(
value = "value",
description = "The value to get the square root of."
) final Double value
) {
return value == null
? null
: Math.sqrt(value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@

import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.util.KsqlConstants;

@UdfDescription(name = MaskKeepLeftKudf.NAME, author = "Confluent",
@UdfDescription(name = MaskKeepLeftKudf.NAME, author = KsqlConstants.CONFLUENT_AUTHOR,
description = "Returns a version of the input string with all but the"
+ " specified number of left-most characters masked out."
+ " Default masking rules will replace all upper-case characters with 'X', all lower-case"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@

import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.util.KsqlConstants;

@UdfDescription(name = MaskKeepRightKudf.NAME, author = "Confluent",
@UdfDescription(name = MaskKeepRightKudf.NAME, author = KsqlConstants.CONFLUENT_AUTHOR,
description = "Returns a version of the input string with all but the"
+ " specified number of right-most characters masked out."
+ " Default masking rules will replace all upper-case characters with 'X', all lower-case"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@

import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.util.KsqlConstants;

@UdfDescription(name = "mask", author = "Confluent",
@UdfDescription(name = "mask", author = KsqlConstants.CONFLUENT_AUTHOR,
description = "Returns a version of the input string with every character replaced by a mask."
+ " Default masking rules will replace all upper-case characters with 'X', all lower-case"
+ " characters with 'x', all digits with 'n', and any other character with '-'.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@

import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.util.KsqlConstants;

@UdfDescription(name = MaskLeftKudf.NAME, author = "Confluent",
@UdfDescription(name = MaskLeftKudf.NAME, author = KsqlConstants.CONFLUENT_AUTHOR,
description = "Returns a version of the input string with the"
+ " specified number of characters, starting from the beginning of the string, masked out."
+ " Default masking rules will replace all upper-case characters with 'X', all lower-case"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@

import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.util.KsqlConstants;

@UdfDescription(name = MaskRightKudf.NAME, author = "Confluent",
@UdfDescription(name = MaskRightKudf.NAME, author = KsqlConstants.CONFLUENT_AUTHOR,
description = "Returns a version of the input string with the"
+ " specified number of characters, counting back from the end of the string, masked out."
+ " Default masking rules will replace all upper-case characters with 'X', all lower-case"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;
import io.confluent.ksql.util.KsqlConstants;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;

@UdfDescription(name = SplitKudf.NAME, author = "Confluent",
@UdfDescription(name = SplitKudf.NAME, author = KsqlConstants.CONFLUENT_AUTHOR,
description = "Splits a string into an array of substrings based on a delimiter. "
+ "If the delimiter is found at the beginning of the string, end of the string, or there "
+ "are contiguous delimiters in the string, then empty strings are added to the array. "
Expand Down
Loading

0 comments on commit a4ca934

Please sign in to comment.