Skip to content

Commit

Permalink
Merge pull request #2940 from rina23q/feature/2862/support-c8y-device…
Browse files Browse the repository at this point in the history
…-availability-version-2

feat: support c8y device availability by AvailabilityActor
  • Loading branch information
rina23q authored Jul 4, 2024
2 parents 39e56c1 + d55b5a2 commit 9f1e2df
Show file tree
Hide file tree
Showing 26 changed files with 2,313 additions and 1,095 deletions.
15 changes: 14 additions & 1 deletion crates/common/tedge_config/src/tedge_config_cli/tedge_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,20 @@ define_tedge_config! {
/// Auto-upload the operation log once it finishes.
#[tedge_config(example = "always", example = "never", example = "on-failure", default(variable = "AutoLogUpload::Never"))]
auto_log_upload: AutoLogUpload,
}
},

availability: {
/// Enable sending heartbeat to Cumulocity periodically. If set to false, c8y_RequiredAvailability won't be sent
#[tedge_config(example = "true", default(value = true))]
enable: bool,

/// Heartbeat interval to be sent to Cumulocity as c8y_RequiredAvailability.
/// The value must be greater than 1 minute.
/// If set to a lower value or 0, the device is considered in maintenance mode in the Cumulocity context.
/// Details: https://cumulocity.com/docs/device-integration/fragment-library/#device-availability
#[tedge_config(example = "60m", default(from_str = "60m"))]
interval: SecondsOrHumanTime,
},
},

#[tedge_config(deprecated_name = "azure")] // for 0.1.0 compatibility
Expand Down
25 changes: 25 additions & 0 deletions crates/core/c8y_api/src/smartrest/inventory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@

use crate::smartrest::csv::fields_to_csv_string;
use crate::smartrest::topic::publish_topic_from_ancestors;
use crate::smartrest::topic::C8yTopic;
use mqtt_channel::MqttMessage;
use std::time::Duration;
use tedge_config::TopicPrefix;

/// Create a SmartREST message for creating a child device under the given ancestors.
Expand Down Expand Up @@ -116,6 +118,29 @@ pub fn service_creation_message_payload(
]))
}

/// Create a SmartREST message to set a response interval for c8y_RequiredAvailability.
///
/// In the SmartREST 117 message, the interval must be in MINUTES, and can be <=0,
/// which means the device is in maintenance mode in the c8y context.
/// Details: https://cumulocity.com/docs/device-integration/fragment-library/#device-availability
#[derive(Debug)]
pub struct C8ySmartRestSetInterval117 {
pub c8y_topic: C8yTopic,
pub interval: Duration,
pub prefix: TopicPrefix,
}

impl From<C8ySmartRestSetInterval117> for MqttMessage {
fn from(value: C8ySmartRestSetInterval117) -> Self {
let topic = value.c8y_topic.to_topic(&value.prefix).unwrap();
let interval_in_minutes = value.interval.as_secs() / 60;
MqttMessage::new(
&topic,
fields_to_csv_string(&["117", &interval_in_minutes.to_string()]),
)
}
}

