diff --git a/Cargo.lock b/Cargo.lock index 516d742e4d9..d9dd5db50ad 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3886,7 +3886,6 @@ dependencies = [ "rumqttd", "serde_json", "tedge_actors", - "tedge_api", "tedge_config", "tedge_test_utils", "thiserror", diff --git a/crates/core/c8y_api/src/utils.rs b/crates/core/c8y_api/src/utils.rs index 1d6c83c22bc..9a8108480c7 100644 --- a/crates/core/c8y_api/src/utils.rs +++ b/crates/core/c8y_api/src/utils.rs @@ -1,33 +1,3 @@ -pub mod bridge { - use mqtt_channel::MqttMessage; - use tedge_api::main_device_health_topic; - use tedge_api::MQTT_BRIDGE_DOWN_PAYLOAD; - use tedge_api::MQTT_BRIDGE_UP_PAYLOAD; - - pub fn is_c8y_bridge_established(message: &MqttMessage, service: &str) -> bool { - let c8y_bridge_health_topic = main_device_health_topic(service); - match message.payload_str() { - Ok(payload) => { - message.topic.name == c8y_bridge_health_topic - && (payload == MQTT_BRIDGE_UP_PAYLOAD - || payload == MQTT_BRIDGE_DOWN_PAYLOAD - || is_valid_status_payload(payload)) - } - Err(_err) => false, - } - } - - #[derive(serde::Deserialize)] - struct HealthStatus<'a> { - status: &'a str, - } - - fn is_valid_status_payload(payload: &str) -> bool { - serde_json::from_str::(payload) - .map_or(false, |h| h.status == "up" || h.status == "down") - } -} - pub mod child_device { use crate::smartrest::topic::C8yTopic; use mqtt_channel::MqttMessage; @@ -40,28 +10,3 @@ pub mod child_device { ) } } - -#[cfg(test)] -mod tests { - use mqtt_channel::MqttMessage; - use mqtt_channel::Topic; - use test_case::test_case; - - use crate::utils::bridge::is_c8y_bridge_established; - - const C8Y_BRIDGE_HEALTH_TOPIC: &str = - "te/device/main/service/tedge-mapper-bridge-c8y/status/health"; - - #[test_case(C8Y_BRIDGE_HEALTH_TOPIC, "1", true)] - #[test_case(C8Y_BRIDGE_HEALTH_TOPIC, "0", true)] - #[test_case(C8Y_BRIDGE_HEALTH_TOPIC, "bad payload", false)] - #[test_case("tedge/not/health/topic", "1", false)] - #[test_case("tedge/not/health/topic", "0", false)] - fn test_bridge_is_established(topic: &str, payload: &str, expected: bool) { - let topic = Topic::new(topic).unwrap(); - let message = MqttMessage::new(&topic, payload); - - let actual = is_c8y_bridge_established(&message, "tedge-mapper-bridge-c8y"); - assert_eq!(actual, expected); - } -} diff --git a/crates/core/tedge_api/src/health.rs b/crates/core/tedge_api/src/health.rs index 98996dc046f..53a5c3f981f 100644 --- a/crates/core/tedge_api/src/health.rs +++ b/crates/core/tedge_api/src/health.rs @@ -1,4 +1,5 @@ use crate::mqtt_topics::Channel; +use crate::mqtt_topics::EntityTopicId; use crate::mqtt_topics::MqttSchema; use crate::mqtt_topics::ServiceTopicId; use clock::Clock; @@ -6,20 +7,24 @@ use clock::WallClock; use log::error; use mqtt_channel::MqttMessage; use mqtt_channel::Topic; +use serde::Deserialize; +use serde::Serialize; use serde_json::json; +use serde_json::Value as JsonValue; +use std::fmt::Display; use std::process; use std::sync::Arc; use tedge_utils::timestamp::TimeFormat; -pub const MQTT_BRIDGE_UP_PAYLOAD: &str = "1"; -pub const MQTT_BRIDGE_DOWN_PAYLOAD: &str = "0"; -pub const UP_STATUS: &str = "up"; -pub const DOWN_STATUS: &str = "down"; -pub const UNKNOWN_STATUS: &str = "unknown"; - -// FIXME: doesn't account for custom topic root, use MQTT scheme API here -pub fn main_device_health_topic(service: &str) -> String { - format!("te/device/main/service/{service}/status/health") +pub fn service_health_topic( + mqtt_schema: &MqttSchema, + device_topic_id: &EntityTopicId, + service: &str, +) -> Topic { + mqtt_schema.topic_for( + &device_topic_id.default_service_for_device(service).unwrap(), + &Channel::Health, + ) } /// Encodes a valid health topic. @@ -92,14 +97,163 @@ impl ServiceHealthTopic { } } +#[derive(Deserialize, Serialize, Debug, Default)] +pub struct HealthStatus { + pub status: Status, + pub pid: Option, + pub time: Option, +} + +#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)] +#[serde(rename_all = "lowercase")] +pub enum Status { + Up, + Down, + #[serde(untagged)] + Other(String), +} + +impl Default for Status { + fn default() -> Self { + Status::Other("unknown".to_string()) + } +} + +impl Display for Status { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let status = match self { + Status::Up => "up", + Status::Down => "down", + Status::Other(val) if val.is_empty() => "unknown", + Status::Other(val) => val, + }; + write!(f, "{}", status) + } +} + #[derive(Debug)] pub struct HealthTopicError; +impl HealthStatus { + pub fn try_from_health_status_message( + message: &MqttMessage, + mqtt_schema: &MqttSchema, + ) -> Result { + if let Ok((topic_id, Channel::Health)) = mqtt_schema.entity_channel_of(&message.topic) { + let health_status = if entity_is_mosquitto_bridge_service(&topic_id) { + let status = match message.payload_str() { + Ok("1") => Status::Up, + Ok("0") => Status::Down, + _ => Status::default(), + }; + HealthStatus { + status, + pid: None, + time: None, + } + } else { + serde_json::from_slice(message.payload()).unwrap_or_default() + }; + Ok(health_status) + } else { + Err(HealthTopicError) + } + } + + pub fn is_valid(&self) -> bool { + self.status == Status::Up || self.status == Status::Down + } +} + +pub fn entity_is_mosquitto_bridge_service(entity_topic_id: &EntityTopicId) -> bool { + entity_topic_id + .default_service_name() + .filter(|name| name.starts_with("mosquitto-") && name.ends_with("-bridge")) + .is_some() +} + #[cfg(test)] mod tests { use super::*; use assert_matches::assert_matches; use serde_json::Value; + use test_case::test_case; + + #[test_case( + "te/device/main/service/tedge-mapper-c8y/status/health", + r#"{"status":"up"}"#, + Status::Up; + "service-health-status-up" + )] + #[test_case( + "te/device/main/service/tedge-mapper-c8y/status/health", + r#"{"status":"down"}"#, + Status::Down; + "service-health-status-down" + )] + #[test_case( + "te/device/main/service/tedge-mapper-c8y/status/health", + r#"{"status":"foo"}"#, + Status::Other("foo".into()); + "service-health-status-other-value" + )] + #[test_case( + "te/device/child/service/tedge-mapper-c8y/status/health", + r#"{"pid":1234,"status":"up"}"#, + Status::Up; + "service-health-status-with-extra-fields" + )] + #[test_case( + "te/device/main/service/tedge-mapper-c8y/status/health", + r#"{"pid":"123456"}"#, + Status::Other("unknown".into()); + "service-health-status-no-value" + )] + #[test_case( + "te/device/main/service/tedge-mapper-c8y/status/health", + r#"{"status":""}"#, + Status::Other("".into()); + "service-health-status-empty-value" + )] + #[test_case( + "te/device/main/service/tedge-mapper-c8y/status/health", + "{}", + Status::default(); + "service-health-status-empty-message" + )] + #[test_case( + "te/device/main/service/mosquitto-xyz-bridge/status/health", + "1", + Status::Up; + "mosquitto-bridge-service-health-status-up" + )] + #[test_case( + "te/device/main/service/mosquitto-xyz-bridge/status/health", + "0", + Status::Down; + "mosquitto-bridge-service-health-status-down" + )] + #[test_case( + "te/device/main/service/mosquitto-xyz-bridge/status/health", + "invalid payload", + Status::default(); + "mosquitto-bridge-service-health-status-invalid-payload" + )] + #[test_case( + "te/device/main/service/tedge-mapper-bridge-c8y/status/health", + r#"{"status":"up"}"#, + Status::Up; + "builtin-bridge-service-health-status-up" + )] + fn parse_heath_status(health_topic: &str, health_payload: &str, expected_status: Status) { + let mqtt_schema = MqttSchema::new(); + let topic = Topic::new_unchecked(health_topic); + let health_message = MqttMessage::new(&topic, health_payload.as_bytes().to_owned()); + + let health_status = + HealthStatus::try_from_health_status_message(&health_message, &mqtt_schema); + assert_eq!(health_status.unwrap().status, expected_status); + } #[test] fn is_rfc3339_timestamp() { diff --git a/crates/core/tedge_api/src/mqtt_topics.rs b/crates/core/tedge_api/src/mqtt_topics.rs index 42fa19eb7e4..3c9fa9a4e33 100644 --- a/crates/core/tedge_api/src/mqtt_topics.rs +++ b/crates/core/tedge_api/src/mqtt_topics.rs @@ -427,12 +427,6 @@ impl EntityTopicId { pub fn as_str(&self) -> &str { self.0.as_str() } - - // FIXME: can also match "device/bridge//" or "/device/main/service/my_custom_bridge" - // should match ONLY the single mapper bridge - pub fn is_bridge_health_topic(&self) -> bool { - self.as_str().contains("bridge") - } } /// Contains a topic id of the service itself and the associated device. diff --git a/crates/core/tedge_mapper/src/aws/mapper.rs b/crates/core/tedge_mapper/src/aws/mapper.rs index 4a52536d3be..6265547eac5 100644 --- a/crates/core/tedge_mapper/src/aws/mapper.rs +++ b/crates/core/tedge_mapper/src/aws/mapper.rs @@ -4,11 +4,14 @@ use async_trait::async_trait; use aws_mapper_ext::converter::AwsConverter; use clock::WallClock; use mqtt_channel::TopicFilter; +use std::str::FromStr; use tedge_actors::ConvertingActor; use tedge_actors::MessageSink; use tedge_actors::MessageSource; use tedge_actors::NoConfig; +use tedge_api::mqtt_topics::EntityTopicId; use tedge_api::mqtt_topics::MqttSchema; +use tedge_api::service_health_topic; use tedge_config::TEdgeConfig; use tedge_mqtt_bridge::use_key_and_cert; use tedge_mqtt_bridge::BridgeConfig; @@ -16,6 +19,7 @@ use tedge_mqtt_bridge::MqttBridgeActorBuilder; use tracing::warn; const AWS_MAPPER_NAME: &str = "tedge-mapper-aws"; +const BUILT_IN_BRIDGE_NAME: &str = "tedge-mapper-bridge-aws"; pub struct AwsMapper; @@ -32,8 +36,12 @@ impl TEdgeComponent for AwsMapper { ) -> Result<(), anyhow::Error> { let (mut runtime, mut mqtt_actor) = start_basic_actors(self.session_name(), &tedge_config).await?; + + let mqtt_schema = MqttSchema::with_root(tedge_config.mqtt.topic_root.clone()); if tedge_config.mqtt.bridge.built_in { let device_id = tedge_config.device.id.try_read(&tedge_config)?; + let device_topic_id = EntityTopicId::from_str(&tedge_config.mqtt.device_topic_id)?; + let rules = built_in_bridge_rules(device_id)?; let mut cloud_config = tedge_mqtt_bridge::MqttOptions::new( @@ -47,9 +55,14 @@ impl TEdgeComponent for AwsMapper { &tedge_config.aws.root_cert_path, &tedge_config, )?; + + let health_topic = + service_health_topic(&mqtt_schema, &device_topic_id, BUILT_IN_BRIDGE_NAME); + let bridge_actor = MqttBridgeActorBuilder::new( &tedge_config, - "tedge-mapper-bridge-aws".to_owned(), + BUILT_IN_BRIDGE_NAME, + &health_topic, rules, cloud_config, ) @@ -57,7 +70,6 @@ impl TEdgeComponent for AwsMapper { runtime.spawn(bridge_actor).await?; } let clock = Box::new(WallClock); - let mqtt_schema = MqttSchema::with_root(tedge_config.mqtt.topic_root.clone()); let aws_converter = AwsConverter::new( tedge_config.aws.mapper.timestamp, clock, diff --git a/crates/core/tedge_mapper/src/az/mapper.rs b/crates/core/tedge_mapper/src/az/mapper.rs index 184034dfb41..b145526e69d 100644 --- a/crates/core/tedge_mapper/src/az/mapper.rs +++ b/crates/core/tedge_mapper/src/az/mapper.rs @@ -4,11 +4,14 @@ use async_trait::async_trait; use az_mapper_ext::converter::AzureConverter; use clock::WallClock; use mqtt_channel::TopicFilter; +use std::str::FromStr; use tedge_actors::ConvertingActor; use tedge_actors::MessageSink; use tedge_actors::MessageSource; use tedge_actors::NoConfig; +use tedge_api::mqtt_topics::EntityTopicId; use tedge_api::mqtt_topics::MqttSchema; +use tedge_api::service_health_topic; use tedge_config::TEdgeConfig; use tedge_mqtt_bridge::use_key_and_cert; use tedge_mqtt_bridge::BridgeConfig; @@ -16,6 +19,7 @@ use tedge_mqtt_bridge::MqttBridgeActorBuilder; use tracing::warn; const AZURE_MAPPER_NAME: &str = "tedge-mapper-az"; +const BUILT_IN_BRIDGE_NAME: &str = "tedge-mapper-bridge-az"; pub struct AzureMapper; @@ -32,7 +36,11 @@ impl TEdgeComponent for AzureMapper { ) -> Result<(), anyhow::Error> { let (mut runtime, mut mqtt_actor) = start_basic_actors(self.session_name(), &tedge_config).await?; + let mqtt_schema = MqttSchema::with_root(tedge_config.mqtt.topic_root.clone()); + if tedge_config.mqtt.bridge.built_in { + let device_topic_id = EntityTopicId::from_str(&tedge_config.mqtt.device_topic_id)?; + let remote_clientid = tedge_config.device.id.try_read(&tedge_config)?; let rules = built_in_bridge_rules(remote_clientid)?; @@ -55,9 +63,13 @@ impl TEdgeComponent for AzureMapper { &tedge_config, )?; + let health_topic = + service_health_topic(&mqtt_schema, &device_topic_id, BUILT_IN_BRIDGE_NAME); + let bridge_actor = MqttBridgeActorBuilder::new( &tedge_config, - "tedge-mapper-bridge-az".to_owned(), + BUILT_IN_BRIDGE_NAME, + &health_topic, rules, cloud_config, ) diff --git a/crates/core/tedge_mapper/src/c8y/mapper.rs b/crates/core/tedge_mapper/src/c8y/mapper.rs index 47952038b99..0ae274e133d 100644 --- a/crates/core/tedge_mapper/src/c8y/mapper.rs +++ b/crates/core/tedge_mapper/src/c8y/mapper.rs @@ -159,7 +159,7 @@ impl TEdgeComponent for CumulocityMapper { let last_will_message_bridge = c8y_api::smartrest::inventory::service_creation_message_payload( mapper_service_external_id.as_ref(), - &c8y_mapper_config.bridge_service_name(), + &c8y_mapper_config.bridge_service_name, service_type.as_str(), "down", )?; @@ -175,7 +175,8 @@ impl TEdgeComponent for CumulocityMapper { .spawn( MqttBridgeActorBuilder::new( &tedge_config, - c8y_mapper_config.bridge_service_name(), + &c8y_mapper_config.bridge_service_name, + &c8y_mapper_config.bridge_health_topic, tc, cloud_config, ) diff --git a/crates/core/tedge_watchdog/src/systemd_watchdog.rs b/crates/core/tedge_watchdog/src/systemd_watchdog.rs index c6227dc8304..78741a39660 100644 --- a/crates/core/tedge_watchdog/src/systemd_watchdog.rs +++ b/crates/core/tedge_watchdog/src/systemd_watchdog.rs @@ -8,9 +8,6 @@ use futures::StreamExt; use mqtt_channel::MqttMessage; use mqtt_channel::PubChannel; use mqtt_channel::Topic; -use serde::Deserialize; -use serde::Serialize; -use serde_json::Value; use std::path::PathBuf; use std::process; use std::process::Command; @@ -23,6 +20,7 @@ use tedge_api::mqtt_topics::Channel; use tedge_api::mqtt_topics::EntityTopicId; use tedge_api::mqtt_topics::MqttSchema; use tedge_api::mqtt_topics::OperationType; +use tedge_api::HealthStatus; use tedge_config::TEdgeConfigLocation; use tedge_utils::timestamp::IsoOrUnix; use time::OffsetDateTime; @@ -40,14 +38,6 @@ const SERVICE_NAME: &str = "tedge-watchdog"; /// a timing misalignment. const NOTIFY_SEND_FREQ_RATIO: u64 = 4; -// TODO: extract to common module -#[derive(Debug, Serialize, Deserialize)] -pub struct HealthStatus { - status: String, - pid: u32, - time: Value, -} - pub async fn start_watchdog(tedge_config_dir: PathBuf) -> Result<(), anyhow::Error> { // Send ready notification to systemd. notify_systemd(process::id(), "--ready")?; @@ -228,11 +218,14 @@ async fn monitor_tedge_service( { Ok(health_status) => { let health_status = health_status?; - debug!( - "Sending notification for {} with pid: {}", - name, health_status.pid - ); - notify_systemd(health_status.pid, "WATCHDOG=1")?; + if let Some(pid) = health_status.pid { + debug!("Sending notification for {} with pid: {}", name, pid); + notify_systemd(pid, "WATCHDOG=1")?; + } else { + error!( + "Ignoring invalid health status message from {name} without a `pid` field in it" + ) + } } Err(_) => { warn!("No health check response received from {name} in time"); @@ -255,7 +248,11 @@ async fn get_latest_health_status_message( if let Ok(message) = message.payload_str() { debug!("Health response received: {message}"); if let Ok(health_status) = serde_json::from_str::(message) { - let datetime = IsoOrUnix::try_from(&health_status.time)?; + if health_status.time.is_none() { + error!("Ignoring invalid health response: {health_status:?} without a `time` field in it"); + continue; + } + let datetime = IsoOrUnix::try_from(&health_status.time.clone().unwrap())?; // Compare to a slightly old timestamp to avoid false negatives from floating-point error in unix timestamps if datetime.into_inner() >= request_timestamp - Duration::from_millis(10) { @@ -339,7 +336,7 @@ mod tests { get_latest_health_status_message(request_timestamp, &mut receiver).await; let expected_timestamp = TimeFormat::Rfc3339.to_json(request_timestamp).unwrap(); - assert_eq!(health_status.unwrap().time, expected_timestamp); + assert_eq!(health_status.unwrap().time, Some(expected_timestamp)); sender.close_channel(); let base_timestamp = base_timestamp + Duration::from_secs(5); @@ -378,6 +375,6 @@ mod tests { let health_status = get_latest_health_status_message(request_timestamp, &mut receiver).await; - assert_eq!(health_status.unwrap().time, payload_timestamp); + assert_eq!(health_status.unwrap().time, Some(payload_timestamp)); } } diff --git a/crates/extensions/c8y_mapper_ext/src/actor.rs b/crates/extensions/c8y_mapper_ext/src/actor.rs index 2bb684e91c3..021d7989df1 100644 --- a/crates/extensions/c8y_mapper_ext/src/actor.rs +++ b/crates/extensions/c8y_mapper_ext/src/actor.rs @@ -4,12 +4,12 @@ use super::dynamic_discovery::process_inotify_events; use crate::converter::UploadContext; use crate::converter::UploadOperationLog; use crate::operations::FtsDownloadOperationType; +use crate::service_monitor::is_c8y_bridge_established; use async_trait::async_trait; use c8y_api::smartrest::smartrest_serializer::fail_operation; use c8y_api::smartrest::smartrest_serializer::succeed_static_operation; use c8y_api::smartrest::smartrest_serializer::CumulocitySupportedOperations; use c8y_api::smartrest::smartrest_serializer::SmartRest; -use c8y_api::utils::bridge::is_c8y_bridge_established; use c8y_auth_proxy::url::ProxyUrlGenerator; use c8y_http_proxy::handle::C8YHttpProxy; use c8y_http_proxy::messages::C8YRestRequest; @@ -32,7 +32,6 @@ use tedge_actors::Sender; use tedge_actors::Service; use tedge_actors::SimpleMessageBox; use tedge_actors::SimpleMessageBoxBuilder; -use tedge_api::main_device_health_topic; use tedge_downloader_ext::DownloadRequest; use tedge_downloader_ext::DownloadResult; use tedge_file_system_ext::FsWatchEvent; @@ -68,7 +67,6 @@ pub struct C8yMapperActor { mqtt_publisher: LoggingSender, timer_sender: LoggingSender, bridge_status_messages: SimpleMessageBox, - c8y_bridge_service_name: String, } #[async_trait] @@ -81,7 +79,11 @@ impl Actor for C8yMapperActor { if !self.converter.config.bridge_in_mapper { // Wait till the c8y bridge is established while let Some(message) = self.bridge_status_messages.recv().await { - if is_c8y_bridge_established(&message, &self.c8y_bridge_service_name) { + if is_c8y_bridge_established( + &message, + &self.converter.config.mqtt_schema, + &self.converter.config.bridge_health_topic, + ) { break; } } @@ -127,7 +129,6 @@ impl C8yMapperActor { mqtt_publisher: LoggingSender, timer_sender: LoggingSender, bridge_status_messages: SimpleMessageBox, - c8y_bridge_service_name: String, ) -> Self { Self { converter, @@ -135,7 +136,6 @@ impl C8yMapperActor { mqtt_publisher, timer_sender, bridge_status_messages, - c8y_bridge_service_name, } } @@ -351,9 +351,9 @@ impl C8yMapperBuilder { let bridge_monitor_builder: SimpleMessageBoxBuilder = SimpleMessageBoxBuilder::new("ServiceMonitor", 1); - let bridge_health_topic = main_device_health_topic(&config.bridge_service_name()); + service_monitor.connect_sink( - bridge_health_topic.as_str().try_into().unwrap(), + config.bridge_health_topic.clone().into(), &bridge_monitor_builder, ); @@ -397,7 +397,6 @@ impl Builder for C8yMapperBuilder { LoggingSender::new("C8yMapper => Uploader".into(), self.upload_sender); let downloader_sender = LoggingSender::new("C8yMapper => Downloader".into(), self.download_sender); - let c8y_bridge_service_name = self.config.bridge_service_name(); let converter = CumulocityConverter::new( self.config, @@ -418,7 +417,6 @@ impl Builder for C8yMapperBuilder { mqtt_publisher, timer_sender, bridge_monitor_box, - c8y_bridge_service_name, )) } } diff --git a/crates/extensions/c8y_mapper_ext/src/config.rs b/crates/extensions/c8y_mapper_ext/src/config.rs index 809a3e3be0b..9f6063b6390 100644 --- a/crates/extensions/c8y_mapper_ext/src/config.rs +++ b/crates/extensions/c8y_mapper_ext/src/config.rs @@ -16,6 +16,7 @@ use tedge_api::mqtt_topics::MqttSchema; use tedge_api::mqtt_topics::OperationType; use tedge_api::mqtt_topics::TopicIdError; use tedge_api::path::DataDir; +use tedge_api::service_health_topic; use tedge_config::AutoLogUpload; use tedge_config::ConfigNotSet; use tedge_config::ReadError; @@ -23,6 +24,7 @@ use tedge_config::SoftwareManagementApiFlag; use tedge_config::TEdgeConfig; use tedge_config::TEdgeConfigReaderService; use tedge_config::TopicPrefix; +use tedge_mqtt_ext::Topic; use tedge_mqtt_ext::TopicFilter; use tracing::log::warn; @@ -51,6 +53,8 @@ pub struct C8yMapperConfig { pub software_management_api: SoftwareManagementApiFlag, pub software_management_with_types: bool, pub auto_log_upload: AutoLogUpload, + pub bridge_service_name: String, + pub bridge_health_topic: Topic, pub data_dir: DataDir, pub config_dir: Arc, @@ -94,6 +98,14 @@ impl C8yMapperConfig { .into(); let state_dir = config_dir.join(STATE_DIR_NAME).into(); + let bridge_service_name = if bridge_in_mapper { + format!("tedge-mapper-bridge-{c8y_prefix}") + } else { + "mosquitto-c8y-bridge".into() + }; + let bridge_health_topic = + service_health_topic(&mqtt_schema, &device_topic_id, &bridge_service_name); + Self { data_dir, device_id, @@ -115,6 +127,8 @@ impl C8yMapperConfig { software_management_api, software_management_with_types, auto_log_upload, + bridge_service_name, + bridge_health_topic, config_dir, logs_path, @@ -124,14 +138,6 @@ impl C8yMapperConfig { } } - pub fn bridge_service_name(&self) -> String { - if self.bridge_in_mapper { - format!("tedge-mapper-bridge-{}", self.c8y_prefix) - } else { - "mosquitto-c8y-bridge".into() - } - } - pub fn from_tedge_config( config_dir: impl AsRef, tedge_config: &TEdgeConfig, diff --git a/crates/extensions/c8y_mapper_ext/src/converter.rs b/crates/extensions/c8y_mapper_ext/src/converter.rs index 03b5b67a45a..47a2dee1dc9 100644 --- a/crates/extensions/c8y_mapper_ext/src/converter.rs +++ b/crates/extensions/c8y_mapper_ext/src/converter.rs @@ -581,6 +581,7 @@ impl CumulocityConverter { let ancestors_external_ids = self.entity_store.ancestors_external_ids(entity)?; Ok(convert_health_status_message( + &self.config.mqtt_schema, entity_metadata, &ancestors_external_ids, message, @@ -2877,7 +2878,7 @@ pub(crate) mod tests { let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await; let in_topic = "te/device/child1/service/child-service-c8y/status/health"; - let in_payload = r#"{"pid":"1234","status":"up","time":"2021-11-16T17:45:40.571760714+01:00","type":"thin-edge.io"}"#; + let in_payload = r#"{"pid":1234,"status":"up","time":"2021-11-16T17:45:40.571760714+01:00","type":"thin-edge.io"}"#; let in_message = MqttMessage::new(&Topic::new_unchecked(in_topic), in_payload); let mqtt_schema = MqttSchema::new(); @@ -2933,7 +2934,7 @@ pub(crate) mod tests { let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await; let in_topic = "te/device/main/service/test-tedge-mapper-c8y/status/health"; - let in_payload = r#"{"pid":"1234","status":"up","time":"2021-11-16T17:45:40.571760714+01:00","type":"thin-edge.io"}"#; + let in_payload = r#"{"pid":1234,"status":"up","time":"2021-11-16T17:45:40.571760714+01:00","type":"thin-edge.io"}"#; let in_message = MqttMessage::new(&Topic::new_unchecked(in_topic), in_payload); let mqtt_schema = MqttSchema::new(); diff --git a/crates/extensions/c8y_mapper_ext/src/service_monitor.rs b/crates/extensions/c8y_mapper_ext/src/service_monitor.rs index c2950ee86ce..40fd855f85f 100644 --- a/crates/extensions/c8y_mapper_ext/src/service_monitor.rs +++ b/crates/extensions/c8y_mapper_ext/src/service_monitor.rs @@ -1,54 +1,27 @@ use c8y_api::smartrest; -use serde::Deserialize; -use serde::Serialize; use tedge_api::entity_store::EntityMetadata; use tedge_api::entity_store::EntityType; -use tedge_api::mqtt_topics::EntityTopicId; -use tedge_api::DOWN_STATUS; -use tedge_api::MQTT_BRIDGE_DOWN_PAYLOAD; -use tedge_api::MQTT_BRIDGE_UP_PAYLOAD; -use tedge_api::UNKNOWN_STATUS; -use tedge_api::UP_STATUS; +use tedge_api::mqtt_topics::MqttSchema; +use tedge_api::HealthStatus; use tedge_config::TopicPrefix; use tedge_mqtt_ext::MqttMessage; +use tedge_mqtt_ext::Topic; use tracing::error; -pub const MOSQUITTO_BRIDGE_PREFIX: &str = "mosquitto-"; -pub const MOSQUITTO_BRIDGE_SUFFIX: &str = "-bridge"; - -#[derive(Deserialize, Serialize, Debug, Default)] -pub struct HealthStatus { - #[serde(default = "default_status")] - pub status: String, -} - -fn default_status() -> String { - "unknown".to_string() -} - -impl HealthStatus { - fn from_mosquitto_bridge_payload_str(payload: &str) -> Self { - let status = match payload { - MQTT_BRIDGE_UP_PAYLOAD => UP_STATUS, - MQTT_BRIDGE_DOWN_PAYLOAD => DOWN_STATUS, - _ => UNKNOWN_STATUS, - }; - HealthStatus { - status: status.into(), - } +pub fn is_c8y_bridge_established( + message: &MqttMessage, + mqtt_schema: &MqttSchema, + bridge_service_topic: &Topic, +) -> bool { + if let Ok(health_status) = HealthStatus::try_from_health_status_message(message, mqtt_schema) { + &message.topic == bridge_service_topic && health_status.is_valid() + } else { + false } } -fn entity_is_mosquitto_bridge_service(entity_topic_id: &EntityTopicId) -> bool { - entity_topic_id - .default_service_name() - .filter(|name| { - name.starts_with(MOSQUITTO_BRIDGE_PREFIX) && name.ends_with(MOSQUITTO_BRIDGE_SUFFIX) - }) - .is_some() -} - pub fn convert_health_status_message( + mqtt_schema: &MqttSchema, entity: &EntityMetadata, ancestors_external_ids: &[String], message: &MqttMessage, @@ -60,19 +33,10 @@ pub fn convert_health_status_message( } let HealthStatus { - status: mut health_status, - } = if entity_is_mosquitto_bridge_service(&entity.topic_id) { - message - .payload_str() - .map(HealthStatus::from_mosquitto_bridge_payload_str) - .unwrap_or_default() - } else { - serde_json::from_slice(message.payload()).unwrap_or_default() - }; - - if health_status.is_empty() { - health_status = "unknown".into(); - } + status, + pid: _, + time: _, + } = HealthStatus::try_from_health_status_message(message, mqtt_schema).unwrap(); let display_name = entity .other @@ -91,7 +55,7 @@ pub fn convert_health_status_message( entity.external_id.as_ref(), display_name, display_type, - &health_status, + &status.to_string(), ancestors_external_ids, prefix, ) else { @@ -114,7 +78,7 @@ mod tests { #[test_case( "test_device", "te/device/main/service/tedge-mapper-c8y/status/health", - r#"{"pid":"1234","status":"up"}"#, + r#"{"pid":1234,"status":"up"}"#, "c8y/s/us", r#"102,test_device:device:main:service:tedge-mapper-c8y,service,tedge-mapper-c8y,up"#; "service-monitoring-thin-edge-device" @@ -122,7 +86,7 @@ mod tests { #[test_case( "test_device", "te/device/child/service/tedge-mapper-c8y/status/health", - r#"{"pid":"1234","status":"up"}"#, + r#"{"pid":1234,"status":"up"}"#, "c8y/s/us/test_device:device:child", r#"102,test_device:device:child:service:tedge-mapper-c8y,service,tedge-mapper-c8y,up"#; "service-monitoring-thin-edge-child-device" @@ -243,6 +207,7 @@ mod tests { .unwrap(); let msg = convert_health_status_message( + &mqtt_schema, entity, &ancestors_external_ids, &health_message, @@ -250,4 +215,25 @@ mod tests { ); assert_eq!(msg[0], expected_message); } + + const C8Y_BRIDGE_HEALTH_TOPIC: &str = + "te/device/main/service/mosquitto-c8y-bridge/status/health"; + + #[test_case(C8Y_BRIDGE_HEALTH_TOPIC, "1", true)] + #[test_case(C8Y_BRIDGE_HEALTH_TOPIC, "0", true)] + #[test_case(C8Y_BRIDGE_HEALTH_TOPIC, "bad payload", false)] + #[test_case("tedge/not/health/topic", "1", false)] + #[test_case("tedge/not/health/topic", "0", false)] + fn test_bridge_is_established(topic: &str, payload: &str, expected: bool) { + let mqtt_schema = MqttSchema::default(); + let topic = Topic::new(topic).unwrap(); + let message = MqttMessage::new(&topic, payload); + + let actual = is_c8y_bridge_established( + &message, + &mqtt_schema, + &C8Y_BRIDGE_HEALTH_TOPIC.try_into().unwrap(), + ); + assert_eq!(actual, expected); + } } diff --git a/crates/extensions/c8y_mapper_ext/src/tests.rs b/crates/extensions/c8y_mapper_ext/src/tests.rs index c199f16a487..6263fdbbcbb 100644 --- a/crates/extensions/c8y_mapper_ext/src/tests.rs +++ b/crates/extensions/c8y_mapper_ext/src/tests.rs @@ -30,12 +30,10 @@ use tedge_actors::NoMessage; use tedge_actors::Sender; use tedge_actors::SimpleMessageBox; use tedge_actors::SimpleMessageBoxBuilder; -use tedge_api::main_device_health_topic; use tedge_api::mqtt_topics::EntityTopicId; use tedge_api::mqtt_topics::MqttSchema; use tedge_api::CommandStatus; use tedge_api::SoftwareUpdateCommand; -use tedge_api::MQTT_BRIDGE_UP_PAYLOAD; use tedge_config::AutoLogUpload; use tedge_config::SoftwareManagementApiFlag; use tedge_config::TEdgeConfig; @@ -2545,6 +2543,7 @@ pub(crate) async fn spawn_c8y_mapper_actor_with_config( let mut service_monitor_builder: SimpleMessageBoxBuilder = SimpleMessageBoxBuilder::new("ServiceMonitor", 1); + let bridge_health_topic = config.bridge_health_topic.clone(); let c8y_mapper_builder = C8yMapperBuilder::try_new( config, &mut mqtt_builder, @@ -2561,10 +2560,7 @@ pub(crate) async fn spawn_c8y_mapper_actor_with_config( tokio::spawn(async move { actor.run().await }); let mut service_monitor_box = service_monitor_builder.build(); - let bridge_status_msg = MqttMessage::new( - &Topic::new_unchecked(&main_device_health_topic("tedge-mapper-bridge-c8y")), - MQTT_BRIDGE_UP_PAYLOAD, - ); + let bridge_status_msg = MqttMessage::new(&bridge_health_topic, "1"); service_monitor_box.send(bridge_status_msg).await.unwrap(); TestHandle { diff --git a/crates/extensions/tedge_mqtt_bridge/Cargo.toml b/crates/extensions/tedge_mqtt_bridge/Cargo.toml index 235e3d4ae62..0d72aeca89b 100644 --- a/crates/extensions/tedge_mqtt_bridge/Cargo.toml +++ b/crates/extensions/tedge_mqtt_bridge/Cargo.toml @@ -22,7 +22,6 @@ mqtt_channel = { workspace = true } mutants = { workspace = true } rumqttc = { workspace = true } tedge_actors = { workspace = true } -tedge_api = { workspace = true } tedge_config = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true, default_features = false, features = ["macros"] } diff --git a/crates/extensions/tedge_mqtt_bridge/src/lib.rs b/crates/extensions/tedge_mqtt_bridge/src/lib.rs index f9256936f0b..4260febabfa 100644 --- a/crates/extensions/tedge_mqtt_bridge/src/lib.rs +++ b/crates/extensions/tedge_mqtt_bridge/src/lib.rs @@ -45,7 +45,6 @@ pub use mqtt_channel::MqttError; pub use mqtt_channel::MqttMessage; pub use mqtt_channel::QoS; pub use mqtt_channel::Topic; -use tedge_api::main_device_health_topic; use tedge_config::MqttAuthConfig; use tedge_config::TEdgeConfig; @@ -58,16 +57,16 @@ pub struct MqttBridgeActorBuilder {} impl MqttBridgeActorBuilder { pub async fn new( tedge_config: &TEdgeConfig, - service_name: String, + service_name: &str, + health_topic: &Topic, rules: BridgeConfig, mut cloud_config: MqttOptions, ) -> Self { let mut local_config = MqttOptions::new( - &service_name, + service_name, &tedge_config.mqtt.client.host, tedge_config.mqtt.client.port.into(), ); - let health_topic = main_device_health_topic(&service_name); // TODO cope with certs but not ca_dir, or handle that case with an explicit error message? let auth_config = tedge_config.mqtt_client_auth_config(); let local_tls_config = match auth_config { @@ -88,7 +87,7 @@ impl MqttBridgeActorBuilder { } local_config.set_manual_acks(true); local_config.set_last_will(LastWill::new( - &health_topic, + &health_topic.name, Status::Down.json(), QoS::AtLeastOnce, true, @@ -113,7 +112,7 @@ impl MqttBridgeActorBuilder { let [(convert_local, bidir_local), (convert_cloud, bidir_cloud)] = rules.converters_and_bidirectional_topic_filters(); let (tx_status, monitor) = - BridgeHealthMonitor::new(local_client.clone(), health_topic, &msgs_cloud); + BridgeHealthMonitor::new(local_client.clone(), health_topic.name.clone(), &msgs_cloud); tokio::spawn(monitor.monitor()); tokio::spawn(half_bridge( local_event_loop, diff --git a/crates/extensions/tedge_mqtt_bridge/tests/bridge.rs b/crates/extensions/tedge_mqtt_bridge/tests/bridge.rs index 48f2509e13a..85d15bb8abc 100644 --- a/crates/extensions/tedge_mqtt_bridge/tests/bridge.rs +++ b/crates/extensions/tedge_mqtt_bridge/tests/bridge.rs @@ -44,9 +44,15 @@ fn new_broker_and_client(name: &str, port: u16) -> (AsyncClient, EventLoop) { async fn start_mqtt_bridge(local_port: u16, cloud_port: u16, rules: BridgeConfig) { let cloud_config = MqttOptions::new("a-device-id", "localhost", cloud_port); + let service_name = "tedge-mapper-test"; + let health_topic = format!("te/device/main/service/{service_name}/status/health") + .as_str() + .try_into() + .unwrap(); MqttBridgeActorBuilder::new( &tedge_mqtt_config(local_port), - "tedge-mapper-test".into(), + service_name, + &health_topic, rules, cloud_config, )