diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/CommandTopicBackupImpl.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/CommandTopicBackupImpl.java index 3a385f0cc6d7..9ed71fcc5c01 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/CommandTopicBackupImpl.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/CommandTopicBackupImpl.java @@ -132,6 +132,14 @@ public void writeRecord(final ConsumerRecord record) { "Failed to write record due to out of sync command topic and backup file: " + record); } + if (record.key() == null || record.value() == null) { + LOG.warn(String.format("Can't backup a command topic record with a null key/value:" + + " key=%s, value=%s, partition=%d, offset=%d", + record.key(), record.value(), record.partition(), record.offset())); + + return; + } + if (isRestoring()) { if (isRecordInLatestReplay(record)) { // Ignore backup because record was already replayed @@ -148,7 +156,7 @@ public void writeRecord(final ConsumerRecord record) { try { replayFile.write(record); - } catch (final IOException e) { + } catch (final Exception e) { LOG.warn("Failed to write to file {}. The command topic backup is not complete. " + "Make sure the file exists and has permissions to write. KSQL must be restarted " + "afterwards to complete the backup process. Error = {}", diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/CommandTopicBackupImplTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/CommandTopicBackupImplTest.java index e8decd076aeb..1f766016d391 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/CommandTopicBackupImplTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/CommandTopicBackupImplTest.java @@ -162,6 +162,40 @@ public void shouldCreateBackupLocationWhenDoesNotExist() throws IOException { assertThat(Files.exists(dir), is(true)); } + @Test + public void shouldNotFailIfRecordKeyIsNull() throws IOException { + // Given + commandTopicBackup.initialize(); + final ConsumerRecord record = + (ConsumerRecord) mock(ConsumerRecord.class); + when(record.key()).thenReturn(null); + when(record.value()).thenReturn(new byte[]{}); + + // When + commandTopicBackup.writeRecord(record); + + // Then + final List> commands = commandTopicBackup.getReplayFile().readRecords(); + assertThat(commands.size(), is(0)); + } + + @Test + public void shouldNotFailIfRecordValueIsNull() throws IOException { + // Given + commandTopicBackup.initialize(); + final ConsumerRecord record = + (ConsumerRecord) mock(ConsumerRecord.class); + when(record.key()).thenReturn(new byte[]{}); + when(record.value()).thenReturn(null); + + // When + commandTopicBackup.writeRecord(record); + + // Then + final List> commands = commandTopicBackup.getReplayFile().readRecords(); + assertThat(commands.size(), is(0)); + } + @Test public void shouldWriteCommandToBackupToReplayFile() throws IOException { // Given