From 5f49907197e3b7be4f13cb8711d328fd504f1f8d Mon Sep 17 00:00:00 2001 From: Yashash H L Date: Wed, 14 Aug 2024 09:54:45 +0530 Subject: [PATCH] chore: avoid partial acks (#1938) Signed-off-by: Yashash H L Signed-off-by: Vigith Maurice Co-authored-by: Vigith Maurice --- rust/monovertex/src/forwarder.rs | 36 +++++++++++++++----------------- rust/monovertex/src/lib.rs | 10 ++++++--- rust/monovertex/src/metrics.rs | 11 ++++++---- 3 files changed, 31 insertions(+), 26 deletions(-) diff --git a/rust/monovertex/src/forwarder.rs b/rust/monovertex/src/forwarder.rs index 014789b7b1..e2dcd6b3d7 100644 --- a/rust/monovertex/src/forwarder.rs +++ b/rust/monovertex/src/forwarder.rs @@ -1,11 +1,3 @@ -use chrono::Utc; -use metrics::counter; -use std::collections::HashMap; -use tokio::task::JoinSet; -use tokio::time::sleep; -use tokio_util::sync::CancellationToken; -use tracing::info; -use tracing::log::warn; use crate::config::config; use crate::error::{Error, Result}; use crate::message::Offset; @@ -16,6 +8,14 @@ use crate::metrics::{ use crate::sink::{proto, SinkClient}; use crate::source::SourceClient; use crate::transformer::TransformerClient; +use chrono::Utc; +use metrics::counter; +use std::collections::HashMap; +use tokio::task::JoinSet; +use tokio::time::sleep; +use tokio_util::sync::CancellationToken; +use tracing::info; +use tracing::log::warn; const MONO_VERTEX_TYPE: &str = "mono_vertex"; @@ -77,6 +77,9 @@ impl Forwarder { let messages = result?; info!("Read batch size: {} and latency - {}ms", messages.len(), start_time.elapsed().as_millis()); + // collect all the offsets as the transformer can drop (via filter) messages + let offsets = messages.iter().map(|msg| msg.offset.clone()).collect::>(); + messages_count += messages.len() as u64; let bytes_count = messages.iter().map(|msg| msg.value.len() as u64).sum::(); counter!(FORWARDER_READ_TOTAL, &self.common_labels).increment(messages_count); @@ -122,18 +125,8 @@ impl Forwarder { .map(|result| result.id.clone()) .collect(); - let successful_offsets: Vec = retry_messages.iter() - .filter(|msg| !failed_ids.contains(&msg.id)) - .map(|msg| msg.offset.clone()) - .collect(); - - - // ack the successful offsets - let n = successful_offsets.len(); - self.source_client.ack_fn(successful_offsets).await?; - counter!(FORWARDER_WRITE_TOTAL, &self.common_labels).increment(n as u64); attempts += 1; - + if failed_ids.is_empty() { break; } else { @@ -161,6 +154,11 @@ impl Forwarder { ))); } + // Acknowledge the messages back to the source + let start_time = tokio::time::Instant::now(); + self.source_client.ack_fn(offsets).await?; + info!("Ack latency - {}ms", start_time.elapsed().as_millis()); + counter!(FORWARDER_ACK_TOTAL, &self.common_labels).increment(messages_count); } } diff --git a/rust/monovertex/src/lib.rs b/rust/monovertex/src/lib.rs index c4d6bf21bf..823ffc6870 100644 --- a/rust/monovertex/src/lib.rs +++ b/rust/monovertex/src/lib.rs @@ -2,7 +2,7 @@ pub(crate) use self::error::Result; use crate::config::config; pub(crate) use crate::error::Error; use crate::forwarder::Forwarder; -use crate::metrics::{start_metrics_https_server, MetricsState, LagReaderBuilder}; +use crate::metrics::{start_metrics_https_server, LagReaderBuilder, MetricsState}; use crate::sink::{SinkClient, SinkConfig}; use crate::source::{SourceClient, SourceConfig}; use crate::transformer::{TransformerClient, TransformerConfig}; @@ -182,8 +182,12 @@ pub async fn init( // start the lag reader to publish lag metrics let mut lag_reader = LagReaderBuilder::new(source_client.clone()) - .lag_checking_interval(Duration::from_secs(config().lag_check_interval_in_secs.into())) - .refresh_interval(Duration::from_secs(config().lag_refresh_interval_in_secs.into())) + .lag_checking_interval(Duration::from_secs( + config().lag_check_interval_in_secs.into(), + )) + .refresh_interval(Duration::from_secs( + config().lag_refresh_interval_in_secs.into(), + )) .build(); lag_reader.start().await; diff --git a/rust/monovertex/src/metrics.rs b/rust/monovertex/src/metrics.rs index 69782e2c32..f3d5421dbc 100644 --- a/rust/monovertex/src/metrics.rs +++ b/rust/monovertex/src/metrics.rs @@ -107,7 +107,7 @@ pub(crate) async fn start_metrics_https_server( /// router for metrics and k8s health endpoints fn metrics_router(recorder_handle: PrometheusHandle, metrics_state: MetricsState) -> Router { - Router::new() + Router::new() .route("/metrics", get(move || ready(recorder_handle.render()))) .route("/livez", get(livez)) .route("/readyz", get(readyz)) @@ -197,7 +197,6 @@ pub(crate) struct LagReader { pending_stats: Arc>>, } - /// LagReaderBuilder is used to build a `LagReader` instance. pub(crate) struct LagReaderBuilder { source_client: SourceClient, @@ -227,8 +226,12 @@ impl LagReaderBuilder { pub(crate) fn build(self) -> LagReader { LagReader { source_client: self.source_client, - lag_checking_interval: self.lag_checking_interval.unwrap_or_else(|| Duration::from_secs(3)), - refresh_interval: self.refresh_interval.unwrap_or_else(|| Duration::from_secs(5)), + lag_checking_interval: self + .lag_checking_interval + .unwrap_or_else(|| Duration::from_secs(3)), + refresh_interval: self + .refresh_interval + .unwrap_or_else(|| Duration::from_secs(5)), buildup_handle: None, expose_handle: None, pending_stats: Arc::new(Mutex::new(Vec::with_capacity(MAX_PENDING_STATS))),