Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement user defined delimiter for value format #3393

Merged
Show file tree
Hide file tree
Changes from 52 commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
51f416f
Merge pull request #1 from confluentinc/master
hasnat Oct 31, 2018
7945d62
Added optional for FORMAT_VALUE DELIMITED
hasnat Oct 15, 2018
6710357
Tests + Code review changes
hasnat Oct 18, 2018
e3481be
Fix formatting in sytax reference docs
hasnat Oct 19, 2018
cc38ab1
Merge remote-tracking branch 'fork/master'
hasnat Dec 23, 2018
eb230a4
Adds option to use custom delimiters when VALUE_FORMAT='DELIMITED'
hasnat Dec 27, 2018
4148ba9
Merge branch 'master' into add_optional_delimiter_for_delimited_format
hasnat Dec 28, 2018
007ad4a
Merge branch 'master' into add_optional_delimiter_for_delimited_format
hasnat Jan 12, 2019
c6d0092
Allow only single character delimiters
hasnat Feb 18, 2019
426b8f6
Merge branch 'master' into add_optional_delimiter_for_delimited_format
hasnat Feb 18, 2019
c774b09
Updates to master merge
hasnat Feb 18, 2019
aaf6fa4
Adds seralizer with custom delimiter
hasnat Mar 3, 2019
5dbcfaa
Merge branch 'master' into add_optional_delimiter_for_delimited_format
hasnat Mar 4, 2019
077699d
Fix max line issue
hasnat Mar 4, 2019
ab21e53
Fix logger classes
hasnat Mar 4, 2019
7794be0
Try default delimiter type with custom character
hasnat Mar 5, 2019
ef4ca56
Use utf 8
hasnat Mar 5, 2019
2b2491a
fix query
hasnat Mar 5, 2019
d1a64e3
Fix value delimiter WITH argument
hasnat Mar 5, 2019
5c58d25
use default with delimiter
hasnat Mar 5, 2019
8eecc8d
remove old test
hasnat Mar 17, 2019
a4bed15
Merge pull request #2 from confluentinc/master
hasnat May 6, 2019
c29017d
Merge origin/master
hasnat May 6, 2019
66ab934
style fix
hasnat May 6, 2019
3dce9e9
Missing default
hasnat May 6, 2019
9676843
bug fixes
hasnat May 6, 2019
a5b496a
more style fixes
hasnat May 6, 2019
5f27f02
fix: Merged PR into master
Sep 19, 2019
b1075d4
fix: Get the PR to work as the original author intended against curre…
Sep 20, 2019
924aa36
Merge branch 'master' into hasnat-add_optional_delimiter_for_delimite…
Sep 20, 2019
3af05b5
fix: mainly nits
Sep 20, 2019
0038c3e
fix: checkstyle issue
Sep 20, 2019
c469f85
Removed 2 param of() method for FormatInfo that was only used in tests
Sep 20, 2019
b1344ed
Add special handling for TAB and SPACE
Sep 20, 2019
4992a91
fix: nit and add more tests
Sep 21, 2019
ad6a8d2
fix: update docs, nits and make sure datagen supports TAB and SPACE
Sep 21, 2019
8167721
Merge branch 'master' into hasnat-add_optional_delimiter_for_delimite…
Sep 21, 2019
28b433f
fix:checkstyle nit
Sep 21, 2019
e2a690a
fix: Removed some debug code
Sep 21, 2019
4a215d5
fix: nit
Sep 21, 2019
543fdee
fix:nit
Sep 24, 2019
d2699ef
fix:nits
Sep 24, 2019
49426b8
fix:nits
Sep 24, 2019
53cdf5f
fix:nits
Sep 24, 2019
1e421ec
Improved CommonCreateConfigs description for value_delimiter
Sep 24, 2019
2f05ad3
Use validator to check value_delimiter is not empty
Sep 24, 2019
8cf99d3
Use concatenated delimiter named values in error message
Sep 24, 2019
1d7a223
Use value delimiter from source if sink doesn't specify one
Sep 24, 2019
33e7358
Remove file that shouldn't be there
Sep 24, 2019
0bbbb37
Created delimiter class
Sep 25, 2019
854b6ec
Review comments
Sep 25, 2019
4e7b0a7
Added getter for delimiter
Sep 25, 2019
ea33711
more nits
Sep 25, 2019
fa12f03
Add invalid value to exception message for Delimiter
Sep 25, 2019
a261c35
review
Sep 25, 2019
6f322e5
Merge branch 'master' into hasnat-add_optional_delimiter_for_delimite…
Sep 25, 2019
0552373
fixed test
Sep 25, 2019
6c27d26
fixed get of delimiter
Sep 26, 2019
87f8342
Fixed merge conflict
Sep 26, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions docs/developer-guide/syntax-reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,11 @@ The WITH clause supports the following properties:
| | set, then the default Kafka cluster configuration for replicas will be used for creating a |
| | new topic. |
+-------------------------+--------------------------------------------------------------------------------------------+
| VALUE_DELIMITER | Used when VALUE_FORMAT='DELIMITED'. Supports single character to be a delimiter, |
| | defaults to ','. |
| | For space and tab delimited values you must use the special values 'SPACE' or 'TAB', not |
| | an actual space or tab character. |
+-------------------------+--------------------------------------------------------------------------------------------+
| KEY | Optimization hint: If the Kafka message key is also present as a field/column in the Kafka |
| | message value, you may set this property to associate the corresponding field/column with |
| | the implicit ``ROWKEY`` column (message key). |
Expand Down Expand Up @@ -470,6 +475,11 @@ The WITH clause supports the following properties:
| | set, then the default Kafka cluster configuration for replicas will be used for creating a |
| | new topic. |
+-------------------------+--------------------------------------------------------------------------------------------+
| VALUE_DELIMITER | Used when VALUE_FORMAT='DELIMITED'. Supports single character to be a delimiter, |
| | defaults to ','. |
| | For space and tab delimited values you must use the special values 'SPACE' or 'TAB', not |
| | an actual space or tab character. |
+-------------------------+--------------------------------------------------------------------------------------------+
| KEY | Optimization hint: If the Kafka message key is also present as a field/column in the Kafka |
| | message value, you may set this property to associate the corresponding field/column with |
| | the implicit ``ROWKEY`` column (message key). |
Expand Down Expand Up @@ -587,6 +597,11 @@ The WITH clause for the result supports the following properties:
| | If this property is not set, then the format of the input stream/table is used. |
| | For more information, see :ref:`ksql_formats`. |
+-------------------------+------------------------------------------------------------------------------------------------------+
| VALUE_DELIMITER | Used when VALUE_FORMAT='DELIMITED'. Supports single character to be a delimiter, |
| | defaults to ','. |
| | For space and tab delimited values you must use the special values 'SPACE' or 'TAB', not |
| | an actual space or tab character. |
+-------------------------+------------------------------------------------------------------------------------------------------+
| PARTITIONS | The number of partitions in the backing topic. If this property is not set, then the number |
| | of partitions of the input stream/table will be used. In join queries, the property values are taken |
| | from the left-side stream or table. |
Expand Down Expand Up @@ -694,6 +709,11 @@ The WITH clause supports the following properties:
| | If this property is not set, then the format of the input stream/table is used. |
| | For more information, see :ref:`ksql_formats`. |
+-------------------------+------------------------------------------------------------------------------------------------------+
| VALUE_DELIMITER | Used when VALUE_FORMAT='DELIMITED'. Supports single character to be a delimiter, |
| | defaults to ','. |
| | For space and tab delimited values you must use the special values 'SPACE' or 'TAB', not |
| | an actual space or tab character. |
+-------------------------+------------------------------------------------------------------------------------------------------+
| PARTITIONS | The number of partitions in the backing topic. If this property is not set, then the number |
| | of partitions of the input stream/table will be used. In join queries, the property values are taken |
| | from the left-side stream or table. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ private static Serde<GenericRow> getJsonSerdeHelper(
final org.apache.kafka.connect.data.Schema schema
) {
return getGenericRowSerde(
FormatInfo.of(Format.JSON, Optional.empty()),
FormatInfo.of(Format.JSON),
schema,
() -> null
);
Expand All @@ -175,7 +175,7 @@ private static Serde<GenericRow> getAvroSerde(
final SchemaRegistryClient schemaRegistryClient = new MockSchemaRegistryClient();

return getGenericRowSerde(
FormatInfo.of(Format.AVRO, Optional.of("benchmarkSchema")),
FormatInfo.of(Format.AVRO, Optional.of("benchmarkSchema"), Optional.empty()),
schema,
() -> schemaRegistryClient
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.common.config.ConfigDef.Validator;
import org.apache.kafka.common.config.ConfigException;
Expand All @@ -32,6 +33,42 @@ public final class ConfigValidators {
private ConfigValidators() {
}

/**
* Validator that tests the STRING property can be parsed by the supplied {@code parser}.
* @param parser the parser.
* @return the validator
*/
public static Validator parses(final Function<String, ?> parser) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would benefit from a unit test - there's some in the patch I sent you.

return (name, val) -> {
try {
if (val == null) {
return;
}
if (!(val instanceof String)) {
throw new ConfigException(name, val, "Must be String");
}
parser.apply((String)val);
} catch (Exception e) {
throw new ConfigException(name, val, e.getMessage());
}
};
}

/**
* Validator that allows null values and calls the {@code delegate} for any non-null values.
* @param delegate the delegate to call for non-null values.
* @return the validator.
*/
public static Validator nullsAllowed(final Validator delegate) {
return (name, value) -> {
if (value == null) {
return;
}

delegate.ensureValid(name, value);
};
}

public static <T extends Enum<T>> Validator enumValues(final Class<T> enumClass) {
final String[] enumValues = EnumSet.allOf(enumClass)
.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.confluent.ksql.properties.with;

import io.confluent.ksql.configdef.ConfigValidators;
import io.confluent.ksql.serde.Delimiter;
import io.confluent.ksql.serde.Format;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
Expand All @@ -40,6 +41,8 @@ public final class CommonCreateConfigs {
public static final String VALUE_FORMAT_PROPERTY = "VALUE_FORMAT";
public static final String WRAP_SINGLE_VALUE = "WRAP_SINGLE_VALUE";

public static final String VALUE_DELIMITER_PROPERTY = "VALUE_DELIMITER";

static void addToConfigDef(
final ConfigDef configDef,
final boolean topicNameRequired,
Expand Down Expand Up @@ -117,7 +120,18 @@ static void addToConfigDef(
ConfigDef.Type.STRING,
null,
Importance.LOW,
"The fully qualified name of the Avro schema to use");
"The fully qualified name of the Avro schema to use"
)
.define(
VALUE_DELIMITER_PROPERTY,
ConfigDef.Type.STRING,
null,
big-andy-coates marked this conversation as resolved.
Show resolved Hide resolved
ConfigValidators.nullsAllowed(ConfigValidators.parses(Delimiter::parse)),
Importance.LOW,
"The delimiter to use when VALUE_FORMAT='DELIMITED'. Supports single "
+ "character to be a delimiter, defaults to ','. For space and tab delimited values "
+ "you must use the special values 'SPACE' or 'TAB', not an actual space or tab "
+ "character.");
}

private CommonCreateConfigs() {
Expand Down
109 changes: 109 additions & 0 deletions ksql-common/src/main/java/io/confluent/ksql/serde/Delimiter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.serde;

import com.google.common.collect.ImmutableMap;
import com.google.errorprone.annotations.Immutable;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;

@Immutable
public final class Delimiter {

private final char delimiter;

public static Delimiter parse(final char ch) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lol - I only mean change the of(String) to parse. I think this one with a single char should of remained of. sorry. Should have been more explicit!

return new Delimiter(ch);
}

private Delimiter(final char delimiter) {
this.delimiter = delimiter;
}

public static Delimiter parse(final String str) {
if (str == null) {
throw new NullPointerException();
}
if (str.trim().isEmpty()) {
throw new IllegalArgumentException(
purplefox marked this conversation as resolved.
Show resolved Hide resolved
"Delimiter cannot be empty, if you meant to have a tab or space for delimiter, please "
+ "use the special values 'TAB' or 'SPACE'"
+ System.lineSeparator()
+ "Example valid value: ';'"
);
}
if (str.length() == 1) {
return new Delimiter(str.charAt(0));
}
final Character delim = NAMED_DELIMITERS.get(str);
if (delim != null) {
return new Delimiter(delim);
}
throw new IllegalArgumentException(
"Delimiter must be a single character or "
+ NAMED_DELIMITERS_STRING
+ System.lineSeparator()
+ "Example valid value: ';'"
);
}

private static final Map<String, Character> NAMED_DELIMITERS = ImmutableMap
purplefox marked this conversation as resolved.
Show resolved Hide resolved
.<String, Character>builder()
.put("TAB", '\t')
.put("SPACE", ' ')
.build();

private static final String NAMED_DELIMITERS_STRING = getNamedDelimitersString();
purplefox marked this conversation as resolved.
Show resolved Hide resolved

private static String getNamedDelimitersString() {
final StringBuilder sb = new StringBuilder();
final Iterator<String> iter = NAMED_DELIMITERS.keySet().iterator();
while (iter.hasNext()) {
sb.append(iter.next());
if (iter.hasNext()) {
sb.append(", ");
}
}
return sb.toString();
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final Delimiter delimiter1 = (Delimiter) o;
return delimiter == delimiter1.delimiter;
}

@Override
public int hashCode() {
return Objects.hash(delimiter);
}

@Override
public String toString() {
return String.valueOf(delimiter);
}

public char getDelimiter() {
return delimiter;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,23 @@ public final class FormatInfo {

private final Format format;
private final Optional<String> avroFullSchemaName;
private final Optional<Delimiter> delimiter;

public static FormatInfo of(final Format format) {
return FormatInfo.of(format, Optional.empty());
return FormatInfo.of(format, Optional.empty(), Optional.empty());
big-andy-coates marked this conversation as resolved.
Show resolved Hide resolved
}

public static FormatInfo of(
final Format format,
final Optional<String> avroFullSchemaName
) {
return new FormatInfo(format, avroFullSchemaName);
final Optional<String> avroFullSchemaName,
final Optional<Delimiter> valueDelimiter) {
return new FormatInfo(format, avroFullSchemaName, valueDelimiter);
}

private FormatInfo(
final Format format,
final Optional<String> avroFullSchemaName
final Optional<String> avroFullSchemaName,
final Optional<Delimiter> delimiter
) {
this.format = Objects.requireNonNull(format, "format");
this.avroFullSchemaName = Objects.requireNonNull(avroFullSchemaName, "avroFullSchemaName");
Expand All @@ -51,9 +53,17 @@ private FormatInfo(
throw new KsqlException("Full schema name only supported with AVRO format");
}

if (avroFullSchemaName.map(name -> name.trim().isEmpty()).orElse(false)) {
throw new KsqlException("Schema name can not be empty");
if (format == Format.AVRO
&& avroFullSchemaName.map(name -> name.trim().isEmpty()).orElse(false)) {
throw new KsqlException("Schema name cannot be empty");
}

this.delimiter = Objects.requireNonNull(delimiter, "delimiter");

if (format != Format.DELIMITED && delimiter.isPresent()) {
throw new KsqlException("Delimeter only supported with DELIMITED format");
}

}

public Format getFormat() {
Expand All @@ -64,6 +74,10 @@ public Optional<String> getAvroFullSchemaName() {
return avroFullSchemaName;
}

public Optional<Delimiter> getDelimiter() {
return delimiter;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
Expand All @@ -74,19 +88,21 @@ public boolean equals(final Object o) {
}
final FormatInfo that = (FormatInfo) o;
return format == that.format
&& Objects.equals(avroFullSchemaName, that.avroFullSchemaName);
&& Objects.equals(avroFullSchemaName, that.avroFullSchemaName)
&& Objects.equals(delimiter, that.delimiter);
}

@Override
public int hashCode() {
return Objects.hash(format, avroFullSchemaName);
return Objects.hash(format, avroFullSchemaName, delimiter);
}

@Override
public String toString() {
return "FormatInfo{"
+ "format=" + format
+ ", avroFullSchemaName=" + avroFullSchemaName
+ ", delimiter=" + delimiter
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,6 @@ public static KeyFormat nonWindowed(final FormatInfo format) {
return new KeyFormat(format, Optional.empty());
}

public static KeyFormat windowed(
final Format format,
final Optional<String> avroSchemaName,
final WindowInfo windowInfo
) {
return new KeyFormat(
FormatInfo.of(format, avroSchemaName),
Optional.of(windowInfo)
);
}

public static KeyFormat windowed(
final FormatInfo format,
final WindowInfo windowInfo
Expand Down
Loading