Skip to content

Commit

Permalink
Handle errors while producing backup restoration messages
Browse files Browse the repository at this point in the history
  • Loading branch information
giuseppelillo committed Feb 14, 2023
1 parent c3d8120 commit a543250
Showing 1 changed file with 11 additions and 3 deletions.
14 changes: 11 additions & 3 deletions karapace/schema_backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from enum import Enum
from kafka import KafkaConsumer, KafkaProducer
from kafka.admin import KafkaAdminClient
from kafka.errors import NoBrokersAvailable, NodeNotReadyError, TopicAlreadyExistsError
from kafka.errors import KafkaError, NoBrokersAvailable, NodeNotReadyError, TopicAlreadyExistsError
from kafka.structs import PartitionMetadata
from karapace import constants
from karapace.anonymize_schemas import anonymize_avro
Expand Down Expand Up @@ -197,6 +197,7 @@ def __init__(self, config: Config, backup_path: str, topic_option: Optional[str]
self.topic_name = topic_option or self.config["topic_name"]
self.admin_client = None
self.timeout_ms = 1000
self.timeout_kafka_producer = 5

# Schema key formatter
self.key_formatter = None
Expand Down Expand Up @@ -281,8 +282,15 @@ def restore_backup(self) -> None:
def _handle_restore_message(self, producer: KafkaProducer, item: Tuple[str, str]) -> None:
key = self.encode_key(item[0])
value = encode_value(item[1])
producer.send(self.topic_name, key=key, value=value, partition=PARTITION_ZERO)
LOG.debug("Sent kafka msg key: %r, value: %r", key, value)
LOG.debug("Trying to send kafka msg key: %r, value: %r", key, value)
try:
msg = producer.send(self.topic_name, key=key, value=value, partition=PARTITION_ZERO)
producer.flush(timeout=self.timeout_kafka_producer)
metadata = msg.get(timeout=self.timeout_kafka_producer)
except KafkaError as ex:
raise BackupError("Error while producing restored message") from ex
else:
LOG.debug("Sent kafka msg key: %r, value: %r, offset: %r", key, value, metadata.offset)

def _restore_backup_version_1_single_array(self, producer: KafkaProducer, fp: IO) -> None:
raw_msg = fp.read()
Expand Down

0 comments on commit a543250

Please sign in to comment.