Skip to content

Commit

Permalink
Merge branch 'master' into command-topic-deleted
Browse files Browse the repository at this point in the history
  • Loading branch information
stevenpyzhang committed Sep 22, 2020
2 parents 7f1cbfa + 4b81912 commit f63c88c
Show file tree
Hide file tree
Showing 168 changed files with 3,660 additions and 2,034 deletions.
5 changes: 0 additions & 5 deletions config/ksql-production-server.properties
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,6 @@ bootstrap.servers=localhost:9092
# Enable snappy compression for the Kafka producers
compression.type=snappy

# By default, wait up to 500 millis to allow a second poll to Kafka if one side of a join is
# exhausted. This improves join predictability.
# Increasing this setting improves join predictability at the cost of increased end-to-end latency.
# ksql.streams.max.task.idle.ms=?

# uncomment the below to start an embedded Connect worker
# ksql.connect.worker.config=config/connect.properties
# ksql.connect.configs.topic=ksql-connect-configs
Expand Down
5 changes: 0 additions & 5 deletions config/ksql-server.properties
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,6 @@ bootstrap.servers=localhost:9092
# Enable snappy compression for the Kafka producers
compression.type=snappy

# By default, wait up to 500 millis to allow a second poll to Kafka if one side of a join is
# exhausted. This improves join predictability.
# Increasing this setting improves join predictability at the cost of increased end-to-end latency.
# ksql.streams.max.task.idle.ms=?

#------ Connect -------

