Skip to content

Commit

Permalink
Improve locking
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Carroll <[email protected]>
  • Loading branch information
mjcarroll committed Jun 2, 2022
1 parent 14986f9 commit 5c7b65c
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 35 deletions.
6 changes: 4 additions & 2 deletions bondcpp/include/bondcpp/bond.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,6 @@ class Bond
void doPublishing();
void publishStatus(bool active);

std::vector<EventCallback> pending_callbacks_;
void flushPendingCallbacks();

rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_base_;
Expand All @@ -197,13 +196,17 @@ class Bond
rclcpp::TimerBase::SharedPtr publishing_timer_;
rclcpp::TimerBase::SharedPtr deadpublishing_timer_;

std::mutex state_machine_mutex_;
std::unique_ptr<BondSM> bondsm_;
BondSMContext sm_;

std::string topic_;
std::string id_;
std::string instance_id_;
std::string sister_instance_id_;

std::mutex callbacks_mutex_;
std::vector<EventCallback> pending_callbacks_;
EventCallback on_broken_;
EventCallback on_formed_;

Expand All @@ -213,7 +216,6 @@ class Bond
bool disconnect_timer_reset_flag_ {false};
bool deadpublishing_timer_reset_flag_ {false};
bool disable_heartbeat_timeout_ {false};
std::mutex mutex_;

double connect_timeout_;
double heartbeat_timeout_;
Expand Down
67 changes: 34 additions & 33 deletions bondcpp/src/bond.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,6 @@ Bond::~Bond()
connectTimerCancel();
heartbeatTimerCancel();
disconnectTimerCancel();

std::unique_lock<std::mutex> lock(mutex_);
}

void Bond::setConnectTimeout(double dur)
Expand Down Expand Up @@ -359,7 +357,6 @@ void Bond::setCallbackQueue(rclcpp::CallbackQueueInterface *queue)

void Bond::start()
{
std::unique_lock<std::mutex> lock(mutex_);
connect_timer_reset_flag_ = true;
connectTimerReset();
publishingTimerReset();
Expand All @@ -371,24 +368,24 @@ void Bond::start()

void Bond::setFormedCallback(EventCallback on_formed)
{
std::unique_lock<std::mutex> lock(mutex_);
std::unique_lock<std::mutex> lock(callbacks_mutex_);
on_formed_ = on_formed;
}

void Bond::setBrokenCallback(EventCallback on_broken)
{
std::unique_lock<std::mutex> lock(mutex_);
std::unique_lock<std::mutex> lock(callbacks_mutex_);
on_broken_ = on_broken;
}

bool Bond::waitUntilFormed(rclcpp::Duration timeout)
{
// std::unique_lock<std::mutex> lock(mutex_);
rclcpp::Clock steady_clock(RCL_STEADY_TIME);
rclcpp::Time deadline(steady_clock.now() + timeout);
rclcpp::Rate r(100);

while (sm_.getState().getId() == SM::WaitingForSister.getId()) {
bool formed = false;
while (!formed) {
if (!rclcpp::ok()) {
break;
}
Expand All @@ -401,19 +398,25 @@ bool Bond::waitUntilFormed(rclcpp::Duration timeout)
break; // The deadline has expired
}
r.sleep();

std::unique_lock<std::mutex> lock(state_machine_mutex_);
if (sm_.getState().getId() != SM::WaitingForSister.getId()) {
formed = true;
break;
}
}

return sm_.getState().getId() != SM::WaitingForSister.getId();
return formed;
}

bool Bond::waitUntilBroken(rclcpp::Duration timeout)
{
// std::unique_lock<std::mutex> lock(mutex_);
rclcpp::Clock steady_clock(RCL_STEADY_TIME);
rclcpp::Time deadline(steady_clock.now() + timeout);
rclcpp::Rate r(100);

while (sm_.getState().getId() != SM::Dead.getId()) {
bool broken = false;
while (!broken) {
if (!rclcpp::ok()) {
break;
}
Expand All @@ -426,53 +429,51 @@ bool Bond::waitUntilBroken(rclcpp::Duration timeout)
break; // The deadline has expired
}
r.sleep();

std::unique_lock<std::mutex> lock(state_machine_mutex_);
if (sm_.getState().getId() == SM::Dead.getId()) {
broken = true;
break;
}
}

return sm_.getState().getId() == SM::Dead.getId();
return broken;
}

bool Bond::isBroken()
{
std::unique_lock<std::mutex> lock(mutex_);
std::unique_lock<std::mutex> lock(state_machine_mutex_);
return sm_.getState().getId() == SM::Dead.getId();
}

void Bond::breakBond()
{
{
std::unique_lock<std::mutex> lock(mutex_);
if (sm_.getState().getId() != SM::Dead.getId()) {
sm_.Die();
publishStatus(false);
}
std::unique_lock<std::mutex> lock(state_machine_mutex_);
if (sm_.getState().getId() != SM::Dead.getId()) {
sm_.Die();
publishStatus(false);
}
flushPendingCallbacks();
}

void Bond::onConnectTimeout()
{
{
std::unique_lock<std::mutex> lock(mutex_);
sm_.ConnectTimeout();
}
std::unique_lock<std::mutex> lock(state_machine_mutex_);
sm_.ConnectTimeout();
flushPendingCallbacks();
}

void Bond::onHeartbeatTimeout()
{
{
std::unique_lock<std::mutex> lock(mutex_);
sm_.HeartbeatTimeout();
}
std::unique_lock<std::mutex> lock(state_machine_mutex_);
sm_.HeartbeatTimeout();
flushPendingCallbacks();
}

void Bond::onDisconnectTimeout()
{
{
std::unique_lock<std::mutex> lock(mutex_);
sm_.DisconnectTimeout();
}
std::unique_lock<std::mutex> lock(state_machine_mutex_);
sm_.DisconnectTimeout();
flushPendingCallbacks();
}

Expand All @@ -484,7 +485,7 @@ void Bond::bondStatusCB(const bond::msg::Status::ConstSharedPtr msg)

// Filters out messages from other bonds and messages from ourself
if (msg->id == id_ && msg->instance_id != instance_id_) {
std::unique_lock<std::mutex> lock(mutex_);
std::unique_lock<std::mutex> lock(state_machine_mutex_);

if (sister_instance_id_.empty()) {
sister_instance_id_ = msg->instance_id;
Expand All @@ -505,7 +506,7 @@ void Bond::bondStatusCB(const bond::msg::Status::ConstSharedPtr msg)

void Bond::doPublishing()
{
std::unique_lock<std::mutex> lock(mutex_);
std::unique_lock<std::mutex> lock(state_machine_mutex_);
if (sm_.getState().getId() == SM::WaitingForSister.getId() ||
sm_.getState().getId() == SM::Alive.getId())
{
Expand Down Expand Up @@ -536,7 +537,7 @@ void Bond::flushPendingCallbacks()
{
std::vector<EventCallback> callbacks;
{
std::unique_lock<std::mutex> lock(mutex_);
std::unique_lock<std::mutex> lock(callbacks_mutex_);
callbacks = pending_callbacks_;
pending_callbacks_.clear();
}
Expand Down

0 comments on commit 5c7b65c

Please sign in to comment.