Skip to content

Commit

Permalink
chore: use enums for offset (#2169)
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
Signed-off-by: Vigith Maurice <[email protected]>
Co-authored-by: Vigith Maurice <[email protected]>
  • Loading branch information
yhl25 and vigith authored Oct 17, 2024
1 parent 9bd7e1b commit e8017cd
Show file tree
Hide file tree
Showing 9 changed files with 251 additions and 151 deletions.
6 changes: 3 additions & 3 deletions rust/numaflow-core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub mod jetstream {
const DEFAULT_USAGE_LIMIT: f64 = 0.8;
const DEFAULT_REFRESH_INTERVAL_SECS: u64 = 1;
const DEFAULT_BUFFER_FULL_STRATEGY: BufferFullStrategy = BufferFullStrategy::RetryUntilSuccess;
const DEFAULT_RETRY_TIMEOUT_MILLIS: u64 = 10;
const DEFAULT_RETRY_INTERVAL_MILLIS: u64 = 10;

#[derive(Debug, Clone)]
pub(crate) struct StreamWriterConfig {
Expand All @@ -53,7 +53,7 @@ pub mod jetstream {
pub refresh_interval: Duration,
pub usage_limit: f64,
pub buffer_full_strategy: BufferFullStrategy,
pub retry_timeout: Duration,
pub retry_interval: Duration,
}

impl Default for StreamWriterConfig {
Expand All @@ -65,7 +65,7 @@ pub mod jetstream {
usage_limit: DEFAULT_USAGE_LIMIT,
refresh_interval: Duration::from_secs(DEFAULT_REFRESH_INTERVAL_SECS),
buffer_full_strategy: DEFAULT_BUFFER_FULL_STRATEGY,
retry_timeout: Duration::from_millis(DEFAULT_RETRY_TIMEOUT_MILLIS),
retry_interval: Duration::from_millis(DEFAULT_RETRY_INTERVAL_MILLIS),
}
}
}
Expand Down
229 changes: 170 additions & 59 deletions rust/numaflow-core/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::Result;

const NUMAFLOW_MONO_VERTEX_NAME: &str = "NUMAFLOW_MONO_VERTEX_NAME";
const NUMAFLOW_VERTEX_NAME: &str = "NUMAFLOW_VERTEX_NAME";
const NUMAFLOW_REPLICA: &str = "NUMAFLOW_REPLICA";

static VERTEX_NAME: OnceLock<String> = OnceLock::new();

Expand All @@ -32,15 +33,29 @@ pub(crate) fn get_vertex_name() -> &'static str {
})
}

static VERTEX_REPLICA: OnceLock<u16> = OnceLock::new();

// fetch the vertex replica information from the environment variable
pub(crate) fn get_vertex_replica() -> &'static u16 {
VERTEX_REPLICA.get_or_init(|| {
env::var(NUMAFLOW_REPLICA)
.unwrap_or_default()
.parse()
.unwrap_or_default()
})
}

/// A message that is sent from the source to the sink.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) struct Message {
/// keys of the message
pub(crate) keys: Vec<String>,
/// actual payload of the message
pub(crate) value: Vec<u8>,
/// offset of the message
pub(crate) offset: Offset,
/// offset of the message, it is optional because offset is only
/// available when we read the message, and we don't persist the
/// offset in the ISB.
pub(crate) offset: Option<Offset>,
/// event time of the message
pub(crate) event_time: DateTime<Utc>,
/// id of the message
Expand All @@ -51,16 +66,81 @@ pub(crate) struct Message {

/// Offset of the message which will be used to acknowledge the message.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) struct Offset {
/// unique identifier of the message
pub(crate) offset: String,
/// partition id of the message
pub(crate) partition_id: i32,
pub(crate) enum Offset {
Int(IntOffset),
String(StringOffset),
}

impl fmt::Display for Offset {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}-{}", self.offset, self.partition_id)
match self {
Offset::Int(offset) => write!(f, "{}", offset),
Offset::String(offset) => write!(f, "{}", offset),
}
}
}

/// IntOffset is integer based offset enum type.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IntOffset {
offset: u64,
partition_idx: u16,
}

impl IntOffset {
pub fn new(seq: u64, partition_idx: u16) -> Self {
Self {
offset: seq,
partition_idx,
}
}
}

impl IntOffset {
fn sequence(&self) -> Result<u64> {
Ok(self.offset)
}

fn partition_idx(&self) -> u16 {
self.partition_idx
}
}

impl fmt::Display for IntOffset {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}-{}", self.offset, self.partition_idx)
}
}

/// StringOffset is string based offset enum type.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StringOffset {
offset: String,
partition_idx: u16,
}

impl StringOffset {
pub fn new(seq: String, partition_idx: u16) -> Self {
Self {
offset: seq,
partition_idx,
}
}
}

impl StringOffset {
fn sequence(&self) -> Result<u64> {
Ok(self.offset.parse().unwrap())
}

fn partition_idx(&self) -> u16 {
self.partition_idx
}
}

impl fmt::Display for StringOffset {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}-{}", self.offset, self.partition_idx)
}
}

Expand All @@ -81,24 +161,49 @@ impl MessageID {
}
}

impl From<numaflow_pb::objects::isb::MessageId> for MessageID {
fn from(id: numaflow_pb::objects::isb::MessageId) -> Self {
Self {
vertex_name: id.vertex_name,
offset: id.offset,
index: id.index,
}
}
}

impl From<MessageID> for numaflow_pb::objects::isb::MessageId {
fn from(id: MessageID) -> Self {
Self {
vertex_name: id.vertex_name,
offset: id.offset,
index: id.index,
}
}
}

impl fmt::Display for MessageID {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}-{}-{}", self.vertex_name, self.offset, self.index)
}
}

