Skip to content

Commit

Permalink
perf(topic_state_monitor): reduce callback invocation
Browse files Browse the repository at this point in the history
Signed-off-by: Takayuki AKAMINE <[email protected]>
  • Loading branch information
takam5f2 committed Feb 28, 2024
1 parent aead86a commit d2cf842
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 11 deletions.
16 changes: 16 additions & 0 deletions system/component_state_monitor/config/topics.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
warn_rate: 0.0
error_rate: 0.0
timeout: 0.0
queue_size: 1

- module: map
mode: [online, logging_simulation]
Expand All @@ -23,6 +24,7 @@
warn_rate: 0.0
error_rate: 0.0
timeout: 0.0
queue_size: 1

- module: localization
mode: [online, logging_simulation, planning_simulation]
Expand All @@ -36,6 +38,7 @@
warn_rate: 0.0
error_rate: 0.0
timeout: 0.0
queue_size: 1

- module: localization
mode: [online, logging_simulation]
Expand All @@ -49,6 +52,7 @@
warn_rate: 5.0
error_rate: 1.0
timeout: 1.0
queue_size: 60

- module: localization
mode: [online, logging_simulation]
Expand All @@ -62,6 +66,7 @@
warn_rate: 5.0
error_rate: 1.0
timeout: 1.0
queue_size: 15

- module: perception
mode: [online, logging_simulation]
Expand All @@ -75,6 +80,7 @@
warn_rate: 5.0
error_rate: 1.0
timeout: 1.0
queue_size: 15

- module: perception
mode: [online, logging_simulation, planning_simulation]
Expand All @@ -88,6 +94,7 @@
warn_rate: 5.0
error_rate: 1.0
timeout: 1.0
queue_size: 15

- module: perception
mode: [online, logging_simulation]
Expand All @@ -101,6 +108,7 @@
warn_rate: 5.0
error_rate: 1.0
timeout: 1.0
queue_size: 15

- module: planning
mode: [online, logging_simulation, planning_simulation]
Expand All @@ -114,6 +122,7 @@
warn_rate: 0.0
error_rate: 0.0
timeout: 0.0
queue_size: 1

- module: planning
mode: [online, logging_simulation, planning_simulation]
Expand All @@ -127,6 +136,7 @@
warn_rate: 5.0
error_rate: 1.0
timeout: 1.0
queue_size: 15

- module: control
mode: [online, logging_simulation, planning_simulation]
Expand All @@ -140,6 +150,7 @@
warn_rate: 5.0
error_rate: 1.0
timeout: 1.0
queue_size: 60

- module: control
mode: [online, logging_simulation, planning_simulation]
Expand All @@ -153,6 +164,7 @@
warn_rate: 5.0
error_rate: 1.0
timeout: 1.0
queue_size: 60

- module: vehicle
mode: [online, logging_simulation, planning_simulation]
Expand All @@ -166,6 +178,7 @@
warn_rate: 5.0
error_rate: 1.0
timeout: 1.0
queue_size: 60

- module: vehicle
mode: [online, logging_simulation, planning_simulation]
Expand All @@ -179,6 +192,7 @@
warn_rate: 5.0
error_rate: 1.0
timeout: 1.0
queue_size: 60

- module: system
mode: [online, logging_simulation, planning_simulation]
Expand All @@ -192,6 +206,7 @@
warn_rate: 5.0
error_rate: 1.0
timeout: 1.0
queue_size: 60

- module: localization
mode: [online, logging_simulation, planning_simulation]
Expand All @@ -206,3 +221,4 @@
warn_rate: 5.0
error_rate: 1.0
timeout: 1.0
queue_size: 60
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class TopicStateMonitor
rclcpp::Time getLastMessageTime() const { return last_message_time_; }
double getTopicRate() const { return topic_rate_; }

void update();
void update(rclcpp::Time & source_timestamp);
TopicStatus getTopicStatus() const;

private:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ struct NodeParam
bool transient_local;
bool best_effort;
bool is_transform;
int queue_size;
};

class TopicStateMonitorNode : public rclcpp::Node
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
<arg name="timeout" description="timeout period[s]"/>
<arg name="window_size" default="10" description="window size"/>

