Skip to content

Commit

Permalink
chore: avoid partial acks (#1938)
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
Signed-off-by: Vigith Maurice <[email protected]>
Co-authored-by: Vigith Maurice <[email protected]>
  • Loading branch information
2 people authored and KeranYang committed Aug 19, 2024
1 parent 7d43b52 commit 5f49907
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 26 deletions.
36 changes: 17 additions & 19 deletions rust/monovertex/src/forwarder.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,3 @@
use chrono::Utc;
use metrics::counter;
use std::collections::HashMap;
use tokio::task::JoinSet;
use tokio::time::sleep;
use tokio_util::sync::CancellationToken;
use tracing::info;
use tracing::log::warn;
use crate::config::config;
use crate::error::{Error, Result};
use crate::message::Offset;
Expand All @@ -16,6 +8,14 @@ use crate::metrics::{
use crate::sink::{proto, SinkClient};
use crate::source::SourceClient;
use crate::transformer::TransformerClient;
use chrono::Utc;
use metrics::counter;
use std::collections::HashMap;
use tokio::task::JoinSet;
use tokio::time::sleep;
use tokio_util::sync::CancellationToken;
use tracing::info;
use tracing::log::warn;

const MONO_VERTEX_TYPE: &str = "mono_vertex";

Expand Down Expand Up @@ -77,6 +77,9 @@ impl Forwarder {
let messages = result?;
info!("Read batch size: {} and latency - {}ms", messages.len(), start_time.elapsed().as_millis());

// collect all the offsets as the transformer can drop (via filter) messages
let offsets = messages.iter().map(|msg| msg.offset.clone()).collect::<Vec<Offset>>();

messages_count += messages.len() as u64;
let bytes_count = messages.iter().map(|msg| msg.value.len() as u64).sum::<u64>();
counter!(FORWARDER_READ_TOTAL, &self.common_labels).increment(messages_count);
Expand Down Expand Up @@ -122,18 +125,8 @@ impl Forwarder {
.map(|result| result.id.clone())
.collect();

let successful_offsets: Vec<Offset> = retry_messages.iter()
.filter(|msg| !failed_ids.contains(&msg.id))
.map(|msg| msg.offset.clone())
.collect();


// ack the successful offsets
let n = successful_offsets.len();
self.source_client.ack_fn(successful_offsets).await?;
counter!(FORWARDER_WRITE_TOTAL, &self.common_labels).increment(n as u64);
attempts += 1;

if failed_ids.is_empty() {
break;
} else {
Expand Down Expand Up @@ -161,6 +154,11 @@ impl Forwarder {
)));
}

// Acknowledge the messages back to the source
let start_time = tokio::time::Instant::now();
self.source_client.ack_fn(offsets).await?;
info!("Ack latency - {}ms", start_time.elapsed().as_millis());

counter!(FORWARDER_ACK_TOTAL, &self.common_labels).increment(messages_count);
}
}
Expand Down
10 changes: 7 additions & 3 deletions rust/monovertex/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ pub(crate) use self::error::Result;
use crate::config::config;
pub(crate) use crate::error::Error;
use crate::forwarder::Forwarder;
use crate::metrics::{start_metrics_https_server, MetricsState, LagReaderBuilder};
use crate::metrics::{start_metrics_https_server, LagReaderBuilder, MetricsState};
use crate::sink::{SinkClient, SinkConfig};
use crate::source::{SourceClient, SourceConfig};
use crate::transformer::{TransformerClient, TransformerConfig};
Expand Down Expand Up @@ -182,8 +182,12 @@ pub async fn init(

// start the lag reader to publish lag metrics
let mut lag_reader = LagReaderBuilder::new(source_client.clone())
.lag_checking_interval(Duration::from_secs(config().lag_check_interval_in_secs.into()))
.refresh_interval(Duration::from_secs(config().lag_refresh_interval_in_secs.into()))
.lag_checking_interval(Duration::from_secs(
config().lag_check_interval_in_secs.into(),
))
.refresh_interval(Duration::from_secs(
config().lag_refresh_interval_in_secs.into(),
))
.build();
lag_reader.start().await;

Expand Down
11 changes: 7 additions & 4 deletions rust/monovertex/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ pub(crate) async fn start_metrics_https_server(

/// router for metrics and k8s health endpoints
fn metrics_router(recorder_handle: PrometheusHandle, metrics_state: MetricsState) -> Router {
Router::new()
Router::new()
.route("/metrics", get(move || ready(recorder_handle.render())))
.route("/livez", get(livez))
.route("/readyz", get(readyz))
Expand Down Expand Up @@ -197,7 +197,6 @@ pub(crate) struct LagReader {
pending_stats: Arc<Mutex<Vec<TimestampedPending>>>,
}


/// LagReaderBuilder is used to build a `LagReader` instance.
pub(crate) struct LagReaderBuilder {
source_client: SourceClient,
Expand Down Expand Up @@ -227,8 +226,12 @@ impl LagReaderBuilder {
pub(crate) fn build(self) -> LagReader {
LagReader {
source_client: self.source_client,
lag_checking_interval: self.lag_checking_interval.unwrap_or_else(|| Duration::from_secs(3)),
refresh_interval: self.refresh_interval.unwrap_or_else(|| Duration::from_secs(5)),
lag_checking_interval: self
.lag_checking_interval
.unwrap_or_else(|| Duration::from_secs(3)),
refresh_interval: self
.refresh_interval
.unwrap_or_else(|| Duration::from_secs(5)),
buildup_handle: None,
expose_handle: None,
pending_stats: Arc::new(Mutex::new(Vec::with_capacity(MAX_PENDING_STATS))),
Expand Down

0 comments on commit 5f49907

Please sign in to comment.