Skip to content

Commit

Permalink
feat: Implement describe and list functions for UDTFs (#3716)
Browse files Browse the repository at this point in the history
This PR implements describe and list functions for table functions. It also does some refactoring of the existing describe and list functions code to make it simpler.
  • Loading branch information
purplefox authored and Tim Fox committed Nov 5, 2019
1 parent 22469bc commit b0bbea4
Show file tree
Hide file tree
Showing 21 changed files with 293 additions and 160 deletions.
35 changes: 31 additions & 4 deletions ksql-cli/src/test/java/io/confluent/ksql/cli/CliTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -789,7 +789,7 @@ public void shouldDescribeScalarFunction() {
+ "Overview : Converts a BIGINT millisecond timestamp value into the string"
+ " representation of the \n"
+ " timestamp in the given format.\n"
+ "Type : scalar\n"
+ "Type : SCALAR\n"
+ "Jar : internal\n"
+ "Variations :";

Expand Down Expand Up @@ -832,7 +832,7 @@ public void shouldDescribeOverloadedScalarFunction() {
+ "Overview : Returns a substring of the passed in value.\n"
));
assertThat(output, containsString(
"Type : scalar\n"
"Type : SCALAR\n"
+ "Jar : internal\n"
+ "Variations :"
));
Expand All @@ -853,8 +853,8 @@ public void shouldDescribeOverloadedScalarFunction() {
public void shouldDescribeAggregateFunction() {
final String expectedSummary =
"Name : TOPK\n" +
"Author : Confluent\n" +
"Type : aggregate\n" +
"Author : Confluent\n" +
"Type : AGGREGATE\n" +
"Jar : internal\n" +
"Variations : \n";

Expand All @@ -870,6 +870,33 @@ public void shouldDescribeAggregateFunction() {
assertThat(output, containsString(expectedVariant));
}

@Test
public void shouldDescribeTableFunction() {
final String expectedOutput =
"Name : EXPLODE\n"
+ "Author : Confluent\n"
+ "Overview : Explodes an array. This function outputs one value for each element of the array.\n"
+ "Type : TABLE\n"
+ "Jar : internal\n"
+ "Variations : ";

localCli.handleLine("describe function explode;");
final String outputString = terminal.getOutputString();
assertThat(outputString, containsString(expectedOutput));

// variations for Udfs are loaded non-deterministically. Don't assume which variation is first
String expectedVariation =
"\tVariation : EXPLODE(list ARRAY<BYTES>)\n"
+ "\tReturns : BYTES\n"
+ "\tDescription : Explodes an array. This function outputs one value for each element of the array.";
assertThat(outputString, containsString(expectedVariation));

expectedVariation = "\tVariation : EXPLODE(input ARRAY<DECIMAL(1, 0)>)\n"
+ "\tReturns : DECIMAL(1, 0)\n"
+ "\tDescription : Explodes an array. This function outputs one value for each element of the array.";
assertThat(outputString, containsString(expectedVariation));
}

@Test
public void shouldExplainQueryId() {
// Given:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1226,7 +1226,8 @@ public void shouldPrintFunctionDescription() throws IOException {
+ "really, really, really, really, really, really, really, really, really, "
+ "really, really, really, really, really, really, really, really, long\n"
+ "and contains\n\ttabs and stuff"
)), FunctionType.scalar)));
)), FunctionType.SCALAR
)));

console.printKsqlEntityList(entityList);

