From dc8be12510c2fd5a809d9a82d2c14b464b5e5a3f Mon Sep 17 00:00:00 2001 From: fanquake Date: Wed, 27 Jan 2021 15:04:34 +0800 Subject: [PATCH] refactor: remove boost::thread_group usage --- src/addrman.cpp | 2 ++ src/bitcoin-cli.cpp | 1 + src/init.cpp | 11 ++++------- src/scheduler.h | 8 ++++++-- src/script/sigcache.cpp | 2 ++ src/test/checkqueue_tests.cpp | 17 ++++++++++------- src/test/cuckoocache_tests.cpp | 2 ++ src/test/scheduler_tests.cpp | 22 ++++++++++++++-------- src/test/util/setup_common.cpp | 4 +--- src/test/util/setup_common.h | 4 +--- src/util/system.h | 7 ------- test/lint/lint-includes.sh | 3 +-- 12 files changed, 44 insertions(+), 39 deletions(-) diff --git a/src/addrman.cpp b/src/addrman.cpp index ed7fccc0ff6..f91121f156f 100644 --- a/src/addrman.cpp +++ b/src/addrman.cpp @@ -9,6 +9,8 @@ #include #include +#include + int CAddrInfo::GetTriedBucket(const uint256& nKey, const std::vector &asmap) const { uint64_t hash1 = (CHashWriter(SER_GETHASH, 0) << nKey << GetKey()).GetCheapHash(); diff --git a/src/bitcoin-cli.cpp b/src/bitcoin-cli.cpp index 6a0d44a1833..fa41208a31d 100644 --- a/src/bitcoin-cli.cpp +++ b/src/bitcoin-cli.cpp @@ -21,6 +21,7 @@ #include #include +#include #include #include #include diff --git a/src/init.cpp b/src/init.cpp index 64bbf8988da..e01d16e1336 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -68,6 +68,8 @@ #include #include #include +#include +#include #ifndef WIN32 #include @@ -78,7 +80,6 @@ #include #include -#include #if ENABLE_ZMQ #include @@ -155,8 +156,6 @@ static std::unique_ptr globalVerifyHandle; static std::thread g_load_block; -static boost::thread_group threadGroup; - void Interrupt(NodeContext& node) { InterruptHTTPServer(); @@ -218,11 +217,9 @@ void Shutdown(NodeContext& node) StopTorControl(); // After everything has been shut down, but before things get flushed, stop the - // CScheduler/checkqueue, threadGroup and load block thread. + // CScheduler/checkqueue, scheduler and load block thread. if (node.scheduler) node.scheduler->stop(); if (g_load_block.joinable()) g_load_block.join(); - threadGroup.interrupt_all(); - threadGroup.join_all(); StopScriptCheckWorkerThreads(); // After the threads that potentially access these pointers have been stopped, @@ -1342,7 +1339,7 @@ bool AppInitMain(const util::Ref& context, NodeContext& node, interfaces::BlockA node.scheduler = MakeUnique(); // Start the lightweight task scheduler thread - threadGroup.create_thread([&] { TraceThread("scheduler", [&] { node.scheduler->serviceQueue(); }); }); + node.scheduler->m_service_thread = std::thread([&] { TraceThread("scheduler", [&] { node.scheduler->serviceQueue(); }); }); // Gather some entropy once per minute. node.scheduler->scheduleEvery([]{ diff --git a/src/scheduler.h b/src/scheduler.h index d7fe00d1b47..9eec8c0fa05 100644 --- a/src/scheduler.h +++ b/src/scheduler.h @@ -9,6 +9,7 @@ #include #include #include +#include #include @@ -35,6 +36,8 @@ class CScheduler CScheduler(); ~CScheduler(); + std::thread m_service_thread; + typedef std::function Function; /** Call func at/after time t */ @@ -62,8 +65,7 @@ class CScheduler void MockForward(std::chrono::seconds delta_seconds); /** - * Services the queue 'forever'. Should be run in a thread, - * and interrupted using boost::interrupt_thread + * Services the queue 'forever'. Should be run in a thread. */ void serviceQueue(); @@ -72,12 +74,14 @@ class CScheduler { WITH_LOCK(newTaskMutex, stopRequested = true); newTaskScheduled.notify_all(); + if (m_service_thread.joinable()) m_service_thread.join(); } /** Tell any threads running serviceQueue to stop when there is no work left to be done */ void StopWhenDrained() { WITH_LOCK(newTaskMutex, stopWhenEmpty = true); newTaskScheduled.notify_all(); + if (m_service_thread.joinable()) m_service_thread.join(); } /** diff --git a/src/script/sigcache.cpp b/src/script/sigcache.cpp index 2bfe206597b..cf47d37e709 100644 --- a/src/script/sigcache.cpp +++ b/src/script/sigcache.cpp @@ -11,6 +11,8 @@ #include #include + +#include #include namespace { diff --git a/src/test/checkqueue_tests.cpp b/src/test/checkqueue_tests.cpp index 76d67270444..21921375b3e 100644 --- a/src/test/checkqueue_tests.cpp +++ b/src/test/checkqueue_tests.cpp @@ -10,7 +10,6 @@ #include #include -#include #include #include @@ -363,11 +362,11 @@ BOOST_AUTO_TEST_CASE(test_CheckQueueControl_Locks) { auto queue = MakeUnique(QUEUE_BATCH_SIZE); { - boost::thread_group tg; + std::vector tg; std::atomic nThreads {0}; std::atomic fails {0}; for (size_t i = 0; i < 3; ++i) { - tg.create_thread( + tg.emplace_back( [&]{ CCheckQueueControl control(queue.get()); // While sleeping, no other thread should execute to this point @@ -376,11 +375,13 @@ BOOST_AUTO_TEST_CASE(test_CheckQueueControl_Locks) fails += observed != nThreads; }); } - tg.join_all(); + for (auto& thread: tg) { + if (thread.joinable()) thread.join(); + } BOOST_REQUIRE_EQUAL(fails, 0); } { - boost::thread_group tg; + std::vector tg; std::mutex m; std::condition_variable cv; bool has_lock{false}; @@ -389,7 +390,7 @@ BOOST_AUTO_TEST_CASE(test_CheckQueueControl_Locks) bool done_ack{false}; { std::unique_lock l(m); - tg.create_thread([&]{ + tg.emplace_back([&]{ CCheckQueueControl control(queue.get()); std::unique_lock ll(m); has_lock = true; @@ -415,7 +416,9 @@ BOOST_AUTO_TEST_CASE(test_CheckQueueControl_Locks) cv.notify_one(); BOOST_REQUIRE(!fails); } - tg.join_all(); + for (auto& thread: tg) { + if (thread.joinable()) thread.join(); + } } } BOOST_AUTO_TEST_SUITE_END() diff --git a/src/test/cuckoocache_tests.cpp b/src/test/cuckoocache_tests.cpp index 3a951d28aec..75c7e47e641 100644 --- a/src/test/cuckoocache_tests.cpp +++ b/src/test/cuckoocache_tests.cpp @@ -2,6 +2,8 @@ // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #include +#include +#include #include #include #include diff --git a/src/test/scheduler_tests.cpp b/src/test/scheduler_tests.cpp index 21162356b81..d57c000b920 100644 --- a/src/test/scheduler_tests.cpp +++ b/src/test/scheduler_tests.cpp @@ -7,10 +7,11 @@ #include #include -#include #include #include +#include +#include BOOST_AUTO_TEST_SUITE(scheduler_tests) @@ -69,16 +70,16 @@ BOOST_AUTO_TEST_CASE(manythreads) BOOST_CHECK(last > now); // As soon as these are created they will start running and servicing the queue - boost::thread_group microThreads; + std::vector microThreads; for (int i = 0; i < 5; i++) - microThreads.create_thread(std::bind(&CScheduler::serviceQueue, µTasks)); + microThreads.emplace_back(std::bind(&CScheduler::serviceQueue, µTasks)); UninterruptibleSleep(std::chrono::microseconds{600}); now = std::chrono::system_clock::now(); // More threads and more tasks: for (int i = 0; i < 5; i++) - microThreads.create_thread(std::bind(&CScheduler::serviceQueue, µTasks)); + microThreads.emplace_back(std::bind(&CScheduler::serviceQueue, µTasks)); for (int i = 0; i < 100; i++) { std::chrono::system_clock::time_point t = now + std::chrono::microseconds(randomMsec(rng)); std::chrono::system_clock::time_point tReschedule = now + std::chrono::microseconds(500 + randomMsec(rng)); @@ -91,7 +92,10 @@ BOOST_AUTO_TEST_CASE(manythreads) // Drain the task queue then exit threads microTasks.StopWhenDrained(); - microThreads.join_all(); // ... wait until all the threads are done + // wait until all the threads are done + for (auto& thread: microThreads) { + if (thread.joinable()) thread.join(); + } int counterSum = 0; for (int i = 0; i < 10; i++) { @@ -131,9 +135,9 @@ BOOST_AUTO_TEST_CASE(singlethreadedscheduler_ordered) // if the queues only permit execution of one task at once then // the extra threads should effectively be doing nothing // if they don't we'll get out of order behaviour - boost::thread_group threads; + std::vector threads; for (int i = 0; i < 5; ++i) { - threads.create_thread(std::bind(&CScheduler::serviceQueue, &scheduler)); + threads.emplace_back(std::bind(&CScheduler::serviceQueue, &scheduler)); } // these are not atomic, if SinglethreadedSchedulerClient prevents @@ -157,7 +161,9 @@ BOOST_AUTO_TEST_CASE(singlethreadedscheduler_ordered) // finish up scheduler.StopWhenDrained(); - threads.join_all(); + for (auto& thread: threads) { + if (thread.joinable()) thread.join(); + } BOOST_CHECK_EQUAL(counter1, 100); BOOST_CHECK_EQUAL(counter2, 100); diff --git a/src/test/util/setup_common.cpp b/src/test/util/setup_common.cpp index 8a9694f55b7..d99d63aac6d 100644 --- a/src/test/util/setup_common.cpp +++ b/src/test/util/setup_common.cpp @@ -131,7 +131,7 @@ ChainTestingSetup::ChainTestingSetup(const std::string& chainName, const std::ve // We have to run a scheduler thread to prevent ActivateBestChain // from blocking due to queue overrun. m_node.scheduler = MakeUnique(); - threadGroup.create_thread([&] { TraceThread("scheduler", [&] { m_node.scheduler->serviceQueue(); }); }); + m_node.scheduler->m_service_thread = std::thread([&] { TraceThread("scheduler", [&] { m_node.scheduler->serviceQueue(); }); }); GetMainSignals().RegisterBackgroundSignalScheduler(*m_node.scheduler); pblocktree.reset(new CBlockTreeDB(1 << 20, true)); @@ -150,8 +150,6 @@ ChainTestingSetup::ChainTestingSetup(const std::string& chainName, const std::ve ChainTestingSetup::~ChainTestingSetup() { if (m_node.scheduler) m_node.scheduler->stop(); - threadGroup.interrupt_all(); - threadGroup.join_all(); StopScriptCheckWorkerThreads(); GetMainSignals().FlushBackgroundCallbacks(); GetMainSignals().UnregisterBackgroundSignalScheduler(); diff --git a/src/test/util/setup_common.h b/src/test/util/setup_common.h index dff8cf825e5..331c1235cb5 100644 --- a/src/test/util/setup_common.h +++ b/src/test/util/setup_common.h @@ -17,8 +17,7 @@ #include #include - -#include +#include /** This is connected to the logger. Can be used to redirect logs to any other log */ extern const std::function G_TEST_LOG_FUN; @@ -88,7 +87,6 @@ struct BasicTestingSetup { * initialization behaviour. */ struct ChainTestingSetup : public BasicTestingSetup { - boost::thread_group threadGroup; explicit ChainTestingSetup(const std::string& chainName = CBaseChainParams::MAIN, const std::vector& extra_args = {}); ~ChainTestingSetup(); diff --git a/src/util/system.h b/src/util/system.h index 010fc5b49fc..d06c30bfa74 100644 --- a/src/util/system.h +++ b/src/util/system.h @@ -35,8 +35,6 @@ #include #include -#include // for boost::thread_interrupted - class UniValue; // Application startup time (used for uptime calculation) @@ -450,11 +448,6 @@ template void TraceThread(const char* name, Callable func) func(); LogPrintf("%s thread exit\n", name); } - catch (const boost::thread_interrupted&) - { - LogPrintf("%s thread interrupt\n", name); - throw; - } catch (const std::exception& e) { PrintExceptionContinue(&e, name); throw; diff --git a/test/lint/lint-includes.sh b/test/lint/lint-includes.sh index 262e9d4c790..6623f9ce4c2 100755 --- a/test/lint/lint-includes.sh +++ b/test/lint/lint-includes.sh @@ -67,9 +67,8 @@ EXPECTED_BOOST_INCLUDES=( boost/signals2/optional_last_value.hpp boost/signals2/signal.hpp boost/test/unit_test.hpp - boost/thread/condition_variable.hpp + boost/thread/lock_types.hpp boost/thread/shared_mutex.hpp - boost/thread/thread.hpp ) for BOOST_INCLUDE in $(git grep '^#include ' | sort -u); do