<node pkg="topic_state_monitor" exec="topic_state_monitor_node" name="topic_state_monitor_$(var node_name_suffix)" output="screen">
<node pkg="topic_state_monitor" exec="topic_state_monitor_node" name="topic_state_monitor_$(var node_name_suffix)" output="screen" >
<param name="topic" value="$(var topic)"/>
<param name="frame_id" value="$(var frame_id)"/>
<param name="child_frame_id" value="$(var child_frame_id)"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ TopicStateMonitor::TopicStateMonitor(rclcpp::Node & node) : clock_(node.get_cloc
{
}

void TopicStateMonitor::update()
void TopicStateMonitor::update(rclcpp::Time & source_timestamp)
{
// Add data
last_message_time_ = clock_->now();
last_message_time_ = source_timestamp;
time_buffer_.push_back(last_message_time_);

// Remove old data
Expand Down Expand Up @@ -52,14 +52,14 @@ TopicStatus TopicStateMonitor::getTopicStatus() const
return TopicStatus::Ok;
}

#include <iostream>
double TopicStateMonitor::calcTopicRate() const
{
// Output max_rate when topic rate can't be calculated.
// In this case, it's assumed timeout is used instead.
if (time_buffer_.size() < 2) {
return TopicStateMonitor::max_rate;
}

const auto time_diff = (time_buffer_.back() - time_buffer_.front()).seconds();
const auto num_intervals = time_buffer_.size() - 1;

Expand Down
42 changes: 36 additions & 6 deletions system/topic_state_monitor/src/topic_state_monitor_core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ TopicStateMonitorNode::TopicStateMonitorNode(const rclcpp::NodeOptions & node_op
node_param_.best_effort = declare_parameter("best_effort", false);
node_param_.diag_name = declare_parameter<std::string>("diag_name");
node_param_.is_transform = (node_param_.topic == "/tf" || node_param_.topic == "/tf_static");
node_param_.queue_size = declare_parameter<int>("queue_size", 10);

if (node_param_.is_transform) {
node_param_.frame_id = declare_parameter<std::string>("frame_id");
Expand All @@ -70,31 +71,42 @@ TopicStateMonitorNode::TopicStateMonitorNode(const rclcpp::NodeOptions & node_op
topic_state_monitor_->setParam(param_);

// Subscriber
rclcpp::QoS qos = rclcpp::QoS{1};
rclcpp::QoS qos = rclcpp::QoS{node_param_.queue_size};
if (node_param_.transient_local) {
qos.transient_local();
}
if (node_param_.best_effort) {
qos.best_effort();
}

auto noexec_callback_group = this->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive, false);
auto noexec_subscription_options = rclcpp::SubscriptionOptions();
noexec_subscription_options.callback_group = noexec_callback_group;

if (node_param_.is_transform) {
sub_transform_ = this->create_subscription<tf2_msgs::msg::TFMessage>(
node_param_.topic, qos, [this](tf2_msgs::msg::TFMessage::ConstSharedPtr msg) {
node_param_.topic, qos,
[this](tf2_msgs::msg::TFMessage::ConstSharedPtr msg) {
assert(false);
for (const auto & transform : msg->transforms) {
if (
transform.header.frame_id == node_param_.frame_id &&
transform.child_frame_id == node_param_.child_frame_id) {
topic_state_monitor_->update();
rclcpp::Time empty = rclcpp::Time(0, 0);
topic_state_monitor_->update(empty);
}
}
});
},
noexec_subscription_options);
} else {
sub_topic_ = this->create_generic_subscription(
node_param_.topic, node_param_.topic_type, qos,
[this]([[maybe_unused]] std::shared_ptr<rclcpp::SerializedMessage> msg) {
topic_state_monitor_->update();
});
assert(false);
rclcpp::Time empty = rclcpp::Time(0, 0);
topic_state_monitor_->update(empty);
},
noexec_subscription_options);
}

// Diagnostic Updater
Expand Down Expand Up @@ -130,6 +142,24 @@ rcl_interfaces::msg::SetParametersResult TopicStateMonitorNode::onParameter(

void TopicStateMonitorNode::onTimer()
{
// Iterate to reading messages and updating the topic state.
bool is_taken = true;
rclcpp::MessageInfo message_info;
while (is_taken) {
if (node_param_.is_transform) {
tf2_msgs::msg::TFMessage::SharedPtr msg = std::make_shared<tf2_msgs::msg::TFMessage>();
is_taken = sub_transform_->take(*msg, message_info);
} else {
std::shared_ptr<rclcpp::SerializedMessage> serialized_msg = sub_topic_->create_serialized_message();
is_taken = sub_topic_->take_serialized(*serialized_msg.get(), message_info);
}

if (is_taken) {
rclcpp::Time source_timestamp(message_info.get_rmw_message_info().source_timestamp, RCL_ROS_TIME);
topic_state_monitor_->update(source_timestamp);
}
}

// Publish diagnostics
updater_.force_update();
}
Expand Down

0 comments on commit d2cf842

Please sign in to comment.