Skip to content

Commit

Permalink
QUIC hot restart part 6 - child instance pauses listening until paren…
Browse files Browse the repository at this point in the history
…t is drained (envoyproxy#31130)

QUIC hot restart part 6 - child instance pauses listening until parent is drained (envoyproxy#31130)

---------

Signed-off-by: Raven Black <[email protected]>
  • Loading branch information
ravenblackx authored Feb 29, 2024
1 parent 88cc302 commit f7352b3
Show file tree
Hide file tree
Showing 25 changed files with 376 additions and 29 deletions.
6 changes: 6 additions & 0 deletions envoy/network/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ envoy_cc_library(
],
)

envoy_cc_library(
name = "parent_drained_callback_registrar_interface",
hdrs = ["parent_drained_callback_registrar.h"],
deps = [":address_interface"],
)

envoy_cc_library(
name = "udp_packet_writer_handler_interface",
hdrs = ["udp_packet_writer_handler.h"],
Expand Down
29 changes: 29 additions & 0 deletions envoy/network/parent_drained_callback_registrar.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#pragma once

#include "envoy/network/address.h"

#include "absl/functional/any_invocable.h"

namespace Envoy {
namespace Network {

/**
* An interface through which a UDP listen socket, especially a QUIC socket, can
* postpone reading during hot restart until the parent instance is drained.
*/
class ParentDrainedCallbackRegistrar {
public:
/**
* @param address is the address of the listener.
* @param callback the function to call when the listener matching address is
* drained on the parent instance.
*/
virtual void registerParentDrainedCallback(const Address::InstanceConstSharedPtr& address,
absl::AnyInvocable<void()> callback) PURE;

protected:
virtual ~ParentDrainedCallbackRegistrar() = default;
};

} // namespace Network
} // namespace Envoy
7 changes: 7 additions & 0 deletions envoy/network/socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,13 @@ class Socket {
* @return the socket options stored earlier with addOption() and addOptions() calls, if any.
*/
virtual const OptionsSharedPtr& options() const PURE;

/**
* @return a ParentDrainedCallbackRegistrar for UDP listen sockets during hot restart.
*/
virtual OptRef<class ParentDrainedCallbackRegistrar> parentDrainedCallbackRegistrar() const {
return absl::nullopt;
}
};

using SocketPtr = std::unique_ptr<Socket>;
Expand Down
11 changes: 11 additions & 0 deletions envoy/server/hot_restart.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,17 @@ class HotRestart {
virtual void
registerUdpForwardingListener(Network::Address::InstanceConstSharedPtr address,
std::shared_ptr<Network::UdpListenerConfig> listener_config) PURE;

/**
* @return An interface on which registerParentDrainedCallback can be called during
* creation of a listener, or nullopt if there is no parent instance.
*
* If this is set, any UDP listener should start paused and only begin listening
* when the parent instance is drained; this allows draining QUIC listeners to
* catch their own packets and forward unrecognized packets to the child instance.
*/
virtual OptRef<Network::ParentDrainedCallbackRegistrar> parentDrainedCallbackRegistrar() PURE;

/**
* Initialize the parent logic of our restarter. Meant to be called after initialization of a
* new child has begun. The hot restart implementation needs to be created early to deal with
Expand Down
5 changes: 4 additions & 1 deletion source/common/listener_manager/listener_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,10 @@ Network::SocketSharedPtr ProdListenerComponentFactory::createListenSocket(
if (socket_type == Network::Socket::Type::Stream) {
return std::make_shared<Network::TcpListenSocket>(std::move(io_handle), address, options);
} else {
return std::make_shared<Network::UdpListenSocket>(std::move(io_handle), address, options);
auto socket = std::make_shared<Network::UdpListenSocket>(
std::move(io_handle), address, options,
server_.hotRestart().parentDrainedCallbackRegistrar());
return socket;
}
}
}
Expand Down
1 change: 1 addition & 0 deletions source/common/network/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,7 @@ envoy_cc_library(
"//envoy/event:file_event_interface",
"//envoy/network:exception_interface",
"//envoy/network:listener_interface",
"//envoy/network:parent_drained_callback_registrar_interface",
"//envoy/runtime:runtime_interface",
"//envoy/stats:stats_interface",
"//envoy/stats:stats_macros",
Expand Down
19 changes: 16 additions & 3 deletions source/common/network/listen_socket_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,19 @@ template <typename T> class NetworkListenSocket : public ListenSocketImpl {
}
}

NetworkListenSocket(IoHandlePtr&& io_handle, const Address::InstanceConstSharedPtr& address,
const Network::Socket::OptionsSharedPtr& options)
: ListenSocketImpl(std::move(io_handle), address) {
NetworkListenSocket(
IoHandlePtr&& io_handle, const Address::InstanceConstSharedPtr& address,
const Network::Socket::OptionsSharedPtr& options,
OptRef<ParentDrainedCallbackRegistrar> parent_drained_callback_registrar = absl::nullopt)
: ListenSocketImpl(std::move(io_handle), address),
parent_drained_callback_registrar_(parent_drained_callback_registrar) {
setListenSocketOptions(options);
}

OptRef<ParentDrainedCallbackRegistrar> parentDrainedCallbackRegistrar() const override {
return parent_drained_callback_registrar_;
}

Socket::Type socketType() const override { return T::type; }

SocketPtr duplicate() override {
Expand Down Expand Up @@ -110,6 +117,12 @@ template <typename T> class NetworkListenSocket : public ListenSocketImpl {
}

protected:
// Usually a socket when initialized starts listening for ready-to-read or ready-to-write events;
// for a QUIC socket during hot restart this is undesirable as the parent instance needs to
// receive all packets; in that case this interface is set, and listening won't begin until the
// callback is called.
OptRef<ParentDrainedCallbackRegistrar> parent_drained_callback_registrar_;

void setPrebindSocketOptions() {
// On Windows, SO_REUSEADDR does not restrict subsequent bind calls when there is a listener as
// on Linux and later BSD socket stacks.
Expand Down
40 changes: 37 additions & 3 deletions source/common/network/udp_listener_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "envoy/common/platform.h"
#include "envoy/config/core/v3/base.pb.h"
#include "envoy/network/exception.h"
#include "envoy/network/parent_drained_callback_registrar.h"

#include "source/common/api/os_sys_calls_impl.h"
#include "source/common/common/assert.h"
Expand All @@ -34,20 +35,53 @@ UdpListenerImpl::UdpListenerImpl(Event::Dispatcher& dispatcher, SocketSharedPtr
: BaseListenerImpl(dispatcher, std::move(socket)), cb_(cb), time_source_(time_source),
// Default prefer_gro to false for downstream server traffic.
config_(config, false) {
parent_drained_callback_registrar_ = socket_->parentDrainedCallbackRegistrar();
socket_->ioHandle().initializeFileEvent(
dispatcher, [this](uint32_t events) -> void { onSocketEvent(events); },
Event::PlatformDefaultTriggerType, Event::FileReadyType::Read | Event::FileReadyType::Write);
Event::PlatformDefaultTriggerType, paused() ? 0 : events_when_unpaused_);
if (paused()) {
parent_drained_callback_registrar_->registerParentDrainedCallback(
socket_->connectionInfoProvider().localAddress(),
[this, &dispatcher, alive = std::weak_ptr<void>(destruction_checker_)]() {
dispatcher.post([this, alive = std::move(alive)]() {
auto still_alive = alive.lock();
if (still_alive != nullptr) {
unpause();
}
});
});
}
}
void UdpListenerImpl::unpause() {
// Remove the paused state so enable will actually start listening to events.
parent_drained_callback_registrar_ = absl::nullopt;
if (events_when_unpaused_ != 0) {
// Start listening to events.
enable();
// There may have already been events while this instance was ignoring them,
// so try reading immediately.
activateRead();
}
}
UdpListenerImpl::~UdpListenerImpl() { socket_->ioHandle().resetFileEvents(); }
void UdpListenerImpl::disable() { disableEvent(); }
void UdpListenerImpl::enable() {
socket_->ioHandle().enableFileEvents(Event::FileReadyType::Read | Event::FileReadyType::Write);
events_when_unpaused_ = Event::FileReadyType::Read | Event::FileReadyType::Write;
if (!paused()) {
socket_->ioHandle().enableFileEvents(events_when_unpaused_);
}
}
void UdpListenerImpl::disableEvent() { socket_->ioHandle().enableFileEvents(0); }
void UdpListenerImpl::disableEvent() {
events_when_unpaused_ = 0;
if (!paused()) {
socket_->ioHandle().enableFileEvents(0);
}
}
void UdpListenerImpl::onSocketEvent(short flags) {
ASSERT((flags & (Event::FileReadyType::Read | Event::FileReadyType::Write)));
Expand Down
6 changes: 6 additions & 0 deletions source/common/network/udp_listener_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ class UdpListenerImpl : public BaseListenerImpl,
TimeSource& time_source, const envoy::config::core::v3::UdpSocketConfig& config);
~UdpListenerImpl() override;
uint32_t packetsDropped() { return packets_dropped_; }
bool paused() const { return parent_drained_callback_registrar_ != absl::nullopt; }
void unpause();

// Network::Listener
void disable() override;
Expand Down Expand Up @@ -63,6 +65,10 @@ class UdpListenerImpl : public BaseListenerImpl,

TimeSource& time_source_;
const ResolvedUdpSocketConfig config_;
OptRef<ParentDrainedCallbackRegistrar> parent_drained_callback_registrar_;
// Taking a weak_ptr to this lets us detect if the listener has been destroyed.
std::shared_ptr<bool> destruction_checker_ = std::make_shared<bool>(true);
uint32_t events_when_unpaused_ = Event::FileReadyType::Read | Event::FileReadyType::Write;
};

class UdpListenerWorkerRouterImpl : public UdpListenerWorkerRouter {
Expand Down
1 change: 1 addition & 0 deletions source/server/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ envoy_cc_library(
hdrs = envoy_select_hot_restart(["hot_restarting_child.h"]),
deps = [
":hot_restarting_base",
"//envoy/network:parent_drained_callback_registrar_interface",
"//source/common/stats:stat_merger_lib",
],
)
Expand Down
4 changes: 4 additions & 0 deletions source/server/hot_restart_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ void HotRestartImpl::registerUdpForwardingListener(
as_child_.registerUdpForwardingListener(address, listener_config);
}

OptRef<Network::ParentDrainedCallbackRegistrar> HotRestartImpl::parentDrainedCallbackRegistrar() {
return as_child_;
}

void HotRestartImpl::initialize(Event::Dispatcher& dispatcher, Server::Instance& server) {
as_parent_.initialize(dispatcher, server);
as_child_.initialize(dispatcher);
Expand Down
1 change: 1 addition & 0 deletions source/server/hot_restart_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ class HotRestartImpl : public HotRestart {
void registerUdpForwardingListener(
Network::Address::InstanceConstSharedPtr address,
std::shared_ptr<Network::UdpListenerConfig> listener_config) override;
OptRef<Network::ParentDrainedCallbackRegistrar> parentDrainedCallbackRegistrar() override;
void initialize(Event::Dispatcher& dispatcher, Server::Instance& server) override;
absl::optional<AdminShutdownResponse> sendParentAdminShutdownRequest() override;
void sendParentTerminateRequest() override;
Expand Down
3 changes: 3 additions & 0 deletions source/server/hot_restart_nop_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ class HotRestartNopImpl : public Server::HotRestart {
int duplicateParentListenSocket(const std::string&, uint32_t) override { return -1; }
void registerUdpForwardingListener(Network::Address::InstanceConstSharedPtr,
std::shared_ptr<Network::UdpListenerConfig>) override {}
OptRef<Network::ParentDrainedCallbackRegistrar> parentDrainedCallbackRegistrar() override {
return absl::nullopt;
}
void initialize(Event::Dispatcher&, Server::Instance&) override {}
absl::optional<AdminShutdownResponse> sendParentAdminShutdownRequest() override {
return absl::nullopt;
Expand Down
53 changes: 39 additions & 14 deletions source/server/hot_restarting_child.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,12 @@ HotRestartingChild::UdpForwardingContext::getListenerForDestination(
return it->second;
}

// If restart_epoch is 0 there is no parent, so it's effectively already
// drained and terminated.
HotRestartingChild::HotRestartingChild(int base_id, int restart_epoch,
const std::string& socket_path, mode_t socket_mode)
: HotRestartingBase(base_id), restart_epoch_(restart_epoch) {
: HotRestartingBase(base_id), restart_epoch_(restart_epoch),
parent_terminated_(restart_epoch == 0), parent_drained_(restart_epoch == 0) {
main_rpc_stream_.initDomainSocketAddress(&parent_address_);
std::string socket_path_udp = socket_path + "_udp";
udp_forwarding_rpc_stream_.initDomainSocketAddress(&parent_address_udp_forwarding_);
Expand Down Expand Up @@ -102,7 +105,7 @@ void HotRestartingChild::onForwardedUdpPacket(uint32_t worker_index, Network::Ud

int HotRestartingChild::duplicateParentListenSocket(const std::string& address,
uint32_t worker_index) {
if (restart_epoch_ == 0 || parent_terminated_) {
if (parent_terminated_) {
return -1;
}

Expand All @@ -121,7 +124,7 @@ int HotRestartingChild::duplicateParentListenSocket(const std::string& address,
}

std::unique_ptr<HotRestartMessage> HotRestartingChild::getParentStats() {
if (restart_epoch_ == 0 || parent_terminated_) {
if (parent_terminated_) {
return nullptr;
}

Expand All @@ -138,7 +141,7 @@ std::unique_ptr<HotRestartMessage> HotRestartingChild::getParentStats() {
}

void HotRestartingChild::drainParentListeners() {
if (restart_epoch_ == 0 || parent_terminated_) {
if (parent_terminated_) {
return;
}
// No reply expected.
Expand All @@ -154,9 +157,27 @@ void HotRestartingChild::registerUdpForwardingListener(
udp_forwarding_context_.registerListener(address, listener_config);
}

void HotRestartingChild::registerParentDrainedCallback(
const Network::Address::InstanceConstSharedPtr& address, absl::AnyInvocable<void()> callback) {
if (parent_drained_) {
callback();
} else {
on_drained_actions_.emplace(address->asString(), std::move(callback));
}
}

void HotRestartingChild::allDrainsImplicitlyComplete() {
for (auto& drain_action : on_drained_actions_) {
// Call the callback.
std::move(drain_action.second)();
}
on_drained_actions_.clear();
parent_drained_ = true;
}

absl::optional<HotRestart::AdminShutdownResponse>
HotRestartingChild::sendParentAdminShutdownRequest() {
if (restart_epoch_ == 0 || parent_terminated_) {
if (parent_terminated_) {
return absl::nullopt;
}

Expand All @@ -176,25 +197,29 @@ HotRestartingChild::sendParentAdminShutdownRequest() {
}

void HotRestartingChild::sendParentTerminateRequest() {
if (restart_epoch_ == 0 || parent_terminated_) {
if (parent_terminated_) {
return;
}
allDrainsImplicitlyComplete();

HotRestartMessage wrapped_request;
wrapped_request.mutable_request()->mutable_terminate();
main_rpc_stream_.sendHotRestartMessage(parent_address_, wrapped_request);
parent_terminated_ = true;

// Note that the 'generation' counter needs to retain the contribution from
// the parent.
stat_merger_->retainParentGaugeValue(hot_restart_generation_stat_name_);
if (stat_merger_ != nullptr) {
stat_merger_->retainParentGaugeValue(hot_restart_generation_stat_name_);

// Now it is safe to forget our stat transferral state.
//
// This destruction is actually important far beyond memory efficiency. The
// scope-based temporary counter logic relies on the StatMerger getting
// destroyed once hot restart's stat merging is all done. (See stat_merger.h
// for details).
stat_merger_.reset();
// Now it is safe to forget our stat transferral state.
//
// This destruction is actually important far beyond memory efficiency. The
// scope-based temporary counter logic relies on the StatMerger getting
// destroyed once hot restart's stat merging is all done. (See stat_merger.h
// for details).
stat_merger_.reset();
}
}

void HotRestartingChild::mergeParentStats(Stats::Store& stats_store,
Expand Down
Loading

0 comments on commit f7352b3

Please sign in to comment.