Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Skip topic creation/deletion for dropped streams/tables when replaying command topic on restart #2329

Closed
rodesai opened this issue Jan 9, 2019 · 5 comments
Assignees

Comments

@rodesai
Copy link
Contributor

rodesai commented Jan 9, 2019

Currently when replaying the command topic at startup, ksql will execute all the commands (but skip starting queries). This includes execution of kafka topic creates and deletes. This can cause undesired side effects for users. For example, in the case:

CREATE STREAM FOO
DROP STREAM FOO

If the user has already deleted topic FOO, upon restarting ksql will create the topic.

Or in the case:

CREATE STREAM FOO;
DROP STREAM FOO DELETE TOPIC;

If the user has created another topic FOO, ksql will delete the topic (need to verify this case experimentally).

To rectify this in the short term, we should skip deletes during replay and avoid creating topics until the full command topic is replayed, and then only create the topics required for the current set of queries.

Note that this is an incomplete solution to this general category of problem:

  • By opting not to delete topics during replay, we may be skipping a delete that should have been applied. However, if we choose to apply deletes we may be deleting something we shouldn't, which is clearly worse. In the absence of additional information about whether the delete was ever successfully applied or not, we have to pick the lesser of the 2 evils.
  • This is still a problem even when processing the command topic during normal execution. We don't really know whether it's safe to apply create/delete operations. How we handle this needs some more thought.
@agavra agavra self-assigned this Jan 18, 2019
@agavra
Copy link
Contributor

agavra commented Jan 22, 2019

Adding some thoughts after investigating a solution: the key challenge here is that we need to still execute all of the commands so that the metadata store is properly updated and in-sync with any other server. I can think of two approaches:

  1. Slightly hacky, but we can have a KafkaTopicClient that does not resolve any side-effects (e.g. create/delete topic) until a time in the future, instead only "recording" calls that would have bene made. After recovery is done, we resolve all recorded calls that are still necessary (i.e. not paired create/deletes) and replace the client with the normal kafka client. This approach is minimally invasive.
  2. Never create/delete topics during recovery. This would require us to either pipe down the recovery mode flag deep into the engine or refactor the engine so that the create/delete topic requests are made closer to the top-level. Either way, we must make sure that we exit recovery mode after reaching the last committed offset so that create topic commands that we have never seen are executed.

My preference is toward option 2, but it requires some non-trivial refactoring. @rodesai - any thoughts?

@agavra
Copy link
Contributor

agavra commented Jan 23, 2019

Things get a little more complicated... for approach 2, we need to make it so that each server records committed offsets.

This can't be done (in kafka) without registering a consumer group with the Kafka brokers (kafka doesn't track offsets per consumer, it tracks them per Topic/Partition - see KafkaConsumer maybeThrowInvalidGropuIdException). Now it gets extra tricky because each kafka server must have its own consumer group, otherwise it won't receive all events, but the consumer group needs to be consistent across restarts.

NOTE: by default, the configuration is to have the consumer start at LATEST offset that exists on the command topic, it does not actually store any offsets locally.

@rodesai
Copy link
Contributor Author

rodesai commented Jan 24, 2019

Here's another broken recovery scenario:

CREATE STREAM FOO WITH ...
CREATE STREAM BAR AS SELECT * FROM FOO;
TERMINATE CSAS_BAR;
DROP STREAM FOO DELETE TOPIC (or the user does the delete)

When recovering, the statement CREATE STREAM FOO will fail to validate because the underlying topic is missing.

@rodesai
Copy link
Contributor Author

rodesai commented Jan 24, 2019

Wanted to dump my current thoughts on this category of issues. Our basic recovery strategy is to start with a well-known initial "zero-state", and then replay the command topic to rebuild our state. The issue here is that our state also includes an external system - kafka. This has 2 problems:

  • Ownership: The state in Kafka is not "ours". If we apply transformations we may be corrupting data in Kafka that we don't own (e.g. deleting a topic we shouldn't, or leaving around a phantom topic).
  • Unpredictability: Kafka has an unpredictable zero-state. It may be different every time we recover, depending on what topics have been created and deleted.

To fix the ownership problem, I think we should slightly change our contract with the user. KSQL should get a "namespace" (prefix) within a Kafka cluster. KSQL will only ever automatically create or delete topics within that namespace. If you want to use KSQL with a topic that is outside that namespace, it must already exist. To make the system more usable, we could support sink topic creation for external (not in our namespace) topics from KsqlResource before issuing a statement to the command topic. But that's only a convenience, and should never be done while running the command topic.

To deal with unpredictability, the outcome of evaluating a KSQL statement should not depend on the current set of topics in Kafka. We can look at this from the pov of externally-created and internally-created topics (and assuming the ownership problem is fixed as described above).

For externally-generated topics, we can spuriously fail commands on recovery due to validation errors (as in the above example). Instead of validating that topics exist, when running the command topic we should just assume that those topics already exist and skip validation. Note that it's still fine, and important, to validate before submitting commands to the command topic. But we should not do this validation when running because the current state of kafka may be different from a previous run. If the assumption is wrong, any queries (streams jobs) that use those topics will fail, but that should be considered a user error, and should be reported to the user in the query status. It shouldn't cause the outcome of the statement to fail.

For internally-generated topics, we may inadvertently delete a sink topic if a later command creates a sink topic with the same name. To solve this we can try to isolate commands from each other by creating a 1:1 mapping between commands and topics. So internally generated sink topic names should include the query id.

@agavra
Copy link
Contributor

agavra commented Jan 25, 2019

@rodesai thanks for providing valuable insight! I agree that any external state manipulation should be done prior to queuing a command on the command topic. This would also solve any race conditions across multiple KSQL servers. One thing that I would add about the externality problem, is that we need to make sure that query id is a function of a command message alone (e.g. Kafka offset or hash of command) - today it is a function of what commands were executed before it.

In the short term, an offline discussion with @big-andy-coates leads me to believe that injecting a special KafkaTopicClient just for recovery solves most of our issues without introducing too many new edge cases. It is aligned well with our model in the validation phase (e.g. tryExecute), so we don't need to introduce too much new code for it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants