Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add constraint to deny drop sources referenced by other CREATE_AS sources #6545

Merged
merged 7 commits into from
Nov 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,11 @@ public interface SourceDescription {
*/
String sqlStatement();

/**
* Returns a list of sources that have a DROP constraint reference on this source. This source
* cannot be dropped until all sources returned by this method are deleted.
*
* @return a list of sources
*/
List<String> getSourceConstraints();
}
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,10 @@ private static Optional<SourceDescription> getDescribeSourceResponse(
? Optional.empty()
: Optional.of(source.getString("timestamp")),
Optional.ofNullable(source.getString("windowType")),
source.getString("statement")
source.getString("statement"),
source.getJsonArray("sourceConstraints").stream()
.map(o -> (String)o)
.collect(Collectors.toList())
));
} catch (Exception e) {
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public final class SourceDescriptionImpl implements SourceDescription {
private final Optional<String> timestampColumn;
private final Optional<String> windowType;
private final String sqlStatement;
private final List<String> sourceConstraints;

// CHECKSTYLE_RULES.OFF: ParameterNumberCheck
SourceDescriptionImpl(
Expand All @@ -48,7 +49,8 @@ public final class SourceDescriptionImpl implements SourceDescription {
final List<QueryInfo> writeQueries,
final Optional<String> timestampColumn,
final Optional<String> windowType,
final String sqlStatement
final String sqlStatement,
final List<String> sourceConstraints
) {
// CHECKSTYLE_RULES.ON: ParameterNumberCheck
this.name = Objects.requireNonNull(name, "name");
Expand All @@ -62,6 +64,7 @@ public final class SourceDescriptionImpl implements SourceDescription {
this.timestampColumn = Objects.requireNonNull(timestampColumn, "timestampColumn");
this.windowType = Objects.requireNonNull(windowType, "windowType");
this.sqlStatement = Objects.requireNonNull(sqlStatement, "sqlStatement");
this.sourceConstraints = Objects.requireNonNull(sourceConstraints, "sourceConstraints");
}

@Override
Expand Down Expand Up @@ -119,6 +122,11 @@ public String sqlStatement() {
return sqlStatement;
}

@Override
public List<String> getSourceConstraints() {
return sourceConstraints;
}

// CHECKSTYLE_RULES.OFF: CyclomaticComplexity
@Override
public boolean equals(final Object o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItems;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
Expand Down Expand Up @@ -923,6 +924,7 @@ public void shouldFailToDescribeSourceViaExecuteStatement() {
4,
1,
"statement",
Collections.emptyList(),
Collections.emptyList()),
Collections.emptyList());
testEndpoints.setKsqlEndpointResponse(Collections.singletonList(entity));
Expand Down Expand Up @@ -1352,7 +1354,8 @@ public void shouldDescribeSource() throws Exception {
4,
1,
"sql",
Collections.emptyList()
Collections.emptyList(),
ImmutableList.of("s1", "s2")
);
final SourceDescriptionEntity entity = new SourceDescriptionEntity(
"describe source;", sd, Collections.emptyList());
Expand Down Expand Up @@ -1384,6 +1387,7 @@ public void shouldDescribeSource() throws Exception {
assertThat(description.timestampColumn(), is(Optional.empty()));
assertThat(description.windowType(), is(Optional.of("TUMBLING")));
assertThat(description.sqlStatement(), is("sql"));
assertThat(description.getSourceConstraints(), hasItems("s1", "s2"));
}

protected Client createJavaClient() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package io.confluent.ksql.api.client.impl;

import com.google.common.collect.ImmutableList;
import com.google.common.testing.EqualsTester;
import io.confluent.ksql.api.client.QueryInfo.QueryType;
import java.util.Collections;
Expand All @@ -40,7 +41,8 @@ public void shouldImplementHashCodeAndEquals() {
Optional.of("name"), Optional.of("topic"))),
Optional.empty(),
Optional.empty(),
"sql"),
"sql",
Collections.emptyList()),
new SourceDescriptionImpl(
"name",
"type",
Expand All @@ -54,7 +56,8 @@ public void shouldImplementHashCodeAndEquals() {
Optional.of("name"), Optional.of("topic"))),
Optional.empty(),
Optional.empty(),
"sql")
"sql",
Collections.emptyList())
)
.addEqualityGroup(
new SourceDescriptionImpl(
Expand All @@ -70,7 +73,8 @@ public void shouldImplementHashCodeAndEquals() {
Optional.of("name"), Optional.of("topic"))),
Optional.empty(),
Optional.empty(),
"sql")
"sql",
Collections.emptyList())
)
.addEqualityGroup(
new SourceDescriptionImpl(
Expand All @@ -86,7 +90,8 @@ public void shouldImplementHashCodeAndEquals() {
Optional.of("name"), Optional.of("topic"))),
Optional.empty(),
Optional.empty(),
"sql")
"sql",
Collections.emptyList())
)
.addEqualityGroup(
new SourceDescriptionImpl(
Expand All @@ -102,7 +107,8 @@ public void shouldImplementHashCodeAndEquals() {
Optional.of("name"), Optional.of("topic"))),
Optional.empty(),
Optional.empty(),
"sql")
"sql",
Collections.emptyList())
)
.addEqualityGroup(
new SourceDescriptionImpl(
Expand All @@ -118,7 +124,8 @@ public void shouldImplementHashCodeAndEquals() {
Optional.of("name"), Optional.of("topic"))),
Optional.empty(),
Optional.empty(),
"sql")
"sql",
Collections.emptyList())
)
.addEqualityGroup(
new SourceDescriptionImpl(
Expand All @@ -134,7 +141,8 @@ public void shouldImplementHashCodeAndEquals() {
Optional.of("name"), Optional.of("topic"))),
Optional.empty(),
Optional.empty(),
"sql")
"sql",
Collections.emptyList())
)
.addEqualityGroup(
new SourceDescriptionImpl(
Expand All @@ -150,7 +158,8 @@ public void shouldImplementHashCodeAndEquals() {
Optional.of("name"), Optional.of("topic"))),
Optional.empty(),
Optional.empty(),
"sql")
"sql",
Collections.emptyList())
)
.addEqualityGroup(
new SourceDescriptionImpl(
Expand All @@ -166,7 +175,8 @@ public void shouldImplementHashCodeAndEquals() {
Optional.of("name"), Optional.of("topic"))),
Optional.empty(),
Optional.empty(),
"sql")
"sql",
Collections.emptyList())
)
.addEqualityGroup(
new SourceDescriptionImpl(
Expand All @@ -181,7 +191,8 @@ public void shouldImplementHashCodeAndEquals() {
Optional.of("name"), Optional.of("topic"))),
Optional.empty(),
Optional.empty(),
"sql")
"sql",
Collections.emptyList())
)
.addEqualityGroup(
new SourceDescriptionImpl(
Expand All @@ -196,7 +207,8 @@ public void shouldImplementHashCodeAndEquals() {
Collections.emptyList(),
Optional.empty(),
Optional.empty(),
"sql")
"sql",
Collections.emptyList())
)
.addEqualityGroup(
new SourceDescriptionImpl(
Expand All @@ -212,7 +224,8 @@ public void shouldImplementHashCodeAndEquals() {
Optional.of("name"), Optional.of("topic"))),
Optional.of("timestamp"),
Optional.empty(),
"sql")
"sql",
Collections.emptyList())
)
.addEqualityGroup(
new SourceDescriptionImpl(
Expand All @@ -228,7 +241,8 @@ public void shouldImplementHashCodeAndEquals() {
Optional.of("name"), Optional.of("topic"))),
Optional.empty(),
Optional.of("window"),
"sql")
"sql",
Collections.emptyList())
)
.addEqualityGroup(
new SourceDescriptionImpl(
Expand All @@ -244,7 +258,8 @@ public void shouldImplementHashCodeAndEquals() {
Optional.of("name"), Optional.of("topic"))),
Optional.empty(),
Optional.empty(),
"other_sql")
"other_sql",
Collections.emptyList())
)
.testEquals();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,18 @@ private void printTopicInfo(final SourceDescription source) {
}
}

