From 46d0c8349ac5d78b1773d70b2c89e2af380c98a7 Mon Sep 17 00:00:00 2001 From: parisiale Date: Thu, 4 Feb 2016 15:41:59 +0000 Subject: [PATCH 1/3] (maint) Trivial style changes --- lib/src/thread_container.cc | 16 ++++++++------- lib/tests/unit/thread_container_test.cc | 26 +++++++++++++++++-------- 2 files changed, 27 insertions(+), 15 deletions(-) diff --git a/lib/src/thread_container.cc b/lib/src/thread_container.cc index 392fe940..80188c9c 100644 --- a/lib/src/thread_container.cc +++ b/lib/src/thread_container.cc @@ -10,6 +10,8 @@ namespace PXPAgent { +namespace pcp_util = PCPClient::Util; + // Check if the thread has completed; if so, and if it's joinable, // detach it. Return true if completed, false otherwise. bool detachIfCompleted(std::shared_ptr thread_ptr) { @@ -53,7 +55,7 @@ ThreadContainer::ThreadContainer(const std::string& name, ThreadContainer::~ThreadContainer() { { - PCPClient::Util::lock_guard the_lock { mutex_ }; + pcp_util::lock_guard the_lock { mutex_ }; destructing_ = true; cond_var_.notify_one(); } @@ -99,7 +101,7 @@ void ThreadContainer::add(PCPClient::Util::thread task, is_monitoring_ = true; monitoring_thread_ptr_.reset( - new PCPClient::Util::thread(&ThreadContainer::monitoringTask_, this)); + new pcp_util::thread(&ThreadContainer::monitoringTask_, this)); } } @@ -119,7 +121,7 @@ uint32_t ThreadContainer::getNumErasedThreads() { } void ThreadContainer::setName(const std::string& name) { - PCPClient::Util::lock_guard the_lock { mutex_ }; + pcp_util::lock_guard the_lock { mutex_ }; name_ = name; } @@ -129,15 +131,15 @@ void ThreadContainer::setName(const std::string& name) { void ThreadContainer::monitoringTask_() { LOG_DEBUG("Starting monitoring task for the '%1%' ThreadContainer, " - "with id %2%", name_, PCPClient::Util::this_thread::get_id()); + "with id %2%", name_, pcp_util::this_thread::get_id()); while (true) { - PCPClient::Util::unique_lock the_lock { mutex_ }; - auto now = PCPClient::Util::chrono::system_clock::now(); + pcp_util::unique_lock the_lock { mutex_ }; + auto now = pcp_util::chrono::system_clock::now(); // Wait for thread objects or for the check interval timeout cond_var_.wait_until(the_lock, - now + PCPClient::Util::chrono::milliseconds(check_interval)); + now + pcp_util::chrono::milliseconds(check_interval)); if (destructing_) { // The dtor has been invoked diff --git a/lib/tests/unit/thread_container_test.cc b/lib/tests/unit/thread_container_test.cc index 7badf3c3..5de7dc1f 100644 --- a/lib/tests/unit/thread_container_test.cc +++ b/lib/tests/unit/thread_container_test.cc @@ -11,6 +11,8 @@ namespace PXPAgent { +namespace pcp_util = PCPClient::Util; + TEST_CASE("ThreadContainer::ThreadContainer", "[utils]") { SECTION("can successfully instantiate a container") { REQUIRE_NOTHROW(ThreadContainer("TESTING_1_1")); @@ -19,7 +21,8 @@ TEST_CASE("ThreadContainer::ThreadContainer", "[utils]") { void testTask(std::shared_ptr> a, const uint32_t task_duration_us) { - PCPClient::Util::this_thread::sleep_for(PCPClient::Util::chrono::microseconds(task_duration_us)); + pcp_util::this_thread::sleep_for( + pcp_util::chrono::microseconds(task_duration_us)); *a = true; } @@ -39,10 +42,12 @@ void addTasksTo(ThreadContainer& container, // when it's suppose to finish immediately (that could happen // due to thread processing ovehead for the OS), otherwise a // terminate call will abort the tests... See below - PCPClient::Util::this_thread::sleep_for(PCPClient::Util::chrono::microseconds(10000)); + pcp_util::this_thread::sleep_for( + pcp_util::chrono::microseconds(10000)); } - PCPClient::Util::this_thread::sleep_for(PCPClient::Util::chrono::microseconds(caller_duration_us)); + pcp_util::this_thread::sleep_for( + pcp_util::chrono::microseconds(caller_duration_us)); } TEST_CASE("ThreadContainer::add, ~ThreadContainer", "[async]") { @@ -95,7 +100,9 @@ TEST_CASE("ThreadContainer::monitoringTask", "[async]") { // Wait for two monitoring intervals plus the task duration; // all threads should be done by then - PCPClient::Util::this_thread::sleep_for(PCPClient::Util::chrono::microseconds(2 * monitoring_interval_us + task_duration_us)); + pcp_util::this_thread::sleep_for( + pcp_util::chrono::microseconds( + 2 * monitoring_interval_us + task_duration_us)); INFO("should be stopped"); REQUIRE_FALSE(container.isMonitoring()); @@ -108,7 +115,8 @@ TEST_CASE("ThreadContainer::monitoringTask", "[async]") { // Threads can't outlive the caller otherwise std::terminate() // will be invoked; sleep for an interval greater than the // duration - PCPClient::Util::this_thread::sleep_for(PCPClient::Util::chrono::microseconds(2 * task_duration_us)); + pcp_util::this_thread::sleep_for( + pcp_util::chrono::microseconds(2 * task_duration_us)); } SECTION("the monitoring thread can delete threads") { @@ -119,7 +127,8 @@ TEST_CASE("ThreadContainer::monitoringTask", "[async]") { REQUIRE(container.getNumAddedThreads() == THREADS_THRESHOLD + 4); // Pause, to let the monitoring thread erase - PCPClient::Util::this_thread::sleep_for(PCPClient::Util::chrono::microseconds(2 * monitoring_interval_us)); + pcp_util::this_thread::sleep_for( + pcp_util::chrono::microseconds(2 * monitoring_interval_us)); REQUIRE_FALSE(container.isMonitoring()); // NB: we cannot be certain about the number of erased threads @@ -130,7 +139,7 @@ TEST_CASE("ThreadContainer::monitoringTask", "[async]") { // completed their execution } - SECTION("can add threds while the monitoring thread is running") { + SECTION("can add threads while the monitoring thread is running") { uint32_t task_duration_us { 100000 }; ThreadContainer container { "TESTING_3_3" }; @@ -139,7 +148,8 @@ TEST_CASE("ThreadContainer::monitoringTask", "[async]") { REQUIRE_NOTHROW(addTasksTo(container, 10, 0, 0)); REQUIRE(container.getNumAddedThreads() == THREADS_THRESHOLD + 4 + 10); - PCPClient::Util::this_thread::sleep_for(PCPClient::Util::chrono::microseconds(2 * task_duration_us)); + pcp_util::this_thread::sleep_for( + pcp_util::chrono::microseconds(2 * task_duration_us)); } } From bda893f32242d775a79e6cd209d366503e524b99 Mon Sep 17 00:00:00 2001 From: parisiale Date: Thu, 4 Feb 2016 15:42:51 +0000 Subject: [PATCH 2/3] (maint) Delete ThreadContainer's default/copy ctor Deleting also the copy assignment. --- lib/inc/pxp-agent/thread_container.hpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/lib/inc/pxp-agent/thread_container.hpp b/lib/inc/pxp-agent/thread_container.hpp index 74a048c1..ed5a2e91 100644 --- a/lib/inc/pxp-agent/thread_container.hpp +++ b/lib/inc/pxp-agent/thread_container.hpp @@ -39,9 +39,12 @@ class ThreadContainer { uint32_t check_interval; // [ms] uint32_t threads_threshold; // number of stored thread objects + ThreadContainer() = delete; ThreadContainer(const std::string& name = "", uint32_t _check_interval = THREADS_MONITORING_INTERVAL_MS, uint32_t _threads_threshold = THREADS_THRESHOLD); + ThreadContainer(const ThreadContainer&) = delete; + ThreadContainer& operator=(const ThreadContainer&) = delete; ~ThreadContainer(); /// Add the specified thread instance to the container together From 634c9d72e6ba945d0d488bc4e22ed1d1822daf84 Mon Sep 17 00:00:00 2001 From: parisiale Date: Thu, 4 Feb 2016 15:49:27 +0000 Subject: [PATCH 3/3] (PCP-208) ThreadContainer stores the name as well When adding a thread objecty to a ThreadContainer instance, the user must also specify the name of the thread. This enables inspection. Using an unsorted_map instead of a vector to store the thread instances; this enables the retrieval by name. Raising an Error in case ThreadContainer::add is called for an existent name. Adding find() and getThreadNames() methods. Making const member functions where possible. Adding new unit tests. Updating RequestProcessor to comply with the new interface and with the possibility of having stored a thread with the same name (using a mutex for that). For a given non-blocking request, it's execution thread will be named against the transaction_id. --- lib/inc/pxp-agent/request_processor.hpp | 4 ++ lib/inc/pxp-agent/thread_container.hpp | 44 ++++++++---- lib/src/request_processor.cc | 32 ++++++--- lib/src/thread_container.cc | 89 ++++++++++++++++--------- lib/tests/unit/thread_container_test.cc | 50 ++++++++++++-- 5 files changed, 163 insertions(+), 56 deletions(-) diff --git a/lib/inc/pxp-agent/request_processor.hpp b/lib/inc/pxp-agent/request_processor.hpp index 9436b2a9..8d035de4 100644 --- a/lib/inc/pxp-agent/request_processor.hpp +++ b/lib/inc/pxp-agent/request_processor.hpp @@ -7,6 +7,8 @@ #include #include +#include + #include #include @@ -60,6 +62,8 @@ class RequestProcessor { /// Manages the lifecycle of non-blocking action jobs ThreadContainer thread_container_; + PCPClient::Util::mutex thread_container_mutex_; + /// PXP Connector pointer std::shared_ptr connector_ptr_; diff --git a/lib/inc/pxp-agent/thread_container.hpp b/lib/inc/pxp-agent/thread_container.hpp index ed5a2e91..3a5d32c9 100644 --- a/lib/inc/pxp-agent/thread_container.hpp +++ b/lib/inc/pxp-agent/thread_container.hpp @@ -4,9 +4,11 @@ #include #include +#include #include // shared_ptr #include #include +#include namespace PXPAgent { @@ -24,11 +26,13 @@ struct ManagedThread { std::shared_ptr> is_done; }; -/// Store thread objects. If the number of stored threads is greater -/// than the specified threshold, the container will delete the -/// instances of those threads that have completed their execution, -/// after detaching them. It is assumed that the completion of a given -/// thread is indicated by the associated atomic flag 'done'. +/// This container stores named thread objects in a thread-safe way +/// and manages their lifecycle. If the number of stored threads is +/// greater than the specified threshold, the container will delete +/// the instances of those threads that have completed their +/// execution, after detaching them. It is assumed that the completion +/// of a given thread is indicated by the associated atomic flag +/// 'done'. /// /// The purpose of this class is to manage the lifecycle of threads; /// in case one or more stored threads are executing by the time @@ -36,6 +40,10 @@ struct ManagedThread { /// function will be invoked; the program will then immediately abort. class ThreadContainer { public: + struct Error : public std::runtime_error { + explicit Error(std::string const& msg) : std::runtime_error(msg) {} + }; + uint32_t check_interval; // [ms] uint32_t threads_threshold; // number of stored thread objects @@ -49,29 +57,41 @@ class ThreadContainer { /// Add the specified thread instance to the container together /// with the pointer to the atomic boolean that will tell when - /// it has completed its execution - void add(PCPClient::Util::thread task, std::shared_ptr> done); + /// it has completed its execution and a name string. + /// Throw an Error in case a thread instance with the same name + /// already exists. + void add(std::string thread_name, + PCPClient::Util::thread task, + std::shared_ptr> done); + + /// Return true if a task with the specified name is currently + /// stored, false otherwise. + bool find(const std::string& thread_name) const; /// Return true if the monitoring thread is actually executing, /// false otherwise - bool isMonitoring(); + bool isMonitoring() const; - uint32_t getNumAddedThreads(); - uint32_t getNumErasedThreads(); + uint32_t getNumAddedThreads() const; + uint32_t getNumErasedThreads() const; + std::vector getThreadNames() const; void setName(const std::string& name); private: std::string name_; - std::vector> threads_; + std::unordered_map> threads_; std::unique_ptr monitoring_thread_ptr_; bool destructing_; - PCPClient::Util::mutex mutex_; + mutable PCPClient::Util::mutex mutex_; PCPClient::Util::condition_variable cond_var_; bool is_monitoring_; uint32_t num_added_threads_; uint32_t num_erased_threads_; + // Must hold the lock to call this one + bool findLocked(const std::string& thread_name) const; + void monitoringTask_(); }; diff --git a/lib/src/request_processor.cc b/lib/src/request_processor.cc index 613fe366..b5cd9f1c 100644 --- a/lib/src/request_processor.cc +++ b/lib/src/request_processor.cc @@ -226,6 +226,7 @@ void nonBlockingActionTask(std::shared_ptr module_ptr, RequestProcessor::RequestProcessor(std::shared_ptr connector_ptr, const Configuration::Agent& agent_configuration) : thread_container_ { "Action Executer" }, + thread_container_mutex_ {}, connector_ptr_ { connector_ptr }, spool_dir_path_ { agent_configuration.spool_dir }, modules_ {}, @@ -372,16 +373,27 @@ void RequestProcessor::processNonBlockingRequest(const ActionRequest& request) { request.prettyLabel(), request.id(), request.sender()); try { - // Flag to enable signaling from task to thread_container - auto done = std::make_shared>(false); - - thread_container_.add(pcp_util::thread(&nonBlockingActionTask, - modules_[request.module()], - request, - ResultsStorage { request }, - connector_ptr_, - done), - done); + pcp_util::lock_guard lck { thread_container_mutex_ }; + + if (thread_container_.find(request.transactionId())) { + err_msg = std::string { "already exists an ongoing task with " + "transaction id " }; + err_msg += request.transactionId(); + } else { + // Flag to enable signaling from task to thread_container + auto done = std::make_shared>(false); + + // NB: we got the_lock, so we're sure this will not throw + // for having another thread with the same name + thread_container_.add(request.transactionId(), + pcp_util::thread(&nonBlockingActionTask, + modules_[request.module()], + request, + ResultsStorage { request }, + connector_ptr_, + done), + done); + } } catch (ResultsStorage::Error& e) { // Failed to instantiate ResultsStorage LOG_ERROR("Failed to initialize the result files for the %1%; error: %2%", diff --git a/lib/src/thread_container.cc b/lib/src/thread_container.cc index 80188c9c..491974ed 100644 --- a/lib/src/thread_container.cc +++ b/lib/src/thread_container.cc @@ -66,8 +66,8 @@ ThreadContainer::~ThreadContainer() { // Detach the completed threads bool all_detached { true }; - for (auto thread_ptr : threads_) { - if (!detachIfCompleted(thread_ptr)) { + for (auto itr = threads_.begin(); itr != threads_.end(); ++itr) { + if (!detachIfCompleted(itr->second)) { all_detached = false; } } @@ -79,13 +79,23 @@ ThreadContainer::~ThreadContainer() { } } -void ThreadContainer::add(PCPClient::Util::thread task, +void ThreadContainer::add(std::string thread_name, + pcp_util::thread task, std::shared_ptr> is_done) { - LOG_TRACE("Adding thread %1% to the '%2%' ThreadContainer; added %3% " - "threads so far", task.get_id(), name_, num_added_threads_); - PCPClient::Util::lock_guard the_lock { mutex_ }; - threads_.push_back(std::shared_ptr { - new ManagedThread { std::move(task), is_done } }); + LOG_TRACE("Adding thread %1% (named '%2%') to the '%3%' " + "ThreadContainer; added %4% threads so far", + task.get_id(), thread_name, name_, num_added_threads_); + pcp_util::lock_guard the_lock { mutex_ }; + + if (findLocked(thread_name)) + throw Error { "thread name is already stored" }; + + threads_.insert( + std::make_pair( + std::move(thread_name), + std::shared_ptr( + new ManagedThread { std::move(task), is_done }))); + num_added_threads_++; // Start the monitoring thread, if necessary @@ -105,21 +115,34 @@ void ThreadContainer::add(PCPClient::Util::thread task, } } -bool ThreadContainer::isMonitoring() { - PCPClient::Util::lock_guard the_lock { mutex_ }; +bool ThreadContainer::find(const std::string& thread_name) const { + pcp_util::lock_guard the_lock { mutex_ }; + return findLocked(thread_name); +} + +bool ThreadContainer::isMonitoring() const { + pcp_util::lock_guard the_lock { mutex_ }; return is_monitoring_; } -uint32_t ThreadContainer::getNumAddedThreads() { - PCPClient::Util::lock_guard the_lock { mutex_ }; +uint32_t ThreadContainer::getNumAddedThreads() const { + pcp_util::lock_guard the_lock { mutex_ }; return num_added_threads_; } -uint32_t ThreadContainer::getNumErasedThreads() { - PCPClient::Util::lock_guard the_lock { mutex_ }; +uint32_t ThreadContainer::getNumErasedThreads() const { + pcp_util::lock_guard the_lock { mutex_ }; return num_erased_threads_; } +std::vector ThreadContainer::getThreadNames() const { + pcp_util::lock_guard the_lock { mutex_ }; + std::vector names; + for (auto itr = threads_.begin(); itr != threads_.end(); itr++) + names.push_back(itr->first); + return names; +} + void ThreadContainer::setName(const std::string& name) { pcp_util::lock_guard the_lock { mutex_ }; name_ = name; @@ -129,6 +152,10 @@ void ThreadContainer::setName(const std::string& name) { // Private methods // +bool ThreadContainer::findLocked(const std::string& thread_name) const { + return threads_.find(thread_name) != threads_.end(); +} + void ThreadContainer::monitoringTask_() { LOG_DEBUG("Starting monitoring task for the '%1%' ThreadContainer, " "with id %2%", name_, pcp_util::this_thread::get_id()); @@ -152,22 +179,24 @@ void ThreadContainer::monitoringTask_() { break; } - // Get the range with the pointers of the thread objects - // that have completed their execution - auto detached_threads_it = std::remove_if(threads_.begin(), - threads_.end(), - detachIfCompleted); - - // ... and delete them; note that we keep the pointers of the - // thread instances that are still executing to be able to - // destroy them when exiting - auto num_deletes = std::distance(detached_threads_it, threads_.end()); - if (num_deletes > 0) { - LOG_DEBUG("Deleting %1% thread objects that have completed their " - "execution; deleted %2% threads so far", - num_deletes, num_erased_threads_); - threads_.erase(detached_threads_it, threads_.end()); - num_erased_threads_ += num_deletes; + int num_deletions { 0 }; + + for (auto itr = threads_.begin(); itr != threads_.end();) { + if (detachIfCompleted(itr->second)) { + LOG_DEBUG("Deleting thread %1% (named '%2%')", + itr->second->the_instance.get_id(), itr->first); + itr = threads_.erase(itr); + num_deletions++; + } else { + ++itr; + } + } + + if (num_deletions > 0) { + num_erased_threads_ += num_deletions; + LOG_DEBUG("Deleted %1% thread objects that have completed their " + "execution; in total, deleted %2% threads so far", + num_deletions, num_erased_threads_); } if (threads_.size() < threads_threshold) { diff --git a/lib/tests/unit/thread_container_test.cc b/lib/tests/unit/thread_container_test.cc index 5de7dc1f..b8afc92f 100644 --- a/lib/tests/unit/thread_container_test.cc +++ b/lib/tests/unit/thread_container_test.cc @@ -8,6 +8,8 @@ #include #include // set_terminate #include +#include +#include namespace PXPAgent { @@ -29,12 +31,14 @@ void testTask(std::shared_ptr> a, void addTasksTo(ThreadContainer& container, const uint32_t num_tasks, const uint32_t caller_duration_us, - const uint32_t task_duration_us) { + const uint32_t task_duration_us, + std::string prefix = "") { uint32_t idx; for (idx = 0; idx < num_tasks; idx++) { + auto task_name = prefix + std::to_string(idx); std::shared_ptr> a { new std::atomic { false } }; - auto t = PCPClient::Util::thread(testTask, a, task_duration_us); - container.add(std::move(t), a); + auto t = pcp_util::thread(testTask, a, task_duration_us); + container.add(task_name, std::move(t), a); } if (task_duration_us == 0) { @@ -82,6 +86,15 @@ TEST_CASE("ThreadContainer::add, ~ThreadContainer", "[async]") { addTasksTo(container, 42, 0, 0); REQUIRE(container.getNumAddedThreads() == 42); } + + SECTION("throws when adding threads with the same name") { + auto f = []{ + ThreadContainer container { "TESTING_2_5" }; + addTasksTo(container, 1, 0, 100000); + addTasksTo(container, 1, 0, 0); + }; + REQUIRE_THROWS_AS(f(), ThreadContainer::Error); + } } auto monitoring_interval_us = THREADS_MONITORING_INTERVAL_MS * 1000; @@ -146,11 +159,40 @@ TEST_CASE("ThreadContainer::monitoringTask", "[async]") { addTasksTo(container, THREADS_THRESHOLD + 4, 0, task_duration_us); REQUIRE(container.isMonitoring()); - REQUIRE_NOTHROW(addTasksTo(container, 10, 0, 0)); + REQUIRE_NOTHROW(addTasksTo(container, 10, 0, 0, "bis_")); REQUIRE(container.getNumAddedThreads() == THREADS_THRESHOLD + 4 + 10); pcp_util::this_thread::sleep_for( pcp_util::chrono::microseconds(2 * task_duration_us)); } } +TEST_CASE("ThreadContainer::find", "[async]") { + ThreadContainer container { "TESTING_4" }; + + SECTION("successfully returns true after a known thread") { + addTasksTo(container, 1, 0, 0); + REQUIRE(container.find("0")); + } + + SECTION("successfully returns false after an unknown thread") { + addTasksTo(container, 1, 0, 0); + REQUIRE_FALSE(container.find("1")); + } +} + +TEST_CASE("ThreadContainer::getThreadNames", "[async]") { + ThreadContainer container { "TESTING_5" }; + + SECTION("successfully returns the names of the stored threads") { + addTasksTo(container, 4, 0, 0); + std::set expected_names { "0", "1", "2", "3" }; + auto stored_names = container.getThreadNames(); + + REQUIRE(expected_names.size() == stored_names.size()); + + for (const auto& name : stored_names) + REQUIRE(expected_names.find(name) != expected_names.end()); + } +} + } // namespace PXPAgent