Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Refactor health status code #2919

Merged
merged 2 commits into from
Jun 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

55 changes: 0 additions & 55 deletions crates/core/c8y_api/src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,33 +1,3 @@
pub mod bridge {
use mqtt_channel::MqttMessage;
use tedge_api::main_device_health_topic;
use tedge_api::MQTT_BRIDGE_DOWN_PAYLOAD;
use tedge_api::MQTT_BRIDGE_UP_PAYLOAD;

pub fn is_c8y_bridge_established(message: &MqttMessage, service: &str) -> bool {
let c8y_bridge_health_topic = main_device_health_topic(service);
match message.payload_str() {
Ok(payload) => {
message.topic.name == c8y_bridge_health_topic
&& (payload == MQTT_BRIDGE_UP_PAYLOAD
|| payload == MQTT_BRIDGE_DOWN_PAYLOAD
|| is_valid_status_payload(payload))
}
Err(_err) => false,
}
}

#[derive(serde::Deserialize)]
struct HealthStatus<'a> {
status: &'a str,
}

fn is_valid_status_payload(payload: &str) -> bool {
serde_json::from_str::<HealthStatus>(payload)
.map_or(false, |h| h.status == "up" || h.status == "down")
}
}

pub mod child_device {
use crate::smartrest::topic::C8yTopic;
use mqtt_channel::MqttMessage;
Expand All @@ -40,28 +10,3 @@ pub mod child_device {
)
}
}

#[cfg(test)]
mod tests {
use mqtt_channel::MqttMessage;
use mqtt_channel::Topic;
use test_case::test_case;

use crate::utils::bridge::is_c8y_bridge_established;

const C8Y_BRIDGE_HEALTH_TOPIC: &str =
"te/device/main/service/tedge-mapper-bridge-c8y/status/health";

#[test_case(C8Y_BRIDGE_HEALTH_TOPIC, "1", true)]
#[test_case(C8Y_BRIDGE_HEALTH_TOPIC, "0", true)]
#[test_case(C8Y_BRIDGE_HEALTH_TOPIC, "bad payload", false)]
#[test_case("tedge/not/health/topic", "1", false)]
#[test_case("tedge/not/health/topic", "0", false)]
fn test_bridge_is_established(topic: &str, payload: &str, expected: bool) {
let topic = Topic::new(topic).unwrap();
let message = MqttMessage::new(&topic, payload);

let actual = is_c8y_bridge_established(&message, "tedge-mapper-bridge-c8y");
assert_eq!(actual, expected);
}
}
172 changes: 163 additions & 9 deletions crates/core/tedge_api/src/health.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,30 @@
use crate::mqtt_topics::Channel;
use crate::mqtt_topics::EntityTopicId;
use crate::mqtt_topics::MqttSchema;
use crate::mqtt_topics::ServiceTopicId;
use clock::Clock;
use clock::WallClock;
use log::error;
use mqtt_channel::MqttMessage;
use mqtt_channel::Topic;
use serde::Deserialize;
use serde::Serialize;
use serde_json::json;
use serde_json::Value as JsonValue;
use std::fmt::Display;
use std::process;
use std::sync::Arc;
use tedge_utils::timestamp::TimeFormat;

pub const MQTT_BRIDGE_UP_PAYLOAD: &str = "1";
pub const MQTT_BRIDGE_DOWN_PAYLOAD: &str = "0";
pub const UP_STATUS: &str = "up";
pub const DOWN_STATUS: &str = "down";
pub const UNKNOWN_STATUS: &str = "unknown";

// FIXME: doesn't account for custom topic root, use MQTT scheme API here
pub fn main_device_health_topic(service: &str) -> String {
format!("te/device/main/service/{service}/status/health")
pub fn service_health_topic(
mqtt_schema: &MqttSchema,
device_topic_id: &EntityTopicId,
service: &str,
) -> Topic {
mqtt_schema.topic_for(
&device_topic_id.default_service_for_device(service).unwrap(),
&Channel::Health,
)
}

/// Encodes a valid health topic.
Expand Down Expand Up @@ -92,14 +97,163 @@ impl ServiceHealthTopic {
}
}

#[derive(Deserialize, Serialize, Debug, Default)]
pub struct HealthStatus {
pub status: Status,
pub pid: Option<u32>,
pub time: Option<JsonValue>,
}

#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)]
#[serde(rename_all = "lowercase")]
pub enum Status {
Up,
Down,
#[serde(untagged)]
Other(String),
}

impl Default for Status {
fn default() -> Self {
Status::Other("unknown".to_string())
}
}

impl Display for Status {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let status = match self {
Status::Up => "up",
Status::Down => "down",
Status::Other(val) if val.is_empty() => "unknown",
Status::Other(val) => val,
};
write!(f, "{}", status)
}
}

#[derive(Debug)]
pub struct HealthTopicError;

impl HealthStatus {
pub fn try_from_health_status_message(
message: &MqttMessage,
mqtt_schema: &MqttSchema,
) -> Result<Self, HealthTopicError> {
if let Ok((topic_id, Channel::Health)) = mqtt_schema.entity_channel_of(&message.topic) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated to this PR, but this highlights an issue I got several time: this method uses a specific mqtt_schema only to parse a topic. The MQTT root prefix doesn't really matter and it would be more convenient to have a parse method returning all the component of a topic is set (i.e. not only the topic_id and channel but also the root prefix).

let health_status = if entity_is_mosquitto_bridge_service(&topic_id) {
let status = match message.payload_str() {
Ok("1") => Status::Up,
Ok("0") => Status::Down,
_ => Status::default(),
};
HealthStatus {
status,
pid: None,
time: None,
}
} else {
serde_json::from_slice(message.payload()).unwrap_or_default()
};
Ok(health_status)
} else {
Err(HealthTopicError)
}
}

pub fn is_valid(&self) -> bool {
self.status == Status::Up || self.status == Status::Down
}
}

pub fn entity_is_mosquitto_bridge_service(entity_topic_id: &EntityTopicId) -> bool {
entity_topic_id
.default_service_name()
.filter(|name| name.starts_with("mosquitto-") && name.ends_with("-bridge"))
.is_some()
}

#[cfg(test)]
mod tests {
use super::*;
use assert_matches::assert_matches;
use serde_json::Value;
use test_case::test_case;

#[test_case(
"te/device/main/service/tedge-mapper-c8y/status/health",
r#"{"status":"up"}"#,
Status::Up;
"service-health-status-up"
)]
#[test_case(
"te/device/main/service/tedge-mapper-c8y/status/health",
r#"{"status":"down"}"#,
Status::Down;
"service-health-status-down"
)]
#[test_case(
"te/device/main/service/tedge-mapper-c8y/status/health",
r#"{"status":"foo"}"#,
Status::Other("foo".into());
"service-health-status-other-value"
)]
#[test_case(
"te/device/child/service/tedge-mapper-c8y/status/health",
r#"{"pid":1234,"status":"up"}"#,
Status::Up;
"service-health-status-with-extra-fields"
)]
#[test_case(
"te/device/main/service/tedge-mapper-c8y/status/health",
r#"{"pid":"123456"}"#,
Status::Other("unknown".into());
"service-health-status-no-value"
)]
#[test_case(
"te/device/main/service/tedge-mapper-c8y/status/health",
r#"{"status":""}"#,
Status::Other("".into());
"service-health-status-empty-value"
)]
#[test_case(
"te/device/main/service/tedge-mapper-c8y/status/health",
"{}",
Status::default();
"service-health-status-empty-message"
)]
#[test_case(
"te/device/main/service/mosquitto-xyz-bridge/status/health",
"1",
Status::Up;
"mosquitto-bridge-service-health-status-up"
)]
#[test_case(
"te/device/main/service/mosquitto-xyz-bridge/status/health",
"0",
Status::Down;
"mosquitto-bridge-service-health-status-down"
)]
#[test_case(
"te/device/main/service/mosquitto-xyz-bridge/status/health",
"invalid payload",
Status::default();
"mosquitto-bridge-service-health-status-invalid-payload"
)]
#[test_case(
"te/device/main/service/tedge-mapper-bridge-c8y/status/health",
r#"{"status":"up"}"#,
Status::Up;
"builtin-bridge-service-health-status-up"
)]
fn parse_heath_status(health_topic: &str, health_payload: &str, expected_status: Status) {
let mqtt_schema = MqttSchema::new();
let topic = Topic::new_unchecked(health_topic);
let health_message = MqttMessage::new(&topic, health_payload.as_bytes().to_owned());

let health_status =
HealthStatus::try_from_health_status_message(&health_message, &mqtt_schema);
assert_eq!(health_status.unwrap().status, expected_status);
}

#[test]
fn is_rfc3339_timestamp() {
Expand Down
6 changes: 0 additions & 6 deletions crates/core/tedge_api/src/mqtt_topics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -427,12 +427,6 @@ impl EntityTopicId {
pub fn as_str(&self) -> &str {
self.0.as_str()
}

// FIXME: can also match "device/bridge//" or "/device/main/service/my_custom_bridge"
// should match ONLY the single mapper bridge
pub fn is_bridge_health_topic(&self) -> bool {
self.as_str().contains("bridge")
}
}

/// Contains a topic id of the service itself and the associated device.
Expand Down
16 changes: 14 additions & 2 deletions crates/core/tedge_mapper/src/aws/mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,22 @@ use async_trait::async_trait;
use aws_mapper_ext::converter::AwsConverter;
use clock::WallClock;
use mqtt_channel::TopicFilter;
use std::str::FromStr;
use tedge_actors::ConvertingActor;
use tedge_actors::MessageSink;
use tedge_actors::MessageSource;
use tedge_actors::NoConfig;
use tedge_api::mqtt_topics::EntityTopicId;
use tedge_api::mqtt_topics::MqttSchema;
use tedge_api::service_health_topic;
use tedge_config::TEdgeConfig;
use tedge_mqtt_bridge::use_key_and_cert;
use tedge_mqtt_bridge::BridgeConfig;
use tedge_mqtt_bridge::MqttBridgeActorBuilder;
use tracing::warn;

const AWS_MAPPER_NAME: &str = "tedge-mapper-aws";
const BUILT_IN_BRIDGE_NAME: &str = "tedge-mapper-bridge-aws";

pub struct AwsMapper;

Expand All @@ -32,8 +36,12 @@ impl TEdgeComponent for AwsMapper {
) -> Result<(), anyhow::Error> {
let (mut runtime, mut mqtt_actor) =
start_basic_actors(self.session_name(), &tedge_config).await?;

let mqtt_schema = MqttSchema::with_root(tedge_config.mqtt.topic_root.clone());
if tedge_config.mqtt.bridge.built_in {
let device_id = tedge_config.device.id.try_read(&tedge_config)?;
let device_topic_id = EntityTopicId::from_str(&tedge_config.mqtt.device_topic_id)?;

let rules = built_in_bridge_rules(device_id)?;

let mut cloud_config = tedge_mqtt_bridge::MqttOptions::new(
Expand All @@ -47,17 +55,21 @@ impl TEdgeComponent for AwsMapper {
&tedge_config.aws.root_cert_path,
&tedge_config,
)?;

let health_topic =
service_health_topic(&mqtt_schema, &device_topic_id, BUILT_IN_BRIDGE_NAME);

let bridge_actor = MqttBridgeActorBuilder::new(
&tedge_config,
"tedge-mapper-bridge-aws".to_owned(),
BUILT_IN_BRIDGE_NAME,
&health_topic,
rules,
cloud_config,
)
.await;
runtime.spawn(bridge_actor).await?;
}
let clock = Box::new(WallClock);
let mqtt_schema = MqttSchema::with_root(tedge_config.mqtt.topic_root.clone());
let aws_converter = AwsConverter::new(
tedge_config.aws.mapper.timestamp,
clock,
Expand Down
14 changes: 13 additions & 1 deletion crates/core/tedge_mapper/src/az/mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,22 @@ use async_trait::async_trait;
use az_mapper_ext::converter::AzureConverter;
use clock::WallClock;
use mqtt_channel::TopicFilter;
use std::str::FromStr;
use tedge_actors::ConvertingActor;
use tedge_actors::MessageSink;
use tedge_actors::MessageSource;
use tedge_actors::NoConfig;
use tedge_api::mqtt_topics::EntityTopicId;
use tedge_api::mqtt_topics::MqttSchema;
use tedge_api::service_health_topic;
use tedge_config::TEdgeConfig;
use tedge_mqtt_bridge::use_key_and_cert;
use tedge_mqtt_bridge::BridgeConfig;
use tedge_mqtt_bridge::MqttBridgeActorBuilder;
use tracing::warn;

const AZURE_MAPPER_NAME: &str = "tedge-mapper-az";
const BUILT_IN_BRIDGE_NAME: &str = "tedge-mapper-bridge-az";

pub struct AzureMapper;

Expand All @@ -32,7 +36,11 @@ impl TEdgeComponent for AzureMapper {
) -> Result<(), anyhow::Error> {
let (mut runtime, mut mqtt_actor) =
start_basic_actors(self.session_name(), &tedge_config).await?;
let mqtt_schema = MqttSchema::with_root(tedge_config.mqtt.topic_root.clone());

if tedge_config.mqtt.bridge.built_in {
let device_topic_id = EntityTopicId::from_str(&tedge_config.mqtt.device_topic_id)?;

let remote_clientid = tedge_config.device.id.try_read(&tedge_config)?;
let rules = built_in_bridge_rules(remote_clientid)?;

Expand All @@ -55,9 +63,13 @@ impl TEdgeComponent for AzureMapper {
&tedge_config,
)?;

let health_topic =
service_health_topic(&mqtt_schema, &device_topic_id, BUILT_IN_BRIDGE_NAME);

let bridge_actor = MqttBridgeActorBuilder::new(
&tedge_config,
"tedge-mapper-bridge-az".to_owned(),
BUILT_IN_BRIDGE_NAME,
&health_topic,
rules,
cloud_config,
)
Expand Down
Loading