Skip to content

Commit

Permalink
fix: overwrite entry_id if entry id is less than start_offset
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Oct 17, 2024
1 parent 613e07a commit 84bf8b1
Showing 1 changed file with 17 additions and 1 deletion.
18 changes: 17 additions & 1 deletion src/log-store/src/kafka/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ impl LogStore for KafkaLogStore {
async fn read(
&self,
provider: &Provider,
entry_id: EntryId,
mut entry_id: EntryId,
index: Option<WalIndex>,
) -> Result<SendableEntryStream<'static, Entry, Self::Error>> {
let provider = provider
Expand All @@ -225,6 +225,22 @@ impl LogStore for KafkaLogStore {
.client()
.clone();

let start_offset = client
.get_offset(OffsetAt::Earliest)
.await
.context(GetOffsetSnafu {
topic: &provider.topic,
})?;

if entry_id as i64 <= start_offset {
warn!(
"The entry_id: {} is less than start_offset: {}, topic: {}. Overwriting entry_id with start_offset",
entry_id, start_offset, &provider.topic
);

entry_id = start_offset as u64;
}

// Gets the offset of the latest record in the topic. Actually, it's the latest record of the single partition in the topic.
// The read operation terminates when this record is consumed.
// Warning: the `get_offset` returns the end offset of the latest record. For our usage, it should be decremented.
Expand Down

0 comments on commit 84bf8b1

Please sign in to comment.