private void printSourceConstraints(final List<String> sourceConstraints) {
if (!sourceConstraints.isEmpty()) {
writer().println(String.format(
"%n%-20s%n%-20s",
"Sources that have a DROP constraint on this source",
"--------------------------------------------------"
));

sourceConstraints.forEach(sourceName -> writer().println(sourceName));
}
}

private void printQueries(
final List<RunningQuery> queries,
final String type,
Expand Down Expand Up @@ -632,6 +644,8 @@ private void printSourceDescription(final SourceDescription source) {

printSchema(source.getWindowType(), source.getFields(), isTable);

printSourceConstraints(source.getSourceConstraints());

printQueries(source.getReadQueries(), source.getType(), "read");

printQueries(source.getWriteQueries(), source.getType(), "write");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ public class ConsoleTest {
2,
1,
"statement",
Collections.emptyList(),
Collections.emptyList());

@Mock
Expand Down Expand Up @@ -529,6 +530,7 @@ public void testPrintSourceDescription() {
1,
1,
"sql statement",
Collections.emptyList(),
Collections.emptyList()),
Collections.emptyList()
)
Expand Down Expand Up @@ -661,7 +663,8 @@ public void testPrintSourceDescription() {
+ " \"partitions\" : 1," + NEWLINE
+ " \"replication\" : 1," + NEWLINE
+ " \"statement\" : \"sql statement\"," + NEWLINE
+ " \"queryOffsetSummaries\" : [ ]" + NEWLINE
+ " \"queryOffsetSummaries\" : [ ]," + NEWLINE
+ " \"sourceConstraints\" : [ ]" + NEWLINE
+ " }," + NEWLINE
+ " \"warnings\" : [ ]" + NEWLINE
+ "} ]" + NEWLINE));
Expand Down Expand Up @@ -800,7 +803,8 @@ public void testPrintConnectorDescription() {
+ " \"partitions\" : 2," + NEWLINE
+ " \"replication\" : 1," + NEWLINE
+ " \"statement\" : \"statement\"," + NEWLINE
+ " \"queryOffsetSummaries\" : [ ]" + NEWLINE
+ " \"queryOffsetSummaries\" : [ ]," + NEWLINE
+ " \"sourceConstraints\" : [ ]" + NEWLINE
+ " } ]," + NEWLINE
+ " \"topics\" : [ \"a-jdbc-topic\" ]," + NEWLINE
+ " \"warnings\" : [ ]" + NEWLINE
Expand Down Expand Up @@ -1104,7 +1108,8 @@ public void shouldPrintTopicDescribeExtended() {
new QueryOffsetSummary(
"consumer2",
ImmutableList.of())
)),
),
ImmutableList.of("S1", "S2")),
Collections.emptyList()
))
);
Expand Down Expand Up @@ -1204,7 +1209,8 @@ public void shouldPrintTopicDescribeExtended() {
+ " }, {" + NEWLINE
+ " \"groupId\" : \"consumer2\"," + NEWLINE
+ " \"topicSummaries\" : [ ]" + NEWLINE
+ " } ]" + NEWLINE
+ " } ]," + NEWLINE
+ " \"sourceConstraints\" : [ \"S1\", \"S2\" ]" + NEWLINE
+ " }," + NEWLINE
+ " \"warnings\" : [ ]" + NEWLINE
+ "} ]" + NEWLINE));
Expand All @@ -1224,6 +1230,11 @@ public void shouldPrintTopicDescribeExtended() {
+ " f_0 | VARCHAR(STRING) " + NEWLINE
+ "-----------------------------------------" + NEWLINE
+ "" + NEWLINE
+ "Sources that have a DROP constraint on this source" + NEWLINE
+ "--------------------------------------------------" + NEWLINE
+ "S1" + NEWLINE
+ "S2" + NEWLINE
+ "" + NEWLINE
+ "Queries that read from this TABLE" + NEWLINE
+ "-----------------------------------" + NEWLINE
+ "readId (" + AGGREGATE_STATUS +") : read query" + NEWLINE
Expand Down
Loading