Skip to content

Commit

Permalink
refactor: differentiate between Schema Field and Struct Field (#3325)
Browse files Browse the repository at this point in the history
  • Loading branch information
agavra authored Sep 11, 2019
1 parent a6549e1 commit 8f97ccb
Show file tree
Hide file tree
Showing 118 changed files with 1,465 additions and 1,416 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
import io.confluent.ksql.cli.console.table.Table.Builder;
import io.confluent.ksql.rest.entity.KafkaTopicsList;
import io.confluent.ksql.rest.entity.KafkaTopicsListExtended;
import io.confluent.ksql.util.StringUtil;
import java.util.List;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;

public class KafkaTopicsListTableBuilder {

Expand Down Expand Up @@ -84,7 +84,7 @@ private static String getTopicReplicaInfo(final List<Integer> replicaSizes) {
} else if (replicaSizes.stream().distinct().limit(2).count() <= 1) {
return String.valueOf(replicaSizes.get(0));
} else {
return StringUtil.join(", ", replicaSizes);
return StringUtils.join(replicaSizes, ", ");
}
}
}
16 changes: 8 additions & 8 deletions ksql-cli/src/test/java/io/confluent/ksql/cli/CliTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -534,9 +534,9 @@ public void testSelectProject() {

final PhysicalSchema resultSchema = PhysicalSchema.from(
LogicalSchema.builder()
.valueField("ITEMID", SqlTypes.STRING)
.valueField("ORDERUNITS", SqlTypes.DOUBLE)
.valueField("PRICEARRAY", SqlTypes.array(SqlTypes.DOUBLE))
.valueColumn("ITEMID", SqlTypes.STRING)
.valueColumn("ORDERUNITS", SqlTypes.DOUBLE)
.valueColumn("PRICEARRAY", SqlTypes.array(SqlTypes.DOUBLE))
.build(),
SerdeOption.none()
);
Expand Down Expand Up @@ -638,11 +638,11 @@ public void testSelectUDFs() {

final PhysicalSchema resultSchema = PhysicalSchema.from(
LogicalSchema.builder()
.valueField("ITEMID", SqlTypes.STRING)
.valueField("COL1", SqlTypes.DOUBLE)
.valueField("COL2", SqlTypes.DOUBLE)
.valueField("COL3", SqlTypes.DOUBLE)
.valueField("COL4", SqlTypes.BOOLEAN)
.valueColumn("ITEMID", SqlTypes.STRING)
.valueColumn("COL1", SqlTypes.DOUBLE)
.valueColumn("COL2", SqlTypes.DOUBLE)
.valueColumn("COL3", SqlTypes.DOUBLE)
.valueColumn("COL4", SqlTypes.BOOLEAN)
.build(),
SerdeOption.none()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1384,7 +1384,7 @@ private static List<FieldInfo> buildTestSchema(final SqlType... fieldTypes) {
final Builder schemaBuilder = LogicalSchema.builder();

for (int idx = 0; idx < fieldTypes.length; idx++) {
schemaBuilder.valueField("f_" + idx, fieldTypes[idx]);
schemaBuilder.valueColumn("f_" + idx, fieldTypes[idx]);
}

final LogicalSchema schema = schemaBuilder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,32 @@

package io.confluent.ksql.schema.ksql;

import static io.confluent.ksql.util.Identifiers.ensureTrimmed;

import com.google.errorprone.annotations.Immutable;
import io.confluent.ksql.schema.ksql.types.SqlType;
import io.confluent.ksql.util.Identifiers;
import io.confluent.ksql.util.SchemaUtil;
import java.util.Objects;
import java.util.Optional;

/**
* A named field within KSQL schema types.
*/
@Immutable
public final class Field {
public final class Column {

private final FieldName name;
private final Optional<String> source;
private final String name;
private final SqlType type;

/**
* @param name the name of the field.
* @param type the type of the field.
* @return the immutable field.
*/
public static Field of(final String name, final SqlType type) {
return new Field(FieldName.of(Optional.empty(), name), type);
public static Column of(final String name, final SqlType type) {
return new Column(Optional.empty(), name, type);
}

/**
Expand All @@ -44,40 +49,45 @@ public static Field of(final String name, final SqlType type) {
* @param type the type of the field.
* @return the immutable field.
*/
public static Field of(final String source, final String name, final SqlType type) {
return new Field(FieldName.of(Optional.of(source), name), type);
public static Column of(final String source, final String name, final SqlType type) {
return new Column(Optional.of(source), name, type);
}

/**
* @param source the name of the source of the field.
* @param name the name of the field.
* @param type the type of the field.
* @return the immutable field.
*/
public static Field of(final FieldName name, final SqlType type) {
return new Field(name, type);
public static Column of(final Optional<String> source, final String name, final SqlType type) {
return new Column(source, name, type);
}

private Field(final FieldName name, final SqlType type) {
this.name = Objects.requireNonNull(name, "name");
private Column(final Optional<String> source, final String name, final SqlType type) {
this.source = Objects.requireNonNull(source, "source").map(src -> ensureTrimmed(src, "source"));
this.name = ensureTrimmed(Objects.requireNonNull(name, "name"), "name");
this.type = Objects.requireNonNull(type, "type");
}

public FieldName fieldName() {
return name;
}

/**
* @return the fully qualified field name.
*/
public String fullName() {
return name.fullName();
return source.map(alias -> SchemaUtil.buildAliasedFieldName(alias, name)).orElse(name);
}

/**
* @return the source of the Column
*/
public Optional<String> source() {
return source;
}

/**
* @return the name of the field, without any source / alias.
*/
public String name() {
return name.name();
return name;
}

/**
Expand All @@ -93,8 +103,8 @@ public SqlType type() {
* @param source the source to set of the new field.
* @return the new field.
*/
public Field withSource(final String source) {
return new Field(name.withSource(source), type);
public Column withSource(final String source) {
return new Column(Optional.of(source), name, type);
}

@Override
Expand All @@ -105,14 +115,15 @@ public boolean equals(final Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
}
final Field field = (Field) o;
return Objects.equals(name, field.name)
&& Objects.equals(type, field.type);
final Column that = (Column) o;
return Objects.equals(source, that.source)
&& Objects.equals(name, that.name)
&& Objects.equals(type, that.type);
}

@Override
public int hashCode() {
return Objects.hash(name, type);
return Objects.hash(source, name, type);
}

@Override
Expand All @@ -121,6 +132,10 @@ public String toString() {
}

public String toString(final FormatOptions formatOptions) {
return name.toString(formatOptions) + " " + type.toString(formatOptions);
final String fmtName = Identifiers.escape(name, formatOptions);
final String fmtType = type.toString(formatOptions);
final String fmtSource = source.map(s -> Identifiers.escape(s, formatOptions) + ".").orElse("");

return fmtSource + fmtName + " " + fmtType;
}
}
128 changes: 0 additions & 128 deletions ksql-common/src/main/java/io/confluent/ksql/schema/ksql/FieldName.java

This file was deleted.

Loading

0 comments on commit 8f97ccb

Please sign in to comment.