Expand All @@ -1243,7 +1244,7 @@ public void shouldPrintFunctionDescription() throws IOException {
+ " and containing new lines\n"
+ " \tAND TABS\n"
+ " too!\n"
+ "Type : scalar\n"
+ "Type : SCALAR\n"
+ "Jar : some.jar\n"
+ "Variations : \n"
+ "\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,35 +58,19 @@ public AggregateFunctionFactory(final UdfMetadata metadata) {

protected abstract List<List<Schema>> supportedArgs();

public String getName() {
return metadata.getName();
}

public String getDescription() {
return metadata.getDescription();
}

public String getPath() {
return metadata.getPath();
public UdfMetadata getMetadata() {
return metadata;
}

public String getAuthor() {
return metadata.getAuthor();
}

public String getVersion() {
return metadata.getVersion();
public String getName() {
return metadata.getName();
}

public void eachFunction(final Consumer<KsqlAggregateFunction<?, ?, ?>> consumer) {
supportedArgs().forEach(args ->
consumer.accept(createAggregateFunction(args, getDefaultArguments())));
}

public boolean isInternal() {
return metadata.isInternal();
}

public AggregateFunctionInitArguments getDefaultArguments() {
return AggregateFunctionInitArguments.EMPTY_ARGS;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,15 @@ public interface FunctionRegistry {
*/
UdfFactory getUdfFactory(String functionName);

/**
* Get the factory for a table function.
*
* @param functionName the name of the function.
* @return the factory.
* @throws KsqlException on unknown table function.
*/
TableFunctionFactory getTableFunctionFactory(String functionName);

/**
* Get the factory for a UDAF.
*
Expand Down Expand Up @@ -100,6 +109,11 @@ public interface FunctionRegistry {
*/
List<UdfFactory> listFunctions();

/**
* @return all table function factories.
*/
List<TableFunctionFactory> listTableFunctions();

/**
* @return all UDAF factories.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,41 +18,46 @@
import io.confluent.ksql.function.udf.UdfMetadata;
import java.util.List;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.kafka.connect.data.Schema;

public abstract class TableFunctionFactory {
public class TableFunctionFactory {

private final UdfIndex<KsqlTableFunction> udtfIndex;

private final UdfMetadata metadata;

public TableFunctionFactory(final UdfMetadata metadata) {
this.metadata = Objects.requireNonNull(metadata, "metadata can't be null");
this.metadata = Objects.requireNonNull(metadata, "metadata");
this.udtfIndex = new UdfIndex<>(metadata.getName());
}

public abstract KsqlTableFunction createTableFunction(List<Schema> argTypeList);

protected abstract List<List<Schema>> supportedArgs();
public UdfMetadata getMetadata() {
return metadata;
}

public String getName() {
return metadata.getName();
}

public String getDescription() {
return metadata.getDescription();
public synchronized void eachFunction(final Consumer<KsqlTableFunction> consumer) {
udtfIndex.values().forEach(consumer);
}

public String getPath() {
return metadata.getPath();
public synchronized KsqlTableFunction createTableFunction(final List<Schema> argTypeList) {
return udtfIndex.getFunction(argTypeList);
}

public String getAuthor() {
return metadata.getAuthor();
protected synchronized List<List<Schema>> supportedArgs() {
return udtfIndex.values()
.stream()
.map(KsqlTableFunction::getArguments)
.collect(Collectors.toList());
}

public String getVersion() {
return metadata.getVersion();
synchronized void addFunction(final KsqlTableFunction tableFunction) {
udtfIndex.addFunction(tableFunction);
}

public boolean isInternal() {
return metadata.isInternal();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,34 +53,18 @@ private void checkCompatible(final KsqlFunction ksqlFunction) {
}
}

public String getName() {
return metadata.getName();
}

public String getAuthor() {
return metadata.getAuthor();
}

public String getVersion() {
return metadata.getVersion();
public UdfMetadata getMetadata() {
return metadata;
}

public String getDescription() {
return metadata.getDescription();
public String getName() {
return metadata.getName();
}

public synchronized void eachFunction(final Consumer<KsqlFunction> consumer) {
udfIndex.values().forEach(consumer);
}

public boolean isInternal() {
return metadata.isInternal();
}

public String getPath() {
return metadata.getPath();
}

public boolean matches(final UdfFactory that) {
return this == that
|| (this.udfClass.equals(that.udfClass) && this.metadata.equals(that.metadata));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,11 +197,26 @@ public synchronized AggregateFunctionFactory getAggregateFactory(final String fu
return udafFactory;
}

@Override
public synchronized TableFunctionFactory getTableFunctionFactory(final String functionName) {
final TableFunctionFactory tableFunctionFactory = udtfs.get(functionName.toUpperCase());
if (tableFunctionFactory == null) {
throw new KsqlException(
"Can not find any table functions with the name '" + functionName + "'");
}
return tableFunctionFactory;
}

@Override
public synchronized List<AggregateFunctionFactory> listAggregateFunctions() {
return new ArrayList<>(udafs.values());
}

@Override
public synchronized List<TableFunctionFactory> listTableFunctions() {
return new ArrayList<>(udtfs.values());
}

private void validateFunctionName(final String functionName) {
if (!functionNameValidator.test(functionName)) {
throw new KsqlException(functionName + " is not a valid function name."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
/**
* Loads user defined table functions (UDTFs)
*/
class UdtfLoader {
public class UdtfLoader {

private static final Logger LOGGER = LoggerFactory.getLogger(UdtfLoader.class);

Expand All @@ -45,7 +45,7 @@ class UdtfLoader {
private final SqlTypeParser typeParser;
private final boolean throwExceptionOnLoadFailure;

UdtfLoader(
public UdtfLoader(
final MutableFunctionRegistry functionRegistry,
final Optional<Metrics> metrics,
final SqlTypeParser typeParser,
Expand All @@ -57,7 +57,7 @@ class UdtfLoader {
this.throwExceptionOnLoadFailure = throwExceptionOnLoadFailure;
}

void loadUdtfFromClass(
public void loadUdtfFromClass(
final Class<?> theClass,
final String path
) {
Expand All @@ -79,7 +79,7 @@ void loadUdtfFromClass(
false
);

final UdtfTableFunctionFactory udtfFactory = new UdtfTableFunctionFactory(metadata);
final TableFunctionFactory udtfFactory = new TableFunctionFactory(metadata);

Arrays.stream(theClass.getMethods())
.filter(method -> method.getAnnotation(Udtf.class) != null)
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -403,13 +403,13 @@ public void shouldAllowClassesWithSameFQCNInDifferentUDFJars() throws Exception
@Test
public void shouldCreateUdfFactoryWithJarPathWhenExternal() {
final UdfFactory tostring = FUNC_REG.getUdfFactory("tostring");
assertThat(tostring.getPath(), equalTo("src/test/resources/udf-example.jar"));
assertThat(tostring.getMetadata().getPath(), equalTo("src/test/resources/udf-example.jar"));
}

@Test
public void shouldCreateUdfFactoryWithInternalPathWhenInternal() {
final UdfFactory substring = FUNC_REG.getUdfFactory("substring");
assertThat(substring.getPath(), equalTo(KsqlFunction.INTERNAL_PATH));
assertThat(substring.getMetadata().getPath(), equalTo(KsqlFunction.INTERNAL_PATH));
}

@Test
Expand Down
Loading

0 comments on commit b0bbea4

Please sign in to comment.