-
Notifications
You must be signed in to change notification settings - Fork 69
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
The consumption logic is currently counting how many records it received in a single batch returned from poll, and when it is empty it concludes that the backup is successfully finished. However, there are meany reasons why a batch returned by poll is empty, especially with timeouts applied to it. A consequence of this is that a backup created at $t_1$ may contain more records than a backup created at $t_2$ (without any external changes to the topic content, e.g. compaction). To fix this we have to use offset watermarks. With them we can determine if we are done, or not. The patch now exposes the poll timeout, so that users can increase it in case they encounter issues, and it uses a longer default poll timeout to ensure that users are not going to see errors right away (increased from 1 second to 1 minute).
- Loading branch information
1 parent
8624bc6
commit 6425450
Showing
7 changed files
with
201 additions
and
41 deletions.
There are no files selected for viewing
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
""" | ||
Copyright (c) 2023 Aiven Ltd | ||
See LICENSE for details | ||
""" | ||
from kafka.structs import TopicPartition | ||
|
||
|
||
class BackupError(Exception): | ||
"""Baseclass for all backup errors.""" | ||
|
||
|
||
class StaleConsumerError(BackupError, RuntimeError): | ||
"""Raised when the backup consumer does not make any progress and has not reached the last record in the topic.""" | ||
|
||
__slots__ = ("__topic_partition", "__start_offset", "__end_offset", "__last_offset") | ||
|
||
def __init__( | ||
self, | ||
topic_partition: TopicPartition, | ||
start_offset: int, | ||
end_offset: int, | ||
current_offset: int, | ||
) -> None: | ||
super().__init__( | ||
f"{topic_partition.topic}:{topic_partition.partition}#{current_offset:,} ({start_offset:,},{end_offset:,})" | ||
) | ||
self.__topic_partition = topic_partition | ||
self.__start_offset = start_offset | ||
self.__end_offset = end_offset | ||
self.__last_offset = current_offset | ||
|
||
@property | ||
def topic_partition(self) -> TopicPartition: | ||
"""Gets the topic and partition that went stale during consumption.""" | ||
return self.__topic_partition | ||
|
||
@property | ||
def topic(self) -> str: | ||
"""Gets the topic that went stale during consumption.""" | ||
return self.__topic_partition.topic | ||
|
||
@property | ||
def partition(self) -> int: | ||
"""Gets the partition that went stale during consumption.""" | ||
return self.__topic_partition.partition | ||
|
||
@property | ||
def start_offset(self) -> int: | ||
"""Gets the start offset of the topic and partition as determined at the start of the backup creation.""" | ||
return self.__start_offset | ||
|
||
@property | ||
def end_offset(self) -> int: | ||
"""Gets the end offset of the topic and partition as determined at the start of the backup creation. | ||
This is the offset of the last written record in the topic and partition, not the high watermark. | ||
""" | ||
return self.__end_offset | ||
|
||
@property | ||
def last_offset(self) -> int: | ||
"""Gets the last offset of the topic and partition that was successfully consumed.""" | ||
return self.__last_offset |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters