diff --git a/src/sinks/gcp_pubsub/run.rs b/src/sinks/gcp_pubsub/run.rs index d89bf52d..5244ffe1 100644 --- a/src/sinks/gcp_pubsub/run.rs +++ b/src/sinks/gcp_pubsub/run.rs @@ -15,10 +15,15 @@ use crate::{ utils::{retry, Utils}, }; -async fn send_pubsub_msg(publisher: &Publisher, event: &Event) -> Result<(), crate::Error> { +async fn send_pubsub_msg( + publisher: &Publisher, + event: &Event, + ordering_key: &str, +) -> Result<(), crate::Error> { let body = json!(event).to_string(); let msg = PubsubMessage { data: body.into(), + ordering_key: ordering_key.into(), ..Default::default() }; @@ -37,6 +42,7 @@ pub fn writer_loop( topic_name: &str, error_policy: &ErrorPolicy, retry_policy: &retry::Policy, + ordering_key: &str, utils: Arc, ) -> Result<(), crate::Error> { let rt = tokio::runtime::Builder::new_current_thread() @@ -52,7 +58,7 @@ pub fn writer_loop( for event in input.iter() { let result = retry::retry_operation( - || rt.block_on(send_pubsub_msg(&publisher, &event)), + || rt.block_on(send_pubsub_msg(&publisher, &event, ordering_key)), retry_policy, ); diff --git a/src/sinks/gcp_pubsub/setup.rs b/src/sinks/gcp_pubsub/setup.rs index b27e562a..74abdc80 100644 --- a/src/sinks/gcp_pubsub/setup.rs +++ b/src/sinks/gcp_pubsub/setup.rs @@ -13,6 +13,7 @@ pub struct Config { pub topic: String, pub error_policy: Option, pub retry_policy: Option, + pub ordering_key: Option, #[warn(deprecated)] pub credentials: Option, @@ -30,12 +31,24 @@ impl SinkProvider for WithUtils { .unwrap_or(ErrorPolicy::Exit); let retry_policy = self.inner.retry_policy.unwrap_or_default(); + let ordering_key = self + .inner + .ordering_key + .to_owned() + .unwrap_or_default(); let utils = self.utils.clone(); let handle = std::thread::spawn(move || { - writer_loop(input, &topic_name, &error_policy, &retry_policy, utils) - .expect("writer loop failed"); + writer_loop( + input, + &topic_name, + &error_policy, + &retry_policy, + &ordering_key, + utils, + ) + .expect("writer loop failed"); }); Ok(handle)