diff --git a/ecal/service/ecal_service/src/server_session_impl_v0.cpp b/ecal/service/ecal_service/src/server_session_impl_v0.cpp index bb984839e0..d06dfe050c 100644 --- a/ecal/service/ecal_service/src/server_session_impl_v0.cpp +++ b/ecal/service/ecal_service/src/server_session_impl_v0.cpp @@ -74,6 +74,18 @@ namespace eCAL // Data receiving and sending /////////////////////////////////////////////// void ServerSessionV0::start() + { + // Call the handle_start with the io_service + // It is important to async call handle_start(), as it will call a + // user-defined callback. As we have no influence what that callback will + // be, we must call it from another thread to make sure to not double-lock + // mutexes from the server_impl, if the callback should itself call a + // server_impl api function. + + io_context_->post([me = shared_from_this()]() { me->handle_start(); }); + } + + void ServerSessionV0::handle_start() { // Go to handshake state state_ = State::CONNECTED; diff --git a/ecal/service/ecal_service/src/server_session_impl_v0.h b/ecal/service/ecal_service/src/server_session_impl_v0.h index cffeda48a0..4c4f93dbe8 100644 --- a/ecal/service/ecal_service/src/server_session_impl_v0.h +++ b/ecal/service/ecal_service/src/server_session_impl_v0.h @@ -75,6 +75,11 @@ namespace eCAL /////////////////////////////////////////////// public: void start() override; + + private: + void handle_start(); + + public: void stop() override; eCAL::service::State get_state() const override; diff --git a/ecal/service/test/src/ecal_tcp_service_test.cpp b/ecal/service/test/src/ecal_tcp_service_test.cpp index 37a72f1370..dc1e2c38c6 100644 --- a/ecal/service/test/src/ecal_tcp_service_test.cpp +++ b/ecal/service/test/src/ecal_tcp_service_test.cpp @@ -1609,6 +1609,151 @@ TEST(Callback, SerializedServiceCallbacks) // NOLINT } #endif +#if 1 +// Call different eCAL Service API functions from within the callbacks +TEST(ecal_service, Callback_ApiCallsFromCallbacks) +{ + for (std::uint8_t protocol_version = min_protocol_version; protocol_version <= max_protocol_version; protocol_version++) + { + const auto io_context = std::make_shared(); + const asio::io_context::work dummy_work(*io_context); + + atomic_signalable num_server_service_callback_called (0); + atomic_signalable num_server_event_callback_called (0); + atomic_signalable num_client_response_callback_called(0); + atomic_signalable num_client_event_callback_called (0); + + // declare server and client shared_ptrs as we need those in the callbacks. + std::shared_ptr server; + std::shared_ptr client; + + const eCAL::service::Server::ServiceCallbackT server_service_callback + = [&num_server_service_callback_called, &server](const std::shared_ptr& /*request*/, const std::shared_ptr& /*response*/) -> void + { + if (server) + { + bool is_connected = server->is_connected(); + int connection_count = server->get_connection_count(); + uint16_t port = server->get_port(); + + EXPECT_EQ(is_connected, true); + EXPECT_EQ(connection_count, 1); + } + + num_server_service_callback_called++; + }; + + const eCAL::service::Server::EventCallbackT server_event_callback + = [&num_server_event_callback_called, &server](eCAL::service::ServerEventType event, const std::string& /*message*/) -> void + { + if (server) + { + bool is_connected = server->is_connected(); + int connection_count = server->get_connection_count(); + uint16_t port = server->get_port(); + + if (event == eCAL::service::ServerEventType::Connected) + { + ASSERT_EQ(is_connected, true); + ASSERT_EQ(connection_count, 1); + } + else if (event == eCAL::service::ServerEventType::Disconnected) + { + ASSERT_EQ(is_connected, false); + ASSERT_EQ(connection_count, 0); + } + } + + num_server_event_callback_called++; + }; + + const eCAL::service::ClientSession::ResponseCallbackT client_response_callback + = [&num_client_response_callback_called, &client] + (const eCAL::service::Error& error, const std::shared_ptr& /*response*/) -> void + { + if(client) + { + // We just test if those functions can be called without crashing + auto address = client->get_address(); + auto port = client->get_port(); + auto protocol_version = client->get_accepted_protocol_version(); + auto queue_size = client->get_queue_size(); + auto state = client->get_state(); + } + + num_client_response_callback_called++; + }; + + const eCAL::service::ClientSession::EventCallbackT client_event_callback + = [&num_client_event_callback_called, &client] + (eCAL::service::ClientEventType event, const std::string& /*message*/) -> void + { + if (client) + { + // We just test if those functions can be called without crashing + auto address = client->get_address(); + auto port = client->get_port(); + auto protocol_version = client->get_accepted_protocol_version(); + auto queue_size = client->get_queue_size(); + auto state = client->get_state(); + } + + num_client_event_callback_called++; + }; + + server = eCAL::service::Server::create(io_context, protocol_version, 0, server_service_callback, true, server_event_callback); + + EXPECT_EQ(num_server_service_callback_called.get(), 0); + EXPECT_EQ(num_server_event_callback_called.get(), 0); + EXPECT_EQ(num_client_response_callback_called.get(), 0); + EXPECT_EQ(num_client_event_callback_called.get(), 0); + + client = eCAL::service::ClientSession::create(io_context, protocol_version, "127.0.0.1", server->get_port(), client_event_callback); + + std::thread io_thread([&io_context]() + { + io_context->run(); + }); + + // Wait for the connected events to be called + num_server_event_callback_called.wait_for([](int value) { return value >= 1; }, std::chrono::milliseconds(500)); + num_client_event_callback_called.wait_for([](int value) { return value >= 1; }, std::chrono::milliseconds(500)); + + EXPECT_EQ(num_server_service_callback_called.get(), 0); + EXPECT_EQ(num_server_event_callback_called.get(), 1); + EXPECT_EQ(num_client_response_callback_called.get(), 0); + EXPECT_EQ(num_client_event_callback_called.get(), 1); + + // Call service and wait for the response + client->async_call_service(std::make_shared("1"), client_response_callback); + + num_server_service_callback_called .wait_for([](int value) { return value >= 1; }, std::chrono::milliseconds(500)); + num_client_response_callback_called.wait_for([](int value) { return value >= 1; }, std::chrono::milliseconds(500)); + + EXPECT_EQ(num_server_service_callback_called.get(), 1); + EXPECT_EQ(num_server_event_callback_called.get(), 1); + EXPECT_EQ(num_client_response_callback_called.get(), 1); + EXPECT_EQ(num_client_event_callback_called.get(), 1); + + // Terminate the client + client = nullptr; + + // Wait for the disconnected events to be called + num_server_event_callback_called.wait_for([](int value) { return value >= 2; }, std::chrono::milliseconds(500)); + + EXPECT_EQ(num_server_service_callback_called.get(), 1); + EXPECT_EQ(num_server_event_callback_called.get(), 2); + + // Terminate the server + server = nullptr; + + // join the io_thread + io_context->stop(); + io_thread.join(); + } +} +#endif + #if 1 TEST(ErrorCallback, ErrorCallbackNoServer) // NOLINT { @@ -2598,4 +2743,4 @@ TEST(BlockingCall, Stopped) // NOLINT // This test shows the proper way to stop } } } -#endif \ No newline at end of file +#endif