From f7352b3237d5b16cf7e078733b32ba158e66d086 Mon Sep 17 00:00:00 2001 From: Raven Black Date: Thu, 29 Feb 2024 12:37:43 -0500 Subject: [PATCH] QUIC hot restart part 6 - child instance pauses listening until parent is drained (#31130) QUIC hot restart part 6 - child instance pauses listening until parent is drained (#31130) --------- Signed-off-by: Raven Black --- envoy/network/BUILD | 6 ++ .../parent_drained_callback_registrar.h | 29 +++++ envoy/network/socket.h | 7 ++ envoy/server/hot_restart.h | 11 ++ .../listener_manager/listener_manager_impl.cc | 5 +- source/common/network/BUILD | 1 + source/common/network/listen_socket_impl.h | 19 +++- source/common/network/udp_listener_impl.cc | 40 ++++++- source/common/network/udp_listener_impl.h | 6 ++ source/server/BUILD | 1 + source/server/hot_restart_impl.cc | 4 + source/server/hot_restart_impl.h | 1 + source/server/hot_restart_nop_impl.h | 3 + source/server/hot_restarting_child.cc | 53 ++++++--- source/server/hot_restarting_child.h | 17 ++- test/common/network/BUILD | 1 + .../udp_listener_impl_batch_writer_test.cc | 1 + test/common/network/udp_listener_impl_test.cc | 102 ++++++++++++++++++ .../network/udp_listener_impl_test_base.h | 27 ++++- .../python/hotrestart_handoff_test.py | 2 +- test/mocks/network/BUILD | 6 ++ .../mock_parent_drained_callback_registrar.h | 18 ++++ test/mocks/server/hot_restart.h | 2 + test/server/hot_restart_impl_test.cc | 8 ++ test/server/hot_restarting_child_test.cc | 35 ++++++ 25 files changed, 376 insertions(+), 29 deletions(-) create mode 100644 envoy/network/parent_drained_callback_registrar.h create mode 100644 test/mocks/network/mock_parent_drained_callback_registrar.h diff --git a/envoy/network/BUILD b/envoy/network/BUILD index 3e7d51a07e90..230f7c065d58 100644 --- a/envoy/network/BUILD +++ b/envoy/network/BUILD @@ -58,6 +58,12 @@ envoy_cc_library( ], ) +envoy_cc_library( + name = "parent_drained_callback_registrar_interface", + hdrs = ["parent_drained_callback_registrar.h"], + deps = [":address_interface"], +) + envoy_cc_library( name = "udp_packet_writer_handler_interface", hdrs = ["udp_packet_writer_handler.h"], diff --git a/envoy/network/parent_drained_callback_registrar.h b/envoy/network/parent_drained_callback_registrar.h new file mode 100644 index 000000000000..d0ce7c9a191e --- /dev/null +++ b/envoy/network/parent_drained_callback_registrar.h @@ -0,0 +1,29 @@ +#pragma once + +#include "envoy/network/address.h" + +#include "absl/functional/any_invocable.h" + +namespace Envoy { +namespace Network { + +/** + * An interface through which a UDP listen socket, especially a QUIC socket, can + * postpone reading during hot restart until the parent instance is drained. + */ +class ParentDrainedCallbackRegistrar { +public: + /** + * @param address is the address of the listener. + * @param callback the function to call when the listener matching address is + * drained on the parent instance. + */ + virtual void registerParentDrainedCallback(const Address::InstanceConstSharedPtr& address, + absl::AnyInvocable callback) PURE; + +protected: + virtual ~ParentDrainedCallbackRegistrar() = default; +}; + +} // namespace Network +} // namespace Envoy diff --git a/envoy/network/socket.h b/envoy/network/socket.h index b83d0f213047..6091cf9fe922 100644 --- a/envoy/network/socket.h +++ b/envoy/network/socket.h @@ -542,6 +542,13 @@ class Socket { * @return the socket options stored earlier with addOption() and addOptions() calls, if any. */ virtual const OptionsSharedPtr& options() const PURE; + + /** + * @return a ParentDrainedCallbackRegistrar for UDP listen sockets during hot restart. + */ + virtual OptRef parentDrainedCallbackRegistrar() const { + return absl::nullopt; + } }; using SocketPtr = std::unique_ptr; diff --git a/envoy/server/hot_restart.h b/envoy/server/hot_restart.h index a1ce1663cde4..8e201dd65e08 100644 --- a/envoy/server/hot_restart.h +++ b/envoy/server/hot_restart.h @@ -62,6 +62,17 @@ class HotRestart { virtual void registerUdpForwardingListener(Network::Address::InstanceConstSharedPtr address, std::shared_ptr listener_config) PURE; + + /** + * @return An interface on which registerParentDrainedCallback can be called during + * creation of a listener, or nullopt if there is no parent instance. + * + * If this is set, any UDP listener should start paused and only begin listening + * when the parent instance is drained; this allows draining QUIC listeners to + * catch their own packets and forward unrecognized packets to the child instance. + */ + virtual OptRef parentDrainedCallbackRegistrar() PURE; + /** * Initialize the parent logic of our restarter. Meant to be called after initialization of a * new child has begun. The hot restart implementation needs to be created early to deal with diff --git a/source/common/listener_manager/listener_manager_impl.cc b/source/common/listener_manager/listener_manager_impl.cc index 853087d0b1bf..930378a94b94 100644 --- a/source/common/listener_manager/listener_manager_impl.cc +++ b/source/common/listener_manager/listener_manager_impl.cc @@ -321,7 +321,10 @@ Network::SocketSharedPtr ProdListenerComponentFactory::createListenSocket( if (socket_type == Network::Socket::Type::Stream) { return std::make_shared(std::move(io_handle), address, options); } else { - return std::make_shared(std::move(io_handle), address, options); + auto socket = std::make_shared( + std::move(io_handle), address, options, + server_.hotRestart().parentDrainedCallbackRegistrar()); + return socket; } } } diff --git a/source/common/network/BUILD b/source/common/network/BUILD index c563ca21428b..53ca953779a4 100644 --- a/source/common/network/BUILD +++ b/source/common/network/BUILD @@ -344,6 +344,7 @@ envoy_cc_library( "//envoy/event:file_event_interface", "//envoy/network:exception_interface", "//envoy/network:listener_interface", + "//envoy/network:parent_drained_callback_registrar_interface", "//envoy/runtime:runtime_interface", "//envoy/stats:stats_interface", "//envoy/stats:stats_macros", diff --git a/source/common/network/listen_socket_impl.h b/source/common/network/listen_socket_impl.h index 1dd9a6f668d7..e17e3178637d 100644 --- a/source/common/network/listen_socket_impl.h +++ b/source/common/network/listen_socket_impl.h @@ -68,12 +68,19 @@ template class NetworkListenSocket : public ListenSocketImpl { } } - NetworkListenSocket(IoHandlePtr&& io_handle, const Address::InstanceConstSharedPtr& address, - const Network::Socket::OptionsSharedPtr& options) - : ListenSocketImpl(std::move(io_handle), address) { + NetworkListenSocket( + IoHandlePtr&& io_handle, const Address::InstanceConstSharedPtr& address, + const Network::Socket::OptionsSharedPtr& options, + OptRef parent_drained_callback_registrar = absl::nullopt) + : ListenSocketImpl(std::move(io_handle), address), + parent_drained_callback_registrar_(parent_drained_callback_registrar) { setListenSocketOptions(options); } + OptRef parentDrainedCallbackRegistrar() const override { + return parent_drained_callback_registrar_; + } + Socket::Type socketType() const override { return T::type; } SocketPtr duplicate() override { @@ -110,6 +117,12 @@ template class NetworkListenSocket : public ListenSocketImpl { } protected: + // Usually a socket when initialized starts listening for ready-to-read or ready-to-write events; + // for a QUIC socket during hot restart this is undesirable as the parent instance needs to + // receive all packets; in that case this interface is set, and listening won't begin until the + // callback is called. + OptRef parent_drained_callback_registrar_; + void setPrebindSocketOptions() { // On Windows, SO_REUSEADDR does not restrict subsequent bind calls when there is a listener as // on Linux and later BSD socket stacks. diff --git a/source/common/network/udp_listener_impl.cc b/source/common/network/udp_listener_impl.cc index 62c5b273db96..a184eaae7036 100644 --- a/source/common/network/udp_listener_impl.cc +++ b/source/common/network/udp_listener_impl.cc @@ -9,6 +9,7 @@ #include "envoy/common/platform.h" #include "envoy/config/core/v3/base.pb.h" #include "envoy/network/exception.h" +#include "envoy/network/parent_drained_callback_registrar.h" #include "source/common/api/os_sys_calls_impl.h" #include "source/common/common/assert.h" @@ -34,9 +35,34 @@ UdpListenerImpl::UdpListenerImpl(Event::Dispatcher& dispatcher, SocketSharedPtr : BaseListenerImpl(dispatcher, std::move(socket)), cb_(cb), time_source_(time_source), // Default prefer_gro to false for downstream server traffic. config_(config, false) { + parent_drained_callback_registrar_ = socket_->parentDrainedCallbackRegistrar(); socket_->ioHandle().initializeFileEvent( dispatcher, [this](uint32_t events) -> void { onSocketEvent(events); }, - Event::PlatformDefaultTriggerType, Event::FileReadyType::Read | Event::FileReadyType::Write); + Event::PlatformDefaultTriggerType, paused() ? 0 : events_when_unpaused_); + if (paused()) { + parent_drained_callback_registrar_->registerParentDrainedCallback( + socket_->connectionInfoProvider().localAddress(), + [this, &dispatcher, alive = std::weak_ptr(destruction_checker_)]() { + dispatcher.post([this, alive = std::move(alive)]() { + auto still_alive = alive.lock(); + if (still_alive != nullptr) { + unpause(); + } + }); + }); + } +} + +void UdpListenerImpl::unpause() { + // Remove the paused state so enable will actually start listening to events. + parent_drained_callback_registrar_ = absl::nullopt; + if (events_when_unpaused_ != 0) { + // Start listening to events. + enable(); + // There may have already been events while this instance was ignoring them, + // so try reading immediately. + activateRead(); + } } UdpListenerImpl::~UdpListenerImpl() { socket_->ioHandle().resetFileEvents(); } @@ -44,10 +70,18 @@ UdpListenerImpl::~UdpListenerImpl() { socket_->ioHandle().resetFileEvents(); } void UdpListenerImpl::disable() { disableEvent(); } void UdpListenerImpl::enable() { - socket_->ioHandle().enableFileEvents(Event::FileReadyType::Read | Event::FileReadyType::Write); + events_when_unpaused_ = Event::FileReadyType::Read | Event::FileReadyType::Write; + if (!paused()) { + socket_->ioHandle().enableFileEvents(events_when_unpaused_); + } } -void UdpListenerImpl::disableEvent() { socket_->ioHandle().enableFileEvents(0); } +void UdpListenerImpl::disableEvent() { + events_when_unpaused_ = 0; + if (!paused()) { + socket_->ioHandle().enableFileEvents(0); + } +} void UdpListenerImpl::onSocketEvent(short flags) { ASSERT((flags & (Event::FileReadyType::Read | Event::FileReadyType::Write))); diff --git a/source/common/network/udp_listener_impl.h b/source/common/network/udp_listener_impl.h index 723c3c74de75..244f93f3923b 100644 --- a/source/common/network/udp_listener_impl.h +++ b/source/common/network/udp_listener_impl.h @@ -26,6 +26,8 @@ class UdpListenerImpl : public BaseListenerImpl, TimeSource& time_source, const envoy::config::core::v3::UdpSocketConfig& config); ~UdpListenerImpl() override; uint32_t packetsDropped() { return packets_dropped_; } + bool paused() const { return parent_drained_callback_registrar_ != absl::nullopt; } + void unpause(); // Network::Listener void disable() override; @@ -63,6 +65,10 @@ class UdpListenerImpl : public BaseListenerImpl, TimeSource& time_source_; const ResolvedUdpSocketConfig config_; + OptRef parent_drained_callback_registrar_; + // Taking a weak_ptr to this lets us detect if the listener has been destroyed. + std::shared_ptr destruction_checker_ = std::make_shared(true); + uint32_t events_when_unpaused_ = Event::FileReadyType::Read | Event::FileReadyType::Write; }; class UdpListenerWorkerRouterImpl : public UdpListenerWorkerRouter { diff --git a/source/server/BUILD b/source/server/BUILD index 7ee402750e1f..f656303df03e 100644 --- a/source/server/BUILD +++ b/source/server/BUILD @@ -187,6 +187,7 @@ envoy_cc_library( hdrs = envoy_select_hot_restart(["hot_restarting_child.h"]), deps = [ ":hot_restarting_base", + "//envoy/network:parent_drained_callback_registrar_interface", "//source/common/stats:stat_merger_lib", ], ) diff --git a/source/server/hot_restart_impl.cc b/source/server/hot_restart_impl.cc index 417bb4a5f2f3..6e2377c9c6f0 100644 --- a/source/server/hot_restart_impl.cc +++ b/source/server/hot_restart_impl.cc @@ -124,6 +124,10 @@ void HotRestartImpl::registerUdpForwardingListener( as_child_.registerUdpForwardingListener(address, listener_config); } +OptRef HotRestartImpl::parentDrainedCallbackRegistrar() { + return as_child_; +} + void HotRestartImpl::initialize(Event::Dispatcher& dispatcher, Server::Instance& server) { as_parent_.initialize(dispatcher, server); as_child_.initialize(dispatcher); diff --git a/source/server/hot_restart_impl.h b/source/server/hot_restart_impl.h index ace7d41321d9..9a22b6f3ec13 100644 --- a/source/server/hot_restart_impl.h +++ b/source/server/hot_restart_impl.h @@ -106,6 +106,7 @@ class HotRestartImpl : public HotRestart { void registerUdpForwardingListener( Network::Address::InstanceConstSharedPtr address, std::shared_ptr listener_config) override; + OptRef parentDrainedCallbackRegistrar() override; void initialize(Event::Dispatcher& dispatcher, Server::Instance& server) override; absl::optional sendParentAdminShutdownRequest() override; void sendParentTerminateRequest() override; diff --git a/source/server/hot_restart_nop_impl.h b/source/server/hot_restart_nop_impl.h index 99f006083937..031cf1e4613b 100644 --- a/source/server/hot_restart_nop_impl.h +++ b/source/server/hot_restart_nop_impl.h @@ -20,6 +20,9 @@ class HotRestartNopImpl : public Server::HotRestart { int duplicateParentListenSocket(const std::string&, uint32_t) override { return -1; } void registerUdpForwardingListener(Network::Address::InstanceConstSharedPtr, std::shared_ptr) override {} + OptRef parentDrainedCallbackRegistrar() override { + return absl::nullopt; + } void initialize(Event::Dispatcher&, Server::Instance&) override {} absl::optional sendParentAdminShutdownRequest() override { return absl::nullopt; diff --git a/source/server/hot_restarting_child.cc b/source/server/hot_restarting_child.cc index 0d842a2755eb..0bb33e650db9 100644 --- a/source/server/hot_restarting_child.cc +++ b/source/server/hot_restarting_child.cc @@ -46,9 +46,12 @@ HotRestartingChild::UdpForwardingContext::getListenerForDestination( return it->second; } +// If restart_epoch is 0 there is no parent, so it's effectively already +// drained and terminated. HotRestartingChild::HotRestartingChild(int base_id, int restart_epoch, const std::string& socket_path, mode_t socket_mode) - : HotRestartingBase(base_id), restart_epoch_(restart_epoch) { + : HotRestartingBase(base_id), restart_epoch_(restart_epoch), + parent_terminated_(restart_epoch == 0), parent_drained_(restart_epoch == 0) { main_rpc_stream_.initDomainSocketAddress(&parent_address_); std::string socket_path_udp = socket_path + "_udp"; udp_forwarding_rpc_stream_.initDomainSocketAddress(&parent_address_udp_forwarding_); @@ -102,7 +105,7 @@ void HotRestartingChild::onForwardedUdpPacket(uint32_t worker_index, Network::Ud int HotRestartingChild::duplicateParentListenSocket(const std::string& address, uint32_t worker_index) { - if (restart_epoch_ == 0 || parent_terminated_) { + if (parent_terminated_) { return -1; } @@ -121,7 +124,7 @@ int HotRestartingChild::duplicateParentListenSocket(const std::string& address, } std::unique_ptr HotRestartingChild::getParentStats() { - if (restart_epoch_ == 0 || parent_terminated_) { + if (parent_terminated_) { return nullptr; } @@ -138,7 +141,7 @@ std::unique_ptr HotRestartingChild::getParentStats() { } void HotRestartingChild::drainParentListeners() { - if (restart_epoch_ == 0 || parent_terminated_) { + if (parent_terminated_) { return; } // No reply expected. @@ -154,9 +157,27 @@ void HotRestartingChild::registerUdpForwardingListener( udp_forwarding_context_.registerListener(address, listener_config); } +void HotRestartingChild::registerParentDrainedCallback( + const Network::Address::InstanceConstSharedPtr& address, absl::AnyInvocable callback) { + if (parent_drained_) { + callback(); + } else { + on_drained_actions_.emplace(address->asString(), std::move(callback)); + } +} + +void HotRestartingChild::allDrainsImplicitlyComplete() { + for (auto& drain_action : on_drained_actions_) { + // Call the callback. + std::move(drain_action.second)(); + } + on_drained_actions_.clear(); + parent_drained_ = true; +} + absl::optional HotRestartingChild::sendParentAdminShutdownRequest() { - if (restart_epoch_ == 0 || parent_terminated_) { + if (parent_terminated_) { return absl::nullopt; } @@ -176,9 +197,11 @@ HotRestartingChild::sendParentAdminShutdownRequest() { } void HotRestartingChild::sendParentTerminateRequest() { - if (restart_epoch_ == 0 || parent_terminated_) { + if (parent_terminated_) { return; } + allDrainsImplicitlyComplete(); + HotRestartMessage wrapped_request; wrapped_request.mutable_request()->mutable_terminate(); main_rpc_stream_.sendHotRestartMessage(parent_address_, wrapped_request); @@ -186,15 +209,17 @@ void HotRestartingChild::sendParentTerminateRequest() { // Note that the 'generation' counter needs to retain the contribution from // the parent. - stat_merger_->retainParentGaugeValue(hot_restart_generation_stat_name_); + if (stat_merger_ != nullptr) { + stat_merger_->retainParentGaugeValue(hot_restart_generation_stat_name_); - // Now it is safe to forget our stat transferral state. - // - // This destruction is actually important far beyond memory efficiency. The - // scope-based temporary counter logic relies on the StatMerger getting - // destroyed once hot restart's stat merging is all done. (See stat_merger.h - // for details). - stat_merger_.reset(); + // Now it is safe to forget our stat transferral state. + // + // This destruction is actually important far beyond memory efficiency. The + // scope-based temporary counter logic relies on the StatMerger getting + // destroyed once hot restart's stat merging is all done. (See stat_merger.h + // for details). + stat_merger_.reset(); + } } void HotRestartingChild::mergeParentStats(Stats::Store& stats_store, diff --git a/source/server/hot_restarting_child.h b/source/server/hot_restarting_child.h index 7f61dfc59f55..7f485511b3d4 100644 --- a/source/server/hot_restarting_child.h +++ b/source/server/hot_restarting_child.h @@ -1,5 +1,6 @@ #pragma once +#include "envoy/network/parent_drained_callback_registrar.h" #include "envoy/server/instance.h" #include "source/common/stats/stat_merger.h" @@ -11,7 +12,8 @@ namespace Server { /** * The child half of hot restarting. Issues requests and commands to the parent. */ -class HotRestartingChild : public HotRestartingBase { +class HotRestartingChild : public HotRestartingBase, + public Network::ParentDrainedCallbackRegistrar { public: // A structure to record the set of registered UDP listeners keyed on their addresses, // to support QUIC packet forwarding. @@ -42,7 +44,7 @@ class HotRestartingChild : public HotRestartingBase { HotRestartingChild(int base_id, int restart_epoch, const std::string& socket_path, mode_t socket_mode); - ~HotRestartingChild() = default; + ~HotRestartingChild() override = default; void initialize(Event::Dispatcher& dispatcher); void shutdown(); @@ -50,6 +52,9 @@ class HotRestartingChild : public HotRestartingBase { int duplicateParentListenSocket(const std::string& address, uint32_t worker_index); void registerUdpForwardingListener(Network::Address::InstanceConstSharedPtr address, std::shared_ptr listener_config); + // From Network::ParentDrainedCallbackRegistrar. + void registerParentDrainedCallback(const Network::Address::InstanceConstSharedPtr& addr, + absl::AnyInvocable action) override; std::unique_ptr getParentStats(); void drainParentListeners(); absl::optional sendParentAdminShutdownRequest(); @@ -60,15 +65,21 @@ class HotRestartingChild : public HotRestartingBase { protected: void onSocketEventUdpForwarding(); void onForwardedUdpPacket(uint32_t worker_index, Network::UdpRecvData&& data); + // When call to terminate parent is sent, or parent is already terminated, + void allDrainsImplicitlyComplete(); private: friend class HotRestartUdpForwardingTestHelper; const int restart_epoch_; - bool parent_terminated_{}; + bool parent_terminated_; + bool parent_drained_; sockaddr_un parent_address_; sockaddr_un parent_address_udp_forwarding_; std::unique_ptr stat_merger_{}; Stats::StatName hot_restart_generation_stat_name_; + // There are multiple listener instances per address that must all be reactivated + // when the parent is drained, so a multimap is used to contain them. + std::unordered_multimap> on_drained_actions_; Event::FileEventPtr socket_event_udp_forwarding_; UdpForwardingContext udp_forwarding_context_; }; diff --git a/test/common/network/BUILD b/test/common/network/BUILD index ea59472db9b6..9e4c1c2e8205 100644 --- a/test/common/network/BUILD +++ b/test/common/network/BUILD @@ -249,6 +249,7 @@ envoy_cc_test( "//source/common/network:utility_lib", "//source/common/stats:stats_lib", "//test/common/network:listener_impl_test_base_lib", + "//test/mocks/network:mock_parent_drained_callback_registrar", "//test/mocks/network:network_mocks", "//test/mocks/server:server_mocks", "//test/test_common:environment_lib", diff --git a/test/common/network/udp_listener_impl_batch_writer_test.cc b/test/common/network/udp_listener_impl_batch_writer_test.cc index 39b69e86c02a..35156dbd6a83 100644 --- a/test/common/network/udp_listener_impl_batch_writer_test.cc +++ b/test/common/network/udp_listener_impl_batch_writer_test.cc @@ -61,6 +61,7 @@ size_t getPacketLength(const msghdr* msg) { class UdpListenerImplBatchWriterTest : public UdpListenerImplTestBase { public: void SetUp() override { + UdpListenerImplTestBase::setup(); // Set listening socket options and set UdpGsoBatchWriter server_socket_->addOptions(SocketOptionFactory::buildIpPacketInfoOptions()); server_socket_->addOptions(SocketOptionFactory::buildRxQueueOverFlowOptions()); diff --git a/test/common/network/udp_listener_impl_test.cc b/test/common/network/udp_listener_impl_test.cc index 18810ca7467a..6df91c150355 100644 --- a/test/common/network/udp_listener_impl_test.cc +++ b/test/common/network/udp_listener_impl_test.cc @@ -16,6 +16,7 @@ #include "test/common/network/udp_listener_impl_test_base.h" #include "test/mocks/api/mocks.h" +#include "test/mocks/network/mock_parent_drained_callback_registrar.h" #include "test/mocks/network/mocks.h" #include "test/test_common/environment.h" #include "test/test_common/network_utility.h" @@ -52,6 +53,7 @@ class OverrideOsSysCallsImpl : public Api::OsSysCallsImpl { class UdpListenerImplTest : public UdpListenerImplTestBase { public: void setup(bool prefer_gro = false) { + UdpListenerImplTestBase::setup(); ON_CALL(override_syscall_, supportsUdpGro()).WillByDefault(Return(false)); // Return the real version by default. ON_CALL(override_syscall_, supportsMmsg()) @@ -385,6 +387,106 @@ TEST_P(UdpListenerImplTest, UdpListenerEnableDisable) { dispatcher_->run(Event::Dispatcher::RunType::Block); } +class HotRestartedUdpListenerImplTest : public UdpListenerImplTest { +public: + void SetUp() override { +#ifdef WIN32 + GTEST_SKIP() << "Hot restart is not supported on Windows."; +#endif + } + void setup() { + io_handle_ = &useHotRestartSocket(registrar_); + // File event should be created listening to no events (i.e. disabled). + EXPECT_CALL(*io_handle_, createFileEvent_(_, _, _, 0)); + // Parent drained callback should be registered when the listener is created. + // We capture the callback so we can simulate "drain complete". + EXPECT_CALL(registrar_, registerParentDrainedCallback(_, _)) + .WillOnce( + [this](const Address::InstanceConstSharedPtr&, absl::AnyInvocable callback) { + parent_drained_callback_ = std::move(callback); + }); + UdpListenerImplTest::setup(); + testing::Mock::VerifyAndClearExpectations(®istrar_); + } + +protected: + MockParentDrainedCallbackRegistrar registrar_; + MockIoHandle* io_handle_; + absl::AnyInvocable parent_drained_callback_; +}; + +INSTANTIATE_TEST_SUITE_P(IpVersions, HotRestartedUdpListenerImplTest, + testing::ValuesIn(TestEnvironment::getIpVersionsForTest()), + TestUtility::ipTestParamsToString); + +/** + * During hot restart, while the parent instance is draining, a quic udp + * listener (created with a parent_drained_callback_registrar) should not + * be reading packets, regardless of enable/disable calls. + * It should begin reading packets after drain completes. + */ +TEST_P(HotRestartedUdpListenerImplTest, EnableAndDisableDuringParentDrainShouldDoNothing) { + setup(); + // Enabling and disabling listener should *not* trigger any + // event actions on the io_handle, because of listener being paused + // while draining. + EXPECT_CALL(*io_handle_, enableFileEvents(_)).Times(0); + listener_->disable(); + listener_->enable(); + testing::Mock::VerifyAndClearExpectations(io_handle_); + // Ending parent drain should cause io_handle to go into reading mode. + EXPECT_CALL(*io_handle_, + enableFileEvents(Event::FileReadyType::Read | Event::FileReadyType::Write)); + EXPECT_CALL(*io_handle_, activateFileEvents(Event::FileReadyType::Read)); + std::move(parent_drained_callback_)(); + dispatcher_->run(Event::Dispatcher::RunType::Block); + testing::Mock::VerifyAndClearExpectations(io_handle_); + // Enabling and disabling once unpaused should update io_handle. + EXPECT_CALL(*io_handle_, enableFileEvents(0)); + listener_->disable(); + testing::Mock::VerifyAndClearExpectations(io_handle_); + EXPECT_CALL(*io_handle_, + enableFileEvents(Event::FileReadyType::Read | Event::FileReadyType::Write)); + listener_->enable(); + testing::Mock::VerifyAndClearExpectations(io_handle_); +} + +/** + * Mostly the same as EnableAndDisableDuringParentDrainShouldDoNothing, but in disabled state when + * drain ends. + */ +TEST_P(HotRestartedUdpListenerImplTest, EndingParentDrainedWhileDisabledShouldNotStartReading) { + setup(); + // Enabling and disabling listener should *not* trigger any + // event actions on the io_handle, because of listener being paused + // while draining. + EXPECT_CALL(*io_handle_, enableFileEvents(_)).Times(0); + listener_->enable(); + listener_->disable(); + testing::Mock::VerifyAndClearExpectations(io_handle_); + // Ending drain should not trigger any event changes because the last state + // of the listener was disabled. + std::move(parent_drained_callback_)(); + dispatcher_->run(Event::Dispatcher::RunType::Block); + testing::Mock::VerifyAndClearExpectations(io_handle_); + // Enabling after unpaused should set io_handle to reading/writing. + EXPECT_CALL(*io_handle_, + enableFileEvents(Event::FileReadyType::Read | Event::FileReadyType::Write)); + listener_->enable(); + testing::Mock::VerifyAndClearExpectations(io_handle_); +} + +TEST_P(HotRestartedUdpListenerImplTest, + ParentDrainedCallbackAfterListenerDestroyedShouldDoNothing) { + setup(); + EXPECT_CALL(*io_handle_, enableFileEvents(_)).Times(0); + listener_ = nullptr; + // Signaling end-of-drain after the listener was destroyed should do nothing. + std::move(parent_drained_callback_)(); + dispatcher_->run(Event::Dispatcher::RunType::Block); + // At this point io_handle should be an invalid reference. +} + /** * Tests UDP listener's error callback. */ diff --git a/test/common/network/udp_listener_impl_test_base.h b/test/common/network/udp_listener_impl_test_base.h index 112f89d68dc3..9b7636e13ae8 100644 --- a/test/common/network/udp_listener_impl_test_base.h +++ b/test/common/network/udp_listener_impl_test_base.h @@ -31,13 +31,24 @@ namespace Envoy { namespace Network { class UdpListenerImplTestBase : public ListenerImplTestBase { -public: - UdpListenerImplTestBase() - : server_socket_(createServerSocket(true)), send_to_addr_(getServerLoopbackAddress()) { +protected: + MockIoHandle& + useHotRestartSocket(OptRef parent_drained_callback_registrar) { + auto io_handle = std::make_unique>(); + MockIoHandle& ret = *io_handle; + server_socket_ = createServerSocketFromExistingHandle(std::move(io_handle), + parent_drained_callback_registrar); + return ret; + } + + void setup() { + if (server_socket_ == nullptr) { + server_socket_ = createServerSocket(true); + } + send_to_addr_ = Address::InstanceConstSharedPtr(getServerLoopbackAddress()); time_system_.advanceTimeWait(std::chrono::milliseconds(100)); } -protected: Address::Instance* getServerLoopbackAddress() { if (version_ == Address::IpVersion::v4) { return new Address::Ipv4Instance( @@ -60,6 +71,14 @@ class UdpListenerImplTestBase : public ListenerImplTestBase { bind); } + SocketSharedPtr createServerSocketFromExistingHandle( + IoHandlePtr&& io_handle, + OptRef parent_drained_callback_registrar) { + return std::make_shared( + std::move(io_handle), Network::Test::getCanonicalLoopbackAddress(version_), + SocketOptionFactory::buildIpFreebindOptions(), parent_drained_callback_registrar); + } + Address::InstanceConstSharedPtr getNonDefaultSourceAddress() { // Use a self address that is unlikely to be picked by source address discovery // algorithm if not specified in recvmsg/recvmmsg. Port is not taken into diff --git a/test/integration/python/hotrestart_handoff_test.py b/test/integration/python/hotrestart_handoff_test.py index 7d975bd7dda7..dcbbb5a8ba1e 100644 --- a/test/integration/python/hotrestart_handoff_test.py +++ b/test/integration/python/hotrestart_handoff_test.py @@ -304,7 +304,7 @@ async def _wait_for_envoy_epoch(i: int): pass await asyncio.sleep(0.2) # Envoy instance with expected restart_epoch should have started up - assert expected_substring in response, f"server_info={response}" + assert expected_substring in response, f"expected_substring={expected_substring}, server_info={response}" class IntegrationTest(unittest.IsolatedAsyncioTestCase): diff --git a/test/mocks/network/BUILD b/test/mocks/network/BUILD index 82b5ef8c79f1..73c9854193a3 100644 --- a/test/mocks/network/BUILD +++ b/test/mocks/network/BUILD @@ -42,6 +42,12 @@ envoy_cc_mock( ], ) +envoy_cc_mock( + name = "mock_parent_drained_callback_registrar", + hdrs = ["mock_parent_drained_callback_registrar.h"], + deps = ["//envoy/network:parent_drained_callback_registrar_interface"], +) + envoy_cc_mock( name = "network_mocks", srcs = ["mocks.cc"], diff --git a/test/mocks/network/mock_parent_drained_callback_registrar.h b/test/mocks/network/mock_parent_drained_callback_registrar.h new file mode 100644 index 000000000000..ae82b52d31f6 --- /dev/null +++ b/test/mocks/network/mock_parent_drained_callback_registrar.h @@ -0,0 +1,18 @@ +#pragma once + +#include "envoy/network/parent_drained_callback_registrar.h" + +#include "gmock/gmock.h" + +namespace Envoy { +namespace Network { + +class MockParentDrainedCallbackRegistrar : public ParentDrainedCallbackRegistrar { +public: + MOCK_METHOD(void, registerParentDrainedCallback, + (const Address::InstanceConstSharedPtr& address, + absl::AnyInvocable callback)); +}; + +} // namespace Network +} // namespace Envoy diff --git a/test/mocks/server/hot_restart.h b/test/mocks/server/hot_restart.h index c83142692c06..99bfa3ccbb0c 100644 --- a/test/mocks/server/hot_restart.h +++ b/test/mocks/server/hot_restart.h @@ -20,6 +20,8 @@ class MockHotRestart : public HotRestart { MOCK_METHOD(void, registerUdpForwardingListener, (Network::Address::InstanceConstSharedPtr address, std::shared_ptr listener_config)); + MOCK_METHOD(OptRef, parentDrainedCallbackRegistrar, ()); + MOCK_METHOD(void, whenDrainComplete, (absl::string_view addr, absl::AnyInvocable action)); MOCK_METHOD(void, initialize, (Event::Dispatcher & dispatcher, Server::Instance& server)); MOCK_METHOD(absl::optional, sendParentAdminShutdownRequest, ()); MOCK_METHOD(void, sendParentTerminateRequest, ()); diff --git a/test/server/hot_restart_impl_test.cc b/test/server/hot_restart_impl_test.cc index 81baf88181b5..28e1427564e1 100644 --- a/test/server/hot_restart_impl_test.cc +++ b/test/server/hot_restart_impl_test.cc @@ -87,6 +87,14 @@ class HotRestartImplTest : public testing::Test { std::unique_ptr hot_restart_; }; +TEST_F(HotRestartImplTest, ParentDrainedCallbackRegistrarIsSetAndCanBeCalled) { + setup(); + OptRef registrar = + hot_restart_->parentDrainedCallbackRegistrar(); + ASSERT_TRUE(registrar.has_value()); + registrar->registerParentDrainedCallback(test_addresses_.ipv4_test_addr_, []() {}); +} + TEST_F(HotRestartImplTest, VersionString) { // Tests that the version-string will be consistent and HOT_RESTART_VERSION, // between multiple instantiations. diff --git a/test/server/hot_restarting_child_test.cc b/test/server/hot_restarting_child_test.cc index f2455034f054..d16653d6454a 100644 --- a/test/server/hot_restarting_child_test.cc +++ b/test/server/hot_restarting_child_test.cc @@ -67,6 +67,11 @@ class FakeHotRestartingParent : public HotRestartingBase { }); udp_forwarding_rpc_stream_.sendHotRestartMessage(child_address_udp_forwarding_, message); } + void expectParentTerminateMessages() { + EXPECT_CALL(os_sys_calls_, sendmsg(_, _, _)).WillOnce([](int, const msghdr* msg, int) { + return Api::SysCallSizeResult{static_cast(msg->msg_iov[0].iov_len), 0}; + }); + } Api::MockOsSysCalls& os_sys_calls_; Event::FileReadyCb udp_file_ready_callback_; sockaddr_un child_address_udp_forwarding_; @@ -100,6 +105,36 @@ class HotRestartingChildTest : public testing::Test { std::unique_ptr hot_restarting_child_; }; +TEST_F(HotRestartingChildTest, ParentDrainedCallbacksAreCalled) { + auto test_listener_addr = Network::Utility::resolveUrl("udp://127.0.0.1:1234"); + auto test_listener_addr2 = Network::Utility::resolveUrl("udp://127.0.0.1:1235"); + testing::MockFunction callback1; + testing::MockFunction callback2; + hot_restarting_child_->registerParentDrainedCallback(test_listener_addr, + callback1.AsStdFunction()); + hot_restarting_child_->registerParentDrainedCallback(test_listener_addr2, + callback2.AsStdFunction()); + EXPECT_CALL(callback1, Call()); + EXPECT_CALL(callback2, Call()); + fake_parent_->expectParentTerminateMessages(); + hot_restarting_child_->sendParentTerminateRequest(); +} + +TEST_F(HotRestartingChildTest, ParentDrainedCallbacksAreCalledImmediatelyWhenAlreadyDrained) { + auto test_listener_addr = Network::Utility::resolveUrl("udp://127.0.0.1:1234"); + auto test_listener_addr2 = Network::Utility::resolveUrl("udp://127.0.0.1:1235"); + testing::MockFunction callback1; + testing::MockFunction callback2; + fake_parent_->expectParentTerminateMessages(); + hot_restarting_child_->sendParentTerminateRequest(); + EXPECT_CALL(callback1, Call()); + EXPECT_CALL(callback2, Call()); + hot_restarting_child_->registerParentDrainedCallback(test_listener_addr, + callback1.AsStdFunction()); + hot_restarting_child_->registerParentDrainedCallback(test_listener_addr2, + callback2.AsStdFunction()); +} + TEST_F(HotRestartingChildTest, LogsErrorOnReplyMessageInUdpStream) { envoy::HotRestartMessage msg; msg.mutable_reply();