Skip to content

Commit

Permalink
Refactor task runner for more robust runtime state (#1318)
Browse files Browse the repository at this point in the history
  • Loading branch information
PragmaTwice authored Mar 13, 2023
1 parent 0e28da2 commit cb7ae54
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 28 deletions.
26 changes: 14 additions & 12 deletions src/common/task_runner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,21 @@ Status TaskRunner::Publish(const Task &task) {
}

task_queue_.emplace_back(task);
cond_.notify_all();
cond_.notify_one();
return Status::OK();
}

void TaskRunner::Start() {
Status TaskRunner::Start() {
if (!threads_.empty()) {
return {Status::NotOK, "Task runner is expected to stop before starting"};
}

stop_ = false;
for (int i = 0; i < n_thread_; i++) {
threads_.emplace_back([this] {
Util::ThreadSetName("task-runner");
this->run();
});
threads_.emplace_back(GET_OR_RET(Util::CreateThread("task-runner", [this] { this->run(); })));
}

return Status::OK();
}

void TaskRunner::Stop() {
Expand All @@ -55,22 +58,22 @@ void TaskRunner::Stop() {
cond_.notify_all();
}

void TaskRunner::Join() {
Status TaskRunner::Join() {
for (auto &thread : threads_) {
if (auto s = Util::ThreadJoin(thread); !s) {
LOG(WARNING) << "Task thread operation failed: " << s.Msg();
return s.Prefixed("Task thread operation failed");
}
}
}

void TaskRunner::Purge() {
std::lock_guard<std::mutex> guard(mu_);
threads_.clear();
task_queue_.clear();

return Status::OK();
}

void TaskRunner::run() {
std::unique_lock<std::mutex> lock(mu_);

while (!stop_) {
cond_.wait(lock, [this]() -> bool { return stop_ || !task_queue_.empty(); });

Expand All @@ -84,6 +87,5 @@ void TaskRunner::run() {
}

task_queue_.clear();
lock.unlock();
// CAUTION: drop the rest of tasks, don't use task runner if the task can't be drop
}
9 changes: 4 additions & 5 deletions src/common/task_runner.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@

#include <condition_variable>
#include <cstdint>
#include <deque>
#include <functional>
#include <list>
#include <mutex>
#include <thread>
#include <vector>
Expand All @@ -40,17 +40,16 @@ class TaskRunner {

Status Publish(const Task &task);
size_t QueueSize() { return task_queue_.size(); }
void Start();
Status Start();
void Stop();
void Join();
void Purge();
Status Join();

private:
void run();

bool stop_ = false;
uint32_t max_queue_size_;
std::list<Task> task_queue_;
std::deque<Task> task_queue_;
std::mutex mu_;
std::condition_variable cond_;
int n_thread_;
Expand Down
17 changes: 12 additions & 5 deletions src/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,9 @@ Status Server::Start() {
worker->Start();
}

task_runner_.Start();
if (auto s = task_runner_.Start(); !s) {
return s.Prefixed("Failed to start task runner");
}
// setup server cron thread
cron_thread_ = GET_OR_RET(Util::CreateThread("server-cron", [this] { this->cron(); }));

Expand Down Expand Up @@ -230,7 +232,9 @@ void Server::Join() {
worker->Join();
}

task_runner_.Join();
if (auto s = task_runner_.Join(); !s) {
LOG(WARNING) << s.Msg();
}
if (auto s = Util::ThreadJoin(cron_thread_); !s) {
LOG(WARNING) << "Cron thread operation failed: " << s.Msg();
}
Expand Down Expand Up @@ -262,7 +266,9 @@ Status Server::AddMaster(const std::string &host, uint32_t port, bool force_reco
auto s = replication_thread_->Start([this]() { PrepareRestoreDB(); },
[this]() {
this->is_loading_ = false;
task_runner_.Start();
if (auto s = task_runner_.Start(); !s) {
LOG(WARNING) << "Failed to start task runner: " << s.Msg();
}
});
if (s.IsOK()) {
master_host_ = host;
Expand Down Expand Up @@ -1189,8 +1195,9 @@ void Server::PrepareRestoreDB() {
// Stop task runner
LOG(INFO) << "[server] Stopping the task runner and clear task queue...";
task_runner_.Stop();
task_runner_.Join();
task_runner_.Purge();
if (auto s = task_runner_.Join(); !s) {
LOG(WARNING) << "[server] " << s.Msg();
}

// If the DB is restored, the object 'db_' will be destroyed, but
// 'db_' will be accessed in data migration task. To avoid wrong
Expand Down
10 changes: 4 additions & 6 deletions tests/cppunit/task_runner_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,15 @@ TEST(TaskRunner, PublishToStopQueue) {
TEST(TaskRunner, Run) {
std::atomic<int> counter = {0};
TaskRunner tr(3, 1024);
tr.Start();
auto _ = tr.Start();

Status s;
Task t;
for (int i = 0; i < 100; i++) {
t = [&counter] { counter.fetch_add(1); };
s = tr.Publish(t);
Task t = [&counter] { counter.fetch_add(1); };
auto s = tr.Publish(t);
ASSERT_TRUE(s.IsOK());
}
sleep(1);
ASSERT_EQ(100, counter);
tr.Stop();
tr.Join();
_ = tr.Join();
}

0 comments on commit cb7ae54

Please sign in to comment.