Skip to content

Commit

Permalink
Handle errors while producing backup restoration messages
Browse files Browse the repository at this point in the history
  • Loading branch information
giuseppelillo committed Feb 14, 2023
1 parent c3d8120 commit a742d84
Showing 1 changed file with 12 additions and 2 deletions.
14 changes: 12 additions & 2 deletions karapace/schema_backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
from kafka.admin import KafkaAdminClient
from kafka.errors import NoBrokersAvailable, NodeNotReadyError, TopicAlreadyExistsError
from kafka.structs import PartitionMetadata
from kafka.errors import NoBrokersAvailable, NodeNotReadyError, TopicAlreadyExistsError, KafkaError

from karapace import constants
from karapace.anonymize_schemas import anonymize_avro
from karapace.config import Config, read_config
Expand Down Expand Up @@ -197,6 +199,7 @@ def __init__(self, config: Config, backup_path: str, topic_option: Optional[str]
self.topic_name = topic_option or self.config["topic_name"]
self.admin_client = None
self.timeout_ms = 1000
self.timeout_kafka_producer = 5

# Schema key formatter
self.key_formatter = None
Expand Down Expand Up @@ -281,8 +284,15 @@ def restore_backup(self) -> None:
def _handle_restore_message(self, producer: KafkaProducer, item: Tuple[str, str]) -> None:
key = self.encode_key(item[0])
value = encode_value(item[1])
producer.send(self.topic_name, key=key, value=value, partition=PARTITION_ZERO)
LOG.debug("Sent kafka msg key: %r, value: %r", key, value)
LOG.debug("Trying to send kafka msg key: %r, value: %r", key, value)
try:
msg = producer.send(self.topic_name, key=key, value=value, partition=PARTITION_ZERO)
producer.flush(timeout=self.timeout_kafka_producer)
metadata = msg.get(timeout=self.timeout_kafka_producer)
except KafkaError as ex:
raise BackupError("Error while producing restored message") from ex
else:
LOG.debug("Sent kafka msg key: %r, value: %r, offset: %r", key, value, metadata.offset)

def _restore_backup_version_1_single_array(self, producer: KafkaProducer, fp: IO) -> None:
raw_msg = fp.read()
Expand Down

0 comments on commit a742d84

Please sign in to comment.