diff --git a/mcrouter/CarbonRouterInstance-inl.h b/mcrouter/CarbonRouterInstance-inl.h index 72b29a7cc..bffa1a706 100644 --- a/mcrouter/CarbonRouterInstance-inl.h +++ b/mcrouter/CarbonRouterInstance-inl.h @@ -275,10 +275,7 @@ CarbonRouterInstance::spinUp() { } } - auto executorObserver = std::make_shared(); - proxyThreads_->addObserver(executorObserver); - std::vector threadPoolEvbs = - executorObserver->extractEvbs(); + auto threadPoolEvbs = extractEvbs(*proxyThreads_); if (threadPoolEvbs.size() != opts_.num_proxies) { return folly::makeUnexpected(folly::sformat( "IOThreadPoolExecutor size does not match num_proxies sz={} proxies={} {}", @@ -286,7 +283,6 @@ CarbonRouterInstance::spinUp() { opts_.num_proxies, folly::exceptionStr(std::current_exception()))); } - proxyThreads_->removeObserver(executorObserver); if (opts_.enable_service_router && mcrouter::gSRInitHook) { try { diff --git a/mcrouter/ExecutorObserver.h b/mcrouter/ExecutorObserver.h index b3579fde7..d47f26aa7 100644 --- a/mcrouter/ExecutorObserver.h +++ b/mcrouter/ExecutorObserver.h @@ -7,31 +7,30 @@ #pragma once -#include +#include + +#include +#include namespace facebook { namespace memcache { namespace mcrouter { -class ExecutorObserver : public folly::ThreadPoolExecutor::Observer { - public: - void threadStarted( - folly::ThreadPoolExecutor::ThreadHandle* threadHandle) override { - CHECK(!initializationComplete_); - evbs_.wlock()->push_back( - folly::IOThreadPoolExecutorBase::getEventBase(threadHandle)); - } - void threadStopped(folly::ThreadPoolExecutor::ThreadHandle*) override {} - - std::vector extractEvbs() { - CHECK(!std::exchange(initializationComplete_, true)); - return evbs_.exchange({}); +/** + * Convenience method to convert the keepalives returned by getAllEventBases() + * into raw pointers, to accommodate for AsyncMcServer::Options interface. + * TODO: Change Options to accept KeepAlives instead? + */ +inline std::vector extractEvbs( + folly::IOThreadPoolExecutorBase& ex) { + auto evbKeepAlives = ex.getAllEventBases(); + std::vector ret; + ret.reserve(evbKeepAlives.size()); + for (const auto& ka : evbKeepAlives) { + ret.push_back(ka.get()); } - - private: - bool initializationComplete_{false}; - folly::Synchronized> evbs_; -}; + return ret; +} } // namespace mcrouter } // namespace memcache diff --git a/mcrouter/Server-inl.h b/mcrouter/Server-inl.h index 959e2a32d..db877d630 100644 --- a/mcrouter/Server-inl.h +++ b/mcrouter/Server-inl.h @@ -305,12 +305,9 @@ bool runServerDual( mcrouterOpts.num_proxies, std::make_shared(threadPrefix)); - // Run observer and extract event bases - auto executorObserver = std::make_shared(); - ioThreadPool->addObserver(executorObserver); - auto evbs = executorObserver->extractEvbs(); + // extract event bases + auto evbs = extractEvbs(*ioThreadPool); CHECK_EQ(evbs.size(), mcrouterOpts.num_proxies); - ioThreadPool->removeObserver(executorObserver); // Create AsyncMcServer instance asyncMcServer = @@ -508,12 +505,9 @@ bool runServer( ioThreadPool = std::make_shared( mcrouterOpts.num_proxies, mcrouterOpts.num_proxies); - // Run observer and extract event bases - auto executorObserver = std::make_shared(); - ioThreadPool->addObserver(executorObserver); - auto evbs = executorObserver->extractEvbs(); + // extract event bases + auto evbs = extractEvbs(*ioThreadPool); CHECK_EQ(evbs.size(), mcrouterOpts.num_proxies); - ioThreadPool->removeObserver(executorObserver); // Get EVB of main thread auto localEvb = ioThreadPool->getEventBaseManager()->getEventBase(); diff --git a/mcrouter/lib/network/test/MockMcServerDual.cpp b/mcrouter/lib/network/test/MockMcServerDual.cpp index 943b86834..98915a521 100644 --- a/mcrouter/lib/network/test/MockMcServerDual.cpp +++ b/mcrouter/lib/network/test/MockMcServerDual.cpp @@ -20,6 +20,7 @@ #include #include +#include "mcrouter/ExecutorObserver.h" #include "mcrouter/lib/network/AsyncMcServer.h" #include "mcrouter/lib/network/AsyncMcServerWorker.h" #include "mcrouter/lib/network/CarbonMessageDispatcher.h" @@ -86,33 +87,6 @@ void shutdown() { } } -class ExecutorObserver : public folly::ThreadPoolExecutor::Observer { - public: - void threadStarted( - folly::ThreadPoolExecutor::ThreadHandle* threadHandle) override { - CHECK(!initializationComplete_); - evbs_.wlock()->push_back( - folly::IOThreadPoolExecutor::getEventBase(threadHandle)); - } - void threadPreviouslyStarted( - folly::ThreadPoolExecutor::ThreadHandle* threadHandle) override { - CHECK(!initializationComplete_); - evbs_.wlock()->push_back( - folly::IOThreadPoolExecutor::getEventBase(threadHandle)); - } - - void threadStopped(folly::ThreadPoolExecutor::ThreadHandle*) override {} - - std::vector extractEvbs() { - CHECK(!std::exchange(initializationComplete_, true)); - return evbs_.exchange({}); - } - - private: - bool initializationComplete_{false}; - folly::Synchronized> evbs_; -}; - class ShutdownSignalHandler : public folly::AsyncSignalHandler { public: explicit ShutdownSignalHandler(folly::EventBase* evb) @@ -166,12 +140,7 @@ int main(int argc, char** argv) { // Create IOThreadPoolExecutor and extract event bases std::shared_ptr ioThreadPool = std::make_shared(numThreads); - auto executorObserver = std::make_shared(); - ioThreadPool->addObserver(executorObserver); - std::vector ioThreads; - for (auto& ioEvb : executorObserver->extractEvbs()) { - ioThreads.push_back(ioEvb); - } + auto ioThreads = mcrouter::extractEvbs(*ioThreadPool); // Thrift server setup LOG(INFO) << "Configure thrift server."; diff --git a/mcrouter/test/cpp_unit_tests/McrouterClientUsage.cpp b/mcrouter/test/cpp_unit_tests/McrouterClientUsage.cpp index b5928d4f5..13f2ca939 100644 --- a/mcrouter/test/cpp_unit_tests/McrouterClientUsage.cpp +++ b/mcrouter/test/cpp_unit_tests/McrouterClientUsage.cpp @@ -39,7 +39,6 @@ using facebook::memcache::MemcacheRouterInfo; using facebook::memcache::mcrouter::CarbonRouterClient; using facebook::memcache::mcrouter::CarbonRouterInstance; using facebook::memcache::mcrouter::defaultTestOptions; -using facebook::memcache::mcrouter::ExecutorObserver; /** * This test provides an example of how to use the CarbonRouterClient API. @@ -91,11 +90,8 @@ TEST(CarbonRouterClient, basicUsageSameThreadClient) { // Explicitly control which proxy should handle requests from this client. // Currently, this is necessary when using createSameThreadClient() with more // than one thread. - // Run observer and extract event bases - auto executorObserver = std::make_shared(); - ioThreadPool->addObserver(executorObserver); - auto evbs = executorObserver->extractEvbs(); - ioThreadPool->removeObserver(executorObserver); + // extract event bases + auto evbs = facebook::memcache::mcrouter::extractEvbs(*ioThreadPool); auto& eventBase = *evbs.front(); client->setProxyIndex(0);