Skip to content

Commit

Permalink
Merge pull request #224 from tprk77/lambda-subscribe
Browse files Browse the repository at this point in the history
Lambda Subscribe
  • Loading branch information
tprk77 committed Aug 15, 2018
2 parents 7606629 + ad531cb commit 33ee9f1
Show file tree
Hide file tree
Showing 4 changed files with 178 additions and 0 deletions.
42 changes: 42 additions & 0 deletions lcm/lcm-cpp-impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,31 @@ class LCMMHUntypedSubscription : public Subscription {
}
};

#if LCM_CXX_11_ENABLED
template <class MessageType>
class LCMLambdaSubscription : public Subscription {
friend class LCM;

private:
using HandlerFunction = typename LCM::HandlerFunction<MessageType>;
HandlerFunction handler;
static void cb_func(const lcm_recv_buf_t *rbuf, const char *channel, void *user_data)
{
LCMLambdaSubscription<MessageType> *subs =
static_cast<LCMLambdaSubscription<MessageType> *>(user_data);
MessageType msg;
int status = msg.decode(rbuf->data, 0, rbuf->data_size);
if (status < 0) {
fprintf(stderr, "error %d decoding %s!!!\n", status, MessageType::getTypeName());
return;
}
const ReceiveBuffer rb = {rbuf->data, rbuf->data_size, rbuf->recv_utime};
std::string chan_str(channel);
(subs->handler)(&rb, chan_str, &msg);
}
};
#endif

