Skip to content

Commit

Permalink
minor fixes
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 committed Nov 4, 2024
1 parent 426141a commit f87e269
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 61 deletions.
2 changes: 0 additions & 2 deletions rust/numaflow-core/src/config/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,8 +353,6 @@ mod tests {
reader_config: BufferReaderConfig {
partitions: 1,
streams: vec![("default-simple-pipeline-out-0".into(), 0)],
batch_size: 500,
read_timeout: Duration::from_secs(1),
wip_ack_interval: Duration::from_secs(1),
},
partitions: 0,
Expand Down
8 changes: 0 additions & 8 deletions rust/numaflow-core/src/config/pipeline/isb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,13 @@ use std::fmt;
use std::time::Duration;

const DEFAULT_PARTITION_IDX: u16 = 0;
const DEFAULT_BATCH_SIZE: usize = 500;
const DEFAULT_PARTITIONS: u16 = 1;
const DEFAULT_MAX_LENGTH: usize = 30000;
const DEFAULT_USAGE_LIMIT: f64 = 0.8;
const DEFAULT_REFRESH_INTERVAL_SECS: u64 = 1;
const DEFAULT_BUFFER_FULL_STRATEGY: BufferFullStrategy = BufferFullStrategy::RetryUntilSuccess;
const DEFAULT_RETRY_INTERVAL_MILLIS: u64 = 10;
const DEFAULT_WIP_ACK_INTERVAL_MILLIS: u64 = 1000;
const DEFAULT_READ_TIMEOUT_MILLIS: u64 = 1000;

pub(crate) mod jetstream {
const DEFAULT_URL: &str = "localhost:4222";
Expand Down Expand Up @@ -78,8 +76,6 @@ impl fmt::Display for BufferFullStrategy {
pub(crate) struct BufferReaderConfig {
pub(crate) partitions: u16,
pub(crate) streams: Vec<(String, u16)>,
pub(crate) batch_size: usize,
pub(crate) read_timeout: Duration,
pub(crate) wip_ack_interval: Duration,
}

Expand All @@ -88,9 +84,7 @@ impl Default for BufferReaderConfig {
BufferReaderConfig {
partitions: DEFAULT_PARTITIONS,
streams: vec![("default-0".to_string(), DEFAULT_PARTITION_IDX)],
batch_size: DEFAULT_BATCH_SIZE,
wip_ack_interval: Duration::from_millis(DEFAULT_WIP_ACK_INTERVAL_MILLIS),
read_timeout: Duration::from_millis(DEFAULT_READ_TIMEOUT_MILLIS),
}
}
}
Expand Down Expand Up @@ -145,9 +139,7 @@ mod tests {
let expected = BufferReaderConfig {
partitions: DEFAULT_PARTITIONS,
streams: vec![("default-0".to_string(), DEFAULT_PARTITION_IDX)],
batch_size: DEFAULT_BATCH_SIZE,
wip_ack_interval: Duration::from_millis(DEFAULT_WIP_ACK_INTERVAL_MILLIS),
read_timeout: Duration::from_millis(DEFAULT_READ_TIMEOUT_MILLIS),
};
let config = BufferReaderConfig::default();
assert_eq!(config, expected);
Expand Down
32 changes: 17 additions & 15 deletions rust/numaflow-core/src/pipeline.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::collections::HashMap;

use async_nats::jetstream;
use async_nats::jetstream::Context;
use async_nats::{jetstream, ConnectOptions};
use futures::future::try_join_all;
use numaflow_pb::clients::sink::sink_client::SinkClient;
use numaflow_pb::clients::source::source_client::SourceClient;
use numaflow_pb::clients::sourcetransformer::source_transform_client::SourceTransformClient;
use std::collections::HashMap;
use std::time::Duration;
use tokio_util::sync::CancellationToken;
use tonic::transport::Channel;

Expand Down Expand Up @@ -289,17 +289,21 @@ async fn create_transformer(

/// Creates a jetstream context based on the provided configuration
async fn create_js_context(config: pipeline::isb::jetstream::ClientConfig) -> Result<Context> {
let js_client = match (config.user, config.password) {
(Some(user), Some(password)) => {
async_nats::connect_with_options(
config.url,
async_nats::ConnectOptions::with_user_and_password(user, password),
)
.await
}
_ => async_nats::connect(config.url).await,
let mut opts = ConnectOptions::new()
.max_reconnects(None) // -1 for unlimited reconnects
.ping_interval(Duration::from_secs(3))
.max_reconnects(None)
.ping_interval(Duration::from_secs(3))
.retry_on_initial_connect();

if let (Some(user), Some(password)) = (config.user, config.password) {
opts = opts.user_and_password(user, password);
}
.map_err(|e| error::Error::Connection(e.to_string()))?;

let js_client = async_nats::connect_with_options(&config.url, opts)
.await
.map_err(|e| error::Error::Connection(e.to_string()))?;

Ok(jetstream::new(js_client))
}

Expand Down Expand Up @@ -562,8 +566,6 @@ mod tests {
.enumerate()
.map(|(i, key)| (key.to_string(), i as u16))
.collect(),
batch_size: 500,
read_timeout: Duration::from_secs(1),
wip_ack_interval: Duration::from_secs(1),
},
partitions: 0,
Expand Down
6 changes: 6 additions & 0 deletions rust/numaflow-core/src/pipeline/forwarder/source_forwarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ impl Forwarder {

/// Writes messages to the jetstream, it writes to all the downstream buffers.
async fn write_to_jetstream(&mut self, messages: Vec<Message>) -> Result<(), Error> {
let start_time = tokio::time::Instant::now();
if messages.is_empty() {
return Ok(());
}
Expand All @@ -186,6 +187,11 @@ impl Forwarder {
.await
.map_err(|e| Error::Forwarder(format!("Failed to write to jetstream {:?}", e)))??;
}
debug!(
"Wrote {} messages to jetstream in {}ms",
messages.len(),
start_time.elapsed().as_millis()
);
Ok(())
}
}
80 changes: 46 additions & 34 deletions rust/numaflow-core/src/pipeline/isb/jetstream/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ use std::time::Duration;
use async_nats::jetstream::{
consumer::PullConsumer, AckKind, Context, Message as JetstreamMessage,
};
use log::info;
use tokio::sync::mpsc::Receiver;
use tokio::sync::{mpsc, oneshot};
use tokio::task::JoinHandle;
use tokio::time::{self, Instant};
use tokio_stream::StreamExt;
use tokio_util::sync::CancellationToken;
use tracing::{error, warn};
use tracing::{debug, error, warn};

use crate::config::pipeline::isb::BufferReaderConfig;
use crate::config::pipeline::PipelineConfig;
Expand Down Expand Up @@ -72,7 +73,7 @@ impl JetstreamReader {
cancel_token: CancellationToken,
pipeline_config: &PipelineConfig,
) -> Result<(Receiver<ReadMessage>, JoinHandle<Result<()>>)> {
let (messages_tx, messages_rx) = mpsc::channel(2 * self.config.batch_size);
let (messages_tx, messages_rx) = mpsc::channel(2 * pipeline_config.batch_size);

let handle: JoinHandle<Result<()>> = tokio::spawn({
let this = self.clone();
Expand Down Expand Up @@ -104,41 +105,44 @@ impl JetstreamReader {
.messages()
.await
.unwrap()
.chunks_timeout(this.config.batch_size, this.config.read_timeout);
.chunks_timeout(pipeline_config.batch_size, pipeline_config.read_timeout);

tokio::pin!(chunk_stream);

// The .next() call will not return if there is no data even if read_timeout is
// reached.
let mut total_messages = 0;
let mut chunk_time = Instant::now();
let mut start_time = Instant::now();
while let Some(messages) = chunk_stream.next().await {
debug!(
"Received {} messages from Jetstream with latency={:?}",
messages.len(),
chunk_time.elapsed()
);
total_messages += messages.len();
for message in messages {
let jetstream_message = match message {
Ok(message) => message,
Err(e) => {
error!(?e, "Failed to fetch messages from the Jetstream");
continue;
}
};

let msg_info = match jetstream_message.info() {
Ok(info) => info,
Err(e) => {
error!(?e, "Failed to get message info from Jetstream");
continue;
}
};
let jetstream_message = message.map_err(|e| {
Error::ISB(format!(
"Error while fetching message from Jetstream: {:?}",
e
))
})?;

let msg_info = jetstream_message.info().map_err(|e| {
Error::ISB(format!(
"Error while fetching message info from Jetstream: {:?}",
e
))
})?;

let mut message: Message =
match jetstream_message.payload.clone().try_into() {
Ok(message) => message,
Err(e) => {
error!(
?e,
"Failed to parse message payload received from Jetstream"
);
continue;
}
};
jetstream_message.payload.clone().try_into().map_err(|e| {
Error::ISB(format!(
"Error while converting Jetstream message to Message: {:?}",
e
))
})?;

message.offset = Some(Offset::Int(IntOffset::new(
msg_info.stream_sequence,
Expand All @@ -158,21 +162,31 @@ impl JetstreamReader {
ack: ack_tx,
};

if messages_tx.send(read_message).await.is_err() {
error!("Failed to send message to the channel");
return Ok(());
}
messages_tx.send(read_message).await.map_err(|e| {
Error::ISB(format!("Error while sending message to channel: {:?}", e))
})?;

forward_pipeline_metrics()
.forwarder
.data_read
.get_or_create(labels)
.inc();

if start_time.elapsed() >= Duration::from_millis(1000) {
info!(
"Total messages read from Jetstream in {}ms: {}",
start_time.elapsed().as_millis(),
total_messages
);
start_time = Instant::now();
total_messages = 0;
}
}
if cancel_token.is_cancelled() {
warn!("Cancellation token is cancelled. Exiting JetstreamReader");
break;
}
chunk_time = Instant::now();
}
Ok(())
}
Expand Down Expand Up @@ -279,8 +293,6 @@ mod tests {
let buf_reader_config = BufferReaderConfig {
partitions: 0,
streams: vec![],
batch_size: 2,
read_timeout: Duration::from_millis(1000),
wip_ack_interval: Duration::from_millis(5),
};
let js_reader = JetstreamReader::new(
Expand Down
5 changes: 3 additions & 2 deletions rust/numaflow-core/src/pipeline/isb/jetstream/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use tokio::sync::mpsc::Receiver;
use tokio::sync::{mpsc, oneshot};
use tokio::time::sleep;
use tokio_util::sync::CancellationToken;
use tracing::{error, info, warn};
use tracing::{debug, error, info, warn};

use crate::config::pipeline::isb::BufferWriterConfig;
use crate::error::Error;
Expand Down Expand Up @@ -207,7 +207,7 @@ impl JetstreamWriter {
/// an error it means it is fatal non-retryable error.
pub(super) async fn blocking_write(&self, payload: Vec<u8>) -> Result<PublishAck> {
let js_ctx = self.js_ctx.clone();

let start_time = tokio::time::Instant::now();
loop {
match js_ctx
.publish(self.stream_name.clone(), Bytes::from(payload.clone()))
Expand All @@ -221,6 +221,7 @@ impl JetstreamWriter {
// same as the previous message offset
warn!("Duplicate message detected, ignoring {:?}", ack);
}
debug!("Blocking write successful in {:?}", start_time.elapsed());
return Ok(ack);
}
Err(e) => {
Expand Down

0 comments on commit f87e269

Please sign in to comment.