Skip to content

Commit

Permalink
fix: fix NPE when backing a record that has null key/values (#7268)
Browse files Browse the repository at this point in the history
  • Loading branch information
spena authored Mar 23, 2021
1 parent 0a5a689 commit 0cbd4e8
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,14 @@ public void writeRecord(final ConsumerRecord<byte[], byte[]> record) {
"Failed to write record due to out of sync command topic and backup file: " + record);
}

if (record.key() == null || record.value() == null) {
LOG.warn(String.format("Can't backup a command topic record with a null key/value:"
+ " key=%s, value=%s, partition=%d, offset=%d",
record.key(), record.value(), record.partition(), record.offset()));

return;
}

if (isRestoring()) {
if (isRecordInLatestReplay(record)) {
// Ignore backup because record was already replayed
Expand All @@ -148,7 +156,7 @@ public void writeRecord(final ConsumerRecord<byte[], byte[]> record) {

try {
replayFile.write(record);
} catch (final IOException e) {
} catch (final Exception e) {
LOG.warn("Failed to write to file {}. The command topic backup is not complete. "
+ "Make sure the file exists and has permissions to write. KSQL must be restarted "
+ "afterwards to complete the backup process. Error = {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,40 @@ public void shouldCreateBackupLocationWhenDoesNotExist() throws IOException {
assertThat(Files.exists(dir), is(true));
}

@Test
public void shouldNotFailIfRecordKeyIsNull() throws IOException {
// Given
commandTopicBackup.initialize();
final ConsumerRecord<byte[], byte[]> record =
(ConsumerRecord<byte[], byte[]>) mock(ConsumerRecord.class);
when(record.key()).thenReturn(null);
when(record.value()).thenReturn(new byte[]{});

// When
commandTopicBackup.writeRecord(record);

// Then
final List<Pair<byte[], byte[]>> commands = commandTopicBackup.getReplayFile().readRecords();
assertThat(commands.size(), is(0));
}

@Test
public void shouldNotFailIfRecordValueIsNull() throws IOException {
// Given
commandTopicBackup.initialize();
final ConsumerRecord<byte[], byte[]> record =
(ConsumerRecord<byte[], byte[]>) mock(ConsumerRecord.class);
when(record.key()).thenReturn(new byte[]{});
when(record.value()).thenReturn(null);

// When
commandTopicBackup.writeRecord(record);

// Then
final List<Pair<byte[], byte[]>> commands = commandTopicBackup.getReplayFile().readRecords();
assertThat(commands.size(), is(0));
}

@Test
public void shouldWriteCommandToBackupToReplayFile() throws IOException {
// Given
Expand Down

0 comments on commit 0cbd4e8

Please sign in to comment.