diff --git a/rust/numaflow-core/src/config.rs b/rust/numaflow-core/src/config.rs index 15cf371c6..feaaa4146 100644 --- a/rust/numaflow-core/src/config.rs +++ b/rust/numaflow-core/src/config.rs @@ -43,7 +43,7 @@ pub mod jetstream { const DEFAULT_USAGE_LIMIT: f64 = 0.8; const DEFAULT_REFRESH_INTERVAL_SECS: u64 = 1; const DEFAULT_BUFFER_FULL_STRATEGY: BufferFullStrategy = BufferFullStrategy::RetryUntilSuccess; - const DEFAULT_RETRY_TIMEOUT_MILLIS: u64 = 10; + const DEFAULT_RETRY_INTERVAL_MILLIS: u64 = 10; #[derive(Debug, Clone)] pub(crate) struct StreamWriterConfig { @@ -53,7 +53,7 @@ pub mod jetstream { pub refresh_interval: Duration, pub usage_limit: f64, pub buffer_full_strategy: BufferFullStrategy, - pub retry_timeout: Duration, + pub retry_interval: Duration, } impl Default for StreamWriterConfig { @@ -65,7 +65,7 @@ pub mod jetstream { usage_limit: DEFAULT_USAGE_LIMIT, refresh_interval: Duration::from_secs(DEFAULT_REFRESH_INTERVAL_SECS), buffer_full_strategy: DEFAULT_BUFFER_FULL_STRATEGY, - retry_timeout: Duration::from_millis(DEFAULT_RETRY_TIMEOUT_MILLIS), + retry_interval: Duration::from_millis(DEFAULT_RETRY_INTERVAL_MILLIS), } } } diff --git a/rust/numaflow-core/src/message.rs b/rust/numaflow-core/src/message.rs index f2af2e873..a4cab12ed 100644 --- a/rust/numaflow-core/src/message.rs +++ b/rust/numaflow-core/src/message.rs @@ -21,6 +21,7 @@ use crate::Result; const NUMAFLOW_MONO_VERTEX_NAME: &str = "NUMAFLOW_MONO_VERTEX_NAME"; const NUMAFLOW_VERTEX_NAME: &str = "NUMAFLOW_VERTEX_NAME"; +const NUMAFLOW_REPLICA: &str = "NUMAFLOW_REPLICA"; static VERTEX_NAME: OnceLock = OnceLock::new(); @@ -32,6 +33,18 @@ pub(crate) fn get_vertex_name() -> &'static str { }) } +static VERTEX_REPLICA: OnceLock = OnceLock::new(); + +// fetch the vertex replica information from the environment variable +pub(crate) fn get_vertex_replica() -> &'static u16 { + VERTEX_REPLICA.get_or_init(|| { + env::var(NUMAFLOW_REPLICA) + .unwrap_or_default() + .parse() + .unwrap_or_default() + }) +} + /// A message that is sent from the source to the sink. #[derive(Debug, Clone, Serialize, Deserialize)] pub(crate) struct Message { @@ -39,8 +52,10 @@ pub(crate) struct Message { pub(crate) keys: Vec, /// actual payload of the message pub(crate) value: Vec, - /// offset of the message - pub(crate) offset: Offset, + /// offset of the message, it is optional because offset is only + /// available when we read the message, and we don't persist the + /// offset in the ISB. + pub(crate) offset: Option, /// event time of the message pub(crate) event_time: DateTime, /// id of the message @@ -51,16 +66,81 @@ pub(crate) struct Message { /// Offset of the message which will be used to acknowledge the message. #[derive(Debug, Clone, Serialize, Deserialize)] -pub(crate) struct Offset { - /// unique identifier of the message - pub(crate) offset: String, - /// partition id of the message - pub(crate) partition_id: i32, +pub(crate) enum Offset { + Int(IntOffset), + String(StringOffset), } impl fmt::Display for Offset { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{}-{}", self.offset, self.partition_id) + match self { + Offset::Int(offset) => write!(f, "{}", offset), + Offset::String(offset) => write!(f, "{}", offset), + } + } +} + +/// IntOffset is integer based offset enum type. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct IntOffset { + offset: u64, + partition_idx: u16, +} + +impl IntOffset { + pub fn new(seq: u64, partition_idx: u16) -> Self { + Self { + offset: seq, + partition_idx, + } + } +} + +impl IntOffset { + fn sequence(&self) -> Result { + Ok(self.offset) + } + + fn partition_idx(&self) -> u16 { + self.partition_idx + } +} + +impl fmt::Display for IntOffset { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}-{}", self.offset, self.partition_idx) + } +} + +/// StringOffset is string based offset enum type. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct StringOffset { + offset: String, + partition_idx: u16, +} + +impl StringOffset { + pub fn new(seq: String, partition_idx: u16) -> Self { + Self { + offset: seq, + partition_idx, + } + } +} + +impl StringOffset { + fn sequence(&self) -> Result { + Ok(self.offset.parse().unwrap()) + } + + fn partition_idx(&self) -> u16 { + self.partition_idx + } +} + +impl fmt::Display for StringOffset { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}-{}", self.offset, self.partition_idx) } } @@ -81,24 +161,49 @@ impl MessageID { } } +impl From for MessageID { + fn from(id: numaflow_pb::objects::isb::MessageId) -> Self { + Self { + vertex_name: id.vertex_name, + offset: id.offset, + index: id.index, + } + } +} + +impl From for numaflow_pb::objects::isb::MessageId { + fn from(id: MessageID) -> Self { + Self { + vertex_name: id.vertex_name, + offset: id.offset, + index: id.index, + } + } +} + impl fmt::Display for MessageID { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "{}-{}-{}", self.vertex_name, self.offset, self.index) } } -impl From for AckRequest { - fn from(offset: Offset) -> Self { - Self { - request: Some(numaflow_pb::clients::source::ack_request::Request { - offset: Some(numaflow_pb::clients::source::Offset { - offset: BASE64_STANDARD - .decode(offset.offset) - .expect("we control the encoding, so this should never fail"), - partition_id: offset.partition_id, +impl TryFrom for AckRequest { + type Error = Error; + + fn try_from(value: Offset) -> std::result::Result { + match value { + Offset::Int(_) => Err(Error::Source("IntOffset not supported".to_string())), + Offset::String(o) => Ok(Self { + request: Some(numaflow_pb::clients::source::ack_request::Request { + offset: Some(numaflow_pb::clients::source::Offset { + offset: BASE64_STANDARD + .decode(o.offset) + .expect("we control the encoding, so this should never fail"), + partition_id: o.partition_idx as i32, + }), }), + handshake: None, }), - handshake: None, } } } @@ -114,11 +219,7 @@ impl TryFrom for Vec { is_late: false, // Set this according to your logic }), kind: numaflow_pb::objects::isb::MessageKind::Data as i32, - id: Some(numaflow_pb::objects::isb::MessageId { - vertex_name: get_vertex_name().to_string(), - offset: message.offset.to_string(), - index: 0, - }), + id: Some(message.id.into()), keys: message.keys.clone(), headers: message.headers.clone(), }), @@ -156,16 +257,9 @@ impl TryFrom> for Message { Ok(Message { keys: header.keys, value: body.payload, - offset: Offset { - offset: id.offset.clone(), - partition_id: 0, // Set this according to your logic - }, + offset: None, event_time: utc_from_timestamp(message_info.event_time), - id: MessageID { - vertex_name: id.vertex_name, - offset: id.offset, - index: id.index, - }, + id: id.into(), headers: header.headers, }) } @@ -196,21 +290,21 @@ impl TryFrom for Message { fn try_from(result: read_response::Result) -> Result { let source_offset = match result.offset { - Some(o) => Offset { + Some(o) => Offset::String(StringOffset { offset: BASE64_STANDARD.encode(o.offset), - partition_id: o.partition_id, - }, + partition_idx: o.partition_id as u16, + }), None => return Err(Error::Source("Offset not found".to_string())), }; Ok(Message { keys: result.keys, value: result.payload, - offset: source_offset.clone(), + offset: Some(source_offset.clone()), event_time: utc_from_timestamp(result.event_time), id: MessageID { vertex_name: get_vertex_name().to_string(), - offset: source_offset.offset, + offset: source_offset.to_string(), index: 0, }, headers: result.headers, @@ -311,10 +405,10 @@ mod tests { #[test] fn test_offset_display() { - let offset = Offset { + let offset = Offset::String(StringOffset { offset: "123".to_string(), - partition_id: 1, - }; + partition_idx: 1, + }); assert_eq!(format!("{}", offset), "123-1"); } @@ -330,11 +424,11 @@ mod tests { #[test] fn test_offset_to_ack_request() { - let offset = Offset { + let offset = Offset::String(StringOffset { offset: BASE64_STANDARD.encode("123"), - partition_id: 1, - }; - let ack_request: AckRequest = offset.into(); + partition_idx: 1, + }); + let ack_request: AckRequest = offset.try_into().unwrap(); assert_eq!(ack_request.request.unwrap().offset.unwrap().partition_id, 1); } @@ -343,10 +437,10 @@ mod tests { let message = Message { keys: vec!["key1".to_string()], value: vec![1, 2, 3], - offset: Offset { + offset: Some(Offset::String(StringOffset { offset: "123".to_string(), - partition_id: 0, - }, + partition_idx: 0, + })), event_time: Utc.timestamp_opt(1627846261, 0).unwrap(), id: MessageID { vertex_name: "vertex".to_string(), @@ -366,11 +460,7 @@ mod tests { is_late: false, }), kind: numaflow_pb::objects::isb::MessageKind::Data as i32, - id: Some(MessageId { - vertex_name: get_vertex_name().to_string(), - offset: message.offset.to_string(), - index: 0, - }), + id: Some(message.id.into()), keys: message.keys.clone(), headers: message.headers.clone(), }), @@ -415,7 +505,6 @@ mod tests { let message = result.unwrap(); assert_eq!(message.keys, vec!["key1".to_string()]); assert_eq!(message.value, vec![1, 2, 3]); - assert_eq!(message.offset.offset, "123"); assert_eq!( message.event_time, Utc.timestamp_opt(1627846261, 0).unwrap() @@ -427,10 +516,10 @@ mod tests { let message = Message { keys: vec!["key1".to_string()], value: vec![1, 2, 3], - offset: Offset { + offset: Some(Offset::String(StringOffset { offset: "123".to_string(), - partition_id: 0, - }, + partition_idx: 0, + })), event_time: Utc.timestamp_opt(1627846261, 0).unwrap(), id: MessageID { vertex_name: "vertex".to_string(), @@ -476,10 +565,10 @@ mod tests { let message = Message { keys: vec!["key1".to_string()], value: vec![1, 2, 3], - offset: Offset { + offset: Some(Offset::String(StringOffset { offset: "123".to_string(), - partition_id: 0, - }, + partition_idx: 0, + })), event_time: Utc.timestamp_opt(1627846261, 0).unwrap(), id: MessageID { vertex_name: "vertex".to_string(), @@ -523,4 +612,26 @@ mod tests { assert_eq!(response.id, "123"); assert_eq!(response.status, ResponseStatusFromSink::Success); } + + #[test] + fn test_message_id_from_proto() { + let proto_id = MessageId { + vertex_name: "vertex".to_string(), + offset: "123".to_string(), + index: 0, + }; + let message_id: MessageID = proto_id.into(); + assert_eq!(message_id.vertex_name, "vertex"); + assert_eq!(message_id.offset, "123"); + assert_eq!(message_id.index, 0); + } + + #[test] + fn test_message_id_to_proto() { + let message_id = MessageID::new("vertex".to_string(), "123".to_string(), 0); + let proto_id: MessageId = message_id.into(); + assert_eq!(proto_id.vertex_name, "vertex"); + assert_eq!(proto_id.offset, "123"); + assert_eq!(proto_id.index, 0); + } } diff --git a/rust/numaflow-core/src/monovertex/forwarder.rs b/rust/numaflow-core/src/monovertex/forwarder.rs index 45326231b..bd3253fea 100644 --- a/rust/numaflow-core/src/monovertex/forwarder.rs +++ b/rust/numaflow-core/src/monovertex/forwarder.rs @@ -146,14 +146,18 @@ impl Forwarder { .get_or_create(&self.common_labels) .inc_by(msg_count); - let (offsets, bytes_count): (Vec, u64) = messages.iter().fold( + let (offsets, bytes_count): (Vec, u64) = messages.iter().try_fold( (Vec::with_capacity(messages.len()), 0), |(mut offsets, mut bytes_count), msg| { - offsets.push(msg.offset.clone()); - bytes_count += msg.value.len() as u64; - (offsets, bytes_count) + if let Some(offset) = &msg.offset { + offsets.push(offset.clone()); + bytes_count += msg.value.len() as u64; + Ok((offsets, bytes_count)) + } else { + Err(Error::Forwarder("Message offset is missing".to_string())) + } }, - ); + )?; forward_metrics() .read_bytes_total @@ -448,7 +452,7 @@ impl Forwarder { // and keep only the failed messages to send again // construct the error map for the failed messages messages_to_send.retain(|msg| { - if let Some(result) = result_map.get(&msg.offset.to_string()) { + if let Some(result) = result_map.get(&msg.id.to_string()) { return match result { ResponseStatusFromSink::Success => false, ResponseStatusFromSink::Failed(err_msg) => { diff --git a/rust/numaflow-core/src/pipeline/isb/jetstream.rs b/rust/numaflow-core/src/pipeline/isb/jetstream.rs index 84092ad10..5e7890a97 100644 --- a/rust/numaflow-core/src/pipeline/isb/jetstream.rs +++ b/rust/numaflow-core/src/pipeline/isb/jetstream.rs @@ -5,7 +5,7 @@ use tokio_util::sync::CancellationToken; use crate::config::jetstream::StreamWriterConfig; use crate::error::Error; -use crate::message::Message; +use crate::message::{Message, Offset}; use crate::pipeline::isb::jetstream::writer::JetstreamWriter; use crate::Result; @@ -24,11 +24,11 @@ struct ActorMessage { /// once the message has been successfully written, we can let the sender know. /// This can be used to trigger Acknowledgement of the message from the Reader. // FIXME: concrete type and better name - callee_tx: oneshot::Sender>, + callee_tx: oneshot::Sender>, } impl ActorMessage { - fn new(message: Message, callee_tx: oneshot::Sender>) -> Self { + fn new(message: Message, callee_tx: oneshot::Sender>) -> Self { Self { message, callee_tx } } } @@ -92,7 +92,10 @@ impl WriterHandle { Self { sender } } - pub(crate) async fn write(&self, message: Message) -> Result>> { + pub(crate) async fn write( + &self, + message: Message, + ) -> Result>> { let (sender, receiver) = oneshot::channel(); let msg = ActorMessage::new(message, sender); self.sender @@ -116,7 +119,7 @@ mod tests { use tokio::time::Instant; use super::*; - use crate::message::{Message, MessageID, Offset}; + use crate::message::{Message, MessageID}; #[cfg(feature = "nats-tests")] #[tokio::test] @@ -152,10 +155,7 @@ mod tests { let message = Message { keys: vec![format!("key_{}", i)], value: format!("message {}", i).as_bytes().to_vec(), - offset: Offset { - offset: format!("offset_{}", i), - partition_id: i, - }, + offset: None, event_time: Utc::now(), id: MessageID { vertex_name: "vertex".to_string(), @@ -214,10 +214,7 @@ mod tests { let message = Message { keys: vec![format!("key_{}", i)], value: format!("message {}", i).as_bytes().to_vec(), - offset: Offset { - offset: format!("offset_{}", i), - partition_id: 0, - }, + offset: None, event_time: Utc::now(), id: MessageID { vertex_name: "vertex".to_string(), @@ -234,10 +231,7 @@ mod tests { let message = Message { keys: vec!["key_101".to_string()], value: vec![0; 1024], - offset: Offset { - offset: "offset_101".to_string(), - partition_id: 0, - }, + offset: None, event_time: Utc::now(), id: MessageID { vertex_name: "vertex".to_string(), @@ -305,10 +299,7 @@ mod tests { let message = Message { keys: vec![format!("key_{}", i)], value: format!("message {}", i).as_bytes().to_vec(), - offset: Offset { - offset: format!("offset_{}", i), - partition_id: i, - }, + offset: None, event_time: Utc::now(), id: MessageID { vertex_name: "".to_string(), diff --git a/rust/numaflow-core/src/pipeline/isb/jetstream/writer.rs b/rust/numaflow-core/src/pipeline/isb/jetstream/writer.rs index 178c88708..0e3324ac4 100644 --- a/rust/numaflow-core/src/pipeline/isb/jetstream/writer.rs +++ b/rust/numaflow-core/src/pipeline/isb/jetstream/writer.rs @@ -17,6 +17,7 @@ use tracing::{debug, warn}; use crate::config::jetstream::StreamWriterConfig; use crate::error::Error; +use crate::message::{IntOffset, Offset}; use crate::Result; #[derive(Clone, Debug)] @@ -148,7 +149,7 @@ impl JetstreamWriter { /// Writes the message to the JetStream ISB and returns a future which can be /// awaited to get the PublishAck. It will do infinite retries until the message /// gets published successfully. If it returns an error it means it is fatal error - pub(super) async fn write(&self, payload: Vec, callee_tx: oneshot::Sender>) { + pub(super) async fn write(&self, payload: Vec, callee_tx: oneshot::Sender>) { let js_ctx = self.js_ctx.clone(); // loop till we get a PAF, there could be other reasons why PAFs cannot be created. @@ -158,6 +159,7 @@ impl JetstreamWriter { true => { // FIXME: add metrics debug!(%self.config.name, "buffer is full"); + // FIXME: consider buffer-full strategy } false => match js_ctx .publish(self.config.name.clone(), Bytes::from(payload.clone())) @@ -180,9 +182,8 @@ impl JetstreamWriter { return; } - // FIXME: make it configurable // sleep to avoid busy looping - sleep(Duration::from_millis(10)).await; + sleep(self.config.retry_interval).await; }; // send the paf and callee_tx over @@ -224,7 +225,7 @@ impl JetstreamWriter { }, Err(e) => { error!(?e, "publishing failed, retrying"); - sleep(Duration::from_millis(10)).await; + sleep(self.config.retry_interval).await; } } if self.cancel_token.is_cancelled() { @@ -241,7 +242,7 @@ impl JetstreamWriter { pub(super) struct ResolveAndPublishResult { paf: PublishAckFuture, payload: Vec, - callee_tx: oneshot::Sender>, + callee_tx: oneshot::Sender>, } /// Resolves the PAF from the write call, if not successful it will do a blocking write so that @@ -265,11 +266,23 @@ impl PafResolverActor { /// not successfully resolve, it will do blocking write till write to JetStream succeeds. async fn successfully_resolve_paf(&mut self, result: ResolveAndPublishResult) { match result.paf.await { - Ok(ack) => result.callee_tx.send(Ok(ack.sequence)).unwrap(), + Ok(ack) => result + .callee_tx + .send(Ok(Offset::Int(IntOffset::new( + ack.sequence, + self.js_writer.config.partition_idx, + )))) + .unwrap(), Err(e) => { error!(?e, "Failed to resolve the future, trying blocking write"); match self.js_writer.blocking_write(result.payload.clone()).await { - Ok(ack) => result.callee_tx.send(Ok(ack.sequence)).unwrap(), + Ok(ack) => result + .callee_tx + .send(Ok(Offset::Int(IntOffset::new( + ack.sequence, + self.js_writer.config.partition_idx, + )))) + .unwrap(), Err(e) => result.callee_tx.send(Err(e)).unwrap(), } } @@ -324,10 +337,7 @@ mod tests { let message = Message { keys: vec!["key_0".to_string()], value: "message 0".as_bytes().to_vec(), - offset: Offset { - offset: "offset_0".to_string(), - partition_id: 0, - }, + offset: None, event_time: Utc::now(), id: MessageID { vertex_name: "vertex".to_string(), @@ -337,7 +347,7 @@ mod tests { headers: HashMap::new(), }; - let (success_tx, success_rx) = oneshot::channel::>(); + let (success_tx, success_rx) = oneshot::channel::>(); writer.write(message.try_into().unwrap(), success_tx).await; assert!(success_rx.await.is_ok()); @@ -373,10 +383,7 @@ mod tests { let message = Message { keys: vec!["key_0".to_string()], value: "message 0".as_bytes().to_vec(), - offset: Offset { - offset: "offset_0".to_string(), - partition_id: 1, - }, + offset: None, event_time: Utc::now(), id: MessageID { vertex_name: "vertex".to_string(), @@ -428,10 +435,7 @@ mod tests { let message = Message { keys: vec![format!("key_{}", i)], value: format!("message {}", i).as_bytes().to_vec(), - offset: Offset { - offset: format!("offset_{}", i), - partition_id: i, - }, + offset: None, event_time: Utc::now(), id: MessageID { vertex_name: "vertex".to_string(), @@ -440,7 +444,7 @@ mod tests { }, headers: HashMap::new(), }; - let (success_tx, success_rx) = oneshot::channel::>(); + let (success_tx, success_rx) = oneshot::channel::>(); writer.write(message.try_into().unwrap(), success_tx).await; result_receivers.push(success_rx); } @@ -450,10 +454,7 @@ mod tests { let message = Message { keys: vec!["key_11".to_string()], value: vec![0; 1025], - offset: Offset { - offset: "offset_11".to_string(), - partition_id: 11, - }, + offset: None, event_time: Utc::now(), id: MessageID { vertex_name: "vertex".to_string(), @@ -462,7 +463,7 @@ mod tests { }, headers: HashMap::new(), }; - let (success_tx, success_rx) = oneshot::channel::>(); + let (success_tx, success_rx) = oneshot::channel::>(); writer.write(message.try_into().unwrap(), success_tx).await; result_receivers.push(success_rx); diff --git a/rust/numaflow-core/src/sink/user_defined.rs b/rust/numaflow-core/src/sink/user_defined.rs index fae703257..8d2d22780 100644 --- a/rust/numaflow-core/src/sink/user_defined.rs +++ b/rust/numaflow-core/src/sink/user_defined.rs @@ -174,10 +174,7 @@ mod tests { Message { keys: vec![], value: b"Hello, World!".to_vec(), - offset: Offset { - offset: "1".to_string(), - partition_id: 0, - }, + offset: None, event_time: Utc::now(), headers: Default::default(), id: MessageID { @@ -189,10 +186,7 @@ mod tests { Message { keys: vec![], value: b"Hello, World!".to_vec(), - offset: Offset { - offset: "2".to_string(), - partition_id: 0, - }, + offset: None, event_time: Utc::now(), headers: Default::default(), id: MessageID { diff --git a/rust/numaflow-core/src/source/generator.rs b/rust/numaflow-core/src/source/generator.rs index 23a4f3092..61ee3d346 100644 --- a/rust/numaflow-core/src/source/generator.rs +++ b/rust/numaflow-core/src/source/generator.rs @@ -3,7 +3,9 @@ use std::time::Duration; use bytes::Bytes; use futures::StreamExt; -use crate::message::{Message, MessageID, Offset}; +use crate::message::{ + get_vertex_name, get_vertex_replica, Message, MessageID, Offset, StringOffset, +}; use crate::reader; use crate::source; @@ -208,18 +210,18 @@ impl source::SourceReader for GeneratorRead { .unwrap_or_default() .to_string(); + let offset = + Offset::String(StringOffset::new(id.clone(), *get_vertex_replica())); + Message { keys: vec![], value: msg.clone().to_vec(), // FIXME: better offset? - offset: Offset { - offset: id.clone(), - partition_id: 0, - }, + offset: Some(offset.clone()), event_time: Default::default(), id: MessageID { - vertex_name: Default::default(), - offset: id, + vertex_name: get_vertex_name().to_string(), + offset: offset.to_string(), index: Default::default(), }, headers: Default::default(), @@ -315,14 +317,8 @@ mod tests { // Create a vector of offsets to acknowledge let offsets = vec![ - Offset { - offset: "offset1".to_string(), - partition_id: 0, - }, - Offset { - offset: "offset2".to_string(), - partition_id: 1, - }, + Offset::String(StringOffset::new("offset1".to_string(), 0)), + Offset::String(StringOffset::new("offset2".to_string(), 0)), ]; // Call the ack method and check the result diff --git a/rust/numaflow-core/src/source/user_defined.rs b/rust/numaflow-core/src/source/user_defined.rs index df66cf4d8..69be3d9a3 100644 --- a/rust/numaflow-core/src/source/user_defined.rs +++ b/rust/numaflow-core/src/source/user_defined.rs @@ -186,7 +186,7 @@ impl SourceAcker for UserDefinedSourceAck { // send n ack requests for offset in offsets { - let request = offset.into(); + let request = offset.try_into()?; self.ack_tx .send(request) .await @@ -331,7 +331,7 @@ mod tests { assert_eq!(messages.len(), 5); let response = src_ack - .ack(messages.iter().map(|m| m.offset.clone()).collect()) + .ack(messages.iter().map(|m| m.offset.clone().unwrap()).collect()) .await; assert!(response.is_ok()); diff --git a/rust/numaflow-core/src/transformer/user_defined.rs b/rust/numaflow-core/src/transformer/user_defined.rs index bb5711507..035addcbb 100644 --- a/rust/numaflow-core/src/transformer/user_defined.rs +++ b/rust/numaflow-core/src/transformer/user_defined.rs @@ -91,7 +91,10 @@ impl SourceTransformer { tracker.insert( message.id.to_string(), MessageInfo { - offset: message.offset.clone(), + offset: message + .offset + .clone() + .ok_or(Error::Transformer("Message offset is missing".to_string()))?, headers: message.headers.clone(), }, ); @@ -173,7 +176,7 @@ impl SourceTransformer { }, keys: result.keys, value: result.value, - offset: msg_info.offset.clone(), + offset: None, event_time: utc_from_timestamp(result.event_time), headers: msg_info.headers.clone(), }; @@ -235,7 +238,7 @@ mod tests { use numaflow_pb::clients::sourcetransformer::source_transform_client::SourceTransformClient; use tempfile::TempDir; - use crate::message::MessageID; + use crate::message::{MessageID, StringOffset}; use crate::shared::utils::create_rpc_channel; use crate::transformer::user_defined::SourceTransformHandle; @@ -283,10 +286,10 @@ mod tests { let message = crate::message::Message { keys: vec!["first".into()], value: "hello".into(), - offset: crate::message::Offset { - partition_id: 0, - offset: "0".into(), - }, + offset: Some(crate::message::Offset::String(StringOffset::new( + "0".to_string(), + 0, + ))), event_time: chrono::Utc::now(), id: MessageID { vertex_name: "vertex_name".to_string(), @@ -362,10 +365,10 @@ mod tests { let message = crate::message::Message { keys: vec!["second".into()], value: "hello".into(), - offset: crate::message::Offset { - partition_id: 0, - offset: "0".into(), - }, + offset: Some(crate::message::Offset::String(StringOffset::new( + "0".to_string(), + 0, + ))), event_time: chrono::Utc::now(), id: MessageID { vertex_name: "vertex_name".to_string(),