impl From<Offset> for AckRequest {
fn from(offset: Offset) -> Self {
Self {
request: Some(numaflow_pb::clients::source::ack_request::Request {
offset: Some(numaflow_pb::clients::source::Offset {
offset: BASE64_STANDARD
.decode(offset.offset)
.expect("we control the encoding, so this should never fail"),
partition_id: offset.partition_id,
impl TryFrom<Offset> for AckRequest {
type Error = Error;

fn try_from(value: Offset) -> std::result::Result<Self, Self::Error> {
match value {
Offset::Int(_) => Err(Error::Source("IntOffset not supported".to_string())),
Offset::String(o) => Ok(Self {
request: Some(numaflow_pb::clients::source::ack_request::Request {
offset: Some(numaflow_pb::clients::source::Offset {
offset: BASE64_STANDARD
.decode(o.offset)
.expect("we control the encoding, so this should never fail"),
partition_id: o.partition_idx as i32,
}),
}),
handshake: None,
}),
handshake: None,
}
}
}
Expand All @@ -114,11 +219,7 @@ impl TryFrom<Message> for Vec<u8> {
is_late: false, // Set this according to your logic
}),
kind: numaflow_pb::objects::isb::MessageKind::Data as i32,
id: Some(numaflow_pb::objects::isb::MessageId {
vertex_name: get_vertex_name().to_string(),
offset: message.offset.to_string(),
index: 0,
}),
id: Some(message.id.into()),
keys: message.keys.clone(),
headers: message.headers.clone(),
}),
Expand Down Expand Up @@ -156,16 +257,9 @@ impl TryFrom<Vec<u8>> for Message {
Ok(Message {
keys: header.keys,
value: body.payload,
offset: Offset {
offset: id.offset.clone(),
partition_id: 0, // Set this according to your logic
},
offset: None,
event_time: utc_from_timestamp(message_info.event_time),
id: MessageID {
vertex_name: id.vertex_name,
offset: id.offset,
index: id.index,
},
id: id.into(),
headers: header.headers,
})
}
Expand Down Expand Up @@ -196,21 +290,21 @@ impl TryFrom<read_response::Result> for Message {

fn try_from(result: read_response::Result) -> Result<Self> {
let source_offset = match result.offset {
Some(o) => Offset {
Some(o) => Offset::String(StringOffset {
offset: BASE64_STANDARD.encode(o.offset),
partition_id: o.partition_id,
},
partition_idx: o.partition_id as u16,
}),
None => return Err(Error::Source("Offset not found".to_string())),
};

Ok(Message {
keys: result.keys,
value: result.payload,
offset: source_offset.clone(),
offset: Some(source_offset.clone()),
event_time: utc_from_timestamp(result.event_time),
id: MessageID {
vertex_name: get_vertex_name().to_string(),
offset: source_offset.offset,
offset: source_offset.to_string(),
index: 0,
},
headers: result.headers,
Expand Down Expand Up @@ -311,10 +405,10 @@ mod tests {

#[test]
fn test_offset_display() {
let offset = Offset {
let offset = Offset::String(StringOffset {
offset: "123".to_string(),
partition_id: 1,
};
partition_idx: 1,
});
assert_eq!(format!("{}", offset), "123-1");
}

Expand All @@ -330,11 +424,11 @@ mod tests {

#[test]
fn test_offset_to_ack_request() {
let offset = Offset {
let offset = Offset::String(StringOffset {
offset: BASE64_STANDARD.encode("123"),
partition_id: 1,
};
let ack_request: AckRequest = offset.into();
partition_idx: 1,
});
let ack_request: AckRequest = offset.try_into().unwrap();
assert_eq!(ack_request.request.unwrap().offset.unwrap().partition_id, 1);
}

Expand All @@ -343,10 +437,10 @@ mod tests {
let message = Message {
keys: vec!["key1".to_string()],
value: vec![1, 2, 3],
offset: Offset {
offset: Some(Offset::String(StringOffset {
offset: "123".to_string(),
partition_id: 0,
},
partition_idx: 0,
})),
event_time: Utc.timestamp_opt(1627846261, 0).unwrap(),
id: MessageID {
vertex_name: "vertex".to_string(),
Expand All @@ -366,11 +460,7 @@ mod tests {
is_late: false,
}),
kind: numaflow_pb::objects::isb::MessageKind::Data as i32,
id: Some(MessageId {
vertex_name: get_vertex_name().to_string(),
offset: message.offset.to_string(),
index: 0,
}),
id: Some(message.id.into()),
keys: message.keys.clone(),
headers: message.headers.clone(),
}),
Expand Down Expand Up @@ -415,7 +505,6 @@ mod tests {
let message = result.unwrap();
assert_eq!(message.keys, vec!["key1".to_string()]);
assert_eq!(message.value, vec![1, 2, 3]);
assert_eq!(message.offset.offset, "123");
assert_eq!(
message.event_time,
Utc.timestamp_opt(1627846261, 0).unwrap()
Expand All @@ -427,10 +516,10 @@ mod tests {
let message = Message {
keys: vec!["key1".to_string()],
value: vec![1, 2, 3],
offset: Offset {
offset: Some(Offset::String(StringOffset {
offset: "123".to_string(),
partition_id: 0,
},
partition_idx: 0,
})),
event_time: Utc.timestamp_opt(1627846261, 0).unwrap(),
id: MessageID {
vertex_name: "vertex".to_string(),
Expand Down Expand Up @@ -476,10 +565,10 @@ mod tests {
let message = Message {
keys: vec!["key1".to_string()],
value: vec![1, 2, 3],
offset: Offset {
offset: Some(Offset::String(StringOffset {
offset: "123".to_string(),
partition_id: 0,
},
partition_idx: 0,
})),
event_time: Utc.timestamp_opt(1627846261, 0).unwrap(),
id: MessageID {
vertex_name: "vertex".to_string(),
Expand Down Expand Up @@ -523,4 +612,26 @@ mod tests {
assert_eq!(response.id, "123");
assert_eq!(response.status, ResponseStatusFromSink::Success);
}

#[test]
fn test_message_id_from_proto() {
let proto_id = MessageId {
vertex_name: "vertex".to_string(),
offset: "123".to_string(),
index: 0,
};
let message_id: MessageID = proto_id.into();
assert_eq!(message_id.vertex_name, "vertex");
assert_eq!(message_id.offset, "123");
assert_eq!(message_id.index, 0);
}

#[test]
fn test_message_id_to_proto() {
let message_id = MessageID::new("vertex".to_string(), "123".to_string(), 0);
let proto_id: MessageId = message_id.into();
assert_eq!(proto_id.vertex_name, "vertex");
assert_eq!(proto_id.offset, "123");
assert_eq!(proto_id.index, 0);
}
}
Loading

0 comments on commit e8017cd

Please sign in to comment.