diff --git a/bondcpp/include/bondcpp/bond.hpp b/bondcpp/include/bondcpp/bond.hpp index 0cb3ff5d..8b723f24 100644 --- a/bondcpp/include/bondcpp/bond.hpp +++ b/bondcpp/include/bondcpp/bond.hpp @@ -33,7 +33,6 @@ #define BONDCPP__BOND_HPP_ #include -#include #include #include #include @@ -45,8 +44,8 @@ #include "bondcpp/BondSM_sm.hpp" -#include "rclcpp/rclcpp.hpp" -#include "rclcpp_lifecycle/lifecycle_node.hpp" +#include +#include namespace bond { @@ -59,6 +58,29 @@ namespace bond class Bond { public: + using EventCallback = std::function; + + /** \brief Constructor to delegate common functionality + * + * \param topic The topic used to exchange the bond status messages. + * \param id The ID of the bond, which should match the ID used on + * the sister's end. + * \param node_base base node interface + * \param node_logging logging node interface + * \param node_timers timers node interface + * \param on_broken callback that will be called when the bond is broken. + * \param on_formed callback that will be called when the bond is formed. + */ + Bond( + const std::string & topic, const std::string & id, + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_base, + rclcpp::node_interfaces::NodeLoggingInterface::SharedPtr node_logging, + rclcpp::node_interfaces::NodeParametersInterface::SharedPtr node_params, + rclcpp::node_interfaces::NodeTimersInterface::SharedPtr node_timers, + rclcpp::node_interfaces::NodeTopicsInterface::SharedPtr node_topics, + EventCallback on_broken = EventCallback(), + EventCallback on_formed = EventCallback()); + /** \brief Constructs a bond, but does not connect * * \param topic The topic used to exchange the bond status messages. @@ -71,8 +93,8 @@ class Bond Bond( const std::string & topic, const std::string & id, rclcpp_lifecycle::LifecycleNode::SharedPtr nh, - std::function on_broken = std::function(), - std::function on_formed = std::function()); + EventCallback on_broken = EventCallback(), + EventCallback on_formed = EventCallback()); /** \brief Constructs a bond, but does not connect * @@ -86,8 +108,8 @@ class Bond Bond( const std::string & topic, const std::string & id, rclcpp::Node::SharedPtr nh, - std::function on_broken = std::function(), - std::function on_formed = std::function()); + EventCallback on_broken = EventCallback(), + EventCallback on_formed = EventCallback()); /** \brief Destructs the object, breaking the bond if it is still formed. */ @@ -95,23 +117,27 @@ class Bond void setupConnections(); - double getConnectTimeout() const {return connect_timeout_;} + double getConnectTimeout() const {return connect_timeout_.seconds();} void setConnectTimeout(double dur); void connectTimerReset(); void connectTimerCancel(); - double getDisconnectTimeout() const {return disconnect_timeout_;} + + double getDisconnectTimeout() const {return disconnect_timeout_.seconds();} void setDisconnectTimeout(double dur); void disconnectTimerReset(); void disconnectTimerCancel(); - double getHeartbeatTimeout() const {return heartbeat_timeout_;} + + double getHeartbeatTimeout() const {return heartbeat_timeout_.seconds();} void setHeartbeatTimeout(double dur); void heartbeatTimerReset(); void heartbeatTimerCancel(); - double getHeartbeatPeriod() const {return heartbeat_period_;} + + double getHeartbeatPeriod() const {return heartbeat_period_.seconds();} void setHeartbeatPeriod(double dur); void publishingTimerReset(); void publishingTimerCancel(); - double getDeadPublishPeriod() const {return dead_publish_period_;} + + double getDeadPublishPeriod() const {return dead_publish_period_.seconds();} void setDeadPublishPeriod(double dur); void deadpublishingTimerReset(); void deadpublishingTimerCancel(); @@ -119,13 +145,14 @@ class Bond /** \brief Starts the bond and connects to the sister process. */ void start(); + /** \brief Sets the formed callback. */ - void setFormedCallback(std::function on_formed); + void setFormedCallback(EventCallback on_formed); /** \brief Sets the broken callback */ - void setBrokenCallback(std::function on_broken); + void setBrokenCallback(EventCallback on_broken); /** \brief Blocks until the bond is formed for at most 'duration'. * Assumes the node to be spinning in the background @@ -134,6 +161,7 @@ class Bond * \return true iff the bond has been formed. */ bool waitUntilFormed(rclcpp::Duration timeout = rclcpp::Duration(std::chrono::seconds(-1))); + /** \brief Blocks until the bond is broken for at most 'duration'. * Assumes the node to be spinning in the background * @@ -141,15 +169,18 @@ class Bond * \return true iff the bond has been broken, even if it has never been formed. */ bool waitUntilBroken(rclcpp::Duration timeout = rclcpp::Duration(std::chrono::seconds(-1))); + /** \brief Indicates if the bond is broken. */ bool isBroken(); + /** \brief Breaks the bond, notifying the other process. */ void breakBond(); - std::string getTopic() {return topic_;} - std::string getId() {return id_;} - std::string getInstanceId() {return instance_id_;} + + std::string getTopic() const {return topic_;} + std::string getId() const {return id_;} + std::string getInstanceId() const {return instance_id_;} private: friend struct ::BondSM; @@ -158,17 +189,22 @@ class Bond void onHeartbeatTimeout(); void onDisconnectTimeout(); - void bondStatusCB(const bond::msg::Status::ConstSharedPtr msg); + void bondStatusCB(const bond::msg::Status & msg); void doPublishing(); void publishStatus(bool active); - std::vector> pending_callbacks_; void flushPendingCallbacks(); + bool isStateAlive(); + bool isStateAwaitSisterDeath(); + bool isStateDead(); + bool isStateWaitingForSister(); + + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_base_; - rclcpp::node_interfaces::NodeTimersInterface::SharedPtr node_timers_; rclcpp::node_interfaces::NodeLoggingInterface::SharedPtr node_logging_; + rclcpp::node_interfaces::NodeTimersInterface::SharedPtr node_timers_; rclcpp::TimerBase::SharedPtr connect_timer_; rclcpp::TimerBase::SharedPtr disconnect_timer_; @@ -176,6 +212,7 @@ class Bond rclcpp::TimerBase::SharedPtr publishing_timer_; rclcpp::TimerBase::SharedPtr deadpublishing_timer_; + std::mutex state_machine_mutex_; std::unique_ptr bondsm_; BondSMContext sm_; @@ -183,30 +220,30 @@ class Bond std::string id_; std::string instance_id_; std::string sister_instance_id_; - std::function on_broken_; - std::function on_formed_; - bool sisterDiedFirst_; - bool started_; - bool connect_timer_reset_flag_; - bool disconnect_timer_reset_flag_; - bool deadpublishing_timer_reset_flag_; - bool disable_heartbeat_timeout_; - std::mutex mutex_; - std::condition_variable condition_; - - double connect_timeout_; - double heartbeat_timeout_; - double disconnect_timeout_; - double heartbeat_period_; - double dead_publish_period_; + + std::mutex callbacks_mutex_; + std::vector pending_callbacks_; + EventCallback on_broken_; + EventCallback on_formed_; + + bool sisterDiedFirst_ {false}; + bool started_ {false}; + bool connect_timer_reset_flag_ {false}; + bool disconnect_timer_reset_flag_ {false}; + bool deadpublishing_timer_reset_flag_ {false}; + bool disable_heartbeat_timeout_ {false}; + + rclcpp::Duration connect_timeout_; + rclcpp::Duration disconnect_timeout_; + rclcpp::Duration heartbeat_timeout_; + rclcpp::Duration heartbeat_period_; + rclcpp::Duration dead_publish_period_; rclcpp::Subscription::SharedPtr sub_; rclcpp::Publisher::SharedPtr pub_; }; - } // namespace bond - // Internal use only struct BondSM { diff --git a/bondcpp/src/bond.cpp b/bondcpp/src/bond.cpp index d7908cd6..1cb6a2c6 100644 --- a/bondcpp/src/bond.cpp +++ b/bondcpp/src/bond.cpp @@ -39,12 +39,15 @@ #include #include -#include +#include #include #include #include #include +#include +#include + using namespace std::chrono_literals; namespace bond @@ -70,75 +73,87 @@ static std::string makeUUID() Bond::Bond( const std::string & topic, const std::string & id, - rclcpp_lifecycle::LifecycleNode::SharedPtr nh, - std::function on_broken, - std::function on_formed) -: bondsm_(new BondSM(this)), + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_base, + rclcpp::node_interfaces::NodeLoggingInterface::SharedPtr node_logging, + rclcpp::node_interfaces::NodeParametersInterface::SharedPtr node_params, + rclcpp::node_interfaces::NodeTimersInterface::SharedPtr node_timers, + rclcpp::node_interfaces::NodeTopicsInterface::SharedPtr node_topics, + EventCallback on_broken, + EventCallback on_formed) +: node_base_(node_base), + node_logging_(node_logging), + node_timers_(node_timers), + bondsm_(std::make_unique(this)), sm_(*bondsm_), topic_(topic), id_(id), instance_id_(makeUUID()), on_broken_(on_broken), on_formed_(on_formed), - sisterDiedFirst_(false), - started_(false), - connect_timer_reset_flag_(false), - disconnect_timer_reset_flag_(false), - deadpublishing_timer_reset_flag_(false), - disable_heartbeat_timeout_(false) -{ - node_base_ = nh->get_node_base_interface(); - node_logging_ = nh->get_node_logging_interface(); - node_timers_ = nh->get_node_timers_interface(); - if (!nh->has_parameter(bond::msg::Constants::DISABLE_HEARTBEAT_TIMEOUT_PARAM)) { - nh->declare_parameter(bond::msg::Constants::DISABLE_HEARTBEAT_TIMEOUT_PARAM, false); + connect_timeout_( + rclcpp::Duration::from_seconds(bond::msg::Constants::DEFAULT_CONNECT_TIMEOUT)), + disconnect_timeout_( + rclcpp::Duration::from_seconds(bond::msg::Constants::DEFAULT_DISCONNECT_TIMEOUT)), + heartbeat_timeout_( + rclcpp::Duration::from_seconds(bond::msg::Constants::DEFAULT_HEARTBEAT_TIMEOUT)), + heartbeat_period_( + rclcpp::Duration::from_seconds(bond::msg::Constants::DEFAULT_HEARTBEAT_PERIOD)), + dead_publish_period_( + rclcpp::Duration::from_seconds(bond::msg::Constants::DEAD_PUBLISH_PERIOD)) +{ + if (!node_params->has_parameter(bond::msg::Constants::DISABLE_HEARTBEAT_TIMEOUT_PARAM)) { + node_params->declare_parameter( + bond::msg::Constants::DISABLE_HEARTBEAT_TIMEOUT_PARAM, + rclcpp::ParameterValue(false)); } disable_heartbeat_timeout_ = - nh->get_parameter(bond::msg::Constants::DISABLE_HEARTBEAT_TIMEOUT_PARAM).as_bool(); + node_params->get_parameter(bond::msg::Constants::DISABLE_HEARTBEAT_TIMEOUT_PARAM).as_bool(); setupConnections(); - pub_ = rclcpp::create_publisher(nh, topic_, rclcpp::QoS(rclcpp::KeepLast(5))); - sub_ = nh->create_subscription( - topic_, rclcpp::QoS(100), + pub_ = rclcpp::create_publisher( + node_params, + node_topics, + topic_, + rclcpp::QoS(rclcpp::KeepLast(5))); + + sub_ = rclcpp::create_subscription( + node_params, + node_topics, + topic_, + rclcpp::QoS(100), std::bind(&Bond::bondStatusCB, this, std::placeholders::_1)); } Bond::Bond( const std::string & topic, const std::string & id, - rclcpp::Node::SharedPtr nh, - std::function on_broken, - std::function on_formed) -: bondsm_(new BondSM(this)), - sm_(*bondsm_), - topic_(topic), - id_(id), - instance_id_(makeUUID()), - on_broken_(on_broken), - on_formed_(on_formed), - sisterDiedFirst_(false), - started_(false), - connect_timer_reset_flag_(false), - disconnect_timer_reset_flag_(false), - deadpublishing_timer_reset_flag_(false) -{ - node_base_ = nh->get_node_base_interface(); - node_logging_ = nh->get_node_logging_interface(); - node_timers_ = nh->get_node_timers_interface(); - if (!nh->has_parameter(bond::msg::Constants::DISABLE_HEARTBEAT_TIMEOUT_PARAM)) { - nh->declare_parameter(bond::msg::Constants::DISABLE_HEARTBEAT_TIMEOUT_PARAM, false); - } - - disable_heartbeat_timeout_ = - nh->get_parameter(bond::msg::Constants::DISABLE_HEARTBEAT_TIMEOUT_PARAM).as_bool(); - - setupConnections(); + rclcpp_lifecycle::LifecycleNode::SharedPtr nh, + EventCallback on_broken, + EventCallback on_formed) +: Bond(topic, id, + nh->get_node_base_interface(), + nh->get_node_logging_interface(), + nh->get_node_parameters_interface(), + nh->get_node_timers_interface(), + nh->get_node_topics_interface(), + on_broken, on_formed) +{ +} - pub_ = rclcpp::create_publisher(nh, topic_, rclcpp::QoS(rclcpp::KeepLast(5))); - sub_ = nh->create_subscription( - topic_, rclcpp::QoS(100), - std::bind(&Bond::bondStatusCB, this, std::placeholders::_1)); +Bond::Bond( + const std::string & topic, const std::string & id, + rclcpp::Node::SharedPtr nh, + EventCallback on_broken, + EventCallback on_formed) +: Bond(topic, id, + nh->get_node_base_interface(), + nh->get_node_logging_interface(), + nh->get_node_parameters_interface(), + nh->get_node_timers_interface(), + nh->get_node_topics_interface(), + on_broken, on_formed) +{ } Bond::~Bond() @@ -153,20 +168,11 @@ Bond::~Bond() id_.c_str(), instance_id_.c_str()); } - - // Must destroy the subscription before locking mutex_: shutdown() - // will block until the status callback completes, and the status - // callback locks mutex_ (in flushPendingCallbacks). - // Stops the timers before locking the mutex. Makes sure none of - // the callbacks are running when we aquire the mutex. - publishingTimerCancel(); - // deadpublishingTimerCancel(); + deadpublishingTimerCancel(); connectTimerCancel(); heartbeatTimerCancel(); disconnectTimerCancel(); - - std::unique_lock lock(mutex_); } void Bond::setupConnections() @@ -184,13 +190,11 @@ void Bond::setConnectTimeout(double dur) RCLCPP_ERROR(node_logging_->get_logger(), "Cannot set timeouts after calling start()"); return; } - connect_timeout_ = dur * 1e9; // conversion from seconds to nanoseconds + connect_timeout_ = rclcpp::Duration::from_seconds(dur); } void Bond::connectTimerReset() { - rclcpp::Duration dur1(rclcpp::Duration::from_nanoseconds(connect_timeout_)); - const std::chrono::nanoseconds period1(dur1.nanoseconds()); // Callback function of connect timer auto connectTimerResetCallback = [this]() -> void @@ -203,8 +207,11 @@ void Bond::connectTimerReset() }; // Connect timer started on node connect_timer_ = rclcpp::create_wall_timer( - period1, std::move(connectTimerResetCallback), - nullptr, node_base_.get(), node_timers_.get()); + connect_timeout_.to_chrono(), + connectTimerResetCallback, + nullptr, + node_base_.get(), + node_timers_.get()); } void Bond::connectTimerCancel() @@ -220,14 +227,11 @@ void Bond::setDisconnectTimeout(double dur) RCLCPP_ERROR(node_logging_->get_logger(), "Cannot set timeouts after calling start()"); return; } - - disconnect_timeout_ = dur * 1e9; // conversion from seconds to nanoseconds + disconnect_timeout_ = rclcpp::Duration::from_seconds(dur); } void Bond::disconnectTimerReset() { - rclcpp::Duration dur2(rclcpp::Duration::from_nanoseconds(disconnect_timeout_)); - const std::chrono::nanoseconds period2(dur2.nanoseconds()); // Callback function of disconnect timer auto disconnectTimerResetCallback = [this]() -> void @@ -240,8 +244,11 @@ void Bond::disconnectTimerReset() }; // Disconnect timer started on node disconnect_timer_ = rclcpp::create_wall_timer( - period2, std::move(disconnectTimerResetCallback), - nullptr, node_base_.get(), node_timers_.get()); + disconnect_timeout_.to_chrono(), + disconnectTimerResetCallback, + nullptr, + node_base_.get(), + node_timers_.get()); } void Bond::disconnectTimerCancel() @@ -258,13 +265,11 @@ void Bond::setHeartbeatTimeout(double dur) return; } - heartbeat_timeout_ = dur * 1e9; // conversion from seconds to nanoseconds + heartbeat_timeout_ = rclcpp::Duration::from_seconds(dur); } void Bond::heartbeatTimerReset() { - rclcpp::Duration dur3(rclcpp::Duration::from_nanoseconds(heartbeat_timeout_)); - const std::chrono::nanoseconds period3(dur3.nanoseconds()); // Callback function of heartbeat timer auto heartbeatTimerResetCallback = [this]() -> void @@ -277,7 +282,8 @@ void Bond::heartbeatTimerReset() }; // heartbeat timer started on node heartbeat_timer_ = rclcpp::create_wall_timer( - period3, std::move(heartbeatTimerResetCallback), + heartbeat_timeout_.to_chrono(), + heartbeatTimerResetCallback, nullptr, node_base_.get(), node_timers_.get()); } @@ -295,13 +301,11 @@ void Bond::setHeartbeatPeriod(double dur) return; } - heartbeat_period_ = dur * 1e9; // conversion from seconds to nanoseconds + heartbeat_period_ = rclcpp::Duration::from_seconds(dur); } void Bond::publishingTimerReset() { - rclcpp::Duration dur4(rclcpp::Duration::from_nanoseconds(heartbeat_period_)); - const std::chrono::nanoseconds period4(dur4.nanoseconds()); // Callback function of publishing timer auto publishingTimerResetCallback = [this]() -> void @@ -310,8 +314,11 @@ void Bond::publishingTimerReset() }; // publishing timer started on node publishing_timer_ = rclcpp::create_wall_timer( - period4, std::move(publishingTimerResetCallback), - nullptr, node_base_.get(), node_timers_.get()); + heartbeat_period_.to_chrono(), + publishingTimerResetCallback, + nullptr, + node_base_.get(), + node_timers_.get()); } void Bond::publishingTimerCancel() @@ -328,26 +335,25 @@ void Bond::setDeadPublishPeriod(double dur) return; } - dead_publish_period_ = dur * 1e9; // conversion from seconds to nanoseconds + dead_publish_period_ = rclcpp::Duration::from_seconds(dur); } void Bond::deadpublishingTimerReset() { - rclcpp::Duration dur5(rclcpp::Duration::from_nanoseconds(dead_publish_period_)); - const std::chrono::nanoseconds period5(dur5.nanoseconds()); // callback function of dead publishing timer which will publish data when bond is broken auto deadpublishingTimerResetCallback = [this]() -> void { - if (deadpublishing_timer_reset_flag_) { - doPublishing(); - deadpublishing_timer_reset_flag_ = false; - } // flag is needed to have valid callback + doPublishing(); }; + // dead publishing timer started on node deadpublishing_timer_ = rclcpp::create_wall_timer( - period5, std::move(deadpublishingTimerResetCallback), - nullptr, node_base_.get(), node_timers_.get()); + dead_publish_period_.to_chrono(), + deadpublishingTimerResetCallback, + nullptr, + node_base_.get(), + node_timers_.get()); } void Bond::deadpublishingTimerCancel() @@ -357,105 +363,120 @@ void Bond::deadpublishingTimerCancel() } } -/* TODO Callback Queue is not availabe in ROS2 -void Bond::setCallbackQueue(rclcpp::CallbackQueueInterface *queue) -{ - if (started_) { - RCLCPP_ERROR(nh_->get_logger(),"Cannot set callback queue after calling start()"); - return; - } - - nh_.setCallbackQueue(queue); -}*/ - void Bond::start() { - std::unique_lock lock(mutex_); connect_timer_reset_flag_ = true; connectTimerReset(); publishingTimerReset(); - heartbeatTimerReset(); disconnectTimerReset(); - // deadpublishingTimerReset(); started_ = true; } -void Bond::setFormedCallback(std::function on_formed) +void Bond::setFormedCallback(EventCallback on_formed) { - std::unique_lock lock(mutex_); + std::unique_lock lock(callbacks_mutex_); on_formed_ = on_formed; } -void Bond::setBrokenCallback(std::function on_broken) +void Bond::setBrokenCallback(EventCallback on_broken) { - std::unique_lock lock(mutex_); + std::unique_lock lock(callbacks_mutex_); on_broken_ = on_broken; } bool Bond::waitUntilFormed(rclcpp::Duration timeout) { - // std::unique_lock lock(mutex_); rclcpp::Clock steady_clock(RCL_STEADY_TIME); rclcpp::Time deadline(steady_clock.now() + timeout); rclcpp::Rate r(100); - while (sm_.getState().getId() == SM::WaitingForSister.getId()) { + bool formed = false; + while (!formed) { if (!rclcpp::ok()) { break; } rclcpp::Duration wait_time = rclcpp::Duration(100ms); if (timeout >= rclcpp::Duration(0.0s)) { - rclcpp::Clock steady_clock(RCL_STEADY_TIME); wait_time = std::min(wait_time, deadline - steady_clock.now()); } if (wait_time <= rclcpp::Duration(0.0s)) { break; // The deadline has expired } r.sleep(); + + if (!isStateWaitingForSister()) { + formed = true; + } } - return sm_.getState().getId() != SM::WaitingForSister.getId(); + return formed; } bool Bond::waitUntilBroken(rclcpp::Duration timeout) { - // std::unique_lock lock(mutex_); rclcpp::Clock steady_clock(RCL_STEADY_TIME); rclcpp::Time deadline(steady_clock.now() + timeout); rclcpp::Rate r(100); - while (sm_.getState().getId() != SM::Dead.getId()) { + bool broken = false; + while (!broken) { if (!rclcpp::ok()) { break; } rclcpp::Duration wait_time = rclcpp::Duration(100ms); if (timeout >= rclcpp::Duration(0.0s)) { - rclcpp::Clock steady_clock(RCL_STEADY_TIME); wait_time = std::min(wait_time, deadline - steady_clock.now()); } if (wait_time <= rclcpp::Duration(0.0s)) { break; // The deadline has expired } r.sleep(); + + if (isStateDead()) { + broken = true; + } } - return sm_.getState().getId() == SM::Dead.getId(); + return broken; } bool Bond::isBroken() { - std::unique_lock lock(mutex_); + return isStateDead(); +} + +bool Bond::isStateAlive() +{ + std::unique_lock lock(state_machine_mutex_); + return sm_.getState().getId() == SM::Alive.getId(); +} + +bool Bond::isStateAwaitSisterDeath() +{ + std::unique_lock lock(state_machine_mutex_); + return sm_.getState().getId() == SM::AwaitSisterDeath.getId(); +} + +bool Bond::isStateDead() +{ + std::unique_lock lock(state_machine_mutex_); return sm_.getState().getId() == SM::Dead.getId(); } +bool Bond::isStateWaitingForSister() +{ + std::unique_lock lock(state_machine_mutex_); + return sm_.getState().getId() == SM::WaitingForSister.getId(); +} + void Bond::breakBond() { - { - std::unique_lock lock(mutex_); - if (sm_.getState().getId() != SM::Dead.getId()) { + if (!isStateDead()) { + { + std::unique_lock lock(state_machine_mutex_); sm_.Die(); - publishStatus(false); } + publishStatus(false); } flushPendingCallbacks(); } @@ -463,7 +484,7 @@ void Bond::breakBond() void Bond::onConnectTimeout() { { - std::unique_lock lock(mutex_); + std::unique_lock lock(state_machine_mutex_); sm_.ConnectTimeout(); } flushPendingCallbacks(); @@ -472,7 +493,7 @@ void Bond::onConnectTimeout() void Bond::onHeartbeatTimeout() { { - std::unique_lock lock(mutex_); + std::unique_lock lock(state_machine_mutex_); sm_.HeartbeatTimeout(); } flushPendingCallbacks(); @@ -481,30 +502,32 @@ void Bond::onHeartbeatTimeout() void Bond::onDisconnectTimeout() { { - std::unique_lock lock(mutex_); + std::unique_lock lock(state_machine_mutex_); sm_.DisconnectTimeout(); } flushPendingCallbacks(); } -void Bond::bondStatusCB(const bond::msg::Status::ConstSharedPtr msg) +void Bond::bondStatusCB(const bond::msg::Status & msg) { if (!started_) { return; } // Filters out messages from other bonds and messages from ourself - if (msg->id == id_ && msg->instance_id != instance_id_) { - std::unique_lock lock(mutex_); - + if (msg.id == id_ && msg.instance_id != instance_id_) { if (sister_instance_id_.empty()) { - sister_instance_id_ = msg->instance_id; + sister_instance_id_ = msg.instance_id; } - if (msg->active) { + if (msg.active) { + std::unique_lock lock(state_machine_mutex_); sm_.SisterAlive(); } else { - sm_.SisterDead(); + { + std::unique_lock lock(state_machine_mutex_); + sm_.SisterDead(); + } // Immediate ack for sister's death notification if (sisterDiedFirst_) { publishStatus(false); @@ -516,38 +539,35 @@ void Bond::bondStatusCB(const bond::msg::Status::ConstSharedPtr msg) void Bond::doPublishing() { - std::unique_lock lock(mutex_); - if (sm_.getState().getId() == SM::WaitingForSister.getId() || - sm_.getState().getId() == SM::Alive.getId()) - { + if (isStateWaitingForSister() || isStateAlive()) { publishStatus(true); - } else if (sm_.getState().getId() == SM::AwaitSisterDeath.getId()) { + } else if (isStateAwaitSisterDeath()) { publishStatus(false); } else { // SM::Dead publishingTimerCancel(); - // deadpublishingTimerCancel(); + deadpublishingTimerCancel(); } } void Bond::publishStatus(bool active) { - std::unique_ptr msg = std::make_unique(); + bond::msg::Status msg; rclcpp::Clock steady_clock(RCL_STEADY_TIME); rclcpp::Time now = steady_clock.now(); - msg->header.stamp = now; - msg->id = id_; - msg->instance_id = instance_id_; - msg->active = active; - msg->heartbeat_timeout = heartbeat_timeout_; - msg->heartbeat_period = heartbeat_period_; - pub_->publish(std::move(msg)); + msg.header.stamp = now; + msg.id = id_; + msg.instance_id = instance_id_; + msg.active = active; + msg.heartbeat_timeout = heartbeat_timeout_.seconds(); + msg.heartbeat_period = heartbeat_period_.seconds(); + pub_->publish(msg); } void Bond::flushPendingCallbacks() { - std::vector> callbacks; + std::vector callbacks; { - std::unique_lock lock(mutex_); + std::unique_lock lock(callbacks_mutex_); callbacks = pending_callbacks_; pending_callbacks_.clear(); } @@ -563,8 +583,8 @@ void Bond::flushPendingCallbacks() void BondSM::Connected() { b->connectTimerCancel(); - b->condition_.notify_all(); if (b->on_formed_) { + std::unique_lock lock(b->callbacks_mutex_); b->pending_callbacks_.push_back(b->on_formed_); } } @@ -576,10 +596,10 @@ void BondSM::SisterDied() void BondSM::Death() { - b->condition_.notify_all(); b->heartbeatTimerCancel(); b->disconnectTimerCancel(); if (b->on_broken_) { + std::unique_lock lock(b->callbacks_mutex_); b->pending_callbacks_.push_back(b->on_broken_); } } @@ -594,6 +614,5 @@ void BondSM::StartDying() b->heartbeatTimerCancel(); b->disconnect_timer_reset_flag_ = true; b->disconnect_timer_.reset(); - /* TODO b->deadpublishing_timer_reset_flag_ = true; - b->deadpublishingTimerReset();*/ + b->deadpublishingTimerReset(); } diff --git a/test_bond/test/test_callbacks_cpp.cpp b/test_bond/test/test_callbacks_cpp.cpp index 59559805..47edf6f7 100644 --- a/test_bond/test/test_callbacks_cpp.cpp +++ b/test_bond/test/test_callbacks_cpp.cpp @@ -87,12 +87,26 @@ TEST_F(TestCallbacksCpp, dieInLifeCallback) bond::Bond a(TOPIC, id1, nh1); bond::Bond b(TOPIC, id1, nh1); - a.setFormedCallback(std::bind(&bond::Bond::breakBond, &a)); + a.setFormedCallback( + [&a]() { + a.breakBond(); + }); a.start(); b.start(); + std::atomic isRunning {true}; + auto runThread = std::thread( + [&isRunning, &nh1]() { + while (isRunning) { + rclcpp::spin_some(nh1); + } + }); + EXPECT_TRUE(a.waitUntilFormed(rclcpp::Duration(5.0s))); EXPECT_TRUE(b.waitUntilBroken(rclcpp::Duration(3.0s))); + + isRunning = false; + runThread.join(); } TEST_F(TestCallbacksCpp, remoteNeverConnects) @@ -102,6 +116,18 @@ TEST_F(TestCallbacksCpp, remoteNeverConnects) bond::Bond a1(TOPIC, id2, nh2); a1.start(); - EXPECT_FALSE(a1.waitUntilFormed(rclcpp::Duration(5.0s))); + + std::atomic isRunning {true}; + auto runThread = std::thread( + [&isRunning, &nh2]() { + while (isRunning) { + rclcpp::spin_some(nh2); + } + }); + + EXPECT_FALSE(a1.waitUntilFormed(rclcpp::Duration(4.0s))); EXPECT_TRUE(a1.waitUntilBroken(rclcpp::Duration(10.0s))); + + isRunning = false; + runThread.join(); }