Skip to content

Commit

Permalink
feat: Added UDFs ENTRIES and GENERATE_SERIES (#3724)
Browse files Browse the repository at this point in the history
This PR adds two new UDFs which should be useful when used together with table functions: ENTRIES and GENERATE_SERIES.
  • Loading branch information
purplefox authored and Tim Fox committed Nov 5, 2019
1 parent f4bd083 commit 0a4558b
Show file tree
Hide file tree
Showing 5 changed files with 670 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.function.udf.array;

import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;

/**
* This UDF constructs an array of structs from the entries in a map. Each struct has a field with
* name "K" containing the key (this is always a String) and a field with name "V" holding the
* value;
*/
@UdfDescription(name = "ENTRIES",
description =
"Construct an array from the entries in a map."
+ "The array can be optionally sorted on the keys."
)
public class Entries {

private static final Schema INT_STRUCT_SCHEMA = buildStructSchema(Schema.OPTIONAL_INT32_SCHEMA);
private static final Schema BIGINT_STRUCT_SCHEMA = buildStructSchema(
Schema.OPTIONAL_INT64_SCHEMA);
private static final Schema DOUBLE_STRUCT_SCHEMA = buildStructSchema(
Schema.OPTIONAL_FLOAT64_SCHEMA);
private static final Schema BOOLEAN_STRUCT_SCHEMA = buildStructSchema(
Schema.OPTIONAL_BOOLEAN_SCHEMA);
private static final Schema STRING_STRUCT_SCHEMA = buildStructSchema(
Schema.OPTIONAL_STRING_SCHEMA);
private static final String KEY_FIELD_NAME = "K";
private static final String VALUE_FIELD_NAME = "V";

private static Schema buildStructSchema(final Schema valueSchema) {
return SchemaBuilder.struct().field(KEY_FIELD_NAME, Schema.OPTIONAL_STRING_SCHEMA)
.field(VALUE_FIELD_NAME, valueSchema).optional().build();
}

@Udf(schema = "ARRAY<STRUCT<K STRING, V INT>>")
public List<Struct> entriesInt(
@UdfParameter(description = "The map to create entries from") final Map<String, Integer> map,
@UdfParameter(description = "If true then the resulting entries are sorted by key")
final boolean sorted
) {
return entries(map, INT_STRUCT_SCHEMA, sorted);
}

@Udf(schema = "ARRAY<STRUCT<K STRING, V BIGINT>>")
public List<Struct> entriesBigInt(
@UdfParameter(description = "The map to create entries from") final Map<String, Long> map,
@UdfParameter(description = "If true then the resulting entries are sorted by key")
final boolean sorted
) {
return entries(map, BIGINT_STRUCT_SCHEMA, sorted);
}

@Udf(schema = "ARRAY<STRUCT<K STRING, V DOUBLE>>")
public List<Struct> entriesDouble(
@UdfParameter(description = "The map to create entries from") final Map<String, Double> map,
@UdfParameter(description = "If true then the resulting entries are sorted by key")
final boolean sorted
) {
return entries(map, DOUBLE_STRUCT_SCHEMA, sorted);
}

@Udf(schema = "ARRAY<STRUCT<K STRING, V BOOLEAN>>")
public List<Struct> entriesBoolean(
@UdfParameter(description = "The map to create entries from") final Map<String, Boolean> map,
@UdfParameter(description = "If true then the resulting entries are sorted by key")
final boolean sorted
) {
return entries(map, BOOLEAN_STRUCT_SCHEMA, sorted);
}

@Udf(schema = "ARRAY<STRUCT<K STRING, V STRING>>")
public List<Struct> entriesString(
@UdfParameter(description = "The map to create entries from") final Map<String, String> map,
@UdfParameter(description = "If true then the resulting entries are sorted by key")
final boolean sorted
) {
return entries(map, STRING_STRUCT_SCHEMA, sorted);
}

private <T> List<Struct> entries(
final Map<String, T> map, final Schema structSchema, final boolean sorted
) {
if (map == null) {
return null;
}
final List<Struct> structs = new ArrayList<>(map.size());
Collection<Entry<String, T>> entries = map.entrySet();
if (sorted) {
final List<Entry<String, T>> list = new ArrayList<>(entries);
list.sort(Comparator.comparing(Entry::getKey));
entries = list;
}
for (Map.Entry<String, T> entry : entries) {
final Struct struct = new Struct(structSchema);
struct.put(KEY_FIELD_NAME, entry.getKey()).put(VALUE_FIELD_NAME, entry.getValue());
structs.add(struct);
}
return structs;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.function.udf.array;

import io.confluent.ksql.function.KsqlFunctionException;
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;
import java.util.ArrayList;
import java.util.List;

/**
* This UDF constructs an array containing an array of INTs or BIGINTs in the specified range
*/
@UdfDescription(name = "GENERATE_SERIES", description = "Construct an array of a range of values")
public class GenerateSeries {

@Udf
public List<Integer> generateSeriesInt(
@UdfParameter(description = "The beginning of the series") final int start,
@UdfParameter(description = "Marks the end of the series (inclusive)") final int end
) {
return generateSeriesInt(start, end, end - start > 0 ? 1 : -1);
}

@Udf
public List<Integer> generateSeriesInt(
@UdfParameter(description = "The beginning of the series") final int start,
@UdfParameter(description = "Marks the end of the series (inclusive)") final int end,
@UdfParameter(description = "Difference between each value in the series") final int step
) {
checkStep(step);
final int diff = end - start;
if (diff > 0 && step < 0 || diff < 0 && step > 0) {
throw new KsqlFunctionException("GENERATE_SERIES step has wrong sign");
}
final int size = 1 + diff / step;
final List<Integer> result = new ArrayList<>(size);
int pos = 0;
int val = start;
while (pos++ < size) {
result.add(val);
val += step;
}
return result;
}

@Udf
public List<Long> generateSeriesLong(
@UdfParameter(description = "The beginning of the series") final long start,
@UdfParameter(description = "Marks the end of the series (inclusive)") final long end
) {
return generateSeriesLong(start, end, end - start > 0 ? 1 : -1);
}

@Udf
public List<Long> generateSeriesLong(
@UdfParameter(description = "The beginning of the series") final long start,
@UdfParameter
(description = "Marks the end of the series (inclusive)") final long end,
@UdfParameter(description = "Difference between each value in the series") final int step
) {
checkStep(step);
final long diff = end - start;
if (diff > 0 && step < 0 || diff < 0 && step > 0) {
throw new KsqlFunctionException("GENERATE_SERIES step has wrong sign");
}
final int size = 1 + (int) (diff / step);
final List<Long> result = new ArrayList<>(size);
int pos = 0;
long val = start;
while (pos++ < size) {
result.add(val);
val += step;
}
return result;
}

private void checkStep(final int step) {
if (step == 0) {
throw new KsqlFunctionException("GENERATE_SERIES step cannot be zero");
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.function.udf.array;

import static junit.framework.TestCase.assertNull;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.UUID;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.kafka.connect.data.Struct;
import org.junit.Test;

public class EntriesTest {

private static final int ENTRIES = 20;

private Entries entriesUdf = new Entries();

@Test
public void shouldComputeIntEntries() {
Map<String, Integer> map = createMap(i -> i);
shouldComputeEntries(map, () -> entriesUdf.entriesInt(map, false));
}

@Test
public void shouldComputeBigIntEntries() {
Map<String, Long> map = createMap(Long::valueOf);
shouldComputeEntries(map, () -> entriesUdf.entriesBigInt(map, false));
}

@Test
public void shouldComputeDoubleEntries() {
Map<String, Double> map = createMap(Double::valueOf);
shouldComputeEntries(map, () -> entriesUdf.entriesDouble(map, false));
}

@Test
public void shouldComputeBooleanEntries() {
Map<String, Boolean> map = createMap(i -> i % 2 == 0);
shouldComputeEntries(map, () -> entriesUdf.entriesBoolean(map, false));
}

@Test
public void shouldComputeStringEntries() {
Map<String, String> map = createMap(String::valueOf);
shouldComputeEntries(map, () -> entriesUdf.entriesString(map, false));
}

@Test
public void shouldComputeIntEntriesSorted() {
Map<String, Integer> map = createMap(i -> i);
shouldComputeEntriesSorted(map, () -> entriesUdf.entriesInt(map, true));
}

@Test
public void shouldComputeBigIntEntriesSorted() {
Map<String, Long> map = createMap(Long::valueOf);
shouldComputeEntriesSorted(map, () -> entriesUdf.entriesBigInt(map, true));
}

@Test
public void shouldComputeDoubleEntriesSorted() {
Map<String, Double> map = createMap(Double::valueOf);
shouldComputeEntriesSorted(map, () -> entriesUdf.entriesDouble(map, true));
}

@Test
public void shouldComputeBooleanEntriesSorted() {
Map<String, Boolean> map = createMap(i -> i % 2 == 0);
shouldComputeEntriesSorted(map, () -> entriesUdf.entriesBoolean(map, true));
}

@Test
public void shouldComputeStringEntriesSorted() {
Map<String, String> map = createMap(String::valueOf);
shouldComputeEntriesSorted(map, () -> entriesUdf.entriesString(map, true));
}

@Test
public void shouldReturnNullListForNullMapInt() {
assertNull(entriesUdf.entriesInt(null, false));
}

@Test
public void shouldReturnNullListForNullMapBigInt() {
assertNull(entriesUdf.entriesBigInt(null, false));
}

@Test
public void shouldReturnNullListForNullMapDouble() {
assertNull(entriesUdf.entriesDouble(null, false));
}

@Test
public void shouldReturnNullListForNullMapBoolean() {
assertNull(entriesUdf.entriesBoolean(null, false));
}

@Test
public void shouldReturnNullListForNullMapString() {
assertNull(entriesUdf.entriesString(null, false));
}

private <T> void shouldComputeEntries(
Map<String, T> map, Supplier<List<Struct>> supplier
) {
List<Struct> out = supplier.get();
assertThat(out, hasSize(map.size()));
for (int i = 0; i < out.size(); i++) {
Struct struct = out.get(i);
T val = map.get(struct.getString("K"));
assertThat(val == null, is(false));
assertThat(val, is(struct.get("V")));
}
}

private <T> void shouldComputeEntriesSorted(Map<String, T> map, Supplier<List<Struct>> supplier) {
List<Struct> out = supplier.get();
List<Map.Entry<String, T>> entries = new ArrayList<>(map.entrySet());
entries.sort(Comparator.comparing(Entry::getKey));
assertThat(out.size(), is(entries.size()));
for (int i = 0; i < entries.size(); i++) {
Struct struct = out.get(i);
Map.Entry<String, T> entry = entries.get(i);
assertThat(struct.get("K"), is(entry.getKey()));
assertThat(struct.get("V"), is(entry.getValue()));
}
}

private <T> Map<String, T> createMap(Function<Integer, T> valueSupplier) {
Map<String, T> map = new HashMap<>();
for (int i = 0; i < ENTRIES; i++) {
map.put(UUID.randomUUID().toString(), valueSupplier.apply(i));
}
return map;
}

}
Loading

0 comments on commit 0a4558b

Please sign in to comment.