Skip to content

Commit

Permalink
add specific exception for corruption, also fixed a typo
Browse files Browse the repository at this point in the history
  • Loading branch information
stevenpyzhang committed Nov 6, 2020
1 parent 7e7f4ae commit d0a5d16
Show file tree
Hide file tree
Showing 11 changed files with 53 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@

import com.google.common.collect.Lists;
import io.confluent.ksql.rest.server.computation.QueuedCommand;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.KsqlServerException;
import io.confluent.ksql.rest.server.resources.CommandTopicCorruptionException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -89,7 +88,7 @@ public Iterable<ConsumerRecord<byte[], byte[]>> getNewCommands(final Duration ti
for (ConsumerRecord<byte[], byte[]> record : iterable) {
try {
backupRecord(record);
} catch (final KsqlServerException e) {
} catch (final CommandTopicCorruptionException e) {
log.warn("Backup is out of sync with the current command topic. "
+ "Backups will not work until the previous command topic is "
+ "restored or all backup files are deleted.", e);
Expand All @@ -116,7 +115,7 @@ public List<QueuedCommand> getRestoreCommands(final Duration duration) {
for (final ConsumerRecord<byte[], byte[]> record : records) {
try {
backupRecord(record);
} catch (final KsqlServerException e) {
} catch (final CommandTopicCorruptionException e) {
log.warn("Backup is out of sync with the current command topic. "
+ "Backups will not work until the previous command topic is "
+ "restored or all backup files are deleted.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,12 @@
package io.confluent.ksql.rest.server;

import com.google.common.annotations.VisibleForTesting;
import io.confluent.ksql.rest.entity.CommandId;
import io.confluent.ksql.rest.server.computation.Command;
import io.confluent.ksql.rest.server.computation.InternalTopicSerdes;
import io.confluent.ksql.rest.server.resources.CommandTopicCorruptionException;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.KsqlServerException;
import io.confluent.ksql.util.Pair;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.attribute.PosixFilePermissions;
Expand Down Expand Up @@ -132,7 +128,7 @@ private boolean isRecordInLatestReplay(final ConsumerRecord<byte[], byte[]> reco
@Override
public void writeRecord(final ConsumerRecord<byte[], byte[]> record) {
if (corruptionDetected) {
throw new KsqlServerException(
throw new CommandTopicCorruptionException(
"Failed to write record due to out of sync command topic and backup file: " + record);
}

Expand All @@ -142,7 +138,7 @@ public void writeRecord(final ConsumerRecord<byte[], byte[]> record) {
return;
} else {
corruptionDetected = true;
throw new KsqlServerException(
throw new CommandTopicCorruptionException(
"Failed to write record due to out of sync command topic and backup file: " + record);
}
} else if (latestReplay.size() > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import io.confluent.ksql.config.SessionConfig;
import io.confluent.ksql.engine.KsqlPlan;
import io.confluent.ksql.planner.plan.ConfiguredKsqlPlan;
import io.confluent.ksql.rest.server.resources.IncomaptibleKsqlCommandVersionException;
import io.confluent.ksql.rest.server.resources.IncompatibleKsqlCommandVersionException;
import io.confluent.ksql.statement.ConfiguredStatement;
import java.util.Collections;
import java.util.Map;
Expand Down Expand Up @@ -97,7 +97,7 @@ public Command(
this.version = requireNonNull(version, "version");

if (expectedVersion < version.orElse(0)) {
throw new IncomaptibleKsqlCommandVersionException(
throw new IncompatibleKsqlCommandVersionException(
"Received a command from an incompatible command topic version. "
+ "Expected version less than or equal to " + expectedVersion
+ " but got " + version.orElse(0));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import com.google.common.annotations.VisibleForTesting;
import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.rest.entity.ClusterTerminateRequest;
import io.confluent.ksql.rest.server.resources.IncomaptibleKsqlCommandVersionException;
import io.confluent.ksql.rest.server.resources.IncompatibleKsqlCommandVersionException;
import io.confluent.ksql.rest.server.state.ServerState;
import io.confluent.ksql.rest.util.ClusterTerminator;
import io.confluent.ksql.rest.util.TerminateCluster;
Expand Down Expand Up @@ -378,7 +378,7 @@ private List<QueuedCommand> checkForIncompatibleCommands(final List<QueuedComman
incompatibleCommandChecker.accept(command);
compatibleCommands.add(command);
}
} catch (final SerializationException | IncomaptibleKsqlCommandVersionException e) {
} catch (final SerializationException | IncompatibleKsqlCommandVersionException e) {
LOG.info("Incompatible command record detected when processing command topic", e);
incompatibleCommandDetected = true;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.rest.server.resources;

import io.confluent.ksql.util.KsqlServerException;

public class CommandTopicCorruptionException extends KsqlServerException {

public CommandTopicCorruptionException(final String message) {
super(message);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

import io.confluent.ksql.util.KsqlServerException;

public class IncomaptibleKsqlCommandVersionException extends KsqlServerException {
public class IncompatibleKsqlCommandVersionException extends KsqlServerException {

public IncomaptibleKsqlCommandVersionException(final String message) {
public IncompatibleKsqlCommandVersionException(final String message) {
super(message);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import io.confluent.ksql.rest.server.BackupReplayFile;
import io.confluent.ksql.rest.server.computation.Command;
import io.confluent.ksql.rest.server.computation.InternalTopicSerdes;
import io.confluent.ksql.rest.server.resources.IncomaptibleKsqlCommandVersionException;
import io.confluent.ksql.rest.server.resources.IncompatibleKsqlCommandVersionException;
import io.confluent.ksql.rest.util.KsqlInternalTopicUtils;
import io.confluent.ksql.services.KafkaTopicClient;
import io.confluent.ksql.services.KafkaTopicClientImpl;
Expand Down Expand Up @@ -132,7 +132,7 @@ n, new String(record.getLeft(), StandardCharsets.UTF_8), e.getMessage()
try {
InternalTopicSerdes.deserializer(Command.class)
.deserialize(null, record.getRight());
} catch (final SerializationException | IncomaptibleKsqlCommandVersionException e) {
} catch (final SerializationException | IncompatibleKsqlCommandVersionException e) {
if (skipIncompatibleCommands) {
incompatibleCommands.add(record.getRight());
numFilteredCommands++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import io.confluent.ksql.rest.server.resources.CommandTopicCorruptionException;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.KsqlServerException;
import io.confluent.ksql.util.Pair;
Expand Down Expand Up @@ -213,7 +214,7 @@ public void shouldNotCreateNewReplayFileIfNewRecordsDoNotMatchPreviousBackups()
try {
commandTopicBackup.writeRecord(command2);
assertThat(true, is(false));
} catch (final KsqlServerException e) {
} catch (final CommandTopicCorruptionException e) {
// This is expected so we do nothing
}
final BackupReplayFile currentReplayFile = commandTopicBackup.getReplayFile();
Expand All @@ -224,7 +225,7 @@ public void shouldNotCreateNewReplayFileIfNewRecordsDoNotMatchPreviousBackups()
try {
commandTopicBackup.writeRecord(command2);
assertThat(true, is(false));
} catch (final KsqlServerException e) {
} catch (final CommandTopicCorruptionException e) {
// This is expected so we do nothing
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.rest.server.computation.QueuedCommand;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.KsqlServerException;
import io.confluent.ksql.rest.server.resources.CommandTopicCorruptionException;
import java.nio.charset.Charset;
import java.time.Duration;
import java.util.Arrays;
Expand Down Expand Up @@ -105,10 +104,10 @@ public void shouldAssignCorrectPartitionToConsumer() {
}

@Test
public void shouldNotGetCommandsWhenKsqlServerExceptionWhenBackingUp() {
public void shouldNotGetCommandsWhenCommandTopicCorruptionWhenBackingUp() {
// Given:
when(commandConsumer.poll(any(Duration.class))).thenReturn(consumerRecords);
doNothing().doThrow(new KsqlServerException("error")).when(commandTopicBackup).writeRecord(any());
doNothing().doThrow(new CommandTopicCorruptionException("error")).when(commandTopicBackup).writeRecord(any());

// When:
final Iterable<ConsumerRecord<byte[], byte[]>> newCommands = commandTopic
Expand All @@ -122,7 +121,7 @@ public void shouldNotGetCommandsWhenKsqlServerExceptionWhenBackingUp() {
}

@Test
public void shouldNotGetCommandsWhenKsqlServerExceptionIhBackupInRestore() {
public void shouldNotGetCommandsWhenCommandTopicCorruptionIhBackupInRestore() {
// Given:
when(commandConsumer.poll(any(Duration.class)))
.thenReturn(someConsumerRecords(
Expand All @@ -131,7 +130,7 @@ public void shouldNotGetCommandsWhenKsqlServerExceptionIhBackupInRestore() {
.thenReturn(someConsumerRecords(
record3))
.thenReturn(new ConsumerRecords<>(Collections.emptyMap()));
doNothing().doThrow(new KsqlServerException("error")).when(commandTopicBackup).writeRecord(any());
doNothing().doThrow(new CommandTopicCorruptionException("error")).when(commandTopicBackup).writeRecord(any());

// When:
final List<QueuedCommand> queuedCommandList = commandTopic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.metrics.MetricCollectors;
import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.rest.server.resources.IncomaptibleKsqlCommandVersionException;
import io.confluent.ksql.rest.server.resources.IncompatibleKsqlCommandVersionException;
import io.confluent.ksql.rest.server.state.ServerState;
import io.confluent.ksql.rest.util.ClusterTerminator;
import io.confluent.ksql.rest.util.TerminateCluster;
Expand Down Expand Up @@ -276,7 +276,7 @@ public void shouldProcessPartialListOfCommandsOnDeserializationExceptionInFetch(
public void shouldProcessPartialListOfCommandsOnIncompatibleCommandInRestore() {
// Given:
givenQueuedCommands(queuedCommand1, queuedCommand2, queuedCommand3);
doThrow(new IncomaptibleKsqlCommandVersionException("")).when(incompatibleCommandChecker).accept(queuedCommand3);
doThrow(new IncompatibleKsqlCommandVersionException("")).when(incompatibleCommandChecker).accept(queuedCommand3);

// When:
commandRunner.processPriorCommands();
Expand All @@ -299,7 +299,7 @@ public void shouldProcessPartialListOfCommandsOnIncompatibleCommandInRestore() {
public void shouldProcessPartialListOfCommandsOnIncompatibleCommandInFetch() {
// Given:
givenQueuedCommands(queuedCommand1, queuedCommand2, queuedCommand3);
doThrow(new IncomaptibleKsqlCommandVersionException("")).when(incompatibleCommandChecker).accept(queuedCommand3);
doThrow(new IncompatibleKsqlCommandVersionException("")).when(incompatibleCommandChecker).accept(queuedCommand3);

// When:
commandRunner.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import java.util.Map;
import java.util.Optional;

import io.confluent.ksql.rest.server.resources.IncomaptibleKsqlCommandVersionException;
import io.confluent.ksql.rest.server.resources.IncompatibleKsqlCommandVersionException;
import org.junit.Test;

public class CommandTest {
Expand Down Expand Up @@ -66,7 +66,7 @@ public void shouldThrowExceptionWhenCommandVersionHigher() {
ValueInstantiationException.class,
() -> mapper.readValue(commandStr, Command.class)
);
assertTrue(thrown.getCause() instanceof IncomaptibleKsqlCommandVersionException);
assertTrue(thrown.getCause() instanceof IncompatibleKsqlCommandVersionException);
}

@Test
Expand Down

0 comments on commit d0a5d16

Please sign in to comment.