Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(PCP-208) ThreadContainer associates names to threads #322

Merged
merged 3 commits into from
Feb 4, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions lib/inc/pxp-agent/request_processor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
#include <pxp-agent/pxp_connector.hpp>
#include <pxp-agent/configuration.hpp>

#include <cpp-pcp-client/util/thread.hpp>

#include <boost/filesystem/path.hpp>

#include <memory>
Expand Down Expand Up @@ -60,6 +62,8 @@ class RequestProcessor {
/// Manages the lifecycle of non-blocking action jobs
ThreadContainer thread_container_;

PCPClient::Util::mutex thread_container_mutex_;

/// PXP Connector pointer
std::shared_ptr<PXPConnector> connector_ptr_;

Expand Down
47 changes: 35 additions & 12 deletions lib/inc/pxp-agent/thread_container.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
#include <cpp-pcp-client/util/thread.hpp>

#include <vector>
#include <unordered_map>
#include <memory> // shared_ptr
#include <atomic>
#include <string>
#include <stdexcept>

namespace PXPAgent {

Expand All @@ -24,51 +26,72 @@ struct ManagedThread {
std::shared_ptr<std::atomic<bool>> is_done;
};

/// Store thread objects. If the number of stored threads is greater
/// than the specified threshold, the container will delete the
/// instances of those threads that have completed their execution,
/// after detaching them. It is assumed that the completion of a given
/// thread is indicated by the associated atomic flag 'done'.
/// This container stores named thread objects in a thread-safe way
/// and manages their lifecycle. If the number of stored threads is
/// greater than the specified threshold, the container will delete
/// the instances of those threads that have completed their
/// execution, after detaching them. It is assumed that the completion
/// of a given thread is indicated by the associated atomic flag
/// 'done'.
///
/// The purpose of this class is to manage the lifecycle of threads;
/// in case one or more stored threads are executing by the time
/// the ThreadContainer destructor is called, the std::terminate
/// function will be invoked; the program will then immediately abort.
class ThreadContainer {
public:
struct Error : public std::runtime_error {
explicit Error(std::string const& msg) : std::runtime_error(msg) {}
};

uint32_t check_interval; // [ms]
uint32_t threads_threshold; // number of stored thread objects

ThreadContainer() = delete;
ThreadContainer(const std::string& name = "",
uint32_t _check_interval = THREADS_MONITORING_INTERVAL_MS,
uint32_t _threads_threshold = THREADS_THRESHOLD);
ThreadContainer(const ThreadContainer&) = delete;
ThreadContainer& operator=(const ThreadContainer&) = delete;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are you disabling the constructors (and the = operator)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the rule of 3 plus threads are not copiable.

~ThreadContainer();

/// Add the specified thread instance to the container together
/// with the pointer to the atomic boolean that will tell when
/// it has completed its execution
void add(PCPClient::Util::thread task, std::shared_ptr<std::atomic<bool>> done);
/// it has completed its execution and a name string.
/// Throw an Error in case a thread instance with the same name
/// already exists.
void add(std::string thread_name,
PCPClient::Util::thread task,
std::shared_ptr<std::atomic<bool>> done);

/// Return true if a task with the specified name is currently
/// stored, false otherwise.
bool find(const std::string& thread_name) const;

/// Return true if the monitoring thread is actually executing,
/// false otherwise
bool isMonitoring();
bool isMonitoring() const;

uint32_t getNumAddedThreads();
uint32_t getNumErasedThreads();
uint32_t getNumAddedThreads() const;
uint32_t getNumErasedThreads() const;
std::vector<std::string> getThreadNames() const;

void setName(const std::string& name);

private:
std::string name_;
std::vector<std::shared_ptr<ManagedThread>> threads_;
std::unordered_map<std::string, std::shared_ptr<ManagedThread>> threads_;
std::unique_ptr<PCPClient::Util::thread> monitoring_thread_ptr_;
bool destructing_;
PCPClient::Util::mutex mutex_;
mutable PCPClient::Util::mutex mutex_;
PCPClient::Util::condition_variable cond_var_;
bool is_monitoring_;
uint32_t num_added_threads_;
uint32_t num_erased_threads_;

// Must hold the lock to call this one
bool findLocked(const std::string& thread_name) const;

void monitoringTask_();
};

Expand Down
32 changes: 22 additions & 10 deletions lib/src/request_processor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ void nonBlockingActionTask(std::shared_ptr<Module> module_ptr,
RequestProcessor::RequestProcessor(std::shared_ptr<PXPConnector> connector_ptr,
const Configuration::Agent& agent_configuration)
: thread_container_ { "Action Executer" },
thread_container_mutex_ {},
connector_ptr_ { connector_ptr },
spool_dir_path_ { agent_configuration.spool_dir },
modules_ {},
Expand Down Expand Up @@ -372,16 +373,27 @@ void RequestProcessor::processNonBlockingRequest(const ActionRequest& request) {
request.prettyLabel(), request.id(), request.sender());

try {
// Flag to enable signaling from task to thread_container
auto done = std::make_shared<std::atomic<bool>>(false);

thread_container_.add(pcp_util::thread(&nonBlockingActionTask,
modules_[request.module()],
request,
ResultsStorage { request },
connector_ptr_,
done),
done);
pcp_util::lock_guard<pcp_util::mutex> lck { thread_container_mutex_ };

if (thread_container_.find(request.transactionId())) {
err_msg = std::string { "already exists an ongoing task with "
"transaction id " };
err_msg += request.transactionId();
} else {
// Flag to enable signaling from task to thread_container
auto done = std::make_shared<std::atomic<bool>>(false);

// NB: we got the_lock, so we're sure this will not throw
// for having another thread with the same name
thread_container_.add(request.transactionId(),
pcp_util::thread(&nonBlockingActionTask,
modules_[request.module()],
request,
ResultsStorage { request },
connector_ptr_,
done),
done);
}
} catch (ResultsStorage::Error& e) {
// Failed to instantiate ResultsStorage
LOG_ERROR("Failed to initialize the result files for the %1%; error: %2%",
Expand Down
105 changes: 68 additions & 37 deletions lib/src/thread_container.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

namespace PXPAgent {

namespace pcp_util = PCPClient::Util;

// Check if the thread has completed; if so, and if it's joinable,
// detach it. Return true if completed, false otherwise.
bool detachIfCompleted(std::shared_ptr<ManagedThread> thread_ptr) {
Expand Down Expand Up @@ -53,7 +55,7 @@ ThreadContainer::ThreadContainer(const std::string& name,

ThreadContainer::~ThreadContainer() {
{
PCPClient::Util::lock_guard<PCPClient::Util::mutex> the_lock { mutex_ };
pcp_util::lock_guard<pcp_util::mutex> the_lock { mutex_ };
destructing_ = true;
cond_var_.notify_one();
}
Expand All @@ -64,8 +66,8 @@ ThreadContainer::~ThreadContainer() {

// Detach the completed threads
bool all_detached { true };
for (auto thread_ptr : threads_) {
if (!detachIfCompleted(thread_ptr)) {
for (auto itr = threads_.begin(); itr != threads_.end(); ++itr) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why can't you use the foreach syntax on the map?

if (!detachIfCompleted(itr->second)) {
all_detached = false;
}
}
Expand All @@ -77,13 +79,23 @@ ThreadContainer::~ThreadContainer() {
}
}

void ThreadContainer::add(PCPClient::Util::thread task,
void ThreadContainer::add(std::string thread_name,
pcp_util::thread task,
std::shared_ptr<std::atomic<bool>> is_done) {
LOG_TRACE("Adding thread %1% to the '%2%' ThreadContainer; added %3% "
"threads so far", task.get_id(), name_, num_added_threads_);
PCPClient::Util::lock_guard<PCPClient::Util::mutex> the_lock { mutex_ };
threads_.push_back(std::shared_ptr<ManagedThread> {
new ManagedThread { std::move(task), is_done } });
LOG_TRACE("Adding thread %1% (named '%2%') to the '%3%' "
"ThreadContainer; added %4% threads so far",
task.get_id(), thread_name, name_, num_added_threads_);
pcp_util::lock_guard<pcp_util::mutex> the_lock { mutex_ };

if (findLocked(thread_name))
throw Error { "thread name is already stored" };

threads_.insert(
std::make_pair(
std::move(thread_name),
std::shared_ptr<ManagedThread>(
new ManagedThread { std::move(task), is_done })));

num_added_threads_++;

// Start the monitoring thread, if necessary
Expand All @@ -99,45 +111,62 @@ void ThreadContainer::add(PCPClient::Util::thread task,

is_monitoring_ = true;
monitoring_thread_ptr_.reset(
new PCPClient::Util::thread(&ThreadContainer::monitoringTask_, this));
new pcp_util::thread(&ThreadContainer::monitoringTask_, this));
}
}

bool ThreadContainer::isMonitoring() {
PCPClient::Util::lock_guard<PCPClient::Util::mutex> the_lock { mutex_ };
bool ThreadContainer::find(const std::string& thread_name) const {
pcp_util::lock_guard<pcp_util::mutex> the_lock { mutex_ };
return findLocked(thread_name);
}

bool ThreadContainer::isMonitoring() const {
pcp_util::lock_guard<pcp_util::mutex> the_lock { mutex_ };
return is_monitoring_;
}

uint32_t ThreadContainer::getNumAddedThreads() {
PCPClient::Util::lock_guard<PCPClient::Util::mutex> the_lock { mutex_ };
uint32_t ThreadContainer::getNumAddedThreads() const {
pcp_util::lock_guard<pcp_util::mutex> the_lock { mutex_ };
return num_added_threads_;
}

uint32_t ThreadContainer::getNumErasedThreads() {
PCPClient::Util::lock_guard<PCPClient::Util::mutex> the_lock { mutex_ };
uint32_t ThreadContainer::getNumErasedThreads() const {
pcp_util::lock_guard<pcp_util::mutex> the_lock { mutex_ };
return num_erased_threads_;
}

std::vector<std::string> ThreadContainer::getThreadNames() const {
pcp_util::lock_guard<pcp_util::mutex> the_lock { mutex_ };
std::vector<std::string> names;
for (auto itr = threads_.begin(); itr != threads_.end(); itr++)
names.push_back(itr->first);
return names;
}

void ThreadContainer::setName(const std::string& name) {
PCPClient::Util::lock_guard<PCPClient::Util::mutex> the_lock { mutex_ };
pcp_util::lock_guard<pcp_util::mutex> the_lock { mutex_ };
name_ = name;
}

//
// Private methods
//

bool ThreadContainer::findLocked(const std::string& thread_name) const {
return threads_.find(thread_name) != threads_.end();
}

void ThreadContainer::monitoringTask_() {
LOG_DEBUG("Starting monitoring task for the '%1%' ThreadContainer, "
"with id %2%", name_, PCPClient::Util::this_thread::get_id());
"with id %2%", name_, pcp_util::this_thread::get_id());

while (true) {
PCPClient::Util::unique_lock<PCPClient::Util::mutex> the_lock { mutex_ };
auto now = PCPClient::Util::chrono::system_clock::now();
pcp_util::unique_lock<pcp_util::mutex> the_lock { mutex_ };
auto now = pcp_util::chrono::system_clock::now();

// Wait for thread objects or for the check interval timeout
cond_var_.wait_until(the_lock,
now + PCPClient::Util::chrono::milliseconds(check_interval));
now + pcp_util::chrono::milliseconds(check_interval));

if (destructing_) {
// The dtor has been invoked
Expand All @@ -150,22 +179,24 @@ void ThreadContainer::monitoringTask_() {
break;
}

// Get the range with the pointers of the thread objects
// that have completed their execution
auto detached_threads_it = std::remove_if(threads_.begin(),
threads_.end(),
detachIfCompleted);

// ... and delete them; note that we keep the pointers of the
// thread instances that are still executing to be able to
// destroy them when exiting
auto num_deletes = std::distance(detached_threads_it, threads_.end());
if (num_deletes > 0) {
LOG_DEBUG("Deleting %1% thread objects that have completed their "
"execution; deleted %2% threads so far",
num_deletes, num_erased_threads_);
threads_.erase(detached_threads_it, threads_.end());
num_erased_threads_ += num_deletes;
int num_deletions { 0 };

for (auto itr = threads_.begin(); itr != threads_.end();) {
if (detachIfCompleted(itr->second)) {
LOG_DEBUG("Deleting thread %1% (named '%2%')",
itr->second->the_instance.get_id(), itr->first);
itr = threads_.erase(itr);
num_deletions++;
} else {
++itr;
}
}

if (num_deletions > 0) {
num_erased_threads_ += num_deletions;
LOG_DEBUG("Deleted %1% thread objects that have completed their "
"execution; in total, deleted %2% threads so far",
num_deletions, num_erased_threads_);
}

if (threads_.size() < threads_threshold) {
Expand Down
Loading