#[derive(thiserror::Error, Debug)]
#[error("Field `{field_name}` contains invalid value: {value:?}")]
pub struct InvalidValueError {
Expand Down
43 changes: 42 additions & 1 deletion crates/core/tedge_api/src/mqtt_topics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ impl MqttSchema {
};
let channel = match channel {
ChannelFilter::EntityMetadata => "".to_string(),
ChannelFilter::EntityTwinData => "/twin/+".to_string(),
ChannelFilter::Measurement => "/m/+".to_string(),
ChannelFilter::MeasurementMetadata => "/m/+/meta".to_string(),
ChannelFilter::Event => "/e/+".to_string(),
Expand All @@ -178,6 +179,7 @@ impl MqttSchema {
ChannelFilter::Command(operation) => format!("/cmd/{operation}/+"),
ChannelFilter::AnyCommandMetadata => "/cmd/+".to_string(),
ChannelFilter::CommandMetadata(operation) => format!("/cmd/{operation}"),
ChannelFilter::Health => "/status/health".to_string(),
};

TopicFilter::new_unchecked(&format!("{}/{entity}{channel}", self.root))
Expand Down Expand Up @@ -404,6 +406,11 @@ impl EntityTopicId {
self == &Self::default_main_device()
}

/// Returns true if the current topic identifier matches that of the service
pub fn is_default_service(&self) -> bool {
self.default_service_name().is_some()
}

/// If `self` is a device topic id, return a service topic id under this
/// device.
///
Expand Down Expand Up @@ -512,7 +519,7 @@ pub enum TopicIdError {
/// A channel identifies the type of the messages exchanged over a topic
///
/// <https://thin-edge.github.io/thin-edge.io/next/references/mqtt-api/#group-channel>
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum Channel {
EntityMetadata,
EntityTwinData {
Expand Down Expand Up @@ -615,6 +622,10 @@ impl Display for Channel {
}

impl Channel {
pub fn is_entity_metadata(&self) -> bool {
matches!(self, Channel::EntityMetadata)
}

pub fn is_measurement(&self) -> bool {
matches!(self, Channel::Measurement { .. })
}
Expand Down Expand Up @@ -716,8 +727,10 @@ pub enum EntityFilter<'a> {
Entity(&'a EntityTopicId),
}

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum ChannelFilter {
EntityMetadata,
EntityTwinData,
Measurement,
Event,
Alarm,
Expand All @@ -728,6 +741,34 @@ pub enum ChannelFilter {
AlarmMetadata,
AnyCommandMetadata,
CommandMetadata(OperationType),
Health,
}

impl From<&Channel> for ChannelFilter {
fn from(value: &Channel) -> Self {
match value {
Channel::EntityMetadata => ChannelFilter::EntityMetadata,
Channel::EntityTwinData { fragment_key: _ } => ChannelFilter::EntityTwinData,
Channel::Measurement {
measurement_type: _,
} => ChannelFilter::Measurement,
Channel::Event { event_type: _ } => ChannelFilter::Event,
Channel::Alarm { alarm_type: _ } => ChannelFilter::Alarm,
Channel::Command {
operation,
cmd_id: _,
} => ChannelFilter::Command(operation.clone()),
Channel::MeasurementMetadata {
measurement_type: _,
} => ChannelFilter::MeasurementMetadata,
Channel::EventMetadata { event_type: _ } => ChannelFilter::EventMetadata,
Channel::AlarmMetadata { alarm_type: _ } => ChannelFilter::AlarmMetadata,
Channel::CommandMetadata { operation } => {
ChannelFilter::CommandMetadata(operation.clone())
}
Channel::Health => ChannelFilter::Health,
}
}
}

pub struct IdGenerator {
Expand Down
9 changes: 9 additions & 0 deletions crates/core/tedge_api/src/store/pending_entity_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,15 @@ pub struct PendingEntityData {
pub data_messages: Vec<MqttMessage>,
}

impl From<EntityRegistrationMessage> for PendingEntityData {
fn from(reg_message: EntityRegistrationMessage) -> Self {
Self {
reg_message,
data_messages: vec![],
}
}
}

impl PendingEntityStore {
pub fn new(mqtt_schema: MqttSchema, telemetry_cache_size: usize) -> Self {
Self {
Expand Down
165 changes: 0 additions & 165 deletions crates/core/tedge_api/store/message_log.rs

This file was deleted.

17 changes: 16 additions & 1 deletion crates/core/tedge_mapper/src/c8y/mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ use c8y_auth_proxy::actor::C8yAuthProxyBuilder;
use c8y_http_proxy::credentials::C8YJwtRetriever;
use c8y_http_proxy::C8YHttpProxyBuilder;
use c8y_mapper_ext::actor::C8yMapperBuilder;
use c8y_mapper_ext::availability::AvailabilityBuilder;
use c8y_mapper_ext::availability::AvailabilityConfig;
use c8y_mapper_ext::compatibility_adapter::OldAgentAdapter;
use c8y_mapper_ext::config::C8yMapperConfig;
use c8y_mapper_ext::converter::CumulocityConverter;
Expand Down Expand Up @@ -209,7 +211,7 @@ impl TEdgeComponent for CumulocityMapper {
let mut service_monitor_actor =
MqttActorBuilder::new(service_monitor_client_config(&tedge_config)?);

let c8y_mapper_actor = C8yMapperBuilder::try_new(
let mut c8y_mapper_actor = C8yMapperBuilder::try_new(
c8y_mapper_config,
&mut mqtt_actor,
&mut c8y_http_proxy_actor,
Expand All @@ -224,6 +226,16 @@ impl TEdgeComponent for CumulocityMapper {
// and translating the responses received on tedge/commands/res/+/+ to te/device/main///cmd/+/+
let old_to_new_agent_adapter = OldAgentAdapter::builder(&mut mqtt_actor);

let availability_actor = if tedge_config.c8y.availability.enable {
Some(AvailabilityBuilder::new(
AvailabilityConfig::from(&tedge_config),
&mut c8y_mapper_actor,
&mut timer_actor,
))
} else {
None
};

runtime.spawn(mqtt_actor).await?;
runtime.spawn(jwt_actor).await?;
runtime.spawn(http_actor).await?;
Expand All @@ -236,6 +248,9 @@ impl TEdgeComponent for CumulocityMapper {
runtime.spawn(uploader_actor).await?;
runtime.spawn(downloader_actor).await?;
runtime.spawn(old_to_new_agent_adapter).await?;
if let Some(availability_actor) = availability_actor {
runtime.spawn(availability_actor).await?;
}
runtime.run_to_completion().await?;

Ok(())
Expand Down
Loading

0 comments on commit 9f1e2df

Please sign in to comment.