diff --git a/docs/developer-guide/syntax-reference.rst b/docs/developer-guide/syntax-reference.rst index 56d3f31b9001..84b236b98ff8 100644 --- a/docs/developer-guide/syntax-reference.rst +++ b/docs/developer-guide/syntax-reference.rst @@ -360,6 +360,11 @@ The WITH clause supports the following properties: | | set, then the default Kafka cluster configuration for replicas will be used for creating a | | | new topic. | +-------------------------+--------------------------------------------------------------------------------------------+ +| VALUE_DELIMITER | Used when VALUE_FORMAT='DELIMITED'. Supports single character to be a delimiter, | +| | defaults to ','. | +| | For space and tab delimited values you must use the special values 'SPACE' or 'TAB', not | +| | an actual space or tab character. | ++-------------------------+--------------------------------------------------------------------------------------------+ | KEY | Optimization hint: If the Kafka message key is also present as a field/column in the Kafka | | | message value, you may set this property to associate the corresponding field/column with | | | the implicit ``ROWKEY`` column (message key). | @@ -470,6 +475,11 @@ The WITH clause supports the following properties: | | set, then the default Kafka cluster configuration for replicas will be used for creating a | | | new topic. | +-------------------------+--------------------------------------------------------------------------------------------+ +| VALUE_DELIMITER | Used when VALUE_FORMAT='DELIMITED'. Supports single character to be a delimiter, | +| | defaults to ','. | +| | For space and tab delimited values you must use the special values 'SPACE' or 'TAB', not | +| | an actual space or tab character. | ++-------------------------+--------------------------------------------------------------------------------------------+ | KEY | Optimization hint: If the Kafka message key is also present as a field/column in the Kafka | | | message value, you may set this property to associate the corresponding field/column with | | | the implicit ``ROWKEY`` column (message key). | @@ -587,6 +597,11 @@ The WITH clause for the result supports the following properties: | | If this property is not set, then the format of the input stream/table is used. | | | For more information, see :ref:`ksql_formats`. | +-------------------------+------------------------------------------------------------------------------------------------------+ +| VALUE_DELIMITER | Used when VALUE_FORMAT='DELIMITED'. Supports single character to be a delimiter, | +| | defaults to ','. | +| | For space and tab delimited values you must use the special values 'SPACE' or 'TAB', not | +| | an actual space or tab character. | ++-------------------------+------------------------------------------------------------------------------------------------------+ | PARTITIONS | The number of partitions in the backing topic. If this property is not set, then the number | | | of partitions of the input stream/table will be used. In join queries, the property values are taken | | | from the left-side stream or table. | @@ -694,6 +709,11 @@ The WITH clause supports the following properties: | | If this property is not set, then the format of the input stream/table is used. | | | For more information, see :ref:`ksql_formats`. | +-------------------------+------------------------------------------------------------------------------------------------------+ +| VALUE_DELIMITER | Used when VALUE_FORMAT='DELIMITED'. Supports single character to be a delimiter, | +| | defaults to ','. | +| | For space and tab delimited values you must use the special values 'SPACE' or 'TAB', not | +| | an actual space or tab character. | ++-------------------------+------------------------------------------------------------------------------------------------------+ | PARTITIONS | The number of partitions in the backing topic. If this property is not set, then the number | | | of partitions of the input stream/table will be used. In join queries, the property values are taken | | | from the left-side stream or table. | diff --git a/ksql-benchmark/src/main/java/io/confluent/ksql/benchmark/SerdeBenchmark.java b/ksql-benchmark/src/main/java/io/confluent/ksql/benchmark/SerdeBenchmark.java index 1b543a2e845a..7dabb9ba9a21 100644 --- a/ksql-benchmark/src/main/java/io/confluent/ksql/benchmark/SerdeBenchmark.java +++ b/ksql-benchmark/src/main/java/io/confluent/ksql/benchmark/SerdeBenchmark.java @@ -158,7 +158,7 @@ private static Serde getJsonSerdeHelper( final org.apache.kafka.connect.data.Schema schema ) { return getGenericRowSerde( - FormatInfo.of(Format.JSON, Optional.empty()), + FormatInfo.of(Format.JSON), schema, () -> null ); @@ -170,7 +170,7 @@ private static Serde getAvroSerde( final SchemaRegistryClient schemaRegistryClient = new MockSchemaRegistryClient(); return getGenericRowSerde( - FormatInfo.of(Format.AVRO, Optional.of("benchmarkSchema")), + FormatInfo.of(Format.AVRO, Optional.of("benchmarkSchema"), Optional.empty()), schema, () -> schemaRegistryClient ); diff --git a/ksql-common/src/main/java/io/confluent/ksql/configdef/ConfigValidators.java b/ksql-common/src/main/java/io/confluent/ksql/configdef/ConfigValidators.java index 5aecc1c53b16..db8075ef6f1a 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/configdef/ConfigValidators.java +++ b/ksql-common/src/main/java/io/confluent/ksql/configdef/ConfigValidators.java @@ -20,6 +20,7 @@ import java.util.Arrays; import java.util.EnumSet; import java.util.List; +import java.util.function.Function; import java.util.stream.Collectors; import org.apache.kafka.common.config.ConfigDef.Validator; import org.apache.kafka.common.config.ConfigException; @@ -32,6 +33,39 @@ public final class ConfigValidators { private ConfigValidators() { } + /** + * Validator that tests the STRING property can be parsed by the supplied {@code parser}. + * @param parser the parser. + * @return the validator + */ + public static Validator parses(final Function parser) { + return (name, val) -> { + if (val != null && !(val instanceof String)) { + throw new IllegalArgumentException("validator should only be used with STRING defs"); + } + try { + parser.apply((String)val); + } catch (Exception e) { + throw new ConfigException("Configuration " + name + " is invalid: " + e.getMessage()); + } + }; + } + + /** + * Validator that allows null values and calls the {@code delegate} for any non-null values. + * @param delegate the delegate to call for non-null values. + * @return the validator. + */ + public static Validator nullsAllowed(final Validator delegate) { + return (name, value) -> { + if (value == null) { + return; + } + + delegate.ensureValid(name, value); + }; + } + public static > Validator enumValues(final Class enumClass) { final String[] enumValues = EnumSet.allOf(enumClass) .stream() diff --git a/ksql-common/src/main/java/io/confluent/ksql/properties/with/CommonCreateConfigs.java b/ksql-common/src/main/java/io/confluent/ksql/properties/with/CommonCreateConfigs.java index 5e78070e654f..da4c161a465a 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/properties/with/CommonCreateConfigs.java +++ b/ksql-common/src/main/java/io/confluent/ksql/properties/with/CommonCreateConfigs.java @@ -16,6 +16,7 @@ package io.confluent.ksql.properties.with; import io.confluent.ksql.configdef.ConfigValidators; +import io.confluent.ksql.serde.Delimiter; import io.confluent.ksql.serde.Format; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Importance; @@ -40,6 +41,8 @@ public final class CommonCreateConfigs { public static final String VALUE_FORMAT_PROPERTY = "VALUE_FORMAT"; public static final String WRAP_SINGLE_VALUE = "WRAP_SINGLE_VALUE"; + public static final String VALUE_DELIMITER_PROPERTY = "VALUE_DELIMITER"; + static void addToConfigDef( final ConfigDef configDef, final boolean topicNameRequired, @@ -117,7 +120,18 @@ static void addToConfigDef( ConfigDef.Type.STRING, null, Importance.LOW, - "The fully qualified name of the Avro schema to use"); + "The fully qualified name of the Avro schema to use" + ) + .define( + VALUE_DELIMITER_PROPERTY, + ConfigDef.Type.STRING, + null, + ConfigValidators.nullsAllowed(ConfigValidators.parses(Delimiter::of)), + Importance.LOW, + "The delimiter to use when VALUE_FORMAT='DELIMITED'. Supports single " + + "character to be a delimiter, defaults to ','. For space and tab delimited values " + + "you must use the special values 'SPACE' or 'TAB', not an actual space or tab " + + "character."); } private CommonCreateConfigs() { diff --git a/ksql-common/src/main/java/io/confluent/ksql/serde/Delimiter.java b/ksql-common/src/main/java/io/confluent/ksql/serde/Delimiter.java new file mode 100644 index 000000000000..f5f7190c863d --- /dev/null +++ b/ksql-common/src/main/java/io/confluent/ksql/serde/Delimiter.java @@ -0,0 +1,99 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.serde; + +import com.google.common.collect.ImmutableMap; +import com.google.errorprone.annotations.Immutable; +import java.util.Map; +import java.util.Objects; +import org.apache.commons.lang3.StringUtils; + +@Immutable +public final class Delimiter { + + private static final Map NAMED_DELIMITERS = ImmutableMap + .builder() + .put("TAB", '\t') + .put("SPACE", ' ') + .build(); + + private static final String NAMED_DELIMITERS_STRING = + StringUtils.join(NAMED_DELIMITERS.keySet(), ","); + + private final char delimiter; + + public static Delimiter of(final char ch) { + return new Delimiter(ch); + } + + private Delimiter(final char delimiter) { + this.delimiter = delimiter; + } + + public static Delimiter of(final String str) { + if (str == null) { + throw new NullPointerException(); + } + if (str.trim().isEmpty()) { + throw new IllegalArgumentException( + "Delimiter cannot be empty, if you meant to have a tab or space for delimiter, please " + + "use the special values 'TAB' or 'SPACE'" + + System.lineSeparator() + + "Example valid value: ';'" + ); + } + if (str.length() == 1) { + return new Delimiter(str.charAt(0)); + } + final Character delim = NAMED_DELIMITERS.get(str); + if (delim != null) { + return new Delimiter(delim); + } + throw new IllegalArgumentException( + "Invalid delimiter value: '" + str + + "'. Delimiter must be a single character or " + + NAMED_DELIMITERS_STRING + + System.lineSeparator() + + "Example valid value: ';'" + ); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final Delimiter delimiter1 = (Delimiter) o; + return delimiter == delimiter1.delimiter; + } + + @Override + public int hashCode() { + return Objects.hash(delimiter); + } + + @Override + public String toString() { + return String.valueOf(delimiter); + } + + public char getDelimiter() { + return delimiter; + } +} diff --git a/ksql-common/src/main/java/io/confluent/ksql/serde/FormatInfo.java b/ksql-common/src/main/java/io/confluent/ksql/serde/FormatInfo.java index f5c4a17ee576..8127b05e8772 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/serde/FormatInfo.java +++ b/ksql-common/src/main/java/io/confluent/ksql/serde/FormatInfo.java @@ -28,21 +28,23 @@ public final class FormatInfo { private final Format format; private final Optional avroFullSchemaName; + private final Optional delimiter; public static FormatInfo of(final Format format) { - return FormatInfo.of(format, Optional.empty()); + return FormatInfo.of(format, Optional.empty(), Optional.empty()); } public static FormatInfo of( final Format format, - final Optional avroFullSchemaName - ) { - return new FormatInfo(format, avroFullSchemaName); + final Optional avroFullSchemaName, + final Optional valueDelimiter) { + return new FormatInfo(format, avroFullSchemaName, valueDelimiter); } private FormatInfo( final Format format, - final Optional avroFullSchemaName + final Optional avroFullSchemaName, + final Optional delimiter ) { this.format = Objects.requireNonNull(format, "format"); this.avroFullSchemaName = Objects.requireNonNull(avroFullSchemaName, "avroFullSchemaName"); @@ -51,9 +53,17 @@ private FormatInfo( throw new KsqlException("Full schema name only supported with AVRO format"); } - if (avroFullSchemaName.map(name -> name.trim().isEmpty()).orElse(false)) { - throw new KsqlException("Schema name can not be empty"); + if (format == Format.AVRO + && avroFullSchemaName.map(name -> name.trim().isEmpty()).orElse(false)) { + throw new KsqlException("Schema name cannot be empty"); + } + + this.delimiter = Objects.requireNonNull(delimiter, "delimiter"); + + if (format != Format.DELIMITED && delimiter.isPresent()) { + throw new KsqlException("Delimeter only supported with DELIMITED format"); } + } public Format getFormat() { @@ -64,6 +74,10 @@ public Optional getAvroFullSchemaName() { return avroFullSchemaName; } + public Optional getDelimiter() { + return delimiter; + } + @Override public boolean equals(final Object o) { if (this == o) { @@ -74,12 +88,13 @@ public boolean equals(final Object o) { } final FormatInfo that = (FormatInfo) o; return format == that.format - && Objects.equals(avroFullSchemaName, that.avroFullSchemaName); + && Objects.equals(avroFullSchemaName, that.avroFullSchemaName) + && Objects.equals(delimiter, that.delimiter); } @Override public int hashCode() { - return Objects.hash(format, avroFullSchemaName); + return Objects.hash(format, avroFullSchemaName, delimiter); } @Override @@ -87,6 +102,7 @@ public String toString() { return "FormatInfo{" + "format=" + format + ", avroFullSchemaName=" + avroFullSchemaName + + ", delimiter=" + delimiter + '}'; } } diff --git a/ksql-common/src/main/java/io/confluent/ksql/serde/KeyFormat.java b/ksql-common/src/main/java/io/confluent/ksql/serde/KeyFormat.java index da2636d7767a..ad9cc5de821f 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/serde/KeyFormat.java +++ b/ksql-common/src/main/java/io/confluent/ksql/serde/KeyFormat.java @@ -34,17 +34,6 @@ public static KeyFormat nonWindowed(final FormatInfo format) { return new KeyFormat(format, Optional.empty()); } - public static KeyFormat windowed( - final Format format, - final Optional avroSchemaName, - final WindowInfo windowInfo - ) { - return new KeyFormat( - FormatInfo.of(format, avroSchemaName), - Optional.of(windowInfo) - ); - } - public static KeyFormat windowed( final FormatInfo format, final WindowInfo windowInfo diff --git a/ksql-common/src/test/java/io/confluent/ksql/configdef/ConfigValidatorsTest.java b/ksql-common/src/test/java/io/confluent/ksql/configdef/ConfigValidatorsTest.java index 9d94fa29d9e6..2cf0d7d7cfc0 100644 --- a/ksql-common/src/test/java/io/confluent/ksql/configdef/ConfigValidatorsTest.java +++ b/ksql-common/src/test/java/io/confluent/ksql/configdef/ConfigValidatorsTest.java @@ -15,14 +15,27 @@ package io.confluent.ksql.configdef; +import static org.mockito.ArgumentMatchers.any; + +import java.util.function.Function; import org.apache.kafka.common.config.ConfigDef.Validator; import org.apache.kafka.common.config.ConfigException; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +@RunWith(MockitoJUnitRunner.class) public class ConfigValidatorsTest { + @Mock + private Function parser; + @Rule public final ExpectedException expectedException = ExpectedException.none(); @@ -122,6 +135,54 @@ public void shouldThrowIfNoMatchForCaseInsensitiveStringNull() { validator.ensureValid("propName", null); } + @Test(expected = IllegalArgumentException.class) + public void shouldThrowOnNonStringFromParse() { + // Given: + final Validator validator = ConfigValidators.parses(parser); + + // When: + validator.ensureValid("propName", 10); + } + + @Test + public void shouldPassNullsToParser() { + // Given: + final Validator validator = ConfigValidators.parses(parser); + + // When: + validator.ensureValid("propName", null); + + // Then: + verify(parser).apply(null); + } + + @Test + public void shouldPassStringsToParser() { + // Given: + final Validator validator = ConfigValidators.parses(parser); + + // When: + validator.ensureValid("propName", "value"); + + // Then: + verify(parser).apply("value"); + } + + @Test + public void shouldThrowIfParserThrows() { + // Given: + final Validator validator = ConfigValidators.parses(parser); + when(parser.apply(any())).thenThrow(new IllegalArgumentException("some error")); + + // Then: + expectedException.expect(ConfigException.class); + expectedException + .expectMessage("Configuration propName is invalid: some error"); + + // When: + validator.ensureValid("propName", "value"); + } + private enum TestEnum { FOO, BAR } diff --git a/ksql-common/src/test/java/io/confluent/ksql/serde/FormatInfoTest.java b/ksql-common/src/test/java/io/confluent/ksql/serde/FormatInfoTest.java index 4f7956a635ff..9ac5807bf502 100644 --- a/ksql-common/src/test/java/io/confluent/ksql/serde/FormatInfoTest.java +++ b/ksql-common/src/test/java/io/confluent/ksql/serde/FormatInfoTest.java @@ -16,6 +16,7 @@ package io.confluent.ksql.serde; import static io.confluent.ksql.serde.Format.AVRO; +import static io.confluent.ksql.serde.Format.DELIMITED; import static io.confluent.ksql.serde.Format.KAFKA; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; @@ -44,23 +45,28 @@ public void shouldThrowNPEs() { public void shouldImplementEquals() { new EqualsTester() .addEqualityGroup( - FormatInfo.of(Format.AVRO, Optional.of("something")), - FormatInfo.of(Format.AVRO, Optional.of("something")) + FormatInfo.of(Format.DELIMITED, Optional.empty(), Optional.of(Delimiter.of('x'))), + FormatInfo.of(Format.DELIMITED, Optional.empty(), Optional.of(Delimiter.of('x'))) ) .addEqualityGroup( - FormatInfo.of(Format.AVRO, Optional.empty()), + FormatInfo.of(Format.AVRO, Optional.of("something"), Optional.empty()), + FormatInfo.of(Format.AVRO, Optional.of("something"), Optional.empty()) + ) + .addEqualityGroup( + FormatInfo.of(Format.AVRO, Optional.empty(), Optional.empty()), FormatInfo.of(Format.AVRO) ) .addEqualityGroup( - FormatInfo.of(Format.JSON, Optional.empty()) + FormatInfo.of(Format.JSON, Optional.empty(), Optional.empty()), + FormatInfo.of(Format.JSON) ) .testEquals(); } @Test - public void shouldImplementToString() { + public void shouldImplementToStringAvro() { // Given: - final FormatInfo info = FormatInfo.of(AVRO, Optional.of("something")); + final FormatInfo info = FormatInfo.of(AVRO, Optional.of("something"), Optional.empty()); // When: final String result = info.toString(); @@ -70,6 +76,19 @@ public void shouldImplementToString() { assertThat(result, containsString("something")); } + @Test + public void shouldImplementToStringDelimited() { + // Given: + final FormatInfo info = FormatInfo.of(DELIMITED, Optional.empty(), Optional.of(Delimiter.of("~"))); + + // When: + final String result = info.toString(); + + // Then: + assertThat(result, containsString("DELIMITED")); + assertThat(result, containsString("~")); + } + @Test public void shouldThrowOnNonAvroWithAvroSchemName() { // Then: @@ -77,30 +96,50 @@ public void shouldThrowOnNonAvroWithAvroSchemName() { expectedException.expectMessage("Full schema name only supported with AVRO format"); // When: - FormatInfo.of(Format.JSON, Optional.of("thing")); + FormatInfo.of(Format.JSON, Optional.of("thing"), Optional.empty()); } @Test public void shouldThrowOnEmptyAvroSchemaName() { // Then: expectedException.expect(KsqlException.class); - expectedException.expectMessage("Schema name can not be empty"); + expectedException.expectMessage("Schema name cannot be empty"); // When: - FormatInfo.of(Format.AVRO, Optional.of("")); + FormatInfo.of(Format.AVRO, Optional.of(""), Optional.empty()); } @Test public void shouldGetFormat() { - assertThat(FormatInfo.of(KAFKA, Optional.empty()).getFormat(), is(KAFKA)); + assertThat(FormatInfo.of(KAFKA, Optional.empty(), Optional.empty()).getFormat(), is(KAFKA)); } @Test public void shouldGetAvroSchemaName() { - assertThat(FormatInfo.of(AVRO, Optional.of("Something")).getAvroFullSchemaName(), + assertThat(FormatInfo.of(AVRO, Optional.of("Something"), Optional.empty()).getAvroFullSchemaName(), is(Optional.of("Something"))); - assertThat(FormatInfo.of(AVRO, Optional.empty()).getAvroFullSchemaName(), + assertThat(FormatInfo.of(AVRO, Optional.empty(), Optional.empty()).getAvroFullSchemaName(), is(Optional.empty())); } + + @Test + public void shouldThrowWhenAttemptingToUseValueDelimeterWithAvroFormat() { + // Then: + expectedException.expect(KsqlException.class); + expectedException.expectMessage("Delimeter only supported with DELIMITED format"); + + // When: + FormatInfo.of(Format.AVRO, Optional.of("something"), Optional.of(Delimiter.of('x'))); + } + + @Test + public void shouldThrowWhenAttemptingToUseValueDelimeterWithJsonFormat() { + // Then: + expectedException.expect(KsqlException.class); + expectedException.expectMessage("Delimeter only supported with DELIMITED format"); + + // When: + FormatInfo.of(Format.JSON, Optional.empty(), Optional.of(Delimiter.of('x'))); + } } \ No newline at end of file diff --git a/ksql-common/src/test/java/io/confluent/ksql/serde/KeyFormatTest.java b/ksql-common/src/test/java/io/confluent/ksql/serde/KeyFormatTest.java index e8de9320b3f8..8a798ff6ebc6 100644 --- a/ksql-common/src/test/java/io/confluent/ksql/serde/KeyFormatTest.java +++ b/ksql-common/src/test/java/io/confluent/ksql/serde/KeyFormatTest.java @@ -44,8 +44,8 @@ public void shouldThrowNPEs() { @Test public void shouldImplementEquals() { - final FormatInfo format1 = FormatInfo.of(AVRO, Optional.empty()); - final FormatInfo format2 = FormatInfo.of(JSON, Optional.empty()); + final FormatInfo format1 = FormatInfo.of(AVRO, Optional.empty(), Optional.empty()); + final FormatInfo format2 = FormatInfo.of(JSON, Optional.empty(), Optional.empty()); final WindowInfo window1 = WindowInfo.of(SESSION, Optional.empty()); final WindowInfo window2 = WindowInfo.of(HOPPING, Optional.of(Duration.ofMillis(1000))); @@ -74,7 +74,7 @@ public void shouldImplementEquals() { @Test public void shouldImplementToString() { // Given: - final FormatInfo formatInfo = FormatInfo.of(AVRO, Optional.of("something")); + final FormatInfo formatInfo = FormatInfo.of(AVRO, Optional.of("something"), Optional.empty()); final WindowInfo windowInfo = WindowInfo.of(HOPPING, Optional.of(Duration.ofMillis(10101))); final KeyFormat keyFormat = KeyFormat.windowed(formatInfo, windowInfo); @@ -90,7 +90,7 @@ public void shouldImplementToString() { @Test public void shouldGetFormat() { // Given: - final FormatInfo format = FormatInfo.of(DELIMITED, Optional.empty()); + final FormatInfo format = FormatInfo.of(DELIMITED, Optional.empty(), Optional.empty()); final KeyFormat keyFormat = KeyFormat.nonWindowed(format); // When: @@ -103,7 +103,7 @@ public void shouldGetFormat() { @Test public void shouldGetFormatInfo() { // Given: - final FormatInfo format = FormatInfo.of(AVRO, Optional.of("something")); + final FormatInfo format = FormatInfo.of(AVRO, Optional.of("something"), Optional.empty()); final KeyFormat keyFormat = KeyFormat.nonWindowed(format); // When: @@ -116,7 +116,8 @@ public void shouldGetFormatInfo() { @Test public void shouldHandleNoneWindowedFunctionsForNonWindowed() { // Given: - final KeyFormat keyFormat = KeyFormat.nonWindowed(FormatInfo.of(JSON, Optional.empty())); + final KeyFormat keyFormat = KeyFormat.nonWindowed(FormatInfo.of(JSON, Optional.empty(), + Optional.empty())); // Then: assertThat(keyFormat.isWindowed(), is(false)); @@ -142,12 +143,13 @@ public void shouldHandleWindowedFunctionsForWindowed() { public void shouldHandleWindowedWithAvroSchemaName() { // Given: final KeyFormat keyFormat = KeyFormat.windowed( - FormatInfo.of(AVRO, Optional.of("something")), + FormatInfo.of(AVRO, Optional.of("something"), Optional.empty()), WindowInfo.of(HOPPING, Optional.of(Duration.ofMinutes(4))) ); // Then: - assertThat(keyFormat.getFormatInfo(), is(FormatInfo.of(AVRO, Optional.of("something")))); + assertThat(keyFormat.getFormatInfo(), is(FormatInfo.of(AVRO, Optional.of("something"), + Optional.empty()))); } @Test diff --git a/ksql-common/src/test/java/io/confluent/ksql/serde/ValueFormatTest.java b/ksql-common/src/test/java/io/confluent/ksql/serde/ValueFormatTest.java index 2b582b17f6cd..3e78a9f1ff2f 100644 --- a/ksql-common/src/test/java/io/confluent/ksql/serde/ValueFormatTest.java +++ b/ksql-common/src/test/java/io/confluent/ksql/serde/ValueFormatTest.java @@ -28,7 +28,12 @@ public class ValueFormatTest { - private static final FormatInfo FORMAT_INFO = FormatInfo.of(AVRO, Optional.of("something")); + private static final FormatInfo FORMAT_INFO = + FormatInfo.of( + AVRO, + Optional.of("something"), + Optional.empty() + ); @Test public void shouldThrowNPEs() { @@ -44,7 +49,7 @@ public void shouldImplementEquals() { ValueFormat.of(FORMAT_INFO) ) .addEqualityGroup( - ValueFormat.of(FormatInfo.of(JSON, Optional.empty())) + ValueFormat.of(FormatInfo.of(JSON, Optional.empty(), Optional.empty())) ) .testEquals(); } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java index 1f9f29f84593..11e7ef26fc52 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java @@ -53,6 +53,7 @@ import io.confluent.ksql.schema.ksql.ColumnRef; import io.confluent.ksql.schema.ksql.FormatOptions; import io.confluent.ksql.schema.ksql.LogicalSchema; +import io.confluent.ksql.serde.Delimiter; import io.confluent.ksql.serde.Format; import io.confluent.ksql.serde.FormatInfo; import io.confluent.ksql.serde.KeyFormat; @@ -186,7 +187,8 @@ private void analyzeNonStdOutSink(final Sink sink) { final ValueFormat valueFormat = ValueFormat.of(FormatInfo.of( getValueFormat(sink), - sink.getProperties().getValueAvroSchemaName() + sink.getProperties().getValueAvroSchemaName(), + getValueDelimiter(sink) )); final KsqlTopic intoKsqlTopic = new KsqlTopic( @@ -285,6 +287,20 @@ private Format getValueFormat(final Sink sink) { .getFormat()); } + private Optional getValueDelimiter(final Sink sink) { + if (sink.getProperties().getValueDelimiter().isPresent()) { + return sink.getProperties().getValueDelimiter(); + } + return analysis + .getFromDataSources() + .get(0) + .getDataSource() + .getKsqlTopic() + .getValueFormat() + .getFormatInfo() + .getDelimiter(); + } + @Override protected AstNode visitQuery( diff --git a/ksql-engine/src/main/java/io/confluent/ksql/topic/TopicFactory.java b/ksql-engine/src/main/java/io/confluent/ksql/topic/TopicFactory.java index d23f67ab05b4..fd4d81614ee0 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/topic/TopicFactory.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/topic/TopicFactory.java @@ -41,11 +41,12 @@ public static KsqlTopic create(final CreateSourceProperties properties) { .map(type -> KeyFormat .windowed(FormatInfo.of(Format.KAFKA), WindowInfo.of(type, windowSize))) .orElseGet(() -> KeyFormat - .nonWindowed(FormatInfo.of(Format.KAFKA, Optional.empty()))); + .nonWindowed(FormatInfo.of(Format.KAFKA))); final ValueFormat valueFormat = ValueFormat.of(FormatInfo.of( properties.getValueFormat(), - properties.getValueAvroSchemaName() + properties.getValueAvroSchemaName(), + properties.getValueDelimiter() )); return new KsqlTopic( diff --git a/ksql-engine/src/test/java/io/confluent/ksql/analyzer/AnalyzerFunctionalTest.java b/ksql-engine/src/test/java/io/confluent/ksql/analyzer/AnalyzerFunctionalTest.java index 531d8e50a47a..3c1c9cacd0dc 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/analyzer/AnalyzerFunctionalTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/analyzer/AnalyzerFunctionalTest.java @@ -253,7 +253,8 @@ public void shouldUseExplicitNamespaceForAvroSchema() { assertThat(analysis.getInto(), is(not(Optional.empty()))); assertThat(analysis.getInto().get().getKsqlTopic().getValueFormat(), - is(ValueFormat.of(FormatInfo.of(Format.AVRO, Optional.of("com.custom.schema"))))); + is(ValueFormat.of(FormatInfo.of(Format.AVRO, Optional.of("com.custom.schema"), + Optional.empty())))); } @Test @@ -284,7 +285,7 @@ public void shouldUseExplicitNamespaceWhenFormatIsInheritedForAvro() { assertThat(analysis.getInto(), is(not(Optional.empty()))); assertThat(analysis.getInto().get().getKsqlTopic().getValueFormat(), - is(ValueFormat.of(FormatInfo.of(Format.AVRO, Optional.of("org.ac.s1"))))); + is(ValueFormat.of(FormatInfo.of(Format.AVRO, Optional.of("org.ac.s1"), Optional.empty())))); } @Test @@ -296,7 +297,7 @@ public void shouldNotInheritNamespaceExplicitlySetUpstreamForAvro() { final KsqlTopic ksqlTopic = new KsqlTopic( "s0", KeyFormat.nonWindowed(FormatInfo.of(Format.KAFKA)), - ValueFormat.of(FormatInfo.of(Format.AVRO, Optional.of("org.ac.s1"))), + ValueFormat.of(FormatInfo.of(Format.AVRO, Optional.of("org.ac.s1"), Optional.empty())), false); final LogicalSchema schema = LogicalSchema.builder() @@ -370,7 +371,7 @@ public void shouldFailIfExplicitNamespaceIsProvidedButEmpty() { final Analyzer analyzer = new Analyzer(jsonMetaStore, "", DEFAULT_SERDE_OPTIONS); expectedException.expect(KsqlException.class); - expectedException.expectMessage("Schema name can not be empty"); + expectedException.expectMessage("Schema name cannot be empty"); analyzer.analyze(query, Optional.of(createStreamAsSelect.getSink())); } diff --git a/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CommandFactoriesTest.java b/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CommandFactoriesTest.java index 2e953f90fac3..d1829686a8d2 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CommandFactoriesTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CommandFactoriesTest.java @@ -669,7 +669,7 @@ public void shouldCreateSerdeToValidateValueFormatCanHandleValueSchemaForStream( // Then: verify(serdeFactory).create( - FormatInfo.of(JSON, Optional.empty()), + FormatInfo.of(JSON, Optional.empty(), Optional.empty()), PersistenceSchema.from(schema.valueConnectSchema(), false), ksqlConfig, serviceContext.getSchemaRegistryClientFactory(), @@ -712,7 +712,7 @@ public void shouldHandleValueAvroSchemaNameForStream() { // Then: assertThat(cmd.getTopic().getValueFormat(), - is(ValueFormat.of(FormatInfo.of(AVRO, Optional.of("full.schema.name"))))); + is(ValueFormat.of(FormatInfo.of(AVRO, Optional.of("full.schema.name"), Optional.empty())))); } @Test diff --git a/ksql-engine/src/test/java/io/confluent/ksql/engine/InsertValuesExecutorTest.java b/ksql-engine/src/test/java/io/confluent/ksql/engine/InsertValuesExecutorTest.java index 6e2e9b1f167b..c8a08fc8a97c 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/engine/InsertValuesExecutorTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/engine/InsertValuesExecutorTest.java @@ -655,7 +655,7 @@ public void shouldBuildCorrectSerde() { // Then: verify(keySerdeFactory).create( - FormatInfo.of(Format.KAFKA, Optional.empty()), + FormatInfo.of(Format.KAFKA, Optional.empty(), Optional.empty()), PersistenceSchema.from(SCHEMA.keyConnectSchema(), false), new KsqlConfig(ImmutableMap.of()), srClientFactory, @@ -664,7 +664,7 @@ public void shouldBuildCorrectSerde() { ); verify(valueSerdeFactory).create( - FormatInfo.of(Format.JSON, Optional.empty()), + FormatInfo.of(Format.JSON, Optional.empty(), Optional.empty()), PersistenceSchema.from(SCHEMA.valueConnectSchema(), false), new KsqlConfig(ImmutableMap.of()), srClientFactory, diff --git a/ksql-engine/src/test/java/io/confluent/ksql/integration/IntegrationTestHarness.java b/ksql-engine/src/test/java/io/confluent/ksql/integration/IntegrationTestHarness.java index 58b9f0103ffd..687ff76e0c6a 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/integration/IntegrationTestHarness.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/integration/IntegrationTestHarness.java @@ -528,7 +528,7 @@ private Serializer getSerializer( final PhysicalSchema schema ) { return GenericRowSerDe.from( - FormatInfo.of(format, Optional.empty()), + FormatInfo.of(format, Optional.empty(), Optional.empty()), schema.valueSchema(), new KsqlConfig(Collections.emptyMap()), serviceContext.get().getSchemaRegistryClientFactory(), @@ -542,7 +542,7 @@ private Deserializer getDeserializer( final PhysicalSchema schema ) { return GenericRowSerDe.from( - FormatInfo.of(format, Optional.empty()), + FormatInfo.of(format, Optional.empty(), Optional.empty()), schema.valueSchema(), new KsqlConfig(Collections.emptyMap()), serviceContext.get().getSchemaRegistryClientFactory(), diff --git a/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/KsqlStructuredDataOutputNodeTest.java b/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/KsqlStructuredDataOutputNodeTest.java index 3533766ee9bb..095cb1686094 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/KsqlStructuredDataOutputNodeTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/KsqlStructuredDataOutputNodeTest.java @@ -306,7 +306,8 @@ public void shouldBuildOutputNodeForInsertIntoAvroFromNonAvro() { // Given: givenInsertIntoNode(); - final ValueFormat valueFormat = ValueFormat.of(FormatInfo.of(Format.AVRO, Optional.of("name"))); + final ValueFormat valueFormat = ValueFormat.of(FormatInfo.of(Format.AVRO, Optional.of("name"), + Optional.empty())); when(ksqlTopic.getValueFormat()).thenReturn(valueFormat); diff --git a/ksql-engine/src/test/java/io/confluent/ksql/topic/TopicDeleteInjectorTest.java b/ksql-engine/src/test/java/io/confluent/ksql/topic/TopicDeleteInjectorTest.java index c06027bac138..8c06d3d81bf4 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/topic/TopicDeleteInjectorTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/topic/TopicDeleteInjectorTest.java @@ -245,7 +245,7 @@ public void shouldThrowIfTopicDoesNotExist() { public void shouldNotThrowIfSchemaIsMissing() throws IOException, RestClientException { // Given: when(topic.getValueFormat()) - .thenReturn(ValueFormat.of(FormatInfo.of(Format.AVRO, Optional.of("foo")))); + .thenReturn(ValueFormat.of(FormatInfo.of(Format.AVRO, Optional.of("foo"), Optional.empty()))); doThrow(new RestClientException("Subject not found.", 404, 40401)) .when(registryClient).deleteSubject("something" + KsqlConstants.SCHEMA_REGISTRY_VALUE_SUFFIX); diff --git a/ksql-engine/src/test/java/io/confluent/ksql/util/TopicConsumer.java b/ksql-engine/src/test/java/io/confluent/ksql/util/TopicConsumer.java index 7d3fa8d29471..0003b476e001 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/util/TopicConsumer.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/util/TopicConsumer.java @@ -97,7 +97,7 @@ public Map readResults( final Deserializer keyDeserializer ) { final Deserializer deserializer = GenericRowSerDe.from( - FormatInfo.of(Format.JSON, Optional.empty()), + FormatInfo.of(Format.JSON, Optional.empty(), Optional.empty()), schema.valueSchema(), new KsqlConfig(ImmutableMap.of()), () -> null, diff --git a/ksql-engine/src/test/java/io/confluent/ksql/util/TopicProducer.java b/ksql-engine/src/test/java/io/confluent/ksql/util/TopicProducer.java index 9c5603bdcda6..9ac711029611 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/util/TopicProducer.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/util/TopicProducer.java @@ -68,7 +68,7 @@ public Map produceInputData( ) throws InterruptedException, TimeoutException, ExecutionException { final Serializer serializer = GenericRowSerDe.from( - FormatInfo.of(Format.JSON, Optional.empty()), + FormatInfo.of(Format.JSON, Optional.empty(), Optional.empty()), schema.valueSchema(), new KsqlConfig(ImmutableMap.of()), () -> null, diff --git a/ksql-examples/src/main/java/io/confluent/ksql/datagen/DataGen.java b/ksql-examples/src/main/java/io/confluent/ksql/datagen/DataGen.java index cc822e556342..563b213c8263 100644 --- a/ksql-examples/src/main/java/io/confluent/ksql/datagen/DataGen.java +++ b/ksql-examples/src/main/java/io/confluent/ksql/datagen/DataGen.java @@ -166,6 +166,7 @@ static class Arguments { private final Supplier schemaFile; private final Format keyFormat; private final Format valueFormat; + private final Character valueDelimiter; private final String topicName; private final String keyName; private final int iterations; @@ -183,6 +184,7 @@ static class Arguments { final Supplier schemaFile, final Format keyFormat, final Format valueFormat, + final Character valueDelimiter, final String topicName, final String keyName, final int iterations, @@ -199,6 +201,7 @@ static class Arguments { this.schemaFile = schemaFile; this.keyFormat = keyFormat; this.valueFormat = valueFormat; + this.valueDelimiter = valueDelimiter; this.topicName = topicName; this.keyName = keyName; this.iterations = iterations; @@ -228,6 +231,8 @@ private static final class Builder { .put("value-format", (builder, arg) -> builder.valueFormat = parseFormat(arg)) // "format" is maintained for backwards compatibility, but should be removed later. .put("format", (builder, argVal) -> builder.valueFormat = parseFormat(argVal)) + .put("value_delimiter", + (builder, argVal) -> builder.valueDelimiter = parseValueDelimiter(argVal)) .put("topic", (builder, argVal) -> builder.topicName = argVal) .put("key", (builder, argVal) -> builder.keyName = argVal) .put("iterations", (builder, argVal) -> builder.iterations = parseInt(argVal, 1)) @@ -248,6 +253,7 @@ private static final class Builder { private Supplier schemaFile; private Format keyFormat; private Format valueFormat; + private char valueDelimiter; private String topicName; private String keyName; private int iterations; @@ -265,6 +271,7 @@ private Builder() { schemaFile = null; keyFormat = Format.KAFKA; valueFormat = null; + valueDelimiter = ','; topicName = null; keyName = null; iterations = -1; @@ -327,6 +334,7 @@ Arguments build() { null, null, null, + null, 0, -1, null, @@ -360,6 +368,7 @@ Arguments build() { schemaFile, keyFormat, valueFormat, + valueDelimiter, topicName, keyName, iterations, @@ -463,6 +472,22 @@ private static Format parseFormat(final String formatString) { } } + private static Character parseValueDelimiter(final String valueDelimiterString) { + if (valueDelimiterString == null) { + return null; + } else { + if (!(valueDelimiterString.length() == 1 || valueDelimiterString.equals("TAB") + || valueDelimiterString.equals("SPACE"))) { + throw new ArgumentParseException(String.format( + "Invalid value_delimiter; was expecting a single character, 'TAB', or " + + "'SPACE', got '%s'", + valueDelimiterString + )); + } + return valueDelimiterString.charAt(0); + } + } + private static int parseNumThreads(final String numThreadsString) { try { final int result = Integer.valueOf(numThreadsString, 10); diff --git a/ksql-examples/src/test/java/io/confluent/ksql/datagen/DataGenTest.java b/ksql-examples/src/test/java/io/confluent/ksql/datagen/DataGenTest.java index 56709d1e0b67..0d4cb911e550 100644 --- a/ksql-examples/src/test/java/io/confluent/ksql/datagen/DataGenTest.java +++ b/ksql-examples/src/test/java/io/confluent/ksql/datagen/DataGenTest.java @@ -75,6 +75,7 @@ public void shouldPassSchemaRegistryUrl() throws Exception { null, null, null, + null, "topic", "key", 0, @@ -89,4 +90,14 @@ public void shouldPassSchemaRegistryUrl() throws Exception { final Properties props = DataGen.getProperties(args); assertThat(props.getProperty(KsqlConfig.SCHEMA_REGISTRY_URL_PROPERTY), equalTo("srUrl")); } + + @Test(expected = DataGen.Arguments.ArgumentParseException.class) + public void valueDelimiterCanOnlyBeSingleCharacter() throws Throwable { + DataGen.run( + "schema=./src/main/resources/purchase.avro", + "key=id", + "format=delimited", + "value_delimiter=@@", + "topic=foo"); + } } \ No newline at end of file diff --git a/ksql-execution/src/test/java/io/confluent/ksql/execution/builder/KsqlQueryBuilderTest.java b/ksql-execution/src/test/java/io/confluent/ksql/execution/builder/KsqlQueryBuilderTest.java index 87f8f67e8fb7..69feb6baac4f 100644 --- a/ksql-execution/src/test/java/io/confluent/ksql/execution/builder/KsqlQueryBuilderTest.java +++ b/ksql-execution/src/test/java/io/confluent/ksql/execution/builder/KsqlQueryBuilderTest.java @@ -72,7 +72,7 @@ public class KsqlQueryBuilderTest { private static final QueryId QUERY_ID = new QueryId("fred"); private static final FormatInfo FORMAT_INFO = FormatInfo - .of(Format.AVRO, Optional.of("io.confluent.ksql")); + .of(Format.AVRO, Optional.of("io.confluent.ksql"), Optional.empty()); private static final WindowInfo WINDOW_INFO = WindowInfo .of(WindowType.TUMBLING, Optional.of(Duration.ofMillis(1000))); diff --git a/ksql-functional-tests/src/main/java/io/confluent/ksql/test/tools/TestCaseBuilderUtil.java b/ksql-functional-tests/src/main/java/io/confluent/ksql/test/tools/TestCaseBuilderUtil.java index 9c1bf6d2ddf6..9f853c913edf 100644 --- a/ksql-functional-tests/src/main/java/io/confluent/ksql/test/tools/TestCaseBuilderUtil.java +++ b/ksql-functional-tests/src/main/java/io/confluent/ksql/test/tools/TestCaseBuilderUtil.java @@ -137,7 +137,7 @@ public static Map getTopicsByName( .map(recordNode -> new Topic( recordNode.topicName(), Optional.empty(), - getKeySedeSupplier(recordNode.getWindow()), + getKeySerdeSupplier(recordNode.getWindow()), defaultValueSerdeSupplier, 4, 1, @@ -222,15 +222,18 @@ private static Topic createTopicFromStatement( } } - private static SerdeSupplier getKeySedeSupplier(final Optional windowDataInfo) { + private static SerdeSupplier getKeySerdeSupplier(final Optional windowDataInfo) { if (windowDataInfo.isPresent()) { final WindowData windowData = windowDataInfo.get(); final WindowType windowType = WindowType.of((windowData.type == Type.SESSION) ? WindowType.SESSION.name() : WindowType.TUMBLING.name()); final KeyFormat windowKeyFormat = KeyFormat.windowed( - Format.KAFKA, - Optional.empty(), + FormatInfo.of( + Format.KAFKA, + Optional.empty(), + Optional.empty() + ), WindowInfo.of( windowType, windowType == WindowType.SESSION diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/delimited.json b/ksql-functional-tests/src/test/resources/query-validation-tests/delimited.json new file mode 100644 index 000000000000..06571d645a4a --- /dev/null +++ b/ksql-functional-tests/src/test/resources/query-validation-tests/delimited.json @@ -0,0 +1,174 @@ +{ + "comments": [ + "When using value_format DELIMITED, we can define VALUE_DELIMITER as custom character." + ], + "tests": [ + { + "name": "select delimited value_format", + "statements": [ + "CREATE STREAM TEST (ID bigint, NAME varchar, VALUE integer) WITH (kafka_topic='test_topic', value_format='DELIMITED');", + "CREATE STREAM S2 as SELECT id, name, value FROM test;" + ], + "inputs": [ + {"topic": "test_topic", "key": 0, "value": "0,zero,0", "timestamp": 0}, + {"topic": "test_topic", "key": 0, "value": "0,100,100", "timestamp": 0}, + {"topic": "test_topic", "key": 100, "value": "100,100,500", "timestamp": 0}, + {"topic": "test_topic", "key": 100, "value": "100,100,100", "timestamp": 0} + ], + "outputs": [ + {"topic": "S2", "key": 0, "value": "0,zero,0", "timestamp": 0}, + {"topic": "S2", "key": 0, "value": "0,100,100", "timestamp": 0}, + {"topic": "S2", "key": 100, "value": "100,100,500", "timestamp": 0}, + {"topic": "S2", "key": 100, "value": "100,100,100", "timestamp": 0} + ] + }, + { + "name": "validate value_delimiter to be single character", + "statements": [ + "CREATE STREAM TEST WITH (kafka_topic='test_topic', value_format='DELIMITED', value_delimiter='<~>');" + ], + "expectedException": { + "type": "io.confluent.ksql.util.KsqlStatementException", + "message": "Failed to prepare statement: Configuration VALUE_DELIMITER is invalid: Invalid delimiter value: '<~>'. Delimiter must be a single character or TAB,SPACE\nExample valid value: ';'" + }, + "inputs": [], + "outputs": [] + }, + { + "name": "validate delimeter is not empty", + "statements": [ + "CREATE STREAM TEST WITH (kafka_topic='test_topic', value_format='DELIMITED', value_delimiter='');" + ], + "topics": [ + { + "name": "test_topic", + "format": "DELIMITED" + } + ], + "expectedException": { + "type": "io.confluent.ksql.util.KsqlStatementException", + "message": "Failed to prepare statement: Configuration VALUE_DELIMITER is invalid: Delimiter cannot be empty, if you meant to have a tab or space for delimiter, please use the special values 'TAB' or 'SPACE'\nExample valid value: ';'" + }, + "inputs": [], + "outputs": [] + }, + { + "name": "validate delimeter is not a space", + "statements": [ + "CREATE STREAM TEST WITH (kafka_topic='test_topic', value_format='DELIMITED', value_delimiter=' ');" + ], + "expectedException": { + "type": "io.confluent.ksql.util.KsqlStatementException", + "message": "Failed to prepare statement: Configuration VALUE_DELIMITER is invalid: Delimiter cannot be empty, if you meant to have a tab or space for delimiter, please use the special values 'TAB' or 'SPACE'\nExample valid value: ';'" + }, + "inputs": [], + "outputs": [] + }, + { + "name": "validate delimeter is not a tab character", + "statements": [ + "CREATE STREAM TEST WITH (kafka_topic='test_topic', value_format='DELIMITED', value_delimiter='\t');" + ], + "expectedException": { + "type": "io.confluent.ksql.util.KsqlStatementException", + "message": "Failed to prepare statement: Configuration VALUE_DELIMITER is invalid: Delimiter cannot be empty, if you meant to have a tab or space for delimiter, please use the special values 'TAB' or 'SPACE'\nExample valid value: ';'" + }, + "inputs": [], + "outputs": [] + }, + { + "name": "select delimited value_format with pipe separated values - should take source delimiter for sink", + "statements": [ + "CREATE STREAM TEST (ID bigint, NAME varchar, VALUE integer) WITH (kafka_topic='test_topic', value_format='DELIMITED', value_delimiter='|');", + "CREATE STREAM S2 as SELECT id, name, value FROM test;" + ], + "inputs": [ + {"topic": "test_topic", "key": 0, "value": "0|zero|0", "timestamp": 0}, + {"topic": "test_topic", "key": 0, "value": "0|100|100", "timestamp": 0}, + {"topic": "test_topic", "key": 100, "value": "100|100|500", "timestamp": 0}, + {"topic": "test_topic", "key": 100, "value": "100|100|100", "timestamp": 0} + ], + "outputs": [ + {"topic": "S2", "key": 0, "value": "0|zero|0", "timestamp": 0}, + {"topic": "S2", "key": 0, "value": "0|100|100", "timestamp": 0}, + {"topic": "S2", "key": 100, "value": "100|100|500", "timestamp": 0}, + {"topic": "S2", "key": 100, "value": "100|100|100", "timestamp": 0} + ] + }, + { + "name": "select delimited value_format with $ separated values using custom delimiter character", + "statements": [ + "CREATE STREAM TEST (ID bigint, NAME varchar, VALUE integer) WITH (kafka_topic='test_topic', value_format='DELIMITED', value_delimiter='|');", + "CREATE STREAM S2 WITH(value_delimiter='$') AS SELECT * FROM test;" + ], + "inputs": [ + {"topic": "test_topic", "key": 0, "value": "0|zero|0", "timestamp": 0}, + {"topic": "test_topic", "key": 0, "value": "0|100|100", "timestamp": 0}, + {"topic": "test_topic", "key": 100, "value": "100|100|500", "timestamp": 0}, + {"topic": "test_topic", "key": 100, "value": "100|100|100", "timestamp": 0} + ], + "outputs": [ + {"topic": "S2", "key": 0, "value": "0$zero$0", "timestamp": 0}, + {"topic": "S2", "key": 0, "value": "0$100$100", "timestamp": 0}, + {"topic": "S2", "key": 100, "value": "100$100$500", "timestamp": 0}, + {"topic": "S2", "key": 100, "value": "100$100$100", "timestamp": 0} + ] + }, + { + "name": "select delimited value_format with SPACE separated values using custom delimiter character", + "statements": [ + "CREATE STREAM TEST (ID bigint, NAME varchar, VALUE integer) WITH (kafka_topic='test_topic', value_format='DELIMITED', value_delimiter='SPACE');", + "CREATE STREAM S2 as SELECT id, name, value FROM test;" + ], + "inputs": [ + {"topic": "test_topic", "key": 0, "value": "0 zero 0", "timestamp": 0}, + {"topic": "test_topic", "key": 0, "value": "0 100 100", "timestamp": 0}, + {"topic": "test_topic", "key": 100, "value": "100 100 500", "timestamp": 0}, + {"topic": "test_topic", "key": 100, "value": "100 100 100", "timestamp": 0} + ], + "outputs": [ + {"topic": "S2", "key": 0, "value": "0 zero 0", "timestamp": 0}, + {"topic": "S2", "key": 0, "value": "0 100 100", "timestamp": 0}, + {"topic": "S2", "key": 100, "value": "100 100 500", "timestamp": 0}, + {"topic": "S2", "key": 100, "value": "100 100 100", "timestamp": 0} + ] + }, + { + "name": "select delimited value_format with TAB separated values using custom delimiter character", + "statements": [ + "CREATE STREAM TEST (ID bigint, NAME varchar, VALUE integer) WITH (kafka_topic='test_topic', value_format='DELIMITED', value_delimiter='TAB');", + "CREATE STREAM S2 as SELECT id, name, value FROM test;" + ], + "inputs": [ + {"topic": "test_topic", "key": 0, "value": "0\tzero\t0", "timestamp": 0}, + {"topic": "test_topic", "key": 0, "value": "0\t100\t100", "timestamp": 0}, + {"topic": "test_topic", "key": 100, "value": "100\t100\t500", "timestamp": 0}, + {"topic": "test_topic", "key": 100, "value": "100\t100\t100", "timestamp": 0} + ], + "outputs": [ + {"topic": "S2", "key": 0, "value": "0\tzero\t0", "timestamp": 0}, + {"topic": "S2", "key": 0, "value": "0\t100\t100", "timestamp": 0}, + {"topic": "S2", "key": 100, "value": "100\t100\t500", "timestamp": 0}, + {"topic": "S2", "key": 100, "value": "100\t100\t100", "timestamp": 0} + ] + }, + { + "name": "validate cannot specify delimeter with json format", + "statements": [ + "CREATE STREAM TEST WITH (kafka_topic='test_topic', value_format='JSON', value_delimiter='|');" + ], + "topics": [ + { + "name": "test_topic", + "format": "JSON" + } + ], + "expectedException": { + "type": "io.confluent.ksql.util.KsqlStatementException", + "message": "Delimeter only supported with DELIMITED format" + }, + "inputs": [], + "outputs": [] + } + ] +} \ No newline at end of file diff --git a/ksql-parser/src/main/java/io/confluent/ksql/parser/properties/with/CreateSourceAsProperties.java b/ksql-parser/src/main/java/io/confluent/ksql/parser/properties/with/CreateSourceAsProperties.java index 293d643f4b20..2691d4a79b16 100644 --- a/ksql-parser/src/main/java/io/confluent/ksql/parser/properties/with/CreateSourceAsProperties.java +++ b/ksql-parser/src/main/java/io/confluent/ksql/parser/properties/with/CreateSourceAsProperties.java @@ -22,6 +22,7 @@ import io.confluent.ksql.execution.expression.tree.StringLiteral; import io.confluent.ksql.properties.with.CommonCreateConfigs; import io.confluent.ksql.properties.with.CreateAsConfigs; +import io.confluent.ksql.serde.Delimiter; import io.confluent.ksql.serde.Format; import io.confluent.ksql.util.KsqlException; import java.util.Map; @@ -95,6 +96,11 @@ public Optional getWrapSingleValues() { return Optional.ofNullable(props.getBoolean(CommonCreateConfigs.WRAP_SINGLE_VALUE)); } + public Optional getValueDelimiter() { + final String val = props.getString(CommonCreateConfigs.VALUE_DELIMITER_PROPERTY); + return val == null ? Optional.empty() : Optional.of(Delimiter.of(val)); + } + public CreateSourceAsProperties withTopic( final String name, final int partitions, diff --git a/ksql-parser/src/main/java/io/confluent/ksql/parser/properties/with/CreateSourceProperties.java b/ksql-parser/src/main/java/io/confluent/ksql/parser/properties/with/CreateSourceProperties.java index 70400d74b79a..d074f62cd0fb 100644 --- a/ksql-parser/src/main/java/io/confluent/ksql/parser/properties/with/CreateSourceProperties.java +++ b/ksql-parser/src/main/java/io/confluent/ksql/parser/properties/with/CreateSourceProperties.java @@ -23,6 +23,7 @@ import io.confluent.ksql.parser.DurationParser; import io.confluent.ksql.properties.with.CommonCreateConfigs; import io.confluent.ksql.properties.with.CreateConfigs; +import io.confluent.ksql.serde.Delimiter; import io.confluent.ksql.serde.Format; import io.confluent.ksql.util.KsqlException; import java.time.Duration; @@ -130,6 +131,11 @@ public Optional getWrapSingleValues() { return Optional.ofNullable(props.getBoolean(CommonCreateConfigs.WRAP_SINGLE_VALUE)); } + public Optional getValueDelimiter() { + final String val = props.getString(CommonCreateConfigs.VALUE_DELIMITER_PROPERTY); + return val == null ? Optional.empty() : Optional.of(Delimiter.of(val)); + } + public CreateSourceProperties withSchemaId(final int id) { final Map originals = props.copyOfOriginalLiterals(); originals.put(CreateConfigs.AVRO_SCHEMA_ID, new IntegerLiteral(id)); diff --git a/ksql-parser/src/main/java/io/confluent/ksql/parser/properties/with/PropertiesConfig.java b/ksql-parser/src/main/java/io/confluent/ksql/parser/properties/with/PropertiesConfig.java index f016a9888b55..b18344e1701b 100644 --- a/ksql-parser/src/main/java/io/confluent/ksql/parser/properties/with/PropertiesConfig.java +++ b/ksql-parser/src/main/java/io/confluent/ksql/parser/properties/with/PropertiesConfig.java @@ -117,4 +117,5 @@ private static void throwOnUnknownProperty( + String.join(",", onlyInProvided)); } } + } diff --git a/ksql-serde/src/main/java/io/confluent/ksql/serde/KsqlSerdeFactories.java b/ksql-serde/src/main/java/io/confluent/ksql/serde/KsqlSerdeFactories.java index 448243c067c3..624b461d7c27 100644 --- a/ksql-serde/src/main/java/io/confluent/ksql/serde/KsqlSerdeFactories.java +++ b/ksql-serde/src/main/java/io/confluent/ksql/serde/KsqlSerdeFactories.java @@ -71,7 +71,7 @@ static KsqlSerdeFactory create(final FormatInfo format) { return new KsqlJsonSerdeFactory(); case DELIMITED: - return new KsqlDelimitedSerdeFactory(); + return new KsqlDelimitedSerdeFactory(format.getDelimiter()); case KAFKA: return new KafkaSerdeFactory(); diff --git a/ksql-serde/src/main/java/io/confluent/ksql/serde/delimited/KsqlDelimitedDeserializer.java b/ksql-serde/src/main/java/io/confluent/ksql/serde/delimited/KsqlDelimitedDeserializer.java index fe18293111aa..011d73cd1659 100644 --- a/ksql-serde/src/main/java/io/confluent/ksql/serde/delimited/KsqlDelimitedDeserializer.java +++ b/ksql-serde/src/main/java/io/confluent/ksql/serde/delimited/KsqlDelimitedDeserializer.java @@ -48,13 +48,15 @@ public class KsqlDelimitedDeserializer implements Deserializer { ); private final ConnectSchema schema; + private final CSVFormat csvFormat; KsqlDelimitedDeserializer( - final PersistenceSchema schema + final PersistenceSchema schema, + final CSVFormat csvFormat ) { this.schema = Objects.requireNonNull(schema, "schema").serializedSchema(); - throwOnUnsupported(this.schema); + this.csvFormat = Objects.requireNonNull(csvFormat, "csvFormat"); } @Override @@ -69,7 +71,7 @@ public Struct deserialize(final String topic, final byte[] bytes) { try { final String recordCsvString = new String(bytes, StandardCharsets.UTF_8); - final List csvRecords = CSVParser.parse(recordCsvString, CSVFormat.DEFAULT) + final List csvRecords = CSVParser.parse(recordCsvString, csvFormat) .getRecords(); if (csvRecords.isEmpty()) { diff --git a/ksql-serde/src/main/java/io/confluent/ksql/serde/delimited/KsqlDelimitedSerdeFactory.java b/ksql-serde/src/main/java/io/confluent/ksql/serde/delimited/KsqlDelimitedSerdeFactory.java index 2e72cc84b495..5310081e129b 100644 --- a/ksql-serde/src/main/java/io/confluent/ksql/serde/delimited/KsqlDelimitedSerdeFactory.java +++ b/ksql-serde/src/main/java/io/confluent/ksql/serde/delimited/KsqlDelimitedSerdeFactory.java @@ -19,12 +19,15 @@ import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.ksql.schema.connect.SchemaWalker; import io.confluent.ksql.schema.ksql.PersistenceSchema; +import io.confluent.ksql.serde.Delimiter; import io.confluent.ksql.serde.Format; import io.confluent.ksql.serde.KsqlSerdeFactory; import io.confluent.ksql.util.DecimalUtil; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; +import java.util.Optional; import java.util.function.Supplier; +import org.apache.commons.csv.CSVFormat; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.connect.data.ConnectSchema; @@ -35,6 +38,15 @@ @Immutable public class KsqlDelimitedSerdeFactory implements KsqlSerdeFactory { + private static final Delimiter DEFAULT_DELIMITER = Delimiter.of(','); + + private final CSVFormat csvFormat; + + public KsqlDelimitedSerdeFactory(final Optional delimiter) { + this.csvFormat = + CSVFormat.DEFAULT.withDelimiter(delimiter.orElse(DEFAULT_DELIMITER).getDelimiter()); + } + @Override public void validate(final PersistenceSchema schema) { final ConnectSchema connectSchema = schema.serializedSchema(); @@ -54,8 +66,8 @@ public Serde createSerde( validate(schema); return Serdes.serdeFrom( - new KsqlDelimitedSerializer(), - new KsqlDelimitedDeserializer(schema) + new KsqlDelimitedSerializer(csvFormat), + new KsqlDelimitedDeserializer(schema, csvFormat) ); } diff --git a/ksql-serde/src/main/java/io/confluent/ksql/serde/delimited/KsqlDelimitedSerializer.java b/ksql-serde/src/main/java/io/confluent/ksql/serde/delimited/KsqlDelimitedSerializer.java index cd5506af144e..d38040a43936 100644 --- a/ksql-serde/src/main/java/io/confluent/ksql/serde/delimited/KsqlDelimitedSerializer.java +++ b/ksql-serde/src/main/java/io/confluent/ksql/serde/delimited/KsqlDelimitedSerializer.java @@ -34,6 +34,12 @@ public class KsqlDelimitedSerializer implements Serializer { + private final CSVFormat csvFormat; + + public KsqlDelimitedSerializer(final CSVFormat csvFormat) { + this.csvFormat = csvFormat; + } + @Override public void configure(final Map map, final boolean b) { } @@ -50,7 +56,7 @@ public byte[] serialize(final String topic, final Object data) { } final StringWriter stringWriter = new StringWriter(); - final CSVPrinter csvPrinter = new CSVPrinter(stringWriter, CSVFormat.DEFAULT); + final CSVPrinter csvPrinter = new CSVPrinter(stringWriter, csvFormat); csvPrinter.printRecord(() -> new FieldIterator((Struct)data)); final String result = stringWriter.toString(); return result.substring(0, result.length() - 2).getBytes(StandardCharsets.UTF_8); diff --git a/ksql-serde/src/test/java/io/confluent/ksql/serde/GenericRowSerDeTest.java b/ksql-serde/src/test/java/io/confluent/ksql/serde/GenericRowSerDeTest.java index 5662e1b96bd0..0875125a08cb 100644 --- a/ksql-serde/src/test/java/io/confluent/ksql/serde/GenericRowSerDeTest.java +++ b/ksql-serde/src/test/java/io/confluent/ksql/serde/GenericRowSerDeTest.java @@ -123,7 +123,7 @@ public void setUp() { public void shouldGetStructSerdeOnConstruction() { // When: valueSerde.create( - FormatInfo.of(Format.JSON, Optional.empty()), + FormatInfo.of(Format.JSON, Optional.empty(), Optional.empty()), MUTLI_FIELD_SCHEMA, ksqlConfig, srClientFactory, @@ -133,7 +133,7 @@ public void shouldGetStructSerdeOnConstruction() { // Then: verify(serdesFactories).create( - FormatInfo.of(Format.JSON, Optional.empty()), + FormatInfo.of(Format.JSON, Optional.empty(), Optional.empty()), MUTLI_FIELD_SCHEMA, ksqlConfig, srClientFactory, @@ -145,7 +145,7 @@ public void shouldGetStructSerdeOnConstruction() { public void shouldGetStringSerdeOnConstruction() { // When: valueSerde.create( - FormatInfo.of(Format.JSON, Optional.empty()), + FormatInfo.of(Format.JSON, Optional.empty(), Optional.empty()), UNWRAPPED_SINGLE_FIELD_SCHEMA, ksqlConfig, srClientFactory, @@ -155,7 +155,7 @@ public void shouldGetStringSerdeOnConstruction() { // Then: verify(serdesFactories).create( - FormatInfo.of(Format.JSON, Optional.empty()), + FormatInfo.of(Format.JSON, Optional.empty(), Optional.empty()), UNWRAPPED_SINGLE_FIELD_SCHEMA, ksqlConfig, srClientFactory, @@ -170,7 +170,7 @@ public void shouldThrowOnNullStructSerde() { // When: valueSerde.create( - FormatInfo.of(Format.JSON, Optional.empty()), + FormatInfo.of(Format.JSON, Optional.empty(), Optional.empty()), MUTLI_FIELD_SCHEMA, ksqlConfig, srClientFactory, @@ -183,7 +183,7 @@ public void shouldThrowOnNullStructSerde() { public void shouldThrowOnNullSchema() { // When: GenericRowSerDe.from( - FormatInfo.of(Format.JSON, Optional.empty()), + FormatInfo.of(Format.JSON, Optional.empty(), Optional.empty()), null, ksqlConfig, srClientFactory, @@ -196,7 +196,7 @@ public void shouldThrowOnNullSchema() { public void shouldCreateProcessingLoggerWithCorrectName() { // When: GenericRowSerDe.from( - FormatInfo.of(Format.JSON, Optional.empty()), + FormatInfo.of(Format.JSON, Optional.empty(), Optional.empty()), MUTLI_FIELD_SCHEMA, ksqlConfig, srClientFactory, @@ -494,7 +494,7 @@ public void shouldDeserializeNullUnwrappedSingleFieldGenericRow() { private Serde givenSerdeForSchema(final PersistenceSchema schema) { return valueSerde.create( - FormatInfo.of(Format.JSON, Optional.empty()), + FormatInfo.of(Format.JSON, Optional.empty(), Optional.empty()), schema, ksqlConfig, srClientFactory, diff --git a/ksql-serde/src/test/java/io/confluent/ksql/serde/KsqlSerdeFactoriesTest.java b/ksql-serde/src/test/java/io/confluent/ksql/serde/KsqlSerdeFactoriesTest.java index 0b66eb33dcc3..16cbae5c996f 100644 --- a/ksql-serde/src/test/java/io/confluent/ksql/serde/KsqlSerdeFactoriesTest.java +++ b/ksql-serde/src/test/java/io/confluent/ksql/serde/KsqlSerdeFactoriesTest.java @@ -136,7 +136,7 @@ public void shouldCreateSerde() { public void shouldHandleAvro() { // When: final KsqlSerdeFactory result = KsqlSerdeFactories - .create(FormatInfo.of(AVRO, Optional.empty())); + .create(FormatInfo.of(AVRO, Optional.empty(), Optional.empty())); // Then: assertThat(result, instanceOf(KsqlAvroSerdeFactory.class)); @@ -146,7 +146,7 @@ public void shouldHandleAvro() { public void shouldHandleJson() { // When: final KsqlSerdeFactory result = KsqlSerdeFactories - .create(FormatInfo.of(JSON, Optional.empty())); + .create(FormatInfo.of(JSON, Optional.empty(), Optional.empty())); // Then: assertThat(result, instanceOf(KsqlJsonSerdeFactory.class)); @@ -156,7 +156,7 @@ public void shouldHandleJson() { public void shouldHandleDelimited() { // When: final KsqlSerdeFactory result = KsqlSerdeFactories - .create(FormatInfo.of(DELIMITED, Optional.empty())); + .create(FormatInfo.of(DELIMITED, Optional.empty(), Optional.empty())); // Then: assertThat(result, instanceOf(KsqlDelimitedSerdeFactory.class)); @@ -166,7 +166,7 @@ public void shouldHandleDelimited() { public void shouldHandleKafka() { // When: final KsqlSerdeFactory result = KsqlSerdeFactories - .create(FormatInfo.of(KAFKA, Optional.empty())); + .create(FormatInfo.of(KAFKA, Optional.empty(), Optional.empty())); // Then: assertThat(result, instanceOf(KafkaSerdeFactory.class)); diff --git a/ksql-serde/src/test/java/io/confluent/ksql/serde/delimited/KsqlDelimitedDeserializerTest.java b/ksql-serde/src/test/java/io/confluent/ksql/serde/delimited/KsqlDelimitedDeserializerTest.java index 3c9b2b6f4bc4..e71fc30a12c8 100644 --- a/ksql-serde/src/test/java/io/confluent/ksql/serde/delimited/KsqlDelimitedDeserializerTest.java +++ b/ksql-serde/src/test/java/io/confluent/ksql/serde/delimited/KsqlDelimitedDeserializerTest.java @@ -26,6 +26,7 @@ import io.confluent.ksql.util.KsqlException; import java.math.BigDecimal; import java.nio.charset.StandardCharsets; +import org.apache.commons.csv.CSVFormat; import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.connect.data.ConnectSchema; import org.apache.kafka.connect.data.Schema; @@ -58,8 +59,8 @@ public class KsqlDelimitedDeserializerTest { private KsqlDelimitedDeserializer deserializer; @Before - public void before() { - deserializer = new KsqlDelimitedDeserializer(ORDER_SCHEMA); + public void setUp() { + deserializer = new KsqlDelimitedDeserializer(ORDER_SCHEMA, CSVFormat.DEFAULT); } @Test @@ -139,7 +140,7 @@ public void shouldThrowIfTopLevelNotStruct() { expectedException.expectMessage("DELIMITED expects all top level schemas to be STRUCTs"); // When: - new KsqlDelimitedDeserializer(schema); + new KsqlDelimitedDeserializer(schema, CSVFormat.DEFAULT.withDelimiter(',')); } @Test @@ -151,7 +152,8 @@ public void shouldDeserializedTopLevelPrimitiveTypeIfSchemaHasOnlySingleField() .build() ); - final KsqlDelimitedDeserializer deserializer = new KsqlDelimitedDeserializer(schema); + final KsqlDelimitedDeserializer deserializer = + createDeserializer(schema); final byte[] bytes = "10".getBytes(StandardCharsets.UTF_8); @@ -170,7 +172,8 @@ public void shouldDeserializeDecimal() { .field("cost", DecimalUtil.builder(4, 2)) .build() ); - final KsqlDelimitedDeserializer deserializer = new KsqlDelimitedDeserializer(schema); + final KsqlDelimitedDeserializer deserializer = + createDeserializer(schema); final byte[] bytes = "01.12".getBytes(StandardCharsets.UTF_8); @@ -189,7 +192,8 @@ public void shouldDeserializeDecimalWithoutLeadingZeros() { .field("cost", DecimalUtil.builder(4, 2)) .build() ); - final KsqlDelimitedDeserializer deserializer = new KsqlDelimitedDeserializer(schema); + final KsqlDelimitedDeserializer deserializer = + createDeserializer(schema); final byte[] bytes = "1.12".getBytes(StandardCharsets.UTF_8); @@ -200,6 +204,35 @@ public void shouldDeserializeDecimalWithoutLeadingZeros() { assertThat(result.get("cost"), is(new BigDecimal("01.12"))); } + @Test + public void shouldDeserializeDelimitedCorrectlyWithTabDelimiter() { + shouldDeserializeDelimitedCorrectlyWithNonDefaultDelimiter('\t'); + } + + @Test + public void shouldDeserializeDelimitedCorrectlyWithBarDelimiter() { + shouldDeserializeDelimitedCorrectlyWithNonDefaultDelimiter('|'); + } + + private void shouldDeserializeDelimitedCorrectlyWithNonDefaultDelimiter(char delimiter) { + // Given: + final byte[] bytes = "1511897796092\t1\titem_1\t10.0\t10.10\r\n".getBytes(StandardCharsets.UTF_8); + + final KsqlDelimitedDeserializer deserializer = + new KsqlDelimitedDeserializer(ORDER_SCHEMA, CSVFormat.DEFAULT.withDelimiter('\t')); + + // When: + final Struct struct = deserializer.deserialize("", bytes); + + // Then: + assertThat(struct.schema(), is(ORDER_SCHEMA.serializedSchema())); + assertThat(struct.get("ORDERTIME"), is(1511897796092L)); + assertThat(struct.get("ORDERID"), is(1L)); + assertThat(struct.get("ITEMID"), is("item_1")); + assertThat(struct.get("ORDERUNITS"), is(10.0)); + assertThat(struct.get("COST"), is(new BigDecimal("10.10"))); + } + @Test public void shouldThrowOnDeserializedTopLevelPrimitiveWhenSchemaHasMoreThanOneField() { // Given: @@ -210,7 +243,8 @@ public void shouldThrowOnDeserializedTopLevelPrimitiveWhenSchemaHasMoreThanOneFi .build() ); - final KsqlDelimitedDeserializer deserializer = new KsqlDelimitedDeserializer(schema); + final KsqlDelimitedDeserializer deserializer = + createDeserializer(schema); final byte[] bytes = "10".getBytes(StandardCharsets.UTF_8); @@ -241,7 +275,7 @@ public void shouldThrowOnArrayTypes() { expectedException.expectMessage("DELIMITED does not support type: ARRAY, field: ids"); // When: - new KsqlDelimitedDeserializer(schema); + createDeserializer(schema); } @Test @@ -261,7 +295,7 @@ public void shouldThrowOnMapTypes() { expectedException.expectMessage("DELIMITED does not support type: MAP, field: ids"); // When: - new KsqlDelimitedDeserializer(schema); + createDeserializer(schema); } @Test @@ -282,10 +316,17 @@ public void shouldThrowOnStructTypes() { expectedException.expectMessage("DELIMITED does not support type: STRUCT, field: ids"); // When: - new KsqlDelimitedDeserializer(schema); + createDeserializer(schema); } + private static PersistenceSchema persistenceSchema(final Schema connectSchema) { return PersistenceSchema.from((ConnectSchema) connectSchema, false); } + + private static KsqlDelimitedDeserializer createDeserializer(PersistenceSchema schema) { + return new KsqlDelimitedDeserializer(schema, CSVFormat.DEFAULT.withDelimiter(',')); + } + + } diff --git a/ksql-serde/src/test/java/io/confluent/ksql/serde/delimited/KsqlDelimitedSerdeFactoryTest.java b/ksql-serde/src/test/java/io/confluent/ksql/serde/delimited/KsqlDelimitedSerdeFactoryTest.java index 5a3c62da4e23..2f7dd99c59ba 100644 --- a/ksql-serde/src/test/java/io/confluent/ksql/serde/delimited/KsqlDelimitedSerdeFactoryTest.java +++ b/ksql-serde/src/test/java/io/confluent/ksql/serde/delimited/KsqlDelimitedSerdeFactoryTest.java @@ -21,8 +21,10 @@ import io.confluent.ksql.schema.ksql.PhysicalSchema; import io.confluent.ksql.schema.ksql.types.SqlType; import io.confluent.ksql.schema.ksql.types.SqlTypes; +import io.confluent.ksql.serde.Delimiter; import io.confluent.ksql.serde.SerdeOption; import io.confluent.ksql.util.KsqlException; +import java.util.Optional; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -37,7 +39,7 @@ public class KsqlDelimitedSerdeFactoryTest { @Before public void setUp() { - factory = new KsqlDelimitedSerdeFactory(); + factory = new KsqlDelimitedSerdeFactory(Optional.of(Delimiter.of(','))); } @Test diff --git a/ksql-serde/src/test/java/io/confluent/ksql/serde/delimited/KsqlDelimitedSerializerTest.java b/ksql-serde/src/test/java/io/confluent/ksql/serde/delimited/KsqlDelimitedSerializerTest.java index 8cfabb4b3717..1cff6ef577f5 100644 --- a/ksql-serde/src/test/java/io/confluent/ksql/serde/delimited/KsqlDelimitedSerializerTest.java +++ b/ksql-serde/src/test/java/io/confluent/ksql/serde/delimited/KsqlDelimitedSerializerTest.java @@ -28,6 +28,7 @@ import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; +import org.apache.commons.csv.CSVFormat; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -50,7 +51,7 @@ public class KsqlDelimitedSerializerTest { @Before public void setUp() { - serializer = new KsqlDelimitedSerializer(); + serializer = new KsqlDelimitedSerializer(CSVFormat.DEFAULT.withDelimiter(',')); } @Test @@ -104,8 +105,6 @@ public void shouldSerializedTopLevelPrimitiveIfValueHasOneField() { .field("id", Schema.OPTIONAL_INT64_SCHEMA) .build(); - final Serializer serializer = new KsqlDelimitedSerializer(); - final Struct value = new Struct(schema) .put("id", 10L); @@ -123,8 +122,6 @@ public void shouldSerializeDecimal() { .field("id", DecimalUtil.builder(4, 2).build()) .build(); - final Serializer serializer = new KsqlDelimitedSerializer(); - final Struct value = new Struct(schema) .put("id", new BigDecimal("11.12")); @@ -142,8 +139,6 @@ public void shouldSerializeDecimalWithPaddedZeros() { .field("id", DecimalUtil.builder(4, 2).build()) .build(); - final Serializer serializer = new KsqlDelimitedSerializer(); - final Struct value = new Struct(schema) .put("id", new BigDecimal("1.12")); @@ -161,8 +156,6 @@ public void shouldSerializeZeroDecimalWithPaddedZeros() { .field("id", DecimalUtil.builder(4, 2).build()) .build(); - final Serializer serializer = new KsqlDelimitedSerializer(); - final Struct value = new Struct(schema) .put("id", BigDecimal.ZERO); @@ -180,8 +173,6 @@ public void shouldSerializeOneHalfDecimalWithPaddedZeros() { .field("id", DecimalUtil.builder(4, 2).build()) .build(); - final Serializer serializer = new KsqlDelimitedSerializer(); - final Struct value = new Struct(schema) .put("id", new BigDecimal(0.5)); @@ -199,8 +190,6 @@ public void shouldSerializeNegativeOneHalfDecimalWithPaddedZeros() { .field("id", DecimalUtil.builder(4, 2).build()) .build(); - final Serializer serializer = new KsqlDelimitedSerializer(); - final Struct value = new Struct(schema) .put("id", new BigDecimal(-0.5)); @@ -218,8 +207,6 @@ public void shouldSerializeNegativeDecimalWithPaddedZeros() { .field("id", DecimalUtil.builder(4, 2).build()) .build(); - final Serializer serializer = new KsqlDelimitedSerializer(); - final Struct value = new Struct(schema) .put("id", new BigDecimal("-1.12")); @@ -230,6 +217,36 @@ public void shouldSerializeNegativeDecimalWithPaddedZeros() { assertThat(new String(bytes, StandardCharsets.UTF_8), is("\"-01.12\"")); } + @Test + public void shouldSerializeRowCorrectlyWithTabDelimeter() { + shouldSerializeRowCorrectlyWithNonDefaultDelimeter('\t'); + } + + @Test + public void shouldSerializeRowCorrectlyWithBarDelimeter() { + shouldSerializeRowCorrectlyWithNonDefaultDelimeter('|'); + } + + private void shouldSerializeRowCorrectlyWithNonDefaultDelimeter(final char delimiter) { + // Given: + final Struct data = new Struct(SCHEMA) + .put("ORDERTIME", 1511897796092L) + .put("ORDERID", 1L) + .put("ITEMID", "item_1") + .put("ORDERUNITS", 10.0); + + final KsqlDelimitedSerializer serializer = + new KsqlDelimitedSerializer(CSVFormat.DEFAULT.withDelimiter(delimiter)); + + // When: + final byte[] bytes = serializer.serialize("t1", data); + + // Then: + final String delimitedString = new String(bytes, StandardCharsets.UTF_8); + assertThat(delimitedString, equalTo( + "1511897796092" + delimiter +"1" + delimiter + "item_1" + delimiter + "10.0")); + } + @Test public void shouldThrowOnArrayField() { // Given: @@ -296,4 +313,5 @@ public void shouldThrowOnStructField() { // When: serializer.serialize("t1", data); } + }