Skip to content

Commit

Permalink
lag reader
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 committed Aug 9, 2024
1 parent bd68cd9 commit 19fede0
Show file tree
Hide file tree
Showing 5 changed files with 185 additions and 135 deletions.
47 changes: 20 additions & 27 deletions serving/source-sink/src/forwarder.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,22 @@
use chrono::Utc;
use metrics::counter;
use tokio::sync::oneshot;
use tokio::task::JoinSet;
use tracing::{info, trace};

use crate::error::{Error, Result};
use crate::metrics::{
FORWARDER_ACK_LATENCY, FORWARDER_ACK_TOTAL, FORWARDER_LATENCY, FORWARDER_READ_BYTES_TOTAL,
FORWARDER_READ_LATENCY, FORWARDER_READ_TOTAL, FORWARDER_TRANSFORMER_LATENCY,
FORWARDER_WRITE_LATENCY, FORWARDER_WRITE_TOTAL, PARTITION_LABEL, PIPELINE_LABEL, REPLICA_LABEL,
VERTEX_LABEL, VERTEX_TYPE_LABEL,
FORWARDER_ACK_TOTAL, FORWARDER_READ_BYTES_TOTAL, FORWARDER_READ_TOTAL, FORWARDER_WRITE_TOTAL,
PARTITION_LABEL, PIPELINE_LABEL, REPLICA_LABEL, VERTEX_LABEL, VERTEX_TYPE_LABEL,
};
use crate::sink::SinkClient;
use crate::source::SourceClient;
use crate::transformer::TransformerClient;
use chrono::Utc;
use metrics::{counter, histogram};
use tokio::sync::oneshot;
use tokio::task::JoinSet;
use tracing::{info, trace};

const SOURCER_SINKER_VERTEX_TYPE: &str = "sourcer-sinker";

