Skip to content

Commit

Permalink
Initial
Browse files Browse the repository at this point in the history
  • Loading branch information
pablogs9 committed Sep 20, 2021
1 parent 936f33c commit 1be7b4b
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 13 deletions.
47 changes: 47 additions & 0 deletions include/uxr/agent/message/InputMessage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ class InputMessage

bool prepare_next_submessage();

size_t count_submessages();

dds::xrce::SubmessageId get_submessage_id();

private:
template<class T>
bool deserialize(T& data);
Expand All @@ -94,6 +98,49 @@ inline bool InputMessage::prepare_next_submessage()
return rv;
}

inline size_t InputMessage::count_submessages()
{
fastcdr::Cdr local_deserializer(fastbuffer_);
dds::xrce::MessageHeader local_header;
dds::xrce::SubmessageHeader local_subheader;

local_header.deserialize(local_deserializer);

bool rv = false;
size_t count = 0;

do
{
local_deserializer.jump((4 - ((local_deserializer.getCurrentPosition() - local_deserializer.getBufferPointer()) & 3)) & 3);
if (fastbuffer_.getBufferSize() > local_deserializer.getSerializedDataLength())
{
local_subheader.deserialize(local_deserializer);
count++;
} else {
rv = false;
}
} while (rv);

return count;
}

inline dds::xrce::SubmessageId InputMessage::get_submessage_id()
{
fastcdr::Cdr local_deserializer(fastbuffer_);
dds::xrce::MessageHeader local_header;
dds::xrce::SubmessageHeader local_subheader;

local_header.deserialize(local_deserializer);

local_deserializer.jump((4 - ((local_deserializer.getCurrentPosition() - local_deserializer.getBufferPointer()) & 3)) & 3);
if (fastbuffer_.getBufferSize() > local_deserializer.getSerializedDataLength())
{
local_subheader.deserialize(local_deserializer);
}

return local_subheader.submessage_id();
}

template<class T>
inline bool InputMessage::get_payload(T& data)
{
Expand Down
58 changes: 47 additions & 11 deletions include/uxr/agent/scheduler/FCFSScheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ class FCFSScheduler : public Scheduler<T>
, max_size_{max_size}
{}

void set_priority_size(uint8_t priority, size_t size);

void init() final;

void deinit() final;
Expand All @@ -47,23 +49,35 @@ class FCFSScheduler : public Scheduler<T>
uint8_t priority) final;

void push_front(
T&& element);
T&& element,
uint8_t priority);

bool pop(
T& element) final;

private:
std::deque<T> deque_;
bool empty();

std::map<uint8_t, std::deque<T>> deque_;
std::map<uint8_t, size_t> sizes_;
std::mutex mtx_;
std::condition_variable cond_var_;
bool running_cond_;
const size_t max_size_;
};

template<class T>
inline void FCFSScheduler<T>::set_priority_size(uint8_t priority, size_t size)
{
std::lock_guard<std::mutex> lock(mtx_);
sizes_[priority] = size;
}

template<class T>
inline void FCFSScheduler<T>::init()
{
std::lock_guard<std::mutex> lock(mtx_);
sizes_[0] = max_size_;
running_cond_ = true;
}

Expand All @@ -80,22 +94,35 @@ inline void FCFSScheduler<T>::push(
T&& element,
uint8_t priority)
{
(void) priority;
std::lock_guard<std::mutex> lock(mtx_);
if (max_size_ <= deque_.size())
if (sizes_[priority] <= deque_[priority].size())
{
deque_.pop_front();
deque_[priority].pop_front();
}
deque_.push_back(std::move(element));
deque_[priority].push_back(std::move(element));
cond_var_.notify_one();
}

template<class T>
inline void FCFSScheduler<T>::push_front(
T&& element)
T&& element,
uint8_t priority)
{
std::lock_guard<std::mutex> lock(mtx_);
deque_.push_front(std::forward<T>(element));
deque_[priority].push_front(std::forward<T>(element));
}

template<class T>
inline bool FCFSScheduler<T>::empty()
{
for (auto& deque : deque_) {
if (!deque.second.empty())
{
return false;
}
}

return true;
}

template<class T>
Expand All @@ -104,11 +131,20 @@ inline bool FCFSScheduler<T>::pop(
{
bool rv = false;
std::unique_lock<std::mutex> lock(mtx_);
cond_var_.wait(lock, [this] { return !(deque_.empty() && running_cond_); });
cond_var_.wait(lock, [this] { return !(empty() && running_cond_); });
if (running_cond_)
{
element = std::move(deque_.front());
deque_.pop_front();
uint8_t available_priority;

for(auto iter = deque_.rbegin(); iter != deque_.rend(); ++iter){
if(iter->second.size() > 0){
available_priority = iter->first;
break;
}
}

element = std::move(deque_[available_priority].front());
deque_[available_priority].pop_front();
rv = true;
cond_var_.notify_one();
}
Expand Down
11 changes: 9 additions & 2 deletions src/cpp/transport/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ bool Server<EndPoint>::start()

/* Scheduler initialization. */
input_scheduler_.init();
input_scheduler_.set_priority_size(1, 1); // Priority 1 used for heartbeats
output_scheduler_.init();

/* Thread initialization. */
Expand Down Expand Up @@ -190,7 +191,13 @@ void Server<EndPoint>::receiver_loop()
TransportRc transport_rc = TransportRc::ok;
if (recv_message(input_packet, RECEIVE_TIMEOUT, transport_rc))
{
input_scheduler_.push(std::move(input_packet), 0);
if(dds::xrce::HEARTBEAT == input_packet.message->get_submessage_id() && 1U == input_packet.message->count_submessages()){
input_scheduler_.push(std::move(input_packet), 1);
}
else
{
input_scheduler_.push(std::move(input_packet), 0);
}
}
else if(running_cond_)
{
Expand Down Expand Up @@ -250,7 +257,7 @@ void Server<EndPoint>::sender_loop()
{
std::unique_lock<std::mutex> lock(error_mtx_);
transport_rc_ = transport_rc;
output_scheduler_.push_front(std::move(output_packet));
output_scheduler_.push_front(std::move(output_packet), 0);
error_cv_.notify_one();
error_cv_.wait(lock);
}
Expand Down

0 comments on commit 1be7b4b

Please sign in to comment.