Skip to content

Commit

Permalink
add support for pubsub emulator
Browse files Browse the repository at this point in the history
  • Loading branch information
joacohoyos committed Jul 26, 2024
1 parent d940685 commit d348e07
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 2 deletions.
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ openssl = { version = "0.10", optional = true, features = ["vendored"] }
redis = { version = "0.21.6", optional = true, features = ["tokio-comp"] }

# features: gcp

google-cloud-gax = {version ="0.17.0", optional = true }
google-cloud-pubsub = { version = "0.12.0", optional = true }
google-cloud-googleapis = { version = "0.7.0", optional = true }

Expand All @@ -78,5 +80,5 @@ elasticsink = ["elasticsearch", "tokio"]
fingerprint = ["murmur3"]
aws = ["aws-config", "aws-sdk-sqs", "aws-sdk-lambda", "aws-sdk-s3", "tokio"]
redissink = ["redis", "tokio"]
gcp = ["google-cloud-pubsub", "google-cloud-googleapis", "tokio", "web"]
gcp = ["google-cloud-pubsub", "google-cloud-googleapis", "tokio", "web" ,"google-cloud-gax"]
rabbitmqsink = ["lapin", "tokio"]
15 changes: 14 additions & 1 deletion src/sinks/gcp_pubsub/run.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{collections::HashMap, sync::Arc};

use google_cloud_gax::conn::Environment;
use google_cloud_googleapis::pubsub::v1::PubsubMessage;
use google_cloud_pubsub::{
client::{Client, ClientConfig},
Expand Down Expand Up @@ -48,6 +49,9 @@ pub fn writer_loop(
retry_policy: &retry::Policy,
ordering_key: &str,
attributes: &GenericKV,
emulator: bool,
emulator_endpoint: &Option<String>,
emulator_project_id: &Option<String>,
utils: Arc<Utils>,
) -> Result<(), crate::Error> {
let rt = tokio::runtime::Builder::new_current_thread()
Expand All @@ -56,7 +60,16 @@ pub fn writer_loop(
.build()?;

let publisher: Publisher = rt.block_on(async {
let client = Client::new(ClientConfig::default()).await?;
let client_config = if emulator {
ClientConfig {
project_id: Some(emulator_project_id.clone().unwrap_or_default()),
environment: Environment::Emulator(emulator_endpoint.clone().unwrap_or_default()),
..Default::default()
}
} else {
ClientConfig::default()
};
let client = Client::new(client_config).await?;
let topic = client.topic(topic_name);
Result::<_, crate::Error>::Ok(topic.new_publisher(None))
})?;
Expand Down
12 changes: 12 additions & 0 deletions src/sinks/gcp_pubsub/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ pub struct Config {
pub retry_policy: Option<retry::Policy>,
pub ordering_key: Option<String>,
pub attributes: Option<GenericKV>,
pub emulator: Option<bool>,
pub emulator_endpoint: Option<String>,
pub emulator_project_id: Option<String>,

#[warn(deprecated)]
pub credentials: Option<String>,
Expand All @@ -24,6 +27,12 @@ pub struct Config {
impl SinkProvider for WithUtils<Config> {
fn bootstrap(&self, input: StageReceiver) -> BootstrapResult {
let topic_name = self.inner.topic.to_owned();
let mut use_emulator = self.inner.emulator.unwrap_or(false);
let emulator_endpoint = self.inner.emulator_endpoint.to_owned();
let emulator_project_id = self.inner.emulator_project_id.to_owned();
if use_emulator && (emulator_endpoint.is_none() || emulator_project_id.is_none()) {
use_emulator = false;
}

let error_policy = self
.inner
Expand All @@ -47,6 +56,9 @@ impl SinkProvider for WithUtils<Config> {
&retry_policy,
&ordering_key,
&attributes,
use_emulator,
&emulator_endpoint,
&emulator_project_id,
utils,
)
.expect("writer loop failed");
Expand Down

0 comments on commit d348e07

Please sign in to comment.