Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Process commands so that WITH clause is always resolved before executing it #2436

Merged
merged 17 commits into from
Mar 28, 2019

Conversation

agavra
Copy link
Contributor

@agavra agavra commented Feb 12, 2019

Description

As part of the efforts to improve the topic management in KSQL, this patch makes sure that all CSAS/CTAS queries that the engine accepts will already have a WITH clause resolved to specify the topic/partitions/replicas. If the user does not supply a WITH clause, or supplies a partial one (e.g. only topic name), then the defaults will be resolved on the "server" before being executed. This should be completely backwards compatible since this patch does not remove any of the default resolving behavior on the executor.

Future Work:

Once this patch is in place, we should be able to create the topics on the server as well before enqueuing the command onto the command topic. After that is done, we will change the executor to only validate, never create, topics. Once all those pieces are in place, recovery will not create topics again (which may cause issues described in #2329.

Minor Changes:

  • Refactored CreateAsSelect to extend Statement and move common functionality between CreateTableAsSelect and CreateStreamAsSelect into CreateAsSelect. This allows me to use it in the HANDLERS map more cleanly.

Testing done

  • Modified existing unit tests, which cover the test cases

Reviewer checklist

  • Ensure docs are updated if necessary. (eg. if a user visible feature is being added or changed).
  • Ensure relevant issues are linked (description should include text like "Fixes #")

@agavra agavra requested a review from a team February 12, 2019 23:00
Copy link
Contributor

@big-andy-coates big-andy-coates left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @agavra.

This is a really good first stab at this change considering you're still new to the code base and how wide ranging it is. Nice work at fixing CreateAsSelect. However, there are some issues / misunderstanding of what KSQL is currently doing that need resolving. (Plus my usual nit fest!).

Actually, thinking about this more. With Hojjat's changes coming in #1930, where partition and replicat count are copied from the source stream, not some config, I don't think you can bake in the partition and replica count into the command in the rest api.

Let's say there's an existing stream FOO:

CREATE STREAM FOO (....) WITH (KAFKA_TOPIC='foo', ...);

And topic 'foo' has 4 partitions. Then the rest API the receives a request with:

CREATE STREAM BAR AS SELECT * FROM FOO;

So the rest API bakes in:

CREATE STREAM BAR WITH(PARTITONS=4, REPLICAS=x) AS SELECT * FROM FOO;

And posts it too the command topic. But while the rest API was doing this another thread/node posted these commands to the command topic:

DROP STREAM FOO;
CREATE STREAM FOO (....) WITH (KAFKA_TOPIC='bob', ...);

And topic 'bob' has 10 partitions. Now the command topic looks like:

0: CREATE STREAM BAR AS SELECT * FROM FOO;
1:  DROP STREAM FOO;
2: CREATE STREAM FOO (....) WITH (KAFKA_TOPIC='bob', ...);
3: CREATE STREAM BAR WITH(PARTITONS=4, REPLICAS=x) AS SELECT * FROM FOO;

But BAR should be using its source's partition count, which is 10, not 4.

So, unfortunately, I think Hojjat's pending change makes your change foobar!

@agavra
Copy link
Contributor Author

agavra commented Feb 13, 2019

Thanks for the review @big-andy-coates!

Actually, thinking about this more. With Hojjat's changes coming in #1930, where partition and replica count are copied from the source stream, not some config, I don't think you can bake in the partition and replica count into the command in the rest api.

This is interesting... @rodesai and I had some discussions about how to resolve this. The real concern here, I think, is something more general than just what you bring up; resolving any type of state on the server (as opposed to executor) is racey if multiple users are interacting simultaneously. Pushing the work off the the executor essentially solves the races by using Kafka to enforce ordering. This alternative, however, creates even more issues - we need to establish quorum across executors to understand whether it is safe to execute something and we no longer have access to Kafka to do it.

Imagine this example:

0. Other commands
1. CSAS foo FROM bar1;
2. DROP foo DELETE TOPIC;
3. CSAS foo FROM bar2;

Executor A is just finished executing command 2 but has not yet begun executing command 3. Executor B just finished executing command 0. B now executes command 1 before A can execute 3. This means that the topic for foo will, again, be wrong because it was created from bar1 not bar2. Since, at this stage, we no longer have Kafka to help us guarantee synchrony, it is much harder to solve these types of problems at this stage then it is when we still have access to Kafka.


Rohan laid out one way to solve this in #2435. Essentially, we enqueue the topic with some validation of the state of the metastore (this can be just the offset we think that we are enqueuing it on) and if there is a race we reject it on the executor. This would solve a few problems, including the queryID generation. After identifying a race, we can reject and cleanup whatever side-effects may have happened.

@rodesai
Copy link
Contributor

rodesai commented Feb 14, 2019

I think we should come to a consensus on how we want to do safe topic creation (I thought we did, but it seems like we're debating that again :)), and then decide how we want to handle REPLICAS/PARTITIONS from there. The 2 options I see are:

  1. Create/validate topics from the request handler before enqueuing the request
  2. Create/validate topics from the statement executor

As @agavra and I discussed, option 2 is very hard, if even possible, to do in a foolproof way. We need to guarantee that the topic gets created once, before any other ksql server applies that command (to ensure the queries created succeed, and that there isn't an acknowledged drop later on). So we need the servers to come to a consensus on one of them to try creating the topic, and then for those servers to apply the command while the other servers wait. If the server dies before creating the topic the other servers need to be able to detect that and re-elect to proceed.

Option 1 gives you the same guarantees since we're creating/validating before even putting the command on the topic. So my vote is to do that.

Rohan laid out one way to solve this in #2435.

The idea behind #2435 was more around protecting our internal state against cross-version validation tightening (or bug-fixes) and non-deterministic bugs in validation.

Where #2435 helps in this case is that it ties the validation to the execution. If we know that no other statements have been applied since we validated, then we know the underlying topic is still kosher (unless the user went and mucked it up underneath us, which I think we have to assume out of our reasoning).

Back to my original point. If we're going with option 1, we can just drop adding REPLICAS/PARTITIONS from this patch (since the executor won't use it). Or (in conjunction w/ #2435) it can just be used to validate and potentially warn the user that something doesn't look right with the underlying topics (but we still must apply the command to the metastore).

@agavra agavra requested a review from a team February 15, 2019 21:05
@agavra
Copy link
Contributor Author

agavra commented Feb 15, 2019

@big-andy-coates After reviewing #1930, I think there's a race condition in the executor side anyway.

-- imagine that ∀P ∃ streamP such that streamP is based on a topic with P partitions
1: CREATE STREAM foo AS SELECT * FROM STREAM1;
2: DROP STREAM foo DELETE TOPIC;
-- note that by default this has the same topic name, "foo"
3: CREATE STREAM foo AS SELECT * FROM STREAM2;
4: CREATE STREAM bar AS SELECT * FROM foo;

The sequence of events (ordered):

1: Executor A executes commands 1, 2 & 3
2: Executor B executes commands 1, 2
3: Executor C executes command 1
4: Executor A executes command 4

Now we have a situation where A creates bar using the configuration from the topic named "foo" but created from STREAM1 with 1 partition 💥

I admit I this scenario is quite contrived (and this is assuming that we avoided the simple case where B executes command 2 before A executes 4 and the topic is just nuked) but it points to the fact that as long as we rely on topic names on the executor bad stuff can happen and detecting it is really hard. Detecting the scenario that you pointed out is easy if we enqueue commands with some validation metadata.

@big-andy-coates
Copy link
Contributor

@agavra

The sequence of events (ordered):

1: Executor A executes commands 1, 2 & 3
2: Executor B executes commands 1, 2
3: Executor C executes command 1
4: Executor A executes command 4

Now we have a situation where A creates bar using the configuration from the topic named "foo" but created from STREAM1 with 1 partition 💥

Are you talking about step 4? At step 4 executor A's metastore would correctly have foo built from stream2, so would create bar with two partitions, unless I'm mistaken. (Remember executors B & C have their own, independant, metastores).

Executors B & C would both fail statement 1 as they would be expecting topic 'FOO' to either not exist, or exist with 1 partition. Given executor A created it in step 3 with 2 partitions, both would fail the statement. They'd then fail statement 2 as foo wouldn't exist, and then go on to succeed to execute statement 3 & 4.

No race condition.

However...

1: Executor A executes commands 1, 2     (Creates Foo w/ 1 partition and then deletes it)
2: Executor B executes command 1 .         (Creates Foo w/ 1 partition)
2. Executor A executes command 3.         (Fails as foo has too few partitions)
3: Executor B executes command 2, 3, 4  (Deletes old Foo and recreates Foo w/ 2 partitions, and then bar)
4: Executor A executes command 4           (Fails as no stream called foo in its metastore).

Hence we end up with servers in inconsistent state.

It may be the case that we need cross-cluster synchronisation regardless of if we do topic management server or executor side. Interestingly, extending our current feedback that a command has been executed to cover all nodes in the cluster is a useful feature in and of itself. If we can't come up with a solution for topic management that doesn't need cross-cluster sync, then I think we should tackle the sync problem first.

When it comes to the discussion of doing topic-management server or executor side, I see:

Server side:
Pro: only 1 node does the state change.
Con: no ordering guarantees.
Single point of failure on state change.
Current design requires x-cluster sync.

Executor side:
Pro: ordering guarantees.
Con: Multiple nodes changing external state.
Current design requires x-cluster sync.

I think the ordering guarantees becomes less of an issue if we implement embedding the validated against state x into the messages produced to the command topic. Which would suggest moving state management to the server side. However, I think we need to a) implement validated against x state stuff and the x-cluster sync stuff before we tackle topic management.

The good thing is with validation checksums baked into the commands in the command topic, the issue of baking in the partition and replica counts into the command goes away, (I think), as race conditions due to other servers producing to the command topic first would result in a checksum error and the command being re-validated etc.

Copy link
Contributor

@rodesai rodesai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @agavra ! Couple questions inline

private static String topicName(final CreateAsSelect cas, final KsqlConfig cfg) {
final Expression topicProperty = cas.getProperties().get(DdlConfig.KAFKA_TOPIC_NAME_PROPERTY);
return (topicProperty == null)
? cfg.getString(KsqlConfig.KSQL_OUTPUT_TOPIC_NAME_PREFIX_CONFIG) + cas.getName().getSuffix()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does getName return the name in upper case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, but I believe some other part of the stack that is called before this does.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not seeing where that happens. CreateAsSelect gets created by AstBuilder, which just uses the string from the statement. We should be sure we create sink topics with upper case names, or call out the change in behavior in the docs (which we'll probably have to do anyway since we're changing the way these are computed as part of the topic management design)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like you're right - I thought I had tested this out, but clearly not. Good catch! I will update this to call toUpperCase

since we're changing the way these are computed as part of the topic management design

This is no longer a requirement for the MVP. We wanted to prepend the queryID to the topic name so that we make sure that we don't delete a topic that we created in the past with the same name. With server-side deletes this doesn't happen anymore - either I can't create the topic because the topic with the same name already exists or it's already deleted (in which case I can safely delete the topic I just created). The only race that it solves (which I don't think we should worry about) is the case where two different users are interacting with two different servers issuing commands at the same time for the same topic name:

Server A:   |---CREATE X---|                              |---DELETE X---|
Server B:                  |---DELETE X---||---CREATE X---|

To me, this falls in the category of "inter-server" communication.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nevermind. I found where it happens:

  public static String getIdentifierText(final SqlBaseParser.IdentifierContext context) {
    if (context instanceof SqlBaseParser.QuotedIdentifierAlternativeContext) {
      return unquote(context.getText(), "\"");
    } else if (context instanceof SqlBaseParser.BackQuotedIdentifierContext) {
      return unquote(context.getText(), "`");
    } else {
      return context.getText().toUpperCase();
    }
  }

In getQualifiedName in AstBuilder.

properties.put(KsqlConstants.SINK_NUMBER_OF_PARTITIONS, new IntegerLiteral(partitions));

final CreateAsSelect withTopic = cas.copyWith(properties);
final String withTopicText = SqlFormatter.formatSql(withTopic) + ";";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

formatSql should include the semicolon

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never mind!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually no, this should already have the semicolon.

Copy link
Contributor Author

@agavra agavra Mar 5, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For some reason, CSAS/CTAS does not:

    final CreateStreamAsSelect csas = new CreateStreamAsSelect(
        QualifiedName.of("stream"),
        new Query(
            new QuerySpecification(
                new Select(true, ImmutableList.of(new AllColumns(new NodeLocation(0, 0)))),
                new Table(QualifiedName.of("sink")),
                true,
                new Table(QualifiedName.of("source")),
                Optional.empty(),
                Optional.empty(),
                Optional.empty(),
                Optional.empty(),
                OptionalInt.empty()
            ),
            OptionalInt.empty()
        ),
        false,
        ImmutableMap.of(),
        Optional.empty()
    );
    final String sql = SqlFormatter.formatSql(csas);
    System.out.println(sql); 
    // > CREATE STREAM stream AS SELECT DISTINCT *\nFROM TABLE source

Though if you feel this is a bug I can fix that!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like a bug to me. Don't want to block this patch on it though.

Copy link
Contributor

@rodesai rodesai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM


final String topic = topicName(cas, ksqlConfig);

final TopicDescription description = describeSource(cas, metaStore, ksqlConfig, topicClient);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we pull the topic name out of the analysis as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good idea, but it seems inconsistent. In my test I have

CREATE STREAM s1 WITH(kafka_topic='topic')AS SELECT * FROM SOURCE;

The AstBuilder has the following code:

  @Override
  public Node visitQuerySpecification(final SqlBaseParser.QuerySpecificationContext context) {
    final Table into;
    if (context.into != null) {
      into = (Table) visit(context.into);
    } else {
      // TODO: Generate a unique name
      final String intoName = "KSQL_Stream_" + System.currentTimeMillis();
      into = new Table(QualifiedName.of(intoName), true);
    }

Unless the query has a INTO statement, then this results in the unique KStream name and sets the second boolean in the Table constructor to true. This boolean represents isStdOut, and if that value is set then the intoKafkaTopicName will be null. In the example query in my unit test, it hits the else clause and I can't get the name from the analysis.

This might be a bug or just a poorly named method, but it seems to be more complicated than I had hoped. Thoughts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Talked offline - this was changed after #2529. I may be able to use it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looked through the code again, we only set the into if the property is there:

    if (sink.getProperties().get(DdlConfig.KAFKA_TOPIC_NAME_PROPERTY) != null) {
      setIntoTopicName(sink);
    }

Since I don't want to change this behavior for all analysis, I will leave it as is.

Copy link
Contributor

@hjafarpour hjafarpour left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @agavra , LGTM with a comment.
Also as we discussed offline, you could consider using StatementRewriter to update your statements with WITH clause. Not a must for this PR but for future ones you can consider it.

* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.schema.inference;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be an odd package to have this since it's for Schema purposes and TopicInjector doesn't have anything to do with Schema!

@hjafarpour hjafarpour requested a review from a team March 14, 2019 19:03
@@ -648,6 +664,94 @@ public void shouldDistributeAvoCreateStatementWithColumns() {
)), any(), any());
}

@Test
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review Note: TODO - in a follow-up PR when I move all of these tests to the right locations (after refactoring #2502) I will also move these tests. I want to keep jitter on this small after getting the green ticks

@agavra
Copy link
Contributor Author

agavra commented Mar 14, 2019

Thanks @hjafarpour! I will try to follow this up with a refactor that moves the logic to StatementRewriter :)

Copy link
Contributor

@big-andy-coates big-andy-coates left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @agavra - sorry for the delay in reviewing. This is looking really good! However, there are a few areas that still need a little tweaking IMNO:

  • we shouldn't be exposing the service context from the execution context.
  • the error handling around parsing WITH clause expressions needs work to be more lenient and provide better error messages
  • Using the QueryAnalyser to just get the source name is overkill. Better to use a more targeted class and leave QueryAnalyser to just to its one job.
  • the tests for the default injector need to be improved to:
    • cover joins
    • clearly test the order of precedence.

I've commented more about each of these inline below.

/**
* @return the service context used for this execution context
*/
ServiceContext getServiceContext();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like we should avoid leaking the service context out of the engine context. It leads to tangled code. The fact the engine/execution_context is internally using a service context is an implementation detail that shouldn't, IMHO, leak out of the interface.

It looks like you've added this so that DefaultTopicInjector can get hold of the service context from the engine, but you actually have the service context at the point you're trying to get the injector from the factory.

Maybe just have the factory take both a ServiceContext and MetaStore instance. Then you can just invoke via:

topicInjectorFactory.apply(sandboxedServiceContext, ksqlEngine.getMetastore());

I know it's tempting as its convienient as we already pass around the engine, but it feels wrong to expose it IMHO. Maybe I'm wrong, but it just smells to me. We should aim to keep this interface as succinct as possible.

Copy link
Contributor Author

@agavra agavra Mar 15, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had the same instinct as you, in fact that was the way I originally had it. The problem was that not just any sandboxed service context needs to be used - it has to be exactly the same sandboxed context that the sandboxed engine uses. Otherwise it can't validate a sequence of statements that create topics, for example:

0: CREATE STREAM Y AS SELECT * FROM X; 
1: CREATE STREAM Z AS SELECT * FROM Y;

When command 0 runs, it updates the service context with a new topic Y with p partitions and r replicas. When command 1 runs, it expects to be able to read from the topic for Y, but that topic doesn't exist.

So, unfortunately, the coupling is an artifact of where the sandbox is created (deep inside the engine).

To do what you suggest, we can pipe in ServiceContext through the ksqlEngine#createSandbox, into the constructor for SandboxedExecutionContext and then into EngineContext#createSandbox from every place we call createSandbox.

This isn't so bad, but it's tricky to get right (to make sure that we always use the same execution context as the engine we are using). As soon as I thought about that, I decided that it was appropriate to expose the service context because (as it stands) the coupling is actually an attribute of the engine, not just an implementation detail.

If you strongly disagree, I can do one of two things:

  • Expose an Optional<ServiceContext> getSandboxContext which will only return if it is a sandbox (ew 🤢 )
  • Do the refactor that I outlined above but risk loose coupling of the context and the engine

Copy link
Contributor

@big-andy-coates big-andy-coates Mar 22, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hummm... makes sense. let's see how it looks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The impersonation work that Sergio is doing will mean that each request to the engine will need to come with its own ServiceContext that runs things in the context of the initiating user.

With that in mind, it probably makes sense to not expose the service context from the engine, but instead pass it each time.

Copy link
Contributor

@big-andy-coates big-andy-coates left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @agavra

This PR also adds another place that does this reconciliation of topic name, partition count and rf. This means the logic in KsqlStructuredDataOutputNode is duplicated and needs to be kept in sync. It would be better if this logic was extracted out into a common place. Maybe a IntoPropertyResolver class or similar. Then both KsqlStructuredDataOutputNode and DefaultTopicInjector can call this code. This is preferable to duplicating the logic.

With this change, going forward, the code in Analyzer and KsqlStructuredDataOutputNode that works out the name/partition/rf from the different sources, (config, WITH clause, or prop overrides), is redundant. Obviously, its still needed for legacy statements. As such, I think it would really help if you added a comment to Analyzer make it clear the code doing this is for legacy statements only. Better still, move this code into a function with legacy in the name AND add a comment.

I've made comments inline about other things.

final KsqlConfig cfg
) {
final Expression topicProperty = cas.getProperties().get(DdlConfig.KAFKA_TOPIC_NAME_PROPERTY);
return (topicProperty == null)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

personally, I'd argue adding more code that follows this pattern of toString isn't a good thing. But in the interest of getting this in I guess we can ignore this if you're really against just checking the type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not against the idea at all - I just tried doing it and it adds another few hundred lines (there are quite a few edge cases that need to be tested) to this PR, which is already hard enough to review.

final KsqlConfig ksqlConfig,
final PreparedStatement<? extends CreateAsSelect> cas
) {
final Analysis analysis = new QueryAnalyzer(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still strongly feel this is a misuse of QueryAnalyzer. QueryAnalyzer will be enhanced as we move forward to be more complex, more advanced, and hence take longer. Using it here to extract a single property of a query is total overkill, coupling we don't need, and ultimately costly.

The query has already been analysed higher up the stack by the time this method is called. We should not be analysing it again.

I see two approaches to remove the need for this:

  1. Capture this information higher up the stack when the query is analysed and pass it down.
  2. Write a very simple visitor to extract the info, e.g.
private TopicDescription describeSource(
      final KafkaTopicClient topicClient,
      final KsqlConfig ksqlConfig,
      final PreparedStatement<? extends CreateAsSelect> cas
  ) {
    final SourceTopicExtractor visitor = new SourceTopicExtractor();
    visitor.process(cas.getStatement().getQuery(), null);

    if (visitor.primaryKafkaTopicName == null) {
      throw new IllegalStateException("blag");
    }
    return topicClient.describeTopic(visitor.primaryKafkaTopicName);
  }

  private final class SourceTopicExtractor extends DefaultTraversalVisitor<Node, Void> {

    private String primaryKafkaTopicName = null;

    @Override
    protected Node visitJoin(final Join node, final Void context) {
      process(node.getLeft(), context);
      return null;
    }

    @Override
    protected Node visitAliasedRelation(final AliasedRelation node, final Void context) {
      final String structuredDataSourceName = ((Table) node.getRelation()).getName().getSuffix();
      final StructuredDataSource<?> source = metaStore.getSource(structuredDataSourceName);
      if (source == null) {
        throw new KsqlException(structuredDataSourceName + " does not exist.");
      }

      primaryKafkaTopicName = source.getKsqlTopic().getKafkaTopicName();
      return node;
    }
  }

With regards to future enhancements, changes to the QueryAnalyser etc... If we ensure this injector is called as part of the QueryTranslationTest then it should be future proofed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. I will go with approach 2.

Copy link
Contributor

@big-andy-coates big-andy-coates left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @agavra this is looking really good.

The only remaining main issue is:

  • need to switch to lazily getting topic descriptions from Kafka. (Which is what the old code did)

Apart from that, it's only nits and suggestions below.

Nice one.

return this;
}

public Builder withSource(final TopicDescription description) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this would be better as:

Suggested change
public Builder withSource(final TopicDescription description) {
public Builder withSource(final Supplier<TopicDescription> description) {

Getting the topic description requires a call to the remote server, (sometimes). This introduces latency and the possibility of failure. If we have all the information from other high-precedence sources then we should not be calling out to get the topic details. (Please add tests to the clients of this class to prove this is the case).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch

}

@Override
public boolean equals(final Object obj) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

by removing the equals method here its now possible to have a stream and table be equal, as its only using the base classes equals method!

Question: are these methods actually used / needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No tests are failing, so I'm guessing they're not used. But I will go ahead and implement them because removing it requires more diligence.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually only toString will have a problem. hashCode/equals check using (getClass() != obj.getClass() which will not return true.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added getClass() to the hashCode as well.


public DistributingExecutor(
final CommandQueue commandQueue,
final Duration distributedCmdResponseTimeout,
final Function<ServiceContext, SchemaInjector> schemaInjectorFactory) {
final Function<ServiceContext, SchemaInjector> schemaInjectorFactory,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor suggestion, but we could decouple DistributingExecutor from these injectors by injecting a new InjectorChain interface or something. Simplifies this code and testing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will follow up with minor change

@agavra agavra merged commit a1c4bf4 into confluentinc:master Mar 28, 2019
@agavra agavra deleted the with branch March 28, 2019 20:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants