Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

orderly cm/cds/rds initialization #482

Merged
merged 1 commit into from
Feb 15, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions include/envoy/init/init.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#pragma once

#include "envoy/common/pure.h"

namespace Init {

/**
* A single initialization target.
*/
class Target {
public:
virtual ~Target() {}

/**
* Called when the target should begin its own initialization.
* @param callback supplies the callback to invoke when the target has completed its
* initialization.
*/
virtual void initialize(std::function<void()> callback) PURE;
};

/**
* A manager that initializes multiple targets.
*/
class Manager {
public:
virtual ~Manager() {}

/**
* Register a target to be initialized in the future. The manager will call initialize() on
* each target at some point in the future.
*/
virtual void registerTarget(Target& target) PURE;
};

} // Init
11 changes: 11 additions & 0 deletions include/envoy/server/instance.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include "envoy/access_log/access_log.h"
#include "envoy/api/api.h"
#include "envoy/init/init.h"
#include "envoy/local_info/local_info.h"
#include "envoy/ratelimit/ratelimit.h"
#include "envoy/runtime/runtime.h"
Expand Down Expand Up @@ -103,6 +104,16 @@ class Instance {
*/
virtual HotRestart& hotRestart() PURE;

/**
* @return the server's init manager. This can be used for extensions that need to initialize
* after cluster manager init but before the server starts listening. All extensions
* should register themselves during configuration load. initialize() will be called on
* each registered target after cluster manager init but before the server starts
* listening. Once all targets have initialized and invoked their callbacks, the server
* will start listening.
*/
virtual Init::Manager& initManager() PURE;

/**
* @return the server's CLI options.
*/
Expand Down
30 changes: 21 additions & 9 deletions source/common/router/rds_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,11 @@

namespace Router {

RouteConfigProviderPtr
RouteConfigProviderUtil::create(const Json::Object& config, Runtime::Loader& runtime,
Upstream::ClusterManager& cm, Event::Dispatcher& dispatcher,
Runtime::RandomGenerator& random,
const LocalInfo::LocalInfo& local_info, Stats::Scope& scope,
const std::string& stat_prefix, ThreadLocal::Instance& tls) {
RouteConfigProviderPtr RouteConfigProviderUtil::create(
const Json::Object& config, Runtime::Loader& runtime, Upstream::ClusterManager& cm,
Event::Dispatcher& dispatcher, Runtime::RandomGenerator& random,
const LocalInfo::LocalInfo& local_info, Stats::Scope& scope, const std::string& stat_prefix,
ThreadLocal::Instance& tls, Init::Manager& init_manager) {
bool has_rds = config.hasObject("rds");
bool has_route_config = config.hasObject("route_config");
if (!(has_rds ^ has_route_config)) {
Expand All @@ -23,13 +22,11 @@ RouteConfigProviderUtil::create(const Json::Object& config, Runtime::Loader& run
return RouteConfigProviderPtr{
new StaticRouteConfigProviderImpl(*config.getObject("route_config"), runtime, cm)};
} else {
// TODO: Ordered initialization of RDS: 1) CDS/clusters, 2) RDS, 3) start listening. This
// will be done in a follow up where we will add a formal init handler in the server.
Json::ObjectPtr rds_config = config.getObject("rds");
rds_config->validateSchema(Json::Schema::RDS_CONFIGURATION_SCHEMA);
std::unique_ptr<RdsRouteConfigProviderImpl> provider{new RdsRouteConfigProviderImpl(
*rds_config, runtime, cm, dispatcher, random, local_info, scope, stat_prefix, tls)};
provider->initialize();
provider->registerInitTarget(init_manager);
return std::move(provider);
}
}
Expand All @@ -55,6 +52,10 @@ RdsRouteConfigProviderImpl::RdsRouteConfigProviderImpl(
throw EnvoyException("rds: setting --service-cluster and --service-node are required");
}

if (!cm.get(remote_cluster_name_)) {
throw EnvoyException(fmt::format("rds: unknown remote cluster '{}'", remote_cluster_name_));
}

ConfigPtr initial_config(new NullConfigImpl());
tls_.set(tls_slot_, [initial_config](Event::Dispatcher&) -> ThreadLocal::ThreadLocalObjectPtr {
return ThreadLocal::ThreadLocalObjectPtr{new ThreadLocalConfig(initial_config)};
Expand Down Expand Up @@ -94,6 +95,13 @@ void RdsRouteConfigProviderImpl::parseResponse(const Http::Message& response) {
stats_.update_success_.inc();
}

void RdsRouteConfigProviderImpl::onFetchComplete() {
if (initialize_callback_) {
initialize_callback_();
initialize_callback_ = nullptr;
}
}

void RdsRouteConfigProviderImpl::onFetchFailure(EnvoyException* e) {
stats_.update_failure_.inc();
if (e) {
Expand All @@ -103,4 +111,8 @@ void RdsRouteConfigProviderImpl::onFetchFailure(EnvoyException* e) {
}
}

void RdsRouteConfigProviderImpl::registerInitTarget(Init::Manager& init_manager) {
init_manager.registerTarget(*this);
}

} // Router
29 changes: 20 additions & 9 deletions source/common/router/rds_impl.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include "envoy/init/init.h"
#include "envoy/json/json_object.h"
#include "envoy/local_info/local_info.h"
#include "envoy/router/rds.h"
Expand All @@ -23,7 +24,8 @@ class RouteConfigProviderUtil {
Upstream::ClusterManager& cm, Event::Dispatcher& dispatcher,
Runtime::RandomGenerator& random,
const LocalInfo::LocalInfo& local_info, Stats::Scope& scope,
const std::string& stat_prefix, ThreadLocal::Instance& tls);
const std::string& stat_prefix, ThreadLocal::Instance& tls,
Init::Manager& init_manager);
};

/**
Expand Down Expand Up @@ -64,24 +66,23 @@ struct RdsStats {
* the RDS API.
*/
class RdsRouteConfigProviderImpl : public RouteConfigProvider,
public Init::Target,
Http::RestApiFetcher,
Logger::Loggable<Logger::Id::router> {
public:
RdsRouteConfigProviderImpl(const Json::Object& config, Runtime::Loader& runtime,
Upstream::ClusterManager& cm, Event::Dispatcher& dispatcher,
Runtime::RandomGenerator& random,
const LocalInfo::LocalInfo& local_info, Stats::Scope& scope,
const std::string& stat_prefix, ThreadLocal::Instance& tls);

void initialize() { RestApiFetcher::initialize(); }
// Init::Target
void initialize(std::function<void()> callback) override {
initialize_callback_ = callback;
RestApiFetcher::initialize();
}

// Router::RouteConfigProvider
Router::ConfigPtr config() override;

// Http::RestApiFetcher
void createRequest(Http::Message& request) override;
void parseResponse(const Http::Message& response) override;
void onFetchComplete() override {}
void onFetchComplete() override;
void onFetchFailure(EnvoyException* e) override;

private:
Expand All @@ -94,6 +95,13 @@ class RdsRouteConfigProviderImpl : public RouteConfigProvider,
ConfigPtr config_;
};

RdsRouteConfigProviderImpl(const Json::Object& config, Runtime::Loader& runtime,
Upstream::ClusterManager& cm, Event::Dispatcher& dispatcher,
Runtime::RandomGenerator& random,
const LocalInfo::LocalInfo& local_info, Stats::Scope& scope,
const std::string& stat_prefix, ThreadLocal::Instance& tls);
void registerInitTarget(Init::Manager& init_manager);

Runtime::Loader& runtime_;
const LocalInfo::LocalInfo& local_info_;
ThreadLocal::Instance& tls_;
Expand All @@ -102,6 +110,9 @@ class RdsRouteConfigProviderImpl : public RouteConfigProvider,
bool initialized_{};
uint64_t last_config_hash_{};
RdsStats stats_;
std::function<void()> initialize_callback_;

friend class RouteConfigProviderUtil;
};

} // Router
3 changes: 2 additions & 1 deletion source/server/config/network/http_connection_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ HttpConnectionManagerConfig::HttpConnectionManagerConfig(const Json::Object& con

route_config_provider_ = Router::RouteConfigProviderUtil::create(
config, server.runtime(), server.clusterManager(), server.dispatcher(), server.random(),
server.localInfo(), server.stats(), stats_prefix_, server.threadLocal());
server.localInfo(), server.stats(), stats_prefix_, server.threadLocal(),
server.initManager());

if (config.hasObject("use_remote_address")) {
use_remote_address_ = config.getBoolean("use_remote_address");
Expand Down
69 changes: 50 additions & 19 deletions source/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,32 @@

namespace Server {

void InitManagerImpl::initialize(std::function<void()> callback) {
ASSERT(state_ == State::NotInitialized);
if (targets_.empty()) {
callback();
state_ = State::Initialized;
} else {
callback_ = callback;
state_ = State::Initializing;
for (auto target : targets_) {
target->initialize([this, target]() -> void {
ASSERT(std::find(targets_.begin(), targets_.end(), target) != targets_.end());
targets_.remove(target);
if (targets_.empty()) {
state_ = State::Initialized;
callback_();
}
});
}
}
}

void InitManagerImpl::registerTarget(Init::Target& target) {
ASSERT(state_ == State::NotInitialized);
targets_.push_back(&target);
}

InstanceImpl::InstanceImpl(Options& options, TestHooks& hooks, HotRestart& restarter,
Stats::StoreRoot& store, Thread::BasicLockable& access_log_lock,
ComponentFactory& component_factory,
Expand Down Expand Up @@ -50,6 +76,7 @@ InstanceImpl::InstanceImpl(Options& options, TestHooks& hooks, HotRestart& resta
initialize(options, hooks, component_factory);
} catch (const EnvoyException& e) {
log().critical("error initializing configuration '{}': {}", options.configPath(), e.what());
thread_local_.shutdownThread();
exit(1);
}
}
Expand Down Expand Up @@ -212,28 +239,32 @@ void InstanceImpl::initialize(Options& options, TestHooks& hooks,

// Register for cluster manager init notification. We don't start serving worker traffic until
// upstream clusters are initialized which may involve running the event loop. Note however that
// if there are only static clusters this will fire immediately.
// this can fire immediately if all clusters have already initialized.
clusterManager().setInitializedCb([this, &hooks]() -> void {
log().warn("all clusters initialized. starting workers");
for (const WorkerPtr& worker : workers_) {
try {
worker->initializeConfiguration(*config_, socket_map_);
} catch (const Network::CreateListenerException& e) {
// It is possible that we fail to start listening on a port, even though we were able to
// bind to it above. This happens when there is a race between two applications to listen
// on the same port. In general if we can't initialize the worker configuration just print
// the error and exit cleanly without crashing.
log().critical("shutting down due to error initializing worker configuration: {}",
e.what());
shutdown();
}
log().warn("all clusters initialized. initializing init manager");
init_manager_.initialize([this, &hooks]() -> void { startWorkers(hooks); });
});
}

void InstanceImpl::startWorkers(TestHooks& hooks) {
log().warn("all dependencies initialized. starting workers");
for (const WorkerPtr& worker : workers_) {
try {
worker->initializeConfiguration(*config_, socket_map_);
} catch (const Network::CreateListenerException& e) {
// It is possible that we fail to start listening on a port, even though we were able to
// bind to it above. This happens when there is a race between two applications to listen
// on the same port. In general if we can't initialize the worker configuration just print
// the error and exit cleanly without crashing.
log().critical("shutting down due to error initializing worker configuration: {}", e.what());
shutdown();
}
}

// At this point we are ready to take traffic and all listening ports are up. Notify our parent
// if applicable that they can stop listening and drain.
restarter_.drainParentListeners();
hooks.onServerInitialized();
});
// At this point we are ready to take traffic and all listening ports are up. Notify our parent
// if applicable that they can stop listening and drain.
restarter_.drainParentListeners();
hooks.onServerInitialized();
}

Runtime::LoaderPtr InstanceUtil::createRuntime(Instance& server,
Expand Down
21 changes: 21 additions & 0 deletions source/server/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,24 @@ class InstanceUtil : Logger::Loggable<Logger::Id::main> {
static Runtime::LoaderPtr createRuntime(Instance& server, Server::Configuration::Initial& config);
};

/**
* Implementation of Init::Manager for use during post cluster manager init / pre listening.
*/
class InitManagerImpl : public Init::Manager {
public:
void initialize(std::function<void()> callback);

// Init::Manager
void registerTarget(Init::Target& target) override;

private:
enum class State { NotInitialized, Initializing, Initialized };

std::list<Init::Target*> targets_;
State state_{State::NotInitialized};
std::function<void()> callback_;
};

/**
* This is the actual full standalone server which stiches together various common components.
*/
Expand Down Expand Up @@ -95,6 +113,7 @@ class InstanceImpl : Logger::Loggable<Logger::Id::main>, public Instance {
int getListenSocketFd(uint32_t port) override;
void getParentStats(HotRestart::GetParentStatsInfo& info) override;
HotRestart& hotRestart() override { return restarter_; }
Init::Manager& initManager() override { return init_manager_; }
Runtime::RandomGenerator& random() override { return random_generator_; }
RateLimit::ClientPtr
rateLimitClient(const Optional<std::chrono::milliseconds>& timeout) override {
Expand All @@ -118,6 +137,7 @@ class InstanceImpl : Logger::Loggable<Logger::Id::main>, public Instance {
void initializeStatSinks();
void loadServerFlags(const Optional<std::string>& flags_path);
uint64_t numConnections();
void startWorkers(TestHooks& hooks);

Options& options_;
HotRestart& restarter_;
Expand All @@ -143,6 +163,7 @@ class InstanceImpl : Logger::Loggable<Logger::Id::main>, public Instance {
const LocalInfo::LocalInfo& local_info_;
DrainManagerPtr drain_manager_;
AccessLog::AccessLogManagerImpl access_log_manager_;
InitManagerImpl init_manager_;
};

} // Server
2 changes: 2 additions & 0 deletions test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ add_executable(envoy-test
mocks/filesystem/mocks.cc
mocks/grpc/mocks.cc
mocks/http/mocks.cc
mocks/init/mocks.cc
mocks/local_info/mocks.cc
mocks/network/mocks.cc
mocks/ratelimit/mocks.cc
Expand All @@ -150,6 +151,7 @@ add_executable(envoy-test
server/http/admin_test.cc
server/http/health_check_test.cc
server/options_impl_test.cc
server/server_test.cc
test_common/printers.cc
test_common/utility.cc)

Expand Down
Loading