Skip to content

Commit

Permalink
Replace ExecutorObserver with native method
Browse files Browse the repository at this point in the history
Summary:
I plan to deprecate `IOThreadPoolExecutorBase::getEventBase()` because it is too coupled with the implementation of `IOThreadPoolExecutor` and does not allow alternative implementations.

Replacing `ExecutorObserver` is easy because `IOThreadPoolExecutorBase` provides `getAllEventBases()`, which is exactly what we need and actually works even in the dynamic case.

Reviewed By: stuclar

Differential Revision: D52093711

fbshipit-source-id: 0cf869b19c9338ed1ef013046847d142fbb50dbd
  • Loading branch information
ot authored and facebook-github-bot committed Dec 16, 2023
1 parent c8ba1ee commit fbe8b8d
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 73 deletions.
6 changes: 1 addition & 5 deletions mcrouter/CarbonRouterInstance-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -275,18 +275,14 @@ CarbonRouterInstance<RouterInfo>::spinUp() {
}
}

auto executorObserver = std::make_shared<ExecutorObserver>();
proxyThreads_->addObserver(executorObserver);
std::vector<folly::EventBase*> threadPoolEvbs =
executorObserver->extractEvbs();
auto threadPoolEvbs = extractEvbs(*proxyThreads_);
if (threadPoolEvbs.size() != opts_.num_proxies) {
return folly::makeUnexpected(folly::sformat(
"IOThreadPoolExecutor size does not match num_proxies sz={} proxies={} {}",
threadPoolEvbs.size(),
opts_.num_proxies,
folly::exceptionStr(std::current_exception())));
}
proxyThreads_->removeObserver(executorObserver);

