diff --git a/lib/inc/pxp-agent/request_processor.hpp b/lib/inc/pxp-agent/request_processor.hpp index 9436b2a9..8d035de4 100644 --- a/lib/inc/pxp-agent/request_processor.hpp +++ b/lib/inc/pxp-agent/request_processor.hpp @@ -7,6 +7,8 @@ #include #include +#include + #include #include @@ -60,6 +62,8 @@ class RequestProcessor { /// Manages the lifecycle of non-blocking action jobs ThreadContainer thread_container_; + PCPClient::Util::mutex thread_container_mutex_; + /// PXP Connector pointer std::shared_ptr connector_ptr_; diff --git a/lib/inc/pxp-agent/thread_container.hpp b/lib/inc/pxp-agent/thread_container.hpp index 74a048c1..3a5d32c9 100644 --- a/lib/inc/pxp-agent/thread_container.hpp +++ b/lib/inc/pxp-agent/thread_container.hpp @@ -4,9 +4,11 @@ #include #include +#include #include // shared_ptr #include #include +#include namespace PXPAgent { @@ -24,11 +26,13 @@ struct ManagedThread { std::shared_ptr> is_done; }; -/// Store thread objects. If the number of stored threads is greater -/// than the specified threshold, the container will delete the -/// instances of those threads that have completed their execution, -/// after detaching them. It is assumed that the completion of a given -/// thread is indicated by the associated atomic flag 'done'. +/// This container stores named thread objects in a thread-safe way +/// and manages their lifecycle. If the number of stored threads is +/// greater than the specified threshold, the container will delete +/// the instances of those threads that have completed their +/// execution, after detaching them. It is assumed that the completion +/// of a given thread is indicated by the associated atomic flag +/// 'done'. /// /// The purpose of this class is to manage the lifecycle of threads; /// in case one or more stored threads are executing by the time @@ -36,39 +40,58 @@ struct ManagedThread { /// function will be invoked; the program will then immediately abort. class ThreadContainer { public: + struct Error : public std::runtime_error { + explicit Error(std::string const& msg) : std::runtime_error(msg) {} + }; + uint32_t check_interval; // [ms] uint32_t threads_threshold; // number of stored thread objects + ThreadContainer() = delete; ThreadContainer(const std::string& name = "", uint32_t _check_interval = THREADS_MONITORING_INTERVAL_MS, uint32_t _threads_threshold = THREADS_THRESHOLD); + ThreadContainer(const ThreadContainer&) = delete; + ThreadContainer& operator=(const ThreadContainer&) = delete; ~ThreadContainer(); /// Add the specified thread instance to the container together /// with the pointer to the atomic boolean that will tell when - /// it has completed its execution - void add(PCPClient::Util::thread task, std::shared_ptr> done); + /// it has completed its execution and a name string. + /// Throw an Error in case a thread instance with the same name + /// already exists. + void add(std::string thread_name, + PCPClient::Util::thread task, + std::shared_ptr> done); + + /// Return true if a task with the specified name is currently + /// stored, false otherwise. + bool find(const std::string& thread_name) const; /// Return true if the monitoring thread is actually executing, /// false otherwise - bool isMonitoring(); + bool isMonitoring() const; - uint32_t getNumAddedThreads(); - uint32_t getNumErasedThreads(); + uint32_t getNumAddedThreads() const; + uint32_t getNumErasedThreads() const; + std::vector getThreadNames() const; void setName(const std::string& name); private: std::string name_; - std::vector> threads_; + std::unordered_map> threads_; std::unique_ptr monitoring_thread_ptr_; bool destructing_; - PCPClient::Util::mutex mutex_; + mutable PCPClient::Util::mutex mutex_; PCPClient::Util::condition_variable cond_var_; bool is_monitoring_; uint32_t num_added_threads_; uint32_t num_erased_threads_; + // Must hold the lock to call this one + bool findLocked(const std::string& thread_name) const; + void monitoringTask_(); }; diff --git a/lib/src/request_processor.cc b/lib/src/request_processor.cc index 613fe366..b5cd9f1c 100644 --- a/lib/src/request_processor.cc +++ b/lib/src/request_processor.cc @@ -226,6 +226,7 @@ void nonBlockingActionTask(std::shared_ptr module_ptr, RequestProcessor::RequestProcessor(std::shared_ptr connector_ptr, const Configuration::Agent& agent_configuration) : thread_container_ { "Action Executer" }, + thread_container_mutex_ {}, connector_ptr_ { connector_ptr }, spool_dir_path_ { agent_configuration.spool_dir }, modules_ {}, @@ -372,16 +373,27 @@ void RequestProcessor::processNonBlockingRequest(const ActionRequest& request) { request.prettyLabel(), request.id(), request.sender()); try { - // Flag to enable signaling from task to thread_container - auto done = std::make_shared>(false); - - thread_container_.add(pcp_util::thread(&nonBlockingActionTask, - modules_[request.module()], - request, - ResultsStorage { request }, - connector_ptr_, - done), - done); + pcp_util::lock_guard lck { thread_container_mutex_ }; + + if (thread_container_.find(request.transactionId())) { + err_msg = std::string { "already exists an ongoing task with " + "transaction id " }; + err_msg += request.transactionId(); + } else { + // Flag to enable signaling from task to thread_container + auto done = std::make_shared>(false); + + // NB: we got the_lock, so we're sure this will not throw + // for having another thread with the same name + thread_container_.add(request.transactionId(), + pcp_util::thread(&nonBlockingActionTask, + modules_[request.module()], + request, + ResultsStorage { request }, + connector_ptr_, + done), + done); + } } catch (ResultsStorage::Error& e) { // Failed to instantiate ResultsStorage LOG_ERROR("Failed to initialize the result files for the %1%; error: %2%", diff --git a/lib/src/thread_container.cc b/lib/src/thread_container.cc index 392fe940..491974ed 100644 --- a/lib/src/thread_container.cc +++ b/lib/src/thread_container.cc @@ -10,6 +10,8 @@ namespace PXPAgent { +namespace pcp_util = PCPClient::Util; + // Check if the thread has completed; if so, and if it's joinable, // detach it. Return true if completed, false otherwise. bool detachIfCompleted(std::shared_ptr thread_ptr) { @@ -53,7 +55,7 @@ ThreadContainer::ThreadContainer(const std::string& name, ThreadContainer::~ThreadContainer() { { - PCPClient::Util::lock_guard the_lock { mutex_ }; + pcp_util::lock_guard the_lock { mutex_ }; destructing_ = true; cond_var_.notify_one(); } @@ -64,8 +66,8 @@ ThreadContainer::~ThreadContainer() { // Detach the completed threads bool all_detached { true }; - for (auto thread_ptr : threads_) { - if (!detachIfCompleted(thread_ptr)) { + for (auto itr = threads_.begin(); itr != threads_.end(); ++itr) { + if (!detachIfCompleted(itr->second)) { all_detached = false; } } @@ -77,13 +79,23 @@ ThreadContainer::~ThreadContainer() { } } -void ThreadContainer::add(PCPClient::Util::thread task, +void ThreadContainer::add(std::string thread_name, + pcp_util::thread task, std::shared_ptr> is_done) { - LOG_TRACE("Adding thread %1% to the '%2%' ThreadContainer; added %3% " - "threads so far", task.get_id(), name_, num_added_threads_); - PCPClient::Util::lock_guard the_lock { mutex_ }; - threads_.push_back(std::shared_ptr { - new ManagedThread { std::move(task), is_done } }); + LOG_TRACE("Adding thread %1% (named '%2%') to the '%3%' " + "ThreadContainer; added %4% threads so far", + task.get_id(), thread_name, name_, num_added_threads_); + pcp_util::lock_guard the_lock { mutex_ }; + + if (findLocked(thread_name)) + throw Error { "thread name is already stored" }; + + threads_.insert( + std::make_pair( + std::move(thread_name), + std::shared_ptr( + new ManagedThread { std::move(task), is_done }))); + num_added_threads_++; // Start the monitoring thread, if necessary @@ -99,27 +111,40 @@ void ThreadContainer::add(PCPClient::Util::thread task, is_monitoring_ = true; monitoring_thread_ptr_.reset( - new PCPClient::Util::thread(&ThreadContainer::monitoringTask_, this)); + new pcp_util::thread(&ThreadContainer::monitoringTask_, this)); } } -bool ThreadContainer::isMonitoring() { - PCPClient::Util::lock_guard the_lock { mutex_ }; +bool ThreadContainer::find(const std::string& thread_name) const { + pcp_util::lock_guard the_lock { mutex_ }; + return findLocked(thread_name); +} + +bool ThreadContainer::isMonitoring() const { + pcp_util::lock_guard the_lock { mutex_ }; return is_monitoring_; } -uint32_t ThreadContainer::getNumAddedThreads() { - PCPClient::Util::lock_guard the_lock { mutex_ }; +uint32_t ThreadContainer::getNumAddedThreads() const { + pcp_util::lock_guard the_lock { mutex_ }; return num_added_threads_; } -uint32_t ThreadContainer::getNumErasedThreads() { - PCPClient::Util::lock_guard the_lock { mutex_ }; +uint32_t ThreadContainer::getNumErasedThreads() const { + pcp_util::lock_guard the_lock { mutex_ }; return num_erased_threads_; } +std::vector ThreadContainer::getThreadNames() const { + pcp_util::lock_guard the_lock { mutex_ }; + std::vector names; + for (auto itr = threads_.begin(); itr != threads_.end(); itr++) + names.push_back(itr->first); + return names; +} + void ThreadContainer::setName(const std::string& name) { - PCPClient::Util::lock_guard the_lock { mutex_ }; + pcp_util::lock_guard the_lock { mutex_ }; name_ = name; } @@ -127,17 +152,21 @@ void ThreadContainer::setName(const std::string& name) { // Private methods // +bool ThreadContainer::findLocked(const std::string& thread_name) const { + return threads_.find(thread_name) != threads_.end(); +} + void ThreadContainer::monitoringTask_() { LOG_DEBUG("Starting monitoring task for the '%1%' ThreadContainer, " - "with id %2%", name_, PCPClient::Util::this_thread::get_id()); + "with id %2%", name_, pcp_util::this_thread::get_id()); while (true) { - PCPClient::Util::unique_lock the_lock { mutex_ }; - auto now = PCPClient::Util::chrono::system_clock::now(); + pcp_util::unique_lock the_lock { mutex_ }; + auto now = pcp_util::chrono::system_clock::now(); // Wait for thread objects or for the check interval timeout cond_var_.wait_until(the_lock, - now + PCPClient::Util::chrono::milliseconds(check_interval)); + now + pcp_util::chrono::milliseconds(check_interval)); if (destructing_) { // The dtor has been invoked @@ -150,22 +179,24 @@ void ThreadContainer::monitoringTask_() { break; } - // Get the range with the pointers of the thread objects - // that have completed their execution - auto detached_threads_it = std::remove_if(threads_.begin(), - threads_.end(), - detachIfCompleted); - - // ... and delete them; note that we keep the pointers of the - // thread instances that are still executing to be able to - // destroy them when exiting - auto num_deletes = std::distance(detached_threads_it, threads_.end()); - if (num_deletes > 0) { - LOG_DEBUG("Deleting %1% thread objects that have completed their " - "execution; deleted %2% threads so far", - num_deletes, num_erased_threads_); - threads_.erase(detached_threads_it, threads_.end()); - num_erased_threads_ += num_deletes; + int num_deletions { 0 }; + + for (auto itr = threads_.begin(); itr != threads_.end();) { + if (detachIfCompleted(itr->second)) { + LOG_DEBUG("Deleting thread %1% (named '%2%')", + itr->second->the_instance.get_id(), itr->first); + itr = threads_.erase(itr); + num_deletions++; + } else { + ++itr; + } + } + + if (num_deletions > 0) { + num_erased_threads_ += num_deletions; + LOG_DEBUG("Deleted %1% thread objects that have completed their " + "execution; in total, deleted %2% threads so far", + num_deletions, num_erased_threads_); } if (threads_.size() < threads_threshold) { diff --git a/lib/tests/unit/thread_container_test.cc b/lib/tests/unit/thread_container_test.cc index 7badf3c3..b8afc92f 100644 --- a/lib/tests/unit/thread_container_test.cc +++ b/lib/tests/unit/thread_container_test.cc @@ -8,9 +8,13 @@ #include #include // set_terminate #include +#include +#include namespace PXPAgent { +namespace pcp_util = PCPClient::Util; + TEST_CASE("ThreadContainer::ThreadContainer", "[utils]") { SECTION("can successfully instantiate a container") { REQUIRE_NOTHROW(ThreadContainer("TESTING_1_1")); @@ -19,19 +23,22 @@ TEST_CASE("ThreadContainer::ThreadContainer", "[utils]") { void testTask(std::shared_ptr> a, const uint32_t task_duration_us) { - PCPClient::Util::this_thread::sleep_for(PCPClient::Util::chrono::microseconds(task_duration_us)); + pcp_util::this_thread::sleep_for( + pcp_util::chrono::microseconds(task_duration_us)); *a = true; } void addTasksTo(ThreadContainer& container, const uint32_t num_tasks, const uint32_t caller_duration_us, - const uint32_t task_duration_us) { + const uint32_t task_duration_us, + std::string prefix = "") { uint32_t idx; for (idx = 0; idx < num_tasks; idx++) { + auto task_name = prefix + std::to_string(idx); std::shared_ptr> a { new std::atomic { false } }; - auto t = PCPClient::Util::thread(testTask, a, task_duration_us); - container.add(std::move(t), a); + auto t = pcp_util::thread(testTask, a, task_duration_us); + container.add(task_name, std::move(t), a); } if (task_duration_us == 0) { @@ -39,10 +46,12 @@ void addTasksTo(ThreadContainer& container, // when it's suppose to finish immediately (that could happen // due to thread processing ovehead for the OS), otherwise a // terminate call will abort the tests... See below - PCPClient::Util::this_thread::sleep_for(PCPClient::Util::chrono::microseconds(10000)); + pcp_util::this_thread::sleep_for( + pcp_util::chrono::microseconds(10000)); } - PCPClient::Util::this_thread::sleep_for(PCPClient::Util::chrono::microseconds(caller_duration_us)); + pcp_util::this_thread::sleep_for( + pcp_util::chrono::microseconds(caller_duration_us)); } TEST_CASE("ThreadContainer::add, ~ThreadContainer", "[async]") { @@ -77,6 +86,15 @@ TEST_CASE("ThreadContainer::add, ~ThreadContainer", "[async]") { addTasksTo(container, 42, 0, 0); REQUIRE(container.getNumAddedThreads() == 42); } + + SECTION("throws when adding threads with the same name") { + auto f = []{ + ThreadContainer container { "TESTING_2_5" }; + addTasksTo(container, 1, 0, 100000); + addTasksTo(container, 1, 0, 0); + }; + REQUIRE_THROWS_AS(f(), ThreadContainer::Error); + } } auto monitoring_interval_us = THREADS_MONITORING_INTERVAL_MS * 1000; @@ -95,7 +113,9 @@ TEST_CASE("ThreadContainer::monitoringTask", "[async]") { // Wait for two monitoring intervals plus the task duration; // all threads should be done by then - PCPClient::Util::this_thread::sleep_for(PCPClient::Util::chrono::microseconds(2 * monitoring_interval_us + task_duration_us)); + pcp_util::this_thread::sleep_for( + pcp_util::chrono::microseconds( + 2 * monitoring_interval_us + task_duration_us)); INFO("should be stopped"); REQUIRE_FALSE(container.isMonitoring()); @@ -108,7 +128,8 @@ TEST_CASE("ThreadContainer::monitoringTask", "[async]") { // Threads can't outlive the caller otherwise std::terminate() // will be invoked; sleep for an interval greater than the // duration - PCPClient::Util::this_thread::sleep_for(PCPClient::Util::chrono::microseconds(2 * task_duration_us)); + pcp_util::this_thread::sleep_for( + pcp_util::chrono::microseconds(2 * task_duration_us)); } SECTION("the monitoring thread can delete threads") { @@ -119,7 +140,8 @@ TEST_CASE("ThreadContainer::monitoringTask", "[async]") { REQUIRE(container.getNumAddedThreads() == THREADS_THRESHOLD + 4); // Pause, to let the monitoring thread erase - PCPClient::Util::this_thread::sleep_for(PCPClient::Util::chrono::microseconds(2 * monitoring_interval_us)); + pcp_util::this_thread::sleep_for( + pcp_util::chrono::microseconds(2 * monitoring_interval_us)); REQUIRE_FALSE(container.isMonitoring()); // NB: we cannot be certain about the number of erased threads @@ -130,16 +152,46 @@ TEST_CASE("ThreadContainer::monitoringTask", "[async]") { // completed their execution } - SECTION("can add threds while the monitoring thread is running") { + SECTION("can add threads while the monitoring thread is running") { uint32_t task_duration_us { 100000 }; ThreadContainer container { "TESTING_3_3" }; addTasksTo(container, THREADS_THRESHOLD + 4, 0, task_duration_us); REQUIRE(container.isMonitoring()); - REQUIRE_NOTHROW(addTasksTo(container, 10, 0, 0)); + REQUIRE_NOTHROW(addTasksTo(container, 10, 0, 0, "bis_")); REQUIRE(container.getNumAddedThreads() == THREADS_THRESHOLD + 4 + 10); - PCPClient::Util::this_thread::sleep_for(PCPClient::Util::chrono::microseconds(2 * task_duration_us)); + pcp_util::this_thread::sleep_for( + pcp_util::chrono::microseconds(2 * task_duration_us)); + } +} + +TEST_CASE("ThreadContainer::find", "[async]") { + ThreadContainer container { "TESTING_4" }; + + SECTION("successfully returns true after a known thread") { + addTasksTo(container, 1, 0, 0); + REQUIRE(container.find("0")); + } + + SECTION("successfully returns false after an unknown thread") { + addTasksTo(container, 1, 0, 0); + REQUIRE_FALSE(container.find("1")); + } +} + +TEST_CASE("ThreadContainer::getThreadNames", "[async]") { + ThreadContainer container { "TESTING_5" }; + + SECTION("successfully returns the names of the stored threads") { + addTasksTo(container, 4, 0, 0); + std::set expected_names { "0", "1", "2", "3" }; + auto stored_names = container.getThreadNames(); + + REQUIRE(expected_names.size() == stored_names.size()); + + for (const auto& name : stored_names) + REQUIRE(expected_names.find(name) != expected_names.end()); } }