-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: update ksql restore command to skip incompatible commands if flag set #6524
feat: update ksql restore command to skip incompatible commands if flag set #6524
Conversation
@@ -90,7 +90,6 @@ public void start() { | |||
log.warn("Backup is out of sync with the current command topic. " | |||
+ "Backups will not work until the previous command topic is " | |||
+ "restored or all backup files are deleted.", e); | |||
return records; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should be returning the record even if it isn't backed up. The CommandRunner has logic to detect when the command topic and backup are out of sync/the command is incompatible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this check detects the case where the backup and the command topic are inconsistent (have different contents). in that case we shouldn't return the command here, otherwise the command runner will try to execute it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should still return the record in some cases. For example, the command topic backup will fail to back up incompatible commands (it tries to deserialize the command before backing it up) Currently the throwOnInvalidRecords
in the CommandTopicBackupImpl
doesn't count as corruption so the incompatible commands don't actually cause the server to enter degraded state if backups are enabled. I think these incompatible commands should still be returned so the CommandRunner can attempt to deserialize it and enter degraded state properly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rodesai I changed it so that we still return the command in the case of a KsqlException since this is thrown by the backup when there's an issue deserializing either the commandId or command. The command isn't returned if it's a KsqlServerException since that means the corruption check has triggered.
64a2ccc
to
6c54d4a
Compare
ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/restore/RestoreOptions.java
Show resolved
Hide resolved
throwOnInvalidRecords(records); | ||
if (options.isSkipIncompatibleCommands()) { | ||
records = removeIncompatibleCommands(records); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we call throwOnInvalidRecords
when skip is false? That way the restore will fail if a command is invalid, thus avoiding re-creating the command topic and writing partial commands.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok I changed it so that we always check that the commands are valid. The only different with the skip option is that if it's set to true, instead of throwing an exception and exiting the program when failing to deserialize the command due to a SerializationException/IncomaptibleKsqlCommandVersionException (which indicates an incompatible command) the method continues by skipping the command.
...-app/src/test/java/io/confluent/ksql/api/integration/RestoreCommandTopicIntegrationTest.java
Outdated
Show resolved
Hide resolved
95fa2c8
to
ce2a9b5
Compare
@@ -117,7 +116,6 @@ public void start() { | |||
log.warn("Backup is out of sync with the current command topic. " | |||
+ "Backups will not work until the previous command topic is " | |||
+ "restored or all backup files are deleted.", e); | |||
return restoreCommands; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to make the above exception more specific. We shouldn't just ignore any exception. Ditto for the other place this is used. I know this isn't part of your patch, but might as well do this now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed it to only catch a KsqlException or KsqlServerException
@@ -90,7 +90,6 @@ public void start() { | |||
log.warn("Backup is out of sync with the current command topic. " | |||
+ "Backups will not work until the previous command topic is " | |||
+ "restored or all backup files are deleted.", e); | |||
return records; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this check detects the case where the backup and the command topic are inconsistent (have different contents). in that case we shouldn't return the command here, otherwise the command runner will try to execute it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgmt
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR still needs some work actually, we need to check if the commands that are skipped have queries in them, and then clean up internal topics and state stores for them.
5205813
to
ab3b497
Compare
29202f2
to
507fe96
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couple more nits inline around naming/exception types
} catch (final Exception e) { | ||
} catch (final KsqlException e) { | ||
log.warn("Error encountered while backing up command.", e); | ||
} catch (final KsqlServerException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should define a new exception type for this case. this is a very specific error condition, and it should have its own exception class.
@@ -86,7 +89,9 @@ public void start() { | |||
for (ConsumerRecord<byte[], byte[]> record : iterable) { | |||
try { | |||
backupRecord(record); | |||
} catch (final Exception e) { | |||
} catch (final KsqlException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
again - this should be a very specific exception indicating a deserialization error. Also, why are we even trying to deserialize the command from the backup logic (cc @spena)? It doesn't look like we're using any of the fields.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok after talking it over with @spena , we don't need this deserialization logic in the backup impl, it should just backup whatever is found in the command topic, deserialization should be handled after it's backed up. So I'll get rid of this catch block and we'll only have the KsqlServerException one.
...db-rest-app/src/main/java/io/confluent/ksql/rest/server/restore/KsqlRestoreCommandTopic.java
Outdated
Show resolved
Hide resolved
75a4ec6
to
d0a5d16
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
Description
#6305
Added a
-s
flag for skipping commands when restoring. If the flag isn't present, the restore command program will exit if it encounters errors deserializing commands from the command topic backup file. If it is present and true, it'll skip over the incompatible command instead. The program then prints out the number of skipped commands and tries to clean up any left over KafkaStreams internal topics and state stores if the skipped command was a query.Testing done
Added integration test
Local manual test
Reviewer checklist