/// Forwarder is responsible for reading messages from the source, applying transformation if
/// transformer is present, writing the messages to the sink, and the acknowledging the messages
/// transformer is present, writing the messages to the sink, and then acknowledging the messages
/// back to the source.
pub(crate) struct Forwarder {
source_client: SourceClient,
Expand Down Expand Up @@ -66,12 +65,13 @@ impl Forwarder {

/// run starts the forward-a-chunk loop and exits only after a chunk has been forwarded and ack'ed.
/// this means that, in the happy path scenario a block is always completely processed.
/// this function will return on any error and will cause end up in a non-0 exit code.
/// this function will return on any error and will cause end up in a non-0 exit code.
pub(crate) async fn run(&mut self) -> Result<()> {
let mut messages_count: u64 = 0;
let mut last_forwarded_at = std::time::Instant::now();
loop {
let start_time = std::time::Instant::now();
// TODO: emit latency metrics, metrics-rs histograms has memory leak issues.
let start_time = tokio::time::Instant::now();
// two arms, either shutdown or forward-a-chunk
tokio::select! {
_ = &mut self.shutdown_rx => {
Expand All @@ -81,10 +81,7 @@ impl Forwarder {
result = self.source_client.read_fn(self.batch_size, self.timeout_in_ms) => {
// Read messages from the source
let messages = result?;
info!("Read batch size: {}", messages.len());

histogram!(FORWARDER_READ_LATENCY, &self.common_labels)
.record(start_time.elapsed().as_millis() as f64);
info!("Read batch size: {} and latency - {}ms", messages.len(), start_time.elapsed().as_millis());

messages_count += messages.len() as u64;
let bytes_count = messages.iter().map(|msg| msg.value.len() as u64).sum::<u64>();
Expand All @@ -95,7 +92,7 @@ impl Forwarder {
let offsets = messages.iter().map(|message| message.offset.clone()).collect();
// Apply transformation if transformer is present
let transformed_messages = if let Some(transformer_client) = &self.transformer_client {
let start_time = std::time::Instant::now();
let start_time = tokio::time::Instant::now();
let mut jh = JoinSet::new();
for message in messages {
let mut transformer_client = transformer_client.clone();
Expand All @@ -108,31 +105,29 @@ impl Forwarder {
let result = result?;
results.extend(result);
}
histogram!(FORWARDER_TRANSFORMER_LATENCY, &self.common_labels)
.record(start_time.elapsed().as_millis() as f64);
info!("Transformed latency - {}ms", start_time.elapsed().as_millis());
results
} else {
messages
};

let sink_write_start_time = std::time::Instant::now();
// Write messages to the sink
// TODO: should we retry writing? what if the error is transient?
// we could rely on gRPC retries and say that any error that is bubbled up is worthy of non-0 exit.
// we need to confirm this via FMEA tests.
let start_time = tokio::time::Instant::now();
self.sink_client.sink_fn(transformed_messages).await?;
histogram!(FORWARDER_WRITE_LATENCY, &self.common_labels)
.record(sink_write_start_time.elapsed().as_millis() as f64);
info!("Sink latency - {}ms", start_time.elapsed().as_millis());
counter!(FORWARDER_WRITE_TOTAL, &self.common_labels).increment(messages_count);

// Acknowledge the messages
// TODO: should we retry acking? what if the error is transient?
// we could rely on gRPC retries and say that any error that is bubbled up is worthy of non-0 exit.
// we need to confirm this via FMEA tests.
let ack_start_time = std::time::Instant::now();
let start_time = tokio::time::Instant::now();
self.source_client.ack_fn(offsets).await?;
histogram!(FORWARDER_ACK_LATENCY, &self.common_labels)
.record(ack_start_time.elapsed().as_millis() as f64);
info!("Ack latency - {}ms", start_time.elapsed().as_millis());

counter!(FORWARDER_ACK_TOTAL, &self.common_labels).increment(messages_count);
trace!("Forwarded {} messages", messages_count);
}
Expand All @@ -147,8 +142,6 @@ impl Forwarder {
messages_count = 0;
last_forwarded_at = std::time::Instant::now();
}
histogram!(FORWARDER_LATENCY, &self.common_labels)
.record(start_time.elapsed().as_millis() as f64);
}
Ok(())
}
Expand Down
16 changes: 12 additions & 4 deletions serving/source-sink/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
use std::time::Duration;
use std::{env, fs};

use tokio::signal;
use tokio::sync::oneshot;
use tokio::task::JoinHandle;
use tokio::time::sleep;
use tracing::info;

pub(crate) use crate::error::Error;
use crate::forwarder::Forwarder;
use crate::sink::{SinkClient, SinkConfig};
use crate::source::{SourceClient, SourceConfig};
use crate::transformer::{TransformerClient, TransformerConfig};

pub(crate) use self::error::Result;
pub(crate) use crate::error::Error;

/// SourcerSinker orchestrates data movement from the Source to the Sink via the optional SourceTransformer.
/// The forward-a-chunk executes the following in an infinite loop till a shutdown signal is received:
Expand Down Expand Up @@ -54,6 +55,10 @@ pub async fn run_forwarder(
wait_for_server_info(&source_config.server_info_file).await?;
let mut source_client = SourceClient::connect(source_config).await?;

// start the lag reader to publish lag metrics
let mut lag_reader = metrics::LagReader::new(source_client.clone(), None, None);
lag_reader.start().await;

wait_for_server_info(&sink_config.server_info_file).await?;
let mut sink_client = SinkClient::connect(sink_config).await?;

Expand Down Expand Up @@ -114,6 +119,7 @@ pub async fn run_forwarder(
let _ = tokio::try_join!(forwarder_handle, shutdown_handle)
.map_err(|e| Error::ForwarderError(format!("{:?}", e)))?;

lag_reader.shutdown().await;
info!("Forwarder stopped gracefully");
Ok(())
}
Expand Down Expand Up @@ -202,13 +208,15 @@ async fn shutdown_signal(shutdown_rx: Option<oneshot::Receiver<()>>) {

#[cfg(test)]
mod tests {
use crate::sink::SinkConfig;
use crate::source::SourceConfig;
use std::env;

use numaflow::source::{Message, Offset, SourceReadRequest};
use numaflow::{sink, source};
use std::env;
use tokio::sync::mpsc::Sender;

use crate::sink::SinkConfig;
use crate::source::SourceConfig;

struct SimpleSource;
#[tonic::async_trait]
impl source::Sourcer for SimpleSource {
Expand Down
12 changes: 7 additions & 5 deletions serving/source-sink/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
use log::info;
use sourcer_sinker::sink::SinkConfig;
use sourcer_sinker::source::SourceConfig;
use sourcer_sinker::transformer::TransformerConfig;
use sourcer_sinker::{metrics::start_metrics_server, run_forwarder};
use std::env;
use std::net::SocketAddr;

use log::info;
use tracing::error;
use tracing::level_filters::LevelFilter;
use tracing_subscriber::EnvFilter;

use sourcer_sinker::sink::SinkConfig;
use sourcer_sinker::source::SourceConfig;
use sourcer_sinker::transformer::TransformerConfig;
use sourcer_sinker::{metrics::start_metrics_server, run_forwarder};

#[tokio::main]
async fn main() {
let log_level = env::var("NUMAFLOW_DEBUG").unwrap_or_else(|_| "info".to_string());
Expand Down
7 changes: 4 additions & 3 deletions serving/source-sink/src/message.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use std::collections::HashMap;

use base64::engine::general_purpose::STANDARD as BASE64_STANDARD;
use base64::Engine;
use chrono::{DateTime, Utc};

use crate::error::Error;
use crate::shared::{prost_timestamp_from_utc, utc_from_timestamp};
use crate::sink::proto;
use crate::source::proto::read_response;
use crate::transformer::proto::SourceTransformRequest;
use base64::engine::general_purpose::STANDARD as BASE64_STANDARD;
use base64::Engine;
use chrono::{DateTime, Utc};

/// A message that is sent from the source to the sink.
#[derive(Debug, Clone)]
Expand Down
Loading

0 comments on commit 19fede0

Please sign in to comment.