# uncomment the below to start an embedded Connect worker
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,16 @@
import io.confluent.ksql.api.client.InsertAck;
import io.confluent.ksql.api.client.InsertsPublisher;
import io.confluent.ksql.api.client.KsqlArray;
import io.confluent.ksql.api.client.KsqlObject;
import io.confluent.ksql.api.client.QueryInfo;
import io.confluent.ksql.api.client.QueryInfo.QueryType;
import io.confluent.ksql.api.client.exception.KsqlClientException;
import io.confluent.ksql.api.client.KsqlObject;
import io.confluent.ksql.api.client.Row;
import io.confluent.ksql.api.client.SourceDescription;
import io.confluent.ksql.api.client.StreamInfo;
import io.confluent.ksql.api.client.StreamedQueryResult;
import io.confluent.ksql.api.client.TableInfo;
import io.confluent.ksql.api.client.TopicInfo;
import io.confluent.ksql.api.client.exception.KsqlClientException;
import io.confluent.ksql.api.client.util.ClientTestUtil.TestSubscriber;
import io.confluent.ksql.api.client.util.RowUtil;
import io.confluent.ksql.engine.KsqlEngine;
Expand Down Expand Up @@ -908,14 +908,13 @@ public void shouldDescribeSource() throws Exception {
assertThat(description.timestampColumn(), is(Optional.empty()));
assertThat(description.windowType(), is(Optional.empty()));
assertThat(description.sqlStatement(), is(
"CREATE STREAM " + TEST_STREAM + " (`STR` STRING KEY, `LONG` BIGINT, "
+ "`DEC` DECIMAL(4, 2), `ARRAY` ARRAY<STRING>, `MAP` MAP<STRING, STRING>, "
+ "`STRUCT` STRUCT<`F1` INTEGER>, `COMPLEX` STRUCT<`DECIMAL` DECIMAL(2, 1), "
+ "`STRUCT` STRUCT<`F1` STRING, `F2` INTEGER>, `ARRAY_ARRAY` ARRAY<ARRAY<STRING>>, "
+ "`ARRAY_STRUCT` ARRAY<STRUCT<`F1` STRING>>, `ARRAY_MAP` ARRAY<MAP<STRING, INTEGER>>, "
+ "`MAP_ARRAY` MAP<STRING, ARRAY<STRING>>, `MAP_MAP` MAP<STRING, MAP<STRING, INTEGER>>, "
+ "`MAP_STRUCT` MAP<STRING, STRUCT<`F1` STRING>>>) WITH "
+ "(kafka_topic='" + TEST_TOPIC + "', value_format='json');"));
"CREATE STREAM " + TEST_STREAM + " (STR STRING KEY, LONG BIGINT, DEC DECIMAL(4, 2), "
+ "ARRAY ARRAY<STRING>, MAP MAP<STRING, STRING>, STRUCT STRUCT<F1 INTEGER>, "
+ "COMPLEX STRUCT<`DECIMAL` DECIMAL(2, 1), STRUCT STRUCT<F1 STRING, F2 INTEGER>, "
+ "ARRAY_ARRAY ARRAY<ARRAY<STRING>>, ARRAY_STRUCT ARRAY<STRUCT<F1 STRING>>, "
+ "ARRAY_MAP ARRAY<MAP<STRING, INTEGER>>, MAP_ARRAY MAP<STRING, ARRAY<STRING>>, "
+ "MAP_MAP MAP<STRING, MAP<STRING, INTEGER>>, MAP_STRUCT MAP<STRING, STRUCT<F1 STRING>>>) "
+ "WITH (KAFKA_TOPIC='STRUCTURED_TYPES_TOPIC', KEY_FORMAT='KAFKA', VALUE_FORMAT='JSON');"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License; you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.config;

import com.google.common.collect.ImmutableMap;
import com.google.errorprone.annotations.Immutable;
import io.confluent.ksql.testing.EffectivelyImmutable;
import io.confluent.ksql.util.KsqlConfig;
import java.util.Map;
import java.util.Objects;

/**
* Combination of the system config + session overrides provided by the user.
*/
@Immutable
public final class SessionConfig {

@EffectivelyImmutable // The map stores primitive types only.
private final ImmutableMap<String, Object> overrides;
private final KsqlConfig systemConfig;

/**
* Create an instance.
*
* @param systemConfig the system config the server started up with.
* @param overrides the overrides that came as part of the users request.
* @return the session config
*/
public static SessionConfig of(final KsqlConfig systemConfig, final Map<String, ?> overrides) {
return new SessionConfig(systemConfig, overrides);
}

private SessionConfig(final KsqlConfig systemConfig, final Map<String, ?> overrides) {
this.systemConfig = Objects.requireNonNull(systemConfig, "systemConfig");
this.overrides = ImmutableMap.copyOf(Objects.requireNonNull(overrides, "overrides"));
}

/**
* Get the `KsqlConfig` instance, either with or without the overrides applied.
*
* <p>Calling with {@code withOverridesApplied} of {@code true} will return the system config
* with the user's overrides applied. This is the more common case. This can be useful when the
* user should be able to influence how code executes by using `SET` to provide config overrides.
*
* <p>Calling with {@code withOverridesApplied} of {@code false} will return the system config
* without any overrides. This can be useful if the users shouldn't be able to supply overrides to
* change the behaviour of the code you're passing the config to.
*
* @param withOverridesApplied flag to indicate if the user supplied overrides should be applied.
* @return the config.
*
*/
public KsqlConfig getConfig(final boolean withOverridesApplied) {
return withOverridesApplied
? systemConfig.cloneWithPropertyOverwrite(overrides)
: systemConfig;
}

/**
* @return User supplied overrides, i.e. those provided in the request to the server.
*/
public Map<String, Object> getOverrides() {
return overrides;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final SessionConfig that = (SessionConfig) o;
return Objects.equals(overrides, that.overrides)
&& Objects.equals(systemConfig, that.systemConfig);
}

@Override
public int hashCode() {
return Objects.hash(overrides, systemConfig);
}

@Override
public String toString() {
return "SessionConfig{"
+ "overrides=" + overrides
+ ", systemConfig=" + systemConfig
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

import io.confluent.ksql.configdef.ConfigValidators;
import io.confluent.ksql.serde.Delimiter;
import io.confluent.ksql.util.KsqlException;
import java.util.Map;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.NonEmptyString;
Expand All @@ -38,14 +40,15 @@ public final class CommonCreateConfigs {
// Persistence Props:
public static final String VALUE_AVRO_SCHEMA_FULL_NAME = "VALUE_AVRO_SCHEMA_FULL_NAME";
public static final String VALUE_FORMAT_PROPERTY = "VALUE_FORMAT";
public static final String KEY_FORMAT_PROPERTY = "KEY_FORMAT";
public static final String FORMAT_PROPERTY = "FORMAT";
public static final String WRAP_SINGLE_VALUE = "WRAP_SINGLE_VALUE";

public static final String VALUE_DELIMITER_PROPERTY = "VALUE_DELIMITER";

public static void addToConfigDef(
final ConfigDef configDef,
final boolean topicNameRequired,
final boolean valueFormatRequired
final boolean topicNameRequired
) {
configDef
.define(
Expand Down Expand Up @@ -79,7 +82,7 @@ public static void addToConfigDef(
.define(
VALUE_FORMAT_PROPERTY,
ConfigDef.Type.STRING,
valueFormatRequired ? ConfigDef.NO_DEFAULT_VALUE : null,
null,
Importance.HIGH,
"The format of the serialized value"
)
Expand Down Expand Up @@ -129,7 +132,40 @@ public static void addToConfigDef(
"The delimiter to use when VALUE_FORMAT='DELIMITED'. Supports single "
+ "character to be a delimiter, defaults to ','. For space and tab delimited values "
+ "you must use the special values 'SPACE' or 'TAB', not an actual space or tab "
+ "character.");
+ "character.")
.define(
KEY_FORMAT_PROPERTY,
ConfigDef.Type.STRING,
null,
Importance.HIGH,
"The format of the serialized key"
)
.define(
FORMAT_PROPERTY,
ConfigDef.Type.STRING,
null,
Importance.HIGH,
"The format of the serialized key and value");
}

public static void validateKeyValueFormats(final Map<String, Object> configs) {
final Object value = configs.get(FORMAT_PROPERTY);
if (value == null) {
return;
}

if (configs.get(KEY_FORMAT_PROPERTY) != null) {
throw new KsqlException("Cannot supply both '" + KEY_FORMAT_PROPERTY + "' and '"
+ FORMAT_PROPERTY + "' properties, as '" + FORMAT_PROPERTY + "' sets both key and value "
+ "formats. Either use just '" + FORMAT_PROPERTY + "', or use '" + KEY_FORMAT_PROPERTY
+ "' and '" + VALUE_FORMAT_PROPERTY + "'.");
}
if (configs.get(VALUE_FORMAT_PROPERTY) != null) {
throw new KsqlException("Cannot supply both '" + VALUE_FORMAT_PROPERTY + "' and '"
+ FORMAT_PROPERTY + "' properties, as '" + FORMAT_PROPERTY + "' sets both key and value "
+ "formats. Either use just '" + FORMAT_PROPERTY + "', or use '" + KEY_FORMAT_PROPERTY
+ "' and '" + VALUE_FORMAT_PROPERTY + "'.");
}
}

private CommonCreateConfigs() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public final class CreateAsConfigs {
private static final ConfigDef CONFIG_DEF = new ConfigDef();

static {
CommonCreateConfigs.addToConfigDef(CONFIG_DEF, false, false);
CommonCreateConfigs.addToConfigDef(CONFIG_DEF, false);
}

public static final ConfigMetaData CONFIG_METADATA = ConfigMetaData.of(CONFIG_DEF);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public final class CreateConfigs {
);

static {
CommonCreateConfigs.addToConfigDef(CONFIG_DEF, true, true);
CommonCreateConfigs.addToConfigDef(CONFIG_DEF, true);
}

public static final ConfigMetaData CONFIG_METADATA = ConfigMetaData.of(CONFIG_DEF);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.errorprone.annotations.Immutable;
import io.confluent.ksql.parser.OutputRefinement;

import java.util.Objects;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public enum SerdeOption {
*
* @see SerdeOption#UNWRAP_SINGLE_VALUES
*/
WRAP_SINGLE_VALUES(SerdeFeature.WRAP_SINGLES),
WRAP_SINGLE_VALUES(SerdeFeature.WRAP_SINGLES, false),

/**
* If the value schema contains only a single field, persist it as an anonymous value.
Expand All @@ -38,19 +38,25 @@ public enum SerdeOption {
*
* @see SerdeOption#WRAP_SINGLE_VALUES
*/
UNWRAP_SINGLE_VALUES(SerdeFeature.UNWRAP_SINGLES),
UNWRAP_SINGLE_VALUES(SerdeFeature.UNWRAP_SINGLES, false),

/**
* Key version of {@link #UNWRAP_SINGLE_VALUES}.
*/
UNWRAP_SINGLE_KEYS(SerdeFeature.UNWRAP_SINGLES);
UNWRAP_SINGLE_KEYS(SerdeFeature.UNWRAP_SINGLES, true);

private final boolean isKey;
private final SerdeFeature requiredFeature;

SerdeOption(final SerdeFeature requiredFeature) {
SerdeOption(final SerdeFeature requiredFeature, final boolean isKey) {
this.isKey = isKey;
this.requiredFeature = Objects.requireNonNull(requiredFeature, "requiredFeature");
}

public boolean isKeyOption() {
return isKey;
}

public SerdeFeature requiredFeature() {
return requiredFeature;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package io.confluent.ksql.serde;

import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.errorprone.annotations.Immutable;
import java.util.EnumSet;
Expand All @@ -33,11 +32,11 @@
@Immutable
public final class SerdeOptions {

private static final ImmutableSet<SerdeOption> KEY_WRAPPING_OPTIONS = ImmutableSet.of(
public static final ImmutableSet<SerdeOption> KEY_WRAPPING_OPTIONS = ImmutableSet.of(
SerdeOption.UNWRAP_SINGLE_KEYS
);

private static final ImmutableSet<SerdeOption> VALUE_WRAPPING_OPTIONS = ImmutableSet.of(
public static final ImmutableSet<SerdeOption> VALUE_WRAPPING_OPTIONS = ImmutableSet.of(
SerdeOption.WRAP_SINGLE_VALUES, SerdeOption.UNWRAP_SINGLE_VALUES
);

Expand All @@ -60,33 +59,22 @@ private SerdeOptions(final ImmutableSet<SerdeOption> options) {
validate(this.options);
}

@SuppressWarnings("MethodMayBeStatic")
public EnabledSerdeFeatures keyFeatures() {
// Currently there are no key features:
return EnabledSerdeFeatures.of();
return features(true);
}

public EnabledSerdeFeatures valueFeatures() {
// Currently there are no key features, so all are value features:
return EnabledSerdeFeatures.from(options.stream()
.map(SerdeOption::requiredFeature)
.collect(Collectors.toSet()));
return features(false);
}

public Set<SerdeOption> all() {
return options;
}

public Optional<SerdeOption> keyWrapping() {
return Optional.ofNullable(
Iterables.getFirst(Sets.intersection(options, KEY_WRAPPING_OPTIONS), null)
);
}

public Optional<SerdeOption> valueWrapping() {
return Optional.ofNullable(
Iterables.getFirst(Sets.intersection(options, VALUE_WRAPPING_OPTIONS), null)
);
public Optional<SerdeOption> findAny(final Set<SerdeOption> anyOf) {
return anyOf.stream()
.filter(options::contains)
.findAny();
}

@Override
Expand All @@ -111,6 +99,13 @@ public String toString() {
return options.toString();
}

private EnabledSerdeFeatures features(final boolean key) {
return EnabledSerdeFeatures.from(options.stream()
.filter(option -> option.isKeyOption() == key)
.map(SerdeOption::requiredFeature)
.collect(Collectors.toSet()));
}

private static void validate(final Set<SerdeOption> options) {
final Set<SerdeOption> wrappingOptions = Sets.intersection(options, VALUE_WRAPPING_OPTIONS);
if (wrappingOptions.size() > 1) {
Expand Down
Loading

0 comments on commit f63c88c

Please sign in to comment.