if (opts_.enable_service_router && mcrouter::gSRInitHook) {
try {
Expand Down
37 changes: 18 additions & 19 deletions mcrouter/ExecutorObserver.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,31 +7,30 @@

#pragma once

#include <folly/executors/ThreadPoolExecutor.h>
#include <vector>

#include <folly/executors/IOThreadPoolExecutor.h>
#include <folly/io/async/EventBase.h>

namespace facebook {
namespace memcache {
namespace mcrouter {

class ExecutorObserver : public folly::ThreadPoolExecutor::Observer {
public:
void threadStarted(
folly::ThreadPoolExecutor::ThreadHandle* threadHandle) override {
CHECK(!initializationComplete_);
evbs_.wlock()->push_back(
folly::IOThreadPoolExecutorBase::getEventBase(threadHandle));
}
void threadStopped(folly::ThreadPoolExecutor::ThreadHandle*) override {}

std::vector<folly::EventBase*> extractEvbs() {
CHECK(!std::exchange(initializationComplete_, true));
return evbs_.exchange({});
/**
* Convenience method to convert the keepalives returned by getAllEventBases()
* into raw pointers, to accommodate for AsyncMcServer::Options interface.
* TODO: Change Options to accept KeepAlives instead?
*/
inline std::vector<folly::EventBase*> extractEvbs(
folly::IOThreadPoolExecutorBase& ex) {
auto evbKeepAlives = ex.getAllEventBases();
std::vector<folly::EventBase*> ret;
ret.reserve(evbKeepAlives.size());
for (const auto& ka : evbKeepAlives) {
ret.push_back(ka.get());
}

private:
bool initializationComplete_{false};
folly::Synchronized<std::vector<folly::EventBase*>> evbs_;
};
return ret;
}

} // namespace mcrouter
} // namespace memcache
Expand Down
14 changes: 4 additions & 10 deletions mcrouter/Server-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -305,12 +305,9 @@ bool runServerDual(
mcrouterOpts.num_proxies,
std::make_shared<folly::NamedThreadFactory>(threadPrefix));

// Run observer and extract event bases
auto executorObserver = std::make_shared<ExecutorObserver>();
ioThreadPool->addObserver(executorObserver);
auto evbs = executorObserver->extractEvbs();
// extract event bases
auto evbs = extractEvbs(*ioThreadPool);
CHECK_EQ(evbs.size(), mcrouterOpts.num_proxies);
ioThreadPool->removeObserver(executorObserver);

// Create AsyncMcServer instance
asyncMcServer =
Expand Down Expand Up @@ -508,12 +505,9 @@ bool runServer(
ioThreadPool = std::make_shared<folly::IOThreadPoolExecutor>(
mcrouterOpts.num_proxies, mcrouterOpts.num_proxies);

// Run observer and extract event bases
auto executorObserver = std::make_shared<ExecutorObserver>();
ioThreadPool->addObserver(executorObserver);
auto evbs = executorObserver->extractEvbs();
// extract event bases
auto evbs = extractEvbs(*ioThreadPool);
CHECK_EQ(evbs.size(), mcrouterOpts.num_proxies);
ioThreadPool->removeObserver(executorObserver);

// Get EVB of main thread
auto localEvb = ioThreadPool->getEventBaseManager()->getEventBase();
Expand Down
35 changes: 2 additions & 33 deletions mcrouter/lib/network/test/MockMcServerDual.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <folly/logging/Init.h>
#include <thrift/lib/cpp2/server/ThriftServer.h>

#include "mcrouter/ExecutorObserver.h"
#include "mcrouter/lib/network/AsyncMcServer.h"
#include "mcrouter/lib/network/AsyncMcServerWorker.h"
#include "mcrouter/lib/network/CarbonMessageDispatcher.h"
Expand Down Expand Up @@ -86,33 +87,6 @@ void shutdown() {
}
}

class ExecutorObserver : public folly::ThreadPoolExecutor::Observer {
public:
void threadStarted(
folly::ThreadPoolExecutor::ThreadHandle* threadHandle) override {
CHECK(!initializationComplete_);
evbs_.wlock()->push_back(
folly::IOThreadPoolExecutor::getEventBase(threadHandle));
}
void threadPreviouslyStarted(
folly::ThreadPoolExecutor::ThreadHandle* threadHandle) override {
CHECK(!initializationComplete_);
evbs_.wlock()->push_back(
folly::IOThreadPoolExecutor::getEventBase(threadHandle));
}

void threadStopped(folly::ThreadPoolExecutor::ThreadHandle*) override {}

std::vector<folly::EventBase*> extractEvbs() {
CHECK(!std::exchange(initializationComplete_, true));
return evbs_.exchange({});
}

private:
bool initializationComplete_{false};
folly::Synchronized<std::vector<folly::EventBase*>> evbs_;
};

class ShutdownSignalHandler : public folly::AsyncSignalHandler {
public:
explicit ShutdownSignalHandler(folly::EventBase* evb)
Expand Down Expand Up @@ -166,12 +140,7 @@ int main(int argc, char** argv) {
// Create IOThreadPoolExecutor and extract event bases
std::shared_ptr<folly::IOThreadPoolExecutor> ioThreadPool =
std::make_shared<folly::IOThreadPoolExecutor>(numThreads);
auto executorObserver = std::make_shared<ExecutorObserver>();
ioThreadPool->addObserver(executorObserver);
std::vector<folly::EventBase*> ioThreads;
for (auto& ioEvb : executorObserver->extractEvbs()) {
ioThreads.push_back(ioEvb);
}
auto ioThreads = mcrouter::extractEvbs(*ioThreadPool);

// Thrift server setup
LOG(INFO) << "Configure thrift server.";
Expand Down
8 changes: 2 additions & 6 deletions mcrouter/test/cpp_unit_tests/McrouterClientUsage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ using facebook::memcache::MemcacheRouterInfo;
using facebook::memcache::mcrouter::CarbonRouterClient;
using facebook::memcache::mcrouter::CarbonRouterInstance;
using facebook::memcache::mcrouter::defaultTestOptions;
using facebook::memcache::mcrouter::ExecutorObserver;

/**
* This test provides an example of how to use the CarbonRouterClient API.
Expand Down Expand Up @@ -91,11 +90,8 @@ TEST(CarbonRouterClient, basicUsageSameThreadClient) {
// Explicitly control which proxy should handle requests from this client.
// Currently, this is necessary when using createSameThreadClient() with more
// than one thread.
// Run observer and extract event bases
auto executorObserver = std::make_shared<ExecutorObserver>();
ioThreadPool->addObserver(executorObserver);
auto evbs = executorObserver->extractEvbs();
ioThreadPool->removeObserver(executorObserver);
// extract event bases
auto evbs = facebook::memcache::mcrouter::extractEvbs(*ioThreadPool);

auto& eventBase = *evbs.front();
client->setProxyIndex(0);
Expand Down

0 comments on commit fbe8b8d

Please sign in to comment.