inline LCM::LCM(std::string lcm_url) : owns_lcm(true)
{
this->lcm = lcm_create(lcm_url.c_str());
Expand Down Expand Up @@ -270,6 +295,23 @@ Subscription *LCM::subscribeFunction(const std::string &channel,
return sub;
}

#if LCM_CXX_11_ENABLED
template <class MessageType>
Subscription *LCM::subscribe(const std::string &channel, LCM::HandlerFunction<MessageType> handler)
{
if (!this->lcm) {
fprintf(stderr, "LCM instance not initialized. Ignoring call to subscribe()\n");
return NULL;
}
LCMLambdaSubscription<MessageType> *subs = new LCMLambdaSubscription<MessageType>();
subs->handler = handler;
subs->c_subs = lcm_subscribe(this->lcm, channel.c_str(),
LCMLambdaSubscription<MessageType>::cb_func, subs);
subscriptions.push_back(subs);
return subs;
}
#endif

lcm_t *LCM::getUnderlyingLCM()
{
return this->lcm;
Expand Down
71 changes: 71 additions & 0 deletions lcm/lcm-cpp.hpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,23 @@
#ifndef __lcm_cpp_hpp__
#define __lcm_cpp_hpp__

#ifndef LCM_CXX_11_ENABLED
#if __cplusplus >= 201103L
#define LCM_CXX_11_ENABLED 1
#else
#define LCM_CXX_11_ENABLED 0
#endif
#endif

#include <cstdio> /* needed for FILE* */
#include <string>
#include <vector>
#include "lcm.h"

#if LCM_CXX_11_ENABLED
#include <functional>
#endif

namespace lcm {

/**
Expand Down Expand Up @@ -355,6 +367,65 @@ class LCM {
ContextClass context),
ContextClass context);

#if LCM_CXX_11_ENABLED
/**
* Type alias for the handler function type.
*/
template <class MessageType>
using HandlerFunction = std::function<void(const ReceiveBuffer *rbuf,
const std::string &channel, const MessageType *msg)>;
/**
* @brief Subscribes a callback function to a channel, with
* automatic message decoding.
*
* This method is designed for use with C++ classes generated by
* @c lcm-gen @c .
*
* The callback function will be invoked on the object when a message
* arrives on the specified channel. Prior to method invocation, LCM
* will attempt to automatically decode the message to the specified
* message type @c MessageType @c , which should be a class generated
* by @c lcm-gen @c . If message
* decoding fails, the callback function is not invoked and an error
* message is printed to stderr.
*
* The callback function is invoked during calls to LCM::handle().
* Callback methods are invoked by the same thread that invokes
* LCM::handle(), in the order that they were subscribed.
*
* For example:
*
* \code
* #include <exlcm/example_t.lcm>
* #include <lcm/lcm-cpp.hpp>
*
* int main(int argc, char** argv) {
* lcm::LCM lcm;
* lcm::LCM::HandlerFunction<exlcm::example_t> func;
* func = [](const lcm::ReceiveBuffer* rbuf, const std::string& channel,
* const exlcm::example_t* msg) {
* // do something with the message
* }
* lcm.subscribe("CHANNEL", func);
* while(true)
* lcm.handle();
* return 0;
* }
* \endcode
*
* @param channel The channel to subscribe to. This is treated as a
* regular expression implicitly surrounded by '^' and '$'.
* @param handler A handler function, for example a lambda.
*
* @return a Subscription object that can be used to adjust the
* subscription and unsubscribe. The Subscription object is managed by
* the LCM class, and is automatically destroyed when its LCM instance
* is destroyed.
*/
template <class MessageType>
Subscription *subscribe(const std::string &channel, HandlerFunction<MessageType> handler);
#endif

/**
* @brief Unsubscribes a message handler.
*
Expand Down
3 changes: 3 additions & 0 deletions test/cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ set(test_cpp_libs lcm-test-types-cpp lcm gtest gtest_main)
add_executable(test-cpp-client client.cpp common.cpp)
lcm_target_link_libraries(test-cpp-client ${test_cpp_libs})

# UNCOMMENT TO ENABLE C++11 RELATED TESTS
#target_compile_features(test-cpp-client PRIVATE cxx_lambdas)

add_executable(test-cpp-memq_test memq_test.cpp common.cpp)
lcm_target_link_libraries(test-cpp-memq_test ${test_cpp_libs})

Expand Down
62 changes: 62 additions & 0 deletions test/cpp/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,3 +157,65 @@ TEST(LCM_CPP, cross_package_t)
{
EXPECT_TRUE(TypedTest<lcmtest2::cross_package_t>("test_lcmtest2_cross_package_t", 100).Run());
}

#if LCM_CXX_11_ENABLED
template <class LcmType>
class LambdaTest {
public:
LambdaTest(const std::string test_name, int num_trials)
: lcm_(), num_trials_(num_trials), test_channel_(test_name)
{
}

bool Run(void)
{
LcmType msg;
int response_count = 0;
lcm::LCM::HandlerFunction<LcmType> handler = [&response_count](
const lcm::ReceiveBuffer *rbuf, const std::string &channel, const LcmType *msg) {
if (CheckLcmType(msg, response_count + 1)) {
response_count++;
}
};
lcm::Subscription *subscription = lcm_.subscribe(test_channel_ + "_reply", handler);
bool result = true;
for (int trial = 0; trial < num_trials_ && result; trial++) {
FillLcmType(trial, &msg);
lcm_.publish(test_channel_, &msg);
if (lcm_.handleTimeout(500) <= 0) {
info("%s test: Timeout waiting for reply", test_channel_.c_str());
result = false;
break;
} else if (response_count != trial + 1) {
info("%s test: failed on iteration %d", test_channel_.c_str(), trial);
result = false;
break;
}
ClearLcmType(&msg);
}
lcm_.unsubscribe(subscription);
return result;
}

private:
lcm::LCM lcm_;
int num_trials_;
std::string test_channel_;
};

TEST(LCM_CPP, Lambda_A)
{
EXPECT_TRUE(LambdaTest<lcmtest::primitives_t>("test_lcmtest_primitives_t", 1000).Run());
}

TEST(LCM_CPP, Lambda_B)
{
EXPECT_TRUE(
LambdaTest<lcmtest::primitives_list_t>("test_lcmtest_primitives_list_t", 100).Run());
}

TEST(LCM_CPP, Lambda_C)
{
EXPECT_TRUE(LambdaTest<lcmtest::node_t>("test_lcmtest_node_t", 7).Run());
}
#endif

0 comments on commit 33ee9f1

Please sign in to comment.