Skip to content

Commit

Permalink
dekaf: Fix high watermark reporting
Browse files Browse the repository at this point in the history
This was causing an incorrectly low high-watermark to be returned in the first Fetch response.  This is an unexpected behavior and caused the consumer to be incorrectly behind by 1 fetch chunk (the first one). This should fix the problem going forward.
  • Loading branch information
jshearer committed Nov 5, 2024
1 parent 440f0a3 commit 3213777
Showing 1 changed file with 4 additions and 4 deletions.
8 changes: 4 additions & 4 deletions crates/dekaf/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -646,10 +646,6 @@ impl Session {
unreachable!("Must have already determined data-preview status of session")
}
SessionDataPreviewState::NotDataPreview => {
partition_data = partition_data
.with_high_watermark(pending.last_write_head) // Map to kafka cursor.
.with_last_stable_offset(pending.last_write_head);

pending.offset = read.offset;
pending.last_write_head = read.last_write_head;
pending.handle = tokio_util::task::AbortOnDropHandle::new(tokio::spawn(
Expand All @@ -660,6 +656,10 @@ impl Session {
std::time::Instant::now() + timeout,
),
));

partition_data = partition_data
.with_high_watermark(pending.last_write_head) // Map to kafka cursor.
.with_last_stable_offset(pending.last_write_head);
}
SessionDataPreviewState::DataPreview(data_preview_states) => {
let data_preview_state = data_preview_states
Expand Down

0 comments on commit 3213777

Please sign in to comment.