Skip to content

Commit

Permalink
feat(plugin): netdata collector of thin-edge measurements
Browse files Browse the repository at this point in the history
Signed-off-by: Didier Wenzek <[email protected]>
  • Loading branch information
didier-wenzek committed Mar 1, 2024
1 parent bbdd177 commit cc6262f
Show file tree
Hide file tree
Showing 10 changed files with 449 additions and 1 deletion.
103 changes: 102 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ members = [
"crates/tests/*",
"plugins/c8y_firmware_plugin",
"plugins/c8y_remote_access_plugin",
"plugins/netdata-collector",
"plugins/tedge_apt_plugin",
]
resolver = "2"
Expand Down
17 changes: 17 additions & 0 deletions crates/core/tedge_actors/src/builders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,23 @@ pub trait MessageSink<M: Message, Config> {
let sender = MappingSender::new(self.get_sender(), cast);
source.register_peer(self.get_config(), sender.into())
}

/// TODO deprecate add_mapped_input in favor of this method
/// indeed, add_mapped_input fails to provide a meaningful config
fn add_source<N, MS, C, MessageMapper>(
&mut self,
source: &mut impl MessageSource<N, C>,
config: C,
cast: MessageMapper,
) where
N: Message,
MS: Iterator<Item = M> + Send,
MessageMapper: Fn(N) -> MS,
MessageMapper: 'static + Send + Sync,
{
let sender = MappingSender::new(self.get_sender(), cast);
source.register_peer(config, sender.into())
}
}

/// The [Builder] of an [Actor](crate::Actor) must implement this trait
Expand Down
22 changes: 22 additions & 0 deletions crates/extensions/netdata_collector/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
[package]
name = "netdata_collector"
version.workspace = true
authors.workspace = true
edition.workspace = true
rust-version.workspace = true
license.workspace = true
homepage.workspace = true
repository.workspace = true

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
async-trait = { workspace = true }
netdata-plugin = "0.2.0"
tedge_actors = { workspace = true }
tedge_api = { workspace = true }
tedge_mqtt_ext = { workspace = true }
time = { workspace = true }

[lints]
workspace = true
83 changes: 83 additions & 0 deletions crates/extensions/netdata_collector/src/actor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
use std::collections::{HashMap, HashSet};
use crate::message::MetricPoints;
use crate::TEdgeNetDataCollectorBuilder;
use netdata_plugin::collector::Collector;
use netdata_plugin::Chart;
use netdata_plugin::Dimension;
use tedge_actors::Actor;
use tedge_actors::LoggingReceiver;
use tedge_actors::MessageReceiver;
use tedge_actors::RuntimeError;

pub struct TEdgeNetDataCollector {
pub(crate) input: LoggingReceiver<MetricPoints>,
}

impl TEdgeNetDataCollector {
pub fn builder() -> TEdgeNetDataCollectorBuilder {
TEdgeNetDataCollectorBuilder::default()
}
}

#[async_trait::async_trait]
impl Actor for TEdgeNetDataCollector {
fn name(&self) -> &str {
"NetData"
}

async fn run(mut self) -> Result<(), RuntimeError> {
let mut writer = std::io::stdout();
let mut c = Collector::new(&mut writer);
let mut charts = HashMap::new();

while let Some(points) = self.input.recv().await {
// Declare any new chart
let updated_charts: HashSet<String> = points.iter().map(|p| p.chart_id.clone()).collect();
for chart_id in updated_charts.iter() {
if !charts.contains_key(chart_id) {
let chart = new_chart(chart_id);
c.add_chart(&chart).unwrap();
charts.insert(chart_id.to_string(), HashSet::new());
}
}

// Declare any new dimension
for p in points.iter() {
if let Some(dims) = charts.get_mut(&p.chart_id) {
let dim_id = p.dimension_id.clone();
if !dims.contains(&dim_id) {
let dim = new_dim(&dim_id);
c.add_dimension(&p.chart_id, &dim).unwrap();
dims.insert(dim_id);
}
}
}

// Publish the metrics
for p in points {
c.prepare_value(&p.chart_id, &p.dimension_id, p.value).unwrap();
}
for chart_id in updated_charts {
c.commit_chart(&chart_id).unwrap();
}
}

Ok(())
}
}

fn new_chart(chart_id: &str) -> Chart {
let mut chart = Chart::default();
chart.type_id = chart_id;
chart.name = chart_id;
chart.title = chart_id;
chart.units = "units";
chart
}

fn new_dim(dim_id: &str) -> Dimension {
let mut dim = Dimension::default();
dim.id = dim_id;
dim.name = dim_id;
dim
}
55 changes: 55 additions & 0 deletions crates/extensions/netdata_collector/src/builder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
use crate::MetricPoints;
use crate::TEdgeNetDataCollector;
use std::convert::Infallible;
use tedge_actors::futures::channel::mpsc;
use tedge_actors::Builder;
use tedge_actors::DynSender;
use tedge_actors::LoggingReceiver;
use tedge_actors::MessageSink;
use tedge_actors::NoConfig;
use tedge_actors::RuntimeRequest;
use tedge_actors::RuntimeRequestSink;

pub struct TEdgeNetDataCollectorBuilder {
input: LoggingReceiver<MetricPoints>,
input_sender: DynSender<MetricPoints>,
signal_sender: DynSender<RuntimeRequest>,
}

impl Default for TEdgeNetDataCollectorBuilder {
fn default() -> Self {
let (input_sender, input_receiver) = mpsc::channel(10);
let (signal_sender, signal_receiver) = mpsc::channel(10);
let input = LoggingReceiver::new("NetData".into(), input_receiver, signal_receiver);

TEdgeNetDataCollectorBuilder {
input,
input_sender: input_sender.into(),
signal_sender: signal_sender.into(),
}
}
}

impl MessageSink<MetricPoints, NoConfig> for TEdgeNetDataCollectorBuilder {
fn get_config(&self) -> NoConfig {
NoConfig
}

fn get_sender(&self) -> DynSender<MetricPoints> {
self.input_sender.clone()
}
}

impl RuntimeRequestSink for TEdgeNetDataCollectorBuilder {
fn get_signal_sender(&self) -> DynSender<RuntimeRequest> {
self.signal_sender.clone()
}
}

impl Builder<TEdgeNetDataCollector> for TEdgeNetDataCollectorBuilder {
type Error = Infallible;

fn try_build(self) -> Result<TEdgeNetDataCollector, Self::Error> {
Ok(TEdgeNetDataCollector { input: self.input })
}
}
7 changes: 7 additions & 0 deletions crates/extensions/netdata_collector/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
mod actor;
mod builder;
mod message;

pub use actor::*;
pub use builder::*;
pub use message::*;
Loading

0 comments on commit cc6262f

Please sign in to comment.