Skip to content

Commit

Permalink
chore: add AS_VALUE Udf (#5194)
Browse files Browse the repository at this point in the history
* chore: add AS_VALUE Udf

Part of [klip-24](#5115).

A Udf that indicates that a key column in a projection should be copied into a value column, for example:

```sql
-- Given:
CREATE STREAM INPUT (ID INT KEY, V0 INT, V1 INT) WITH (kafka_topic='input', value_format='JSON');

-- When:
CREATE STREAM OUTPUT AS SELECT ID, AS_VALUE(ID) AS ID_COPY, V1 FROM INPUT;

-- Then:
-- resulting schema: ID INT KEY, ID_COPY INT, V1 INT
```

Co-authored-by: Andy Coates <[email protected]>
  • Loading branch information
big-andy-coates and big-andy-coates authored Apr 28, 2020
1 parent 3eb9cd6 commit c030382
Show file tree
Hide file tree
Showing 3 changed files with 219 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.function.udf;

@SuppressWarnings("MethodMayBeStatic")
@UdfDescription(name = "AS_VALUE", description = AsValue.DESCRIPTION)
public class AsValue {

static final String DESCRIPTION = "Used exclusively in the projection of a query to allow a key"
+ "column to be copied into the value schema. Has no affect if used anywhere else.";

@Udf
public <T> T asValue(final T keyColumn) {
return keyColumn;
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.function.udf;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;

import org.junit.Before;
import org.junit.Test;

public class AsValueTest {

private AsValue udf;

@Before
public void setUp() {
udf = new AsValue();
}

@Test
public void shouldHandlePrimitiveTypes() {
assertThat(udf.asValue(Boolean.TRUE), is(Boolean.TRUE));
assertThat(udf.asValue(Integer.MIN_VALUE), is(Integer.MIN_VALUE));
assertThat(udf.asValue(Long.MAX_VALUE), is(Long.MAX_VALUE));
assertThat(udf.asValue(Double.MAX_VALUE), is(Double.MAX_VALUE));
assertThat(udf.asValue("string"), is("string"));
}

@Test
public void shouldHandleNullPrimitiveTypes() {
assertThat(udf.asValue((Boolean)null), is(nullValue()));
assertThat(udf.asValue((Integer) null), is(nullValue()));
assertThat(udf.asValue((Long) null), is(nullValue()));
assertThat(udf.asValue((Double) null), is(nullValue()));
assertThat(udf.asValue((String) null), is(nullValue()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
{
"comments": [
"AS_VALUE udf is a special UDF introduced as part of KLIP-24: https://github.com/confluentinc/ksql/pull/5115",
"Its purpose is to indicate that a key column in a projection should result in a value column, not a key column.",
"When used elsewhere the udf has little affect, though it may affect column namings"
],
"tests": [
{
"name": "key column",
"comment": "awaiting other changes for KLIP-24 to enable expected behaviour",
"statements": [
"CREATE STREAM INPUT (ID INT KEY, V0 INT, V1 INT) WITH (kafka_topic='input', value_format='JSON');",
"CREATE STREAM OUTPUT AS SELECT AS_VALUE(ID) AS ID_COPY, V1 FROM INPUT;"
],
"properties": {
"ksql.any.key.name.enabled": true
},
"inputs": [
{"topic": "input", "key": 1, "value": {"V0": 2, "V1": 3}}
],
"outputs": [
{"topic": "OUTPUT", "key": 1, "value": {"ID_COPY": 1, "V1": 3}}
],
"post": {
"sources": [
{"name": "OUTPUT", "type": "stream", "schema": "ID INT KEY, ID_COPY INT, V1 INT"}
]
}
},
{
"name": "value column",
"statements": [
"CREATE STREAM INPUT (ID INT KEY, V0 INT, V1 INT) WITH (kafka_topic='input', value_format='JSON');",
"CREATE STREAM OUTPUT AS SELECT AS_VALUE(V0) FROM INPUT;"
],
"properties": {
"ksql.any.key.name.enabled": true
},
"inputs": [
{"topic": "input", "key": 1, "value": {"V0": 2, "V1": 3}}
],
"outputs": [
{"topic": "OUTPUT", "key": 1, "value": {"KSQL_COL_0": 2}}
],
"post": {
"sources": [
{"name": "OUTPUT", "type": "stream", "schema": "ID INT KEY, KSQL_COL_0 INT"}
]
}
},
{
"name": "expression",
"statements": [
"CREATE STREAM INPUT (ID INT KEY, V0 INT, V1 INT) WITH (kafka_topic='input', value_format='JSON');",
"CREATE STREAM OUTPUT AS SELECT AS_VALUE(V0 + V1) FROM INPUT;"
],
"properties": {
"ksql.any.key.name.enabled": true
},
"inputs": [
{"topic": "input", "key": 1, "value": {"V0": 2, "V1": 3}}
],
"outputs": [
{"topic": "OUTPUT", "key": 1, "value": {"KSQL_COL_0": 5}}
],
"post": {
"sources": [
{"name": "OUTPUT", "type": "stream", "schema": "ID INT KEY, KSQL_COL_0 INT"}
]
}
},
{
"name": "partition by",
"statements": [
"CREATE STREAM INPUT (ID INT KEY, V0 INT, V1 INT) WITH (kafka_topic='input', value_format='JSON');",
"CREATE STREAM OUTPUT AS SELECT * FROM INPUT PARTITION BY AS_VALUE(ID);"
],
"properties": {
"ksql.any.key.name.enabled": true
},
"inputs": [
{"topic": "input", "key": 1, "value": {"V0": 2, "V1": 3}}
],
"outputs": [
{"topic": "OUTPUT", "key": 1, "value": {"V0": 2, "V1": 3, "ID": 1}}
],
"post": {
"sources": [
{"name": "OUTPUT", "type": "stream", "schema": "KSQL_COL_0 INT KEY, V0 INT, V1 INT, ID INT"}
]
}
},
{
"name": "group by",
"statements": [
"CREATE STREAM INPUT (ID INT KEY, V0 INT, V1 INT) WITH (kafka_topic='input', value_format='JSON');",
"CREATE TABLE OUTPUT AS SELECT AS_VALUE(ID) AS K, COUNT(1) FROM INPUT GROUP BY AS_VALUE(ID);"
],
"properties": {
"ksql.any.key.name.enabled": true
},
"inputs": [
{"topic": "input", "key": 1, "value": {"V0": 2, "V1": 3}}
],
"outputs": [
{"topic": "OUTPUT", "key": 1, "value": {"K": 1, "KSQL_COL_0": 1}}
],
"post": {
"sources": [
{"name": "OUTPUT", "type": "table", "schema": "KSQL_COL_1 INT KEY, K INT, KSQL_COL_0 BIGINT"}
]
}
},
{
"name": "join",
"statements": [
"CREATE STREAM I1 (ID INT KEY, V0 INT, V1 INT) WITH (kafka_topic='i1', value_format='JSON');",
"CREATE TABLE I2 (ID INT PRIMARY KEY, V0 INT, V1 INT) WITH (kafka_topic='i2', value_format='JSON');",
"CREATE STREAM OUTPUT AS SELECT * FROM I1 JOIN I2 ON AS_VALUE(I1.ID) = I2.ID;"
],
"properties": {
"ksql.any.key.name.enabled": true
},
"inputs": [
{"topic": "i2", "key": 1, "value": {"V0": 2, "V1": 3}},
{"topic": "i1", "key": 1, "value": {"V0": 2, "V1": 3}}
],
"outputs": [
{"topic": "OUTPUT", "key": 1, "value": {"I1_ID": 1, "I1_V0": 2, "I1_V1": 3, "I2_ID": 1, "I2_V0": 2, "I2_V1": 3}}
],
"post": {
"sources": [
{"name": "OUTPUT", "type": "stream", "schema": "KSQL_COL_0 INT KEY, I1_ID INT, I1_V0 INT, I1_V1 INT, I2_ID INT, I2_V0 INT, I2_V1 INT"}
]
}
}
]
}

0 comments on commit c030382

Please sign in to comment.