From d08c59cfd09dc2aecf113638b8d32d24b82851a8 Mon Sep 17 00:00:00 2001 From: Matt Klein Date: Tue, 29 Oct 2019 11:37:40 -0700 Subject: [PATCH 1/5] udp_proxy: scaffolding This is the first commit in a series to support UDP proxying. There are quite a few TODOs in the code before this feature will be considered a MVP. Part of https://github.com/envoyproxy/envoy/issues/492 Signed-off-by: Matt Klein --- CODEOWNERS | 1 + .../network/tcp_proxy/v2/tcp_proxy.proto | 1 - .../network/tcp_proxy/v3alpha/tcp_proxy.proto | 1 - .../config/filter/udp/udp_proxy/v2alpha/BUILD | 7 + .../udp/udp_proxy/v2alpha/udp_proxy.proto | 22 +++ include/envoy/network/filter.h | 3 +- include/envoy/network/listener.h | 30 +-- source/common/network/BUILD | 1 + source/common/network/udp_listener_impl.cc | 56 ++---- source/common/network/utility.cc | 81 ++++++-- source/common/network/utility.h | 33 +++- source/extensions/extensions_build_config.bzl | 6 + source/extensions/filters/udp/udp_proxy/BUILD | 36 ++++ .../filters/udp/udp_proxy/config.cc | 15 ++ .../extensions/filters/udp/udp_proxy/config.h | 45 +++++ .../filters/udp/udp_proxy/udp_proxy_filter.cc | 85 ++++++++ .../filters/udp/udp_proxy/udp_proxy_filter.h | 117 +++++++++++ .../quiche/active_quic_listener.cc | 5 +- .../quiche/envoy_quic_packet_writer.cc | 3 +- source/server/BUILD | 1 + source/server/configuration_impl.cc | 4 +- source/server/configuration_impl.h | 2 +- source/server/http/admin.h | 6 +- source/server/listener_impl.cc | 6 +- source/server/listener_impl.h | 2 +- source/server/server.cc | 5 + test/common/network/udp_listener_impl_test.cc | 14 +- test/config/utility.cc | 3 +- test/extensions/filters/udp/udp_proxy/BUILD | 22 +++ .../udp_proxy/udp_proxy_integration_test.cc | 182 ++++++++++++++++++ test/integration/BUILD | 15 -- test/integration/autonomous_upstream.cc | 6 +- test/integration/autonomous_upstream.h | 2 +- test/integration/fake_upstream.cc | 70 +++++-- test/integration/fake_upstream.h | 40 +++- test/integration/filters/BUILD | 16 -- .../filters/udp_listener_echo_filter.cc | 66 ------- test/integration/integration.cc | 4 +- test/integration/integration.h | 3 + test/integration/udp_echo_integration_test.cc | 126 ------------ test/mocks/network/mocks.cc | 1 - test/mocks/network/mocks.h | 2 +- 42 files changed, 787 insertions(+), 359 deletions(-) create mode 100644 api/envoy/config/filter/udp/udp_proxy/v2alpha/BUILD create mode 100644 api/envoy/config/filter/udp/udp_proxy/v2alpha/udp_proxy.proto create mode 100644 source/extensions/filters/udp/udp_proxy/BUILD create mode 100644 source/extensions/filters/udp/udp_proxy/config.cc create mode 100644 source/extensions/filters/udp/udp_proxy/config.h create mode 100644 source/extensions/filters/udp/udp_proxy/udp_proxy_filter.cc create mode 100644 source/extensions/filters/udp/udp_proxy/udp_proxy_filter.h create mode 100644 test/extensions/filters/udp/udp_proxy/BUILD create mode 100644 test/extensions/filters/udp/udp_proxy/udp_proxy_integration_test.cc delete mode 100644 test/integration/filters/udp_listener_echo_filter.cc delete mode 100644 test/integration/udp_echo_integration_test.cc diff --git a/CODEOWNERS b/CODEOWNERS index 6d4c8744dcb9..678b6158890f 100644 --- a/CODEOWNERS +++ b/CODEOWNERS @@ -90,3 +90,4 @@ extensions/filters/common/original_src @snowp @klarose /*/extensions/filters/network/ext_authz @gsagula @dio /*/extensions/filters/network/tcp_proxy @alyssawilk @zuercher /*/extensions/filters/network/echo @htuch @alyssawilk +/*/extensions/filters/udp/udp_proxy @mattklein123 @danzh2010 diff --git a/api/envoy/config/filter/network/tcp_proxy/v2/tcp_proxy.proto b/api/envoy/config/filter/network/tcp_proxy/v2/tcp_proxy.proto index 889a9a0c9871..d67c4c23c9d4 100644 --- a/api/envoy/config/filter/network/tcp_proxy/v2/tcp_proxy.proto +++ b/api/envoy/config/filter/network/tcp_proxy/v2/tcp_proxy.proto @@ -104,7 +104,6 @@ message TcpProxy { option (validate.required) = true; // The upstream cluster to connect to. - // string cluster = 2; // Multiple upstream clusters can be specified for a given route. The diff --git a/api/envoy/config/filter/network/tcp_proxy/v3alpha/tcp_proxy.proto b/api/envoy/config/filter/network/tcp_proxy/v3alpha/tcp_proxy.proto index c8becda9d243..a9d327a974ff 100644 --- a/api/envoy/config/filter/network/tcp_proxy/v3alpha/tcp_proxy.proto +++ b/api/envoy/config/filter/network/tcp_proxy/v3alpha/tcp_proxy.proto @@ -52,7 +52,6 @@ message TcpProxy { option (validate.required) = true; // The upstream cluster to connect to. - // string cluster = 2; // Multiple upstream clusters can be specified for a given route. The diff --git a/api/envoy/config/filter/udp/udp_proxy/v2alpha/BUILD b/api/envoy/config/filter/udp/udp_proxy/v2alpha/BUILD new file mode 100644 index 000000000000..5dc095ade27a --- /dev/null +++ b/api/envoy/config/filter/udp/udp_proxy/v2alpha/BUILD @@ -0,0 +1,7 @@ +# DO NOT EDIT. This file is generated by tools/proto_sync.py. + +load("@envoy_api//bazel:api_build_system.bzl", "api_proto_package") + +licenses(["notice"]) # Apache 2 + +api_proto_package() diff --git a/api/envoy/config/filter/udp/udp_proxy/v2alpha/udp_proxy.proto b/api/envoy/config/filter/udp/udp_proxy/v2alpha/udp_proxy.proto new file mode 100644 index 000000000000..5f2db4a91814 --- /dev/null +++ b/api/envoy/config/filter/udp/udp_proxy/v2alpha/udp_proxy.proto @@ -0,0 +1,22 @@ +syntax = "proto3"; + +package envoy.config.filter.udp.udp_proxy.v2alpha; + +option java_outer_classname = "UdpProxyProto"; +option java_multiple_files = true; +option java_package = "io.envoyproxy.envoy.config.filter.udp.udp_proxy.v2alpha"; + +import "google/protobuf/duration.proto"; + +import "validate/validate.proto"; + +// TODO(mattklein123): docs + +message UdpProxyConfig { + oneof cluster_specifier { + option (validate.required) = true; + + // The upstream cluster to connect to. + string cluster = 1 [(validate.rules).string = {min_bytes: 1}]; + } +} diff --git a/include/envoy/network/filter.h b/include/envoy/network/filter.h index e446b1bf7a2c..0019ccedcec3 100644 --- a/include/envoy/network/filter.h +++ b/include/envoy/network/filter.h @@ -436,9 +436,8 @@ class FilterChainFactory { * * @param udp_listener supplies the listener to create the chain on. * @param callbacks supplies the callbacks needed to create a filter. - * @return true if filter chain was created successfully. Otherwise false. */ - virtual bool createUdpListenerFilterChain(UdpListenerFilterManager& udp_listener, + virtual void createUdpListenerFilterChain(UdpListenerFilterManager& udp_listener, UdpReadFilterCallbacks& callbacks) PURE; }; diff --git a/include/envoy/network/listener.h b/include/envoy/network/listener.h index fbb0ff7c17f9..07978eb944b4 100644 --- a/include/envoy/network/listener.h +++ b/include/envoy/network/listener.h @@ -127,22 +127,28 @@ class ListenerCallbacks { /** * Utility struct that encapsulates the information from a udp socket's * recvfrom/recvmmsg call. - * - * TODO(conqerAtapple): Maybe this belongs inside the UdpListenerCallbacks - * class. */ struct UdpRecvData { - Address::InstanceConstSharedPtr local_address_; - Address::InstanceConstSharedPtr peer_address_; // TODO(conquerAtapple): Fix ownership semantics. + struct LocalPeerAddresses { + bool operator==(const LocalPeerAddresses& rhs) const { + // TODO(mattklein123): Implement a hash directly on Address that does not use strings. + return local_->asStringView() == rhs.local_->asStringView() && + peer_->asStringView() == rhs.peer_->asStringView(); + } + + template friend H AbslHashValue(H h, const LocalPeerAddresses& addresses) { + // TODO(mattklein123): Implement a hash directly on Address that does not use strings. + return H::combine(std::move(h), addresses.local_->asStringView(), + addresses.peer_->asStringView()); + } + + Address::InstanceConstSharedPtr local_; + Address::InstanceConstSharedPtr peer_; + }; + + LocalPeerAddresses addresses_; Buffer::InstancePtr buffer_; MonotonicTime receive_time_; - - // TODO(conquerAtapple): - // Add UdpReader here so that the callback handler can - // then use the reader to do multiple reads(recvmmsg) once the OS notifies it - // has data. We could also just return a `ReaderFactory` that returns either a - // `recvfrom` reader (with peer information) or a `read/recvmmsg` reader. This - // is still being flushed out (Jan, 2019). }; /** diff --git a/source/common/network/BUILD b/source/common/network/BUILD index aaa04b36f4c9..4b558ffaea7e 100644 --- a/source/common/network/BUILD +++ b/source/common/network/BUILD @@ -283,6 +283,7 @@ envoy_cc_library( deps = [ ":address_lib", "//include/envoy/network:connection_interface", + "//include/envoy/network:listener_interface", "//include/envoy/stats:stats_interface", "//source/common/api:os_sys_calls_lib", "//source/common/buffer:buffer_lib", diff --git a/source/common/network/udp_listener_impl.cc b/source/common/network/udp_listener_impl.cc index a89bc20f5930..da6c4db1f599 100644 --- a/source/common/network/udp_listener_impl.cc +++ b/source/common/network/udp_listener_impl.cc @@ -68,50 +68,23 @@ void UdpListenerImpl::onSocketEvent(short flags) { void UdpListenerImpl::handleReadCallback() { ENVOY_UDP_LOG(trace, "handleReadCallback"); - do { - uint32_t old_packets_dropped = packets_dropped_; - MonotonicTime receive_time = time_source_.monotonicTime(); - Api::IoCallUint64Result result = - Utility::readFromSocket(socket_, *this, receive_time, &packets_dropped_); - - if (!result.ok()) { - // No more to read or encountered a system error. - if (result.err_->getErrorCode() != Api::IoError::IoErrorCode::Again) { - ENVOY_UDP_LOG(error, "recvmsg result {}: {}", static_cast(result.err_->getErrorCode()), - result.err_->getErrorDetails()); - cb_.onReceiveError(UdpListenerCallbacks::ErrorCode::SyscallError, - result.err_->getErrorCode()); - } - // Stop reading. - return; - } - - if (result.rc_ == 0) { - // TODO(conqerAtapple): Is zero length packet interesting? If so add stats - // for it. Otherwise remove the warning log below. - ENVOY_UDP_LOG(trace, "received 0-length packet"); - } - - if (packets_dropped_ != old_packets_dropped) { - // The kernel tracks SO_RXQ_OVFL as a uint32 which can overflow to a smaller - // value. So as long as this count differs from previously recorded value, - // more packets are dropped by kernel. - uint32_t delta = (packets_dropped_ > old_packets_dropped) - ? (packets_dropped_ - old_packets_dropped) - : (packets_dropped_ + - (std::numeric_limits::max() - old_packets_dropped) + 1); - // TODO(danzh) add stats for this. - ENVOY_UDP_LOG(debug, "Kernel dropped {} more packets. Consider increase receive buffer size.", - delta); - } - } while (true); + const Api::IoCallUint64Result result = Utility::readAllDatagramsFromSocket( + socket_.ioHandle(), *socket_.localAddress(), *this, time_source_, packets_dropped_); + // We should always get a failed result, even if no further data. + ASSERT(!result.ok()); + if (result.err_->getErrorCode() != Api::IoError::IoErrorCode::Again) { + ENVOY_UDP_LOG(error, "recvmsg result {}: {}", static_cast(result.err_->getErrorCode()), + result.err_->getErrorDetails()); + cb_.onReceiveError(UdpListenerCallbacks::ErrorCode::SyscallError, result.err_->getErrorCode()); + } } void UdpListenerImpl::processPacket(Address::InstanceConstSharedPtr local_address, Address::InstanceConstSharedPtr peer_address, Buffer::InstancePtr buffer, MonotonicTime receive_time) { - UdpRecvData recvData{std::move(local_address), std::move(peer_address), std::move(buffer), - receive_time}; + ASSERT(local_address != nullptr); + UdpRecvData recvData{ + {std::move(local_address), std::move(peer_address)}, std::move(buffer), receive_time}; cb_.onData(recvData); } @@ -129,11 +102,8 @@ const Address::InstanceConstSharedPtr& UdpListenerImpl::localAddress() const { Api::IoCallUint64Result UdpListenerImpl::send(const UdpSendData& send_data) { ENVOY_UDP_LOG(trace, "send"); Buffer::Instance& buffer = send_data.buffer_; - uint64_t num_slices = buffer.getRawSlices(nullptr, 0); - STACK_ARRAY(slices, Buffer::RawSlice, num_slices); - buffer.getRawSlices(slices.begin(), num_slices); Api::IoCallUint64Result send_result = Utility::writeToSocket( - socket_, slices.begin(), num_slices, send_data.local_ip_, send_data.peer_address_); + socket_.ioHandle(), buffer, send_data.local_ip_, send_data.peer_address_); // The send_result normalizes the rc_ value to 0 in error conditions. // The drain call is hence 'safe' in success and failure cases. diff --git a/source/common/network/utility.cc b/source/common/network/utility.cc index 4c9034cbfb0c..e63b61aa61b8 100644 --- a/source/common/network/utility.cc +++ b/source/common/network/utility.cc @@ -15,6 +15,7 @@ #include "common/common/assert.h" #include "common/common/cleanup.h" #include "common/common/fmt.h" +#include "common/common/stack_array.h" #include "common/common/utility.h" #include "common/network/address_impl.h" #include "common/network/io_socket_error_impl.h" @@ -230,14 +231,13 @@ Address::InstanceConstSharedPtr Utility::getLocalAddress(const Address::IpVersio return ret; } -bool Utility::isLocalConnection(const Network::ConnectionSocket& socket) { +bool Utility::isLocalConnection(const ConnectionSocket& socket) { // These are local: // - Pipes // - Sockets to a loopback address // - Sockets where the local and remote address (ignoring port) are the same const auto& remote_address = socket.remoteAddress(); - if (remote_address->type() == Envoy::Network::Address::Type::Pipe || - isLoopbackAddress(*remote_address)) { + if (remote_address->type() == Address::Type::Pipe || isLoopbackAddress(*remote_address)) { return true; } const auto local_ip = socket.localAddress()->ip(); @@ -329,9 +329,9 @@ const std::string& Utility::getIpv6CidrCatchAllAddress() { Address::InstanceConstSharedPtr Utility::getAddressWithPort(const Address::Instance& address, uint32_t port) { switch (address.ip()->version()) { - case Network::Address::IpVersion::v4: + case Address::IpVersion::v4: return std::make_shared(address.ip()->addressAsString(), port); - case Network::Address::IpVersion::v6: + case Address::IpVersion::v6: return std::make_shared(address.ip()->addressAsString(), port); } NOT_REACHED_GCOVR_EXCL_LINE; @@ -413,7 +413,7 @@ bool Utility::portInRangeList(const Address::Instance& address, const std::list< return false; } - for (const Network::PortRange& p : list) { + for (const PortRange& p : list) { if (p.contains(address.ip()->port())) { return true; } @@ -450,9 +450,9 @@ Address::InstanceConstSharedPtr Utility::protobufAddressToAddress(const envoy::api::v2::core::Address& proto_address) { switch (proto_address.address_case()) { case envoy::api::v2::core::Address::kSocketAddress: - return Network::Utility::parseInternetAddress(proto_address.socket_address().address(), - proto_address.socket_address().port_value(), - !proto_address.socket_address().ipv4_compat()); + return Utility::parseInternetAddress(proto_address.socket_address().address(), + proto_address.socket_address().port_value(), + !proto_address.socket_address().ipv4_compat()); case envoy::api::v2::core::Address::kPipe: return std::make_shared(proto_address.pipe().path()); default: @@ -493,13 +493,22 @@ Utility::protobufAddressSocketType(const envoy::api::v2::core::Address& proto_ad } } -Api::IoCallUint64Result Utility::writeToSocket(Network::Socket& socket, Buffer::RawSlice* slices, +Api::IoCallUint64Result Utility::writeToSocket(IoHandle& handle, const Buffer::Instance& buffer, + const Address::Ip* local_ip, + const Address::Instance& peer_address) { + const uint64_t num_slices = buffer.getRawSlices(nullptr, 0); + STACK_ARRAY(slices, Buffer::RawSlice, num_slices); + buffer.getRawSlices(slices.begin(), num_slices); + return writeToSocket(handle, slices.begin(), num_slices, local_ip, peer_address); +} + +Api::IoCallUint64Result Utility::writeToSocket(IoHandle& handle, Buffer::RawSlice* slices, uint64_t num_slices, const Address::Ip* local_ip, const Address::Instance& peer_address) { Api::IoCallUint64Result send_result( - /*rc=*/0, /*err=*/Api::IoErrorPtr(nullptr, Network::IoSocketError::deleteIoError)); + /*rc=*/0, /*err=*/Api::IoErrorPtr(nullptr, IoSocketError::deleteIoError)); do { - send_result = socket.ioHandle().sendmsg(slices, num_slices, 0, local_ip, peer_address); + send_result = handle.sendmsg(slices, num_slices, 0, local_ip, peer_address); } while (!send_result.ok() && // Send again if interrupted. send_result.err_->getErrorCode() == Api::IoError::IoErrorCode::Interrupt); @@ -514,7 +523,8 @@ Api::IoCallUint64Result Utility::writeToSocket(Network::Socket& socket, Buffer:: return send_result; } -Api::IoCallUint64Result Utility::readFromSocket(Network::Socket& socket, +Api::IoCallUint64Result Utility::readFromSocket(IoHandle& handle, + const Address::Instance& local_address, UdpPacketProcessor& udp_packet_processor, MonotonicTime receive_time, uint32_t* packets_dropped) { @@ -525,14 +535,12 @@ Api::IoCallUint64Result Utility::readFromSocket(Network::Socket& socket, IoHandle::RecvMsgOutput output(packets_dropped); Api::IoCallUint64Result result = - socket.ioHandle().recvmsg(&slice, num_slices, socket.localAddress()->ip()->port(), output); + handle.recvmsg(&slice, num_slices, local_address.ip()->port(), output); if (!result.ok()) { return result; } - RELEASE_ASSERT(output.local_address_ != nullptr, "fail to get local address from IP header"); - // Adjust used memory length. slice.len_ = std::min(slice.len_, static_cast(result.rc_)); buffer->commit(&slice, 1); @@ -541,13 +549,13 @@ Api::IoCallUint64Result Utility::readFromSocket(Network::Socket& socket, RELEASE_ASSERT(output.peer_address_ != nullptr, fmt::format("Unable to get remote address for fd: {}, local address: {} ", - socket.ioHandle().fd(), socket.localAddress()->asString())); + handle.fd(), local_address.asString())); // Unix domain sockets are not supported RELEASE_ASSERT(output.peer_address_->type() == Address::Type::Ip, fmt::format("Unsupported remote address: {} local address: {}, receive size: " "{}", - output.peer_address_->asString(), socket.localAddress()->asString(), + output.peer_address_->asString(), local_address.asString(), result.rc_)); udp_packet_processor.processPacket(std::move(output.local_address_), std::move(output.peer_address_), std::move(buffer), @@ -555,5 +563,42 @@ Api::IoCallUint64Result Utility::readFromSocket(Network::Socket& socket, return result; } +Api::IoCallUint64Result +Utility::readAllDatagramsFromSocket(IoHandle& handle, const Address::Instance& local_address, + UdpPacketProcessor& udp_packet_processor, + TimeSource& time_source, uint32_t& packets_dropped) { + do { + const uint32_t old_packets_dropped = packets_dropped; + const MonotonicTime receive_time = time_source.monotonicTime(); + Api::IoCallUint64Result result = Utility::readFromSocket( + handle, local_address, udp_packet_processor, receive_time, &packets_dropped); + + if (!result.ok()) { + // No more to read or encountered a system error. + return result; + } + + if (result.rc_ == 0) { + // TODO(conqerAtapple): Is zero length packet interesting? If so add stats + // for it. Otherwise remove the warning log below. + ENVOY_LOG_MISC(trace, "received 0-length packet"); + } + + if (packets_dropped != old_packets_dropped) { + // The kernel tracks SO_RXQ_OVFL as a uint32 which can overflow to a smaller + // value. So as long as this count differs from previously recorded value, + // more packets are dropped by kernel. + const uint32_t delta = + (packets_dropped > old_packets_dropped) + ? (packets_dropped - old_packets_dropped) + : (packets_dropped + (std::numeric_limits::max() - old_packets_dropped) + + 1); + // TODO(danzh) add stats for this. + ENVOY_LOG_MISC( + debug, "Kernel dropped {} more packets. Consider increase receive buffer size.", delta); + } + } while (true); +} + } // namespace Network } // namespace Envoy diff --git a/source/common/network/utility.h b/source/common/network/utility.h index 34443c219f1b..65eab62d9f30 100644 --- a/source/common/network/utility.h +++ b/source/common/network/utility.h @@ -7,6 +7,7 @@ #include "envoy/api/v2/core/address.pb.h" #include "envoy/common/platform.h" #include "envoy/network/connection.h" +#include "envoy/network/listener.h" #include "absl/strings/string_view.h" @@ -167,7 +168,7 @@ class Utility { * Determine whether this is a local connection. * @return bool the address is a local connection. */ - static bool isLocalConnection(const Network::ConnectionSocket& socket); + static bool isLocalConnection(const ConnectionSocket& socket); /** * Determine whether this is an internal (RFC1918) address. @@ -290,31 +291,49 @@ class Utility { /** * Send a packet via given UDP socket with specific source address. - * @param socket is the UDP socket used to send. + * @param handle is the UDP socket used to send. * @param slices points to the buffers containing the packet. * @param num_slices is the number of buffers. * @param local_ip is the source address to be used to send. * @param peer_address is the destination address to send to. */ - static Api::IoCallUint64Result writeToSocket(Network::Socket& socket, Buffer::RawSlice* slices, + static Api::IoCallUint64Result writeToSocket(IoHandle& handle, Buffer::RawSlice* slices, uint64_t num_slices, const Address::Ip* local_ip, const Address::Instance& peer_address); + static Api::IoCallUint64Result writeToSocket(IoHandle& handle, const Buffer::Instance& buffer, + const Address::Ip* local_ip, + const Address::Instance& peer_address); /** - * Read a packet from given UDP socket and pass the packet to given - * UdpPacketProcessor. - * @param socket is the UDP socket to read from. + * Read a packet from a given UDP socket and pass the packet to given UdpPacketProcessor. + * @param handle is the UDP socket to read from. + * @param local_address is the socket's local address used to populate port. * @param udp_packet_processor is the callback to receive the packet. * @param receive_time is the timestamp passed to udp_packet_processor for the * receive time of the packet. * @param packets_dropped is the output parameter for number of packets dropped in kernel. If the * caller is not interested in it, nullptr can be passed in. */ - static Api::IoCallUint64Result readFromSocket(Network::Socket& socket, + static Api::IoCallUint64Result readFromSocket(IoHandle& handle, + const Address::Instance& local_address, UdpPacketProcessor& udp_packet_processor, MonotonicTime receive_time, uint32_t* packets_dropped); + /** + * Read all available packets from a given UDP socket and pass the packet to a given + * UdpPacketProcessor. + * @param handle is the UDP socket to read from. + * @param local_address is the socket's local address used to populate port. + * @param udp_packet_processor is the callback to receive the packets. + * @param time_source is the time source used to generate the time stamp of the received packets. + * @param packets_dropped is the output parameter for number of packets dropped in kernel. + */ + static Api::IoCallUint64Result + readAllDatagramsFromSocket(IoHandle& handle, const Address::Instance& local_address, + UdpPacketProcessor& udp_packet_processor, TimeSource& time_source, + uint32_t& packets_dropped); + private: static void throwWithMalformedIp(const std::string& ip_address); diff --git a/source/extensions/extensions_build_config.bzl b/source/extensions/extensions_build_config.bzl index adecb1c08de8..5557d6b3ce51 100644 --- a/source/extensions/extensions_build_config.bzl +++ b/source/extensions/extensions_build_config.bzl @@ -93,6 +93,12 @@ EXTENSIONS = { "envoy.filters.network.sni_cluster": "//source/extensions/filters/network/sni_cluster:config", "envoy.filters.network.zookeeper_proxy": "//source/extensions/filters/network/zookeeper_proxy:config", + # + # UDP filters fixfifx include in loaded extensions + # + + "envoy.filters.udp.udp_proxy": "//source/extensions/filters/udp/udp_proxy:config", + # # Resource monitors # diff --git a/source/extensions/filters/udp/udp_proxy/BUILD b/source/extensions/filters/udp/udp_proxy/BUILD new file mode 100644 index 000000000000..49f9377023dd --- /dev/null +++ b/source/extensions/filters/udp/udp_proxy/BUILD @@ -0,0 +1,36 @@ +licenses(["notice"]) # Apache 2 + +load( + "//bazel:envoy_build_system.bzl", + "envoy_cc_library", + "envoy_package", +) + +envoy_package() + +envoy_cc_library( + name = "udp_proxy_filter_lib", + srcs = ["udp_proxy_filter.cc"], + hdrs = ["udp_proxy_filter.h"], + deps = [ + "//include/envoy/event:file_event_interface", + "//include/envoy/event:timer_interface", + "//include/envoy/network:filter_interface", + "//include/envoy/network:listener_interface", + "//include/envoy/upstream:cluster_manager_interface", + "//source/common/network:utility_lib", + "@envoy_api//envoy/config/filter/udp/udp_proxy/v2alpha:pkg_cc_proto", + ], +) + +envoy_cc_library( + name = "config", + srcs = ["config.cc"], + hdrs = ["config.h"], + deps = [ + ":udp_proxy_filter_lib", + "//include/envoy/registry", + "//include/envoy/server:filter_config_interface", + "@envoy_api//envoy/config/filter/udp/udp_proxy/v2alpha:pkg_cc_proto", + ], +) diff --git a/source/extensions/filters/udp/udp_proxy/config.cc b/source/extensions/filters/udp/udp_proxy/config.cc new file mode 100644 index 000000000000..43324d8ced9a --- /dev/null +++ b/source/extensions/filters/udp/udp_proxy/config.cc @@ -0,0 +1,15 @@ +#include "extensions/filters/udp/udp_proxy/config.h" + +namespace Envoy { +namespace Extensions { +namespace UdpFilters { +namespace UdpProxy { + +static Registry::RegisterFactory + register_; + +} +} // namespace UdpFilters +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/filters/udp/udp_proxy/config.h b/source/extensions/filters/udp/udp_proxy/config.h new file mode 100644 index 000000000000..dbdb7ce63997 --- /dev/null +++ b/source/extensions/filters/udp/udp_proxy/config.h @@ -0,0 +1,45 @@ +#pragma once + +#include "envoy/config/filter/udp/udp_proxy/v2alpha/udp_proxy.pb.h" +#include "envoy/config/filter/udp/udp_proxy/v2alpha/udp_proxy.pb.validate.h" +#include "envoy/server/filter_config.h" + +#include "extensions/filters/udp/udp_proxy/udp_proxy_filter.h" + +namespace Envoy { +namespace Extensions { +namespace UdpFilters { +namespace UdpProxy { + +/** + * Config registration for the UDP proxy filter. @see NamedUdpListenerFilterConfigFactory. + */ +class UdpProxyFilterConfigFactory + : public Server::Configuration::NamedUdpListenerFilterConfigFactory { +public: + // NamedUdpListenerFilterConfigFactory + Network::UdpListenerFilterFactoryCb + createFilterFactoryFromProto(const Protobuf::Message& config, + Server::Configuration::ListenerFactoryContext& context) override { + auto shared_config = std::make_shared( + context.clusterManager(), context.timeSource(), + MessageUtil::downcastAndValidate< + const envoy::config::filter::udp::udp_proxy::v2alpha::UdpProxyConfig&>( + config, context.messageValidationVisitor())); + return [shared_config](Network::UdpListenerFilterManager& filter_manager, + Network::UdpReadFilterCallbacks& callbacks) -> void { + filter_manager.addReadFilter(std::make_unique(callbacks, shared_config)); + }; + } + + ProtobufTypes::MessagePtr createEmptyConfigProto() override { + return std::make_unique(); + } + + std::string name() override { return "envoy.filters.udp.udp_proxy"; } +}; + +} // namespace UdpProxy +} // namespace UdpFilters +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.cc b/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.cc new file mode 100644 index 000000000000..4f1df27b13e7 --- /dev/null +++ b/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.cc @@ -0,0 +1,85 @@ +#include "extensions/filters/udp/udp_proxy/udp_proxy_filter.h" + +#include "envoy/network/listener.h" + +namespace Envoy { +namespace Extensions { +namespace UdpFilters { +namespace UdpProxy { + +// TODO(mattklein123): Logging +// TODO(mattklein123): Stats + +void UdpProxyFilter::onData(Network::UdpRecvData& data) { + const auto active_session_it = sessions_.find(data.addresses_); + ActiveSession* active_session; + if (active_session_it == sessions_.end()) { + // TODO(mattklein123): Instead of looking up the cluster each time, keep track of it via + // cluster manager callbacks. + Upstream::ThreadLocalCluster* cluster = config_->getCluster(); + // TODO(mattklein123): Handle the case where the cluster does not exist. + ASSERT(cluster != nullptr); + + // TODO(mattklein123): Pass a context and support hash based routing. + Upstream::HostConstSharedPtr host = cluster->loadBalancer().chooseHost(nullptr); + // TODO(mattklein123): Handle the case where the host does not exist. + ASSERT(host != nullptr); + + auto new_session = std::make_unique(*this, std::move(data.addresses_), host); + active_session = new_session.get(); + sessions_.emplace(std::move(new_session)); + } else { + // TODO(mattklein123): Handle the host going away going away or failing health checks. + active_session = active_session_it->get(); + } + + active_session->write(*data.buffer_); +} + +UdpProxyFilter::ActiveSession::ActiveSession(UdpProxyFilter& parent, + Network::UdpRecvData::LocalPeerAddresses&& addresses, + const Upstream::HostConstSharedPtr& host) + : parent_(parent), addresses_(std::move(addresses)), host_(host), + io_handle_(host->address()->socket(Network::Address::SocketType::Datagram)), + socket_event_(parent.read_callbacks_->udpListener().dispatcher().createFileEvent( + io_handle_->fd(), [this](uint32_t) { onReadReady(); }, Event::FileTriggerType::Edge, + Event::FileReadyType::Read)) { + // TODO(mattklein123): Enable dropped packets socket option. In general the Socket abstraction + // does not work well right now for client sockets. It's too heavy weight and is aimed at listener + // sockets. We need to figure out how to either refactor Socket into something that works better + // for this use case or allow the socket option abstractions to work directly against an IO + // handle. +} + +void UdpProxyFilter::ActiveSession::onReadReady() { + // TODO(mattklein123): Refresh idle timer. + uint32_t packets_dropped = 0; + const Api::IoCallUint64Result result = Network::Utility::readAllDatagramsFromSocket( + *io_handle_, *addresses_.local_, *this, parent_.config_->timeSource(), packets_dropped); + // We should always get a failed result, even if no further data. + ASSERT(!result.ok()); + // TODO(mattklein123): Increment stat on failure. + ASSERT(result.err_->getErrorCode() == Api::IoError::IoErrorCode::Again); +} + +void UdpProxyFilter::ActiveSession::write(const Buffer::Instance& buffer) { + // TODO(mattklein123): Refresh idle timer. + Api::IoCallUint64Result rc = Network::Utility::writeToSocket( + *io_handle_, buffer, addresses_.local_->ip(), *host_->address()); + // TODO(mattklein123): Increment stat on failure. + ASSERT(rc.ok()); +} + +void UdpProxyFilter::ActiveSession::processPacket(Network::Address::InstanceConstSharedPtr, + Network::Address::InstanceConstSharedPtr, + Buffer::InstancePtr buffer, MonotonicTime) { + Network::UdpSendData data{addresses_.local_->ip(), *addresses_.peer_, *buffer}; + const Api::IoCallUint64Result rc = parent_.read_callbacks_->udpListener().send(data); + // TODO(mattklein123): Increment stat on failure. + ASSERT(rc.ok()); +} + +} // namespace UdpProxy +} // namespace UdpFilters +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.h b/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.h new file mode 100644 index 000000000000..fb16fdfb74bf --- /dev/null +++ b/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.h @@ -0,0 +1,117 @@ +#pragma once + +#include "envoy/config/filter/udp/udp_proxy/v2alpha/udp_proxy.pb.h" +#include "envoy/event/file_event.h" +#include "envoy/event/timer.h" +#include "envoy/network/filter.h" +#include "envoy/upstream/cluster_manager.h" + +#include "common/network/utility.h" + +#include "absl/container/flat_hash_set.h" + +namespace Envoy { +namespace Extensions { +namespace UdpFilters { +namespace UdpProxy { + +class UdpProxyFilterConfig { +public: + UdpProxyFilterConfig(Upstream::ClusterManager& cluster_manager, TimeSource& time_source, + const envoy::config::filter::udp::udp_proxy::v2alpha::UdpProxyConfig& config) + : cluster_manager_(cluster_manager), time_source_(time_source), config_(config) {} + + Upstream::ThreadLocalCluster* getCluster() const { + return cluster_manager_.get(config_.cluster()); + } + TimeSource& timeSource() const { return time_source_; } + +private: + Upstream::ClusterManager& cluster_manager_; + TimeSource& time_source_; + const envoy::config::filter::udp::udp_proxy::v2alpha::UdpProxyConfig config_; +}; + +using UdpProxyFilterConfigSharedPtr = std::shared_ptr; + +class UdpProxyFilter : public Network::UdpListenerReadFilter, Logger::Loggable { +public: + UdpProxyFilter(Network::UdpReadFilterCallbacks& callbacks, + const UdpProxyFilterConfigSharedPtr& config) + : UdpListenerReadFilter(callbacks), config_(config) {} + + // Network::UdpListenerReadFilter + void onData(Network::UdpRecvData& data) override; + +private: + class ActiveSession : public Network::UdpPacketProcessor { + public: + ActiveSession(UdpProxyFilter& parent, Network::UdpRecvData::LocalPeerAddresses&& addresses, + const Upstream::HostConstSharedPtr& host); + const Network::UdpRecvData::LocalPeerAddresses& addresses() { return addresses_; } + void write(const Buffer::Instance& buffer); + + private: + void onReadReady(); + + // Network::UdpPacketProcessor + void processPacket(Network::Address::InstanceConstSharedPtr local_address, + Network::Address::InstanceConstSharedPtr peer_address, + Buffer::InstancePtr buffer, MonotonicTime receive_time) override; + uint64_t maxPacketSize() const override { + // TODO(mattklein123): Support configurable/jumbo frames when proxying to upstream. + // Eventually we will want to support some type of PROXY header when doing L4 QUIC + // forwarding. + return Network::MAX_UDP_PACKET_SIZE; + } + + UdpProxyFilter& parent_; + const Network::UdpRecvData::LocalPeerAddresses addresses_; + const Upstream::HostConstSharedPtr host_; + const Network::IoHandlePtr io_handle_; + const Event::FileEventPtr socket_event_; + }; + + using ActiveSessionPtr = std::unique_ptr; + + struct HeterogeneousActiveSessionHash { + // Specifying is_transparent indicates to the library infrastructure that + // type-conversions should not be applied when calling find(), but instead + // pass the actual types of the contained and searched-for objects directly to + // these functors. See + // https://en.cppreference.com/w/cpp/utility/functional/less_void for an + // official reference, and https://abseil.io/tips/144 for a description of + // using it in the context of absl. + using is_transparent = void; // NOLINT(readability-identifier-naming) + + size_t operator()(const Network::UdpRecvData::LocalPeerAddresses& value) const { + return absl::Hash()(value); + } + size_t operator()(const ActiveSessionPtr& value) const { + return absl::Hash()(value->addresses()); + } + }; + + struct HeterogeneousActiveSessionEqual { + // See description for HeterogeneousActiveSessionHash::is_transparent. + using is_transparent = void; // NOLINT(readability-identifier-naming) + + bool operator()(const ActiveSessionPtr& lhs, + const Network::UdpRecvData::LocalPeerAddresses& rhs) const { + return lhs->addresses() == rhs; + } + bool operator()(const ActiveSessionPtr& lhs, const ActiveSessionPtr& rhs) const { + return lhs->addresses() == rhs->addresses(); + } + }; + + const UdpProxyFilterConfigSharedPtr config_; + absl::flat_hash_set + sessions_; +}; + +} // namespace UdpProxy +} // namespace UdpFilters +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/quic_listeners/quiche/active_quic_listener.cc b/source/extensions/quic_listeners/quiche/active_quic_listener.cc index 57be66ad6e38..10e27344fe73 100644 --- a/source/extensions/quic_listeners/quiche/active_quic_listener.cc +++ b/source/extensions/quic_listeners/quiche/active_quic_listener.cc @@ -58,9 +58,10 @@ void ActiveQuicListener::onListenerShutdown() { } void ActiveQuicListener::onData(Network::UdpRecvData& data) { - quic::QuicSocketAddress peer_address(envoyAddressInstanceToQuicSocketAddress(data.peer_address_)); + quic::QuicSocketAddress peer_address( + envoyAddressInstanceToQuicSocketAddress(data.addresses_.peer_)); quic::QuicSocketAddress self_address( - envoyAddressInstanceToQuicSocketAddress(data.local_address_)); + envoyAddressInstanceToQuicSocketAddress(data.addresses_.local_)); quic::QuicTime timestamp = quic::QuicTime::Zero() + quic::QuicTime::Delta::FromMilliseconds(std::chrono::duration_cast( diff --git a/source/extensions/quic_listeners/quiche/envoy_quic_packet_writer.cc b/source/extensions/quic_listeners/quiche/envoy_quic_packet_writer.cc index 0fca1ea83772..88816a34d059 100644 --- a/source/extensions/quic_listeners/quiche/envoy_quic_packet_writer.cc +++ b/source/extensions/quic_listeners/quiche/envoy_quic_packet_writer.cc @@ -26,7 +26,8 @@ quic::WriteResult EnvoyQuicPacketWriter::WritePacket(const char* buffer, size_t Network::Address::InstanceConstSharedPtr remote_addr = quicAddressToEnvoyAddressInstance(peer_address); Api::IoCallUint64Result result = Network::Utility::writeToSocket( - socket_, &slice, 1, local_addr == nullptr ? nullptr : local_addr->ip(), *remote_addr); + socket_.ioHandle(), &slice, 1, local_addr == nullptr ? nullptr : local_addr->ip(), + *remote_addr); if (result.ok()) { return {quic::WRITE_STATUS_OK, static_cast(result.rc_)}; } diff --git a/source/server/BUILD b/source/server/BUILD index ee4de6e782df..cec023ba3eda 100644 --- a/source/server/BUILD +++ b/source/server/BUILD @@ -377,6 +377,7 @@ envoy_cc_library( "abseil_optional", ], deps = [ + ":active_raw_udp_listener_config", ":configuration_lib", ":connection_handler_lib", ":guarddog_lib", diff --git a/source/server/configuration_impl.cc b/source/server/configuration_impl.cc index da83272b5392..d0b6315cbda2 100644 --- a/source/server/configuration_impl.cc +++ b/source/server/configuration_impl.cc @@ -45,14 +45,12 @@ bool FilterChainUtility::buildFilterChain( return true; } -bool FilterChainUtility::buildUdpFilterChain( +void FilterChainUtility::buildUdpFilterChain( Network::UdpListenerFilterManager& filter_manager, Network::UdpReadFilterCallbacks& callbacks, const std::vector& factories) { for (const Network::UdpListenerFilterFactoryCb& factory : factories) { factory(filter_manager, callbacks); } - - return true; } void MainImpl::initialize(const envoy::config::bootstrap::v2::Bootstrap& bootstrap, diff --git a/source/server/configuration_impl.h b/source/server/configuration_impl.h index 4aa4852eb0ca..032afe3ccdd9 100644 --- a/source/server/configuration_impl.h +++ b/source/server/configuration_impl.h @@ -81,7 +81,7 @@ class FilterChainUtility { * Given a UdpListenerFilterManager and a list of factories, create a new filter chain. Chain * creation will exit early if any filters immediately close the connection. */ - static bool + static void buildUdpFilterChain(Network::UdpListenerFilterManager& filter_manager, Network::UdpReadFilterCallbacks& callbacks, const std::vector& factories); diff --git a/source/server/http/admin.h b/source/server/http/admin.h index 3ace506ef6ef..11f71fde3960 100644 --- a/source/server/http/admin.h +++ b/source/server/http/admin.h @@ -90,10 +90,8 @@ class AdminImpl : public Admin, createNetworkFilterChain(Network::Connection& connection, const std::vector& filter_factories) override; bool createListenerFilterChain(Network::ListenerFilterManager&) override { return true; } - bool createUdpListenerFilterChain(Network::UdpListenerFilterManager&, - Network::UdpReadFilterCallbacks&) override { - return true; - } + void createUdpListenerFilterChain(Network::UdpListenerFilterManager&, + Network::UdpReadFilterCallbacks&) override {} // Http::FilterChainFactory void createFilterChain(Http::FilterChainFactoryCallbacks& callbacks) override; diff --git a/source/server/listener_impl.cc b/source/server/listener_impl.cc index bec30d337b54..547538ba9856 100644 --- a/source/server/listener_impl.cc +++ b/source/server/listener_impl.cc @@ -255,10 +255,10 @@ bool ListenerImpl::createListenerFilterChain(Network::ListenerFilterManager& man return Configuration::FilterChainUtility::buildFilterChain(manager, listener_filter_factories_); } -bool ListenerImpl::createUdpListenerFilterChain(Network::UdpListenerFilterManager& manager, +void ListenerImpl::createUdpListenerFilterChain(Network::UdpListenerFilterManager& manager, Network::UdpReadFilterCallbacks& callbacks) { - return Configuration::FilterChainUtility::buildUdpFilterChain(manager, callbacks, - udp_listener_filter_factories_); + Configuration::FilterChainUtility::buildUdpFilterChain(manager, callbacks, + udp_listener_filter_factories_); } bool ListenerImpl::drainClose() const { diff --git a/source/server/listener_impl.h b/source/server/listener_impl.h index 21e06155aa42..e58ee7b3d391 100644 --- a/source/server/listener_impl.h +++ b/source/server/listener_impl.h @@ -143,7 +143,7 @@ class ListenerImpl : public Network::ListenerConfig, bool createNetworkFilterChain(Network::Connection& connection, const std::vector& factories) override; bool createListenerFilterChain(Network::ListenerFilterManager& manager) override; - bool createUdpListenerFilterChain(Network::UdpListenerFilterManager& udp_listener, + void createUdpListenerFilterChain(Network::UdpListenerFilterManager& udp_listener, Network::UdpReadFilterCallbacks& callbacks) override; SystemTime last_updated_; diff --git a/source/server/server.cc b/source/server/server.cc index 5161b456aad6..e35647be965b 100644 --- a/source/server/server.cc +++ b/source/server/server.cc @@ -260,6 +260,9 @@ void InstanceImpl::initialize(const Options& options, ENVOY_LOG( info, " filters.network: {}", Registry::FactoryRegistry::allFactoryNames()); + ENVOY_LOG(info, " filters.udp: {}", + Registry::FactoryRegistry< + Configuration::NamedUdpListenerFilterConfigFactory>::allFactoryNames()); ENVOY_LOG(info, " stat_sinks: {}", Registry::FactoryRegistry::allFactoryNames()); ENVOY_LOG(info, " tracers: {}", @@ -270,6 +273,8 @@ void InstanceImpl::initialize(const Options& options, ENVOY_LOG(info, " transport_sockets.upstream: {}", Registry::FactoryRegistry< Configuration::UpstreamTransportSocketConfigFactory>::allFactoryNames()); + ENVOY_LOG(info, " udp_listeners: {}", + Registry::FactoryRegistry::allFactoryNames()); // Enable the selected buffer implementation (old libevent evbuffer version or new native // version) early in the initialization, before any buffers can be created. diff --git a/test/common/network/udp_listener_impl_test.cc b/test/common/network/udp_listener_impl_test.cc index 1cfc09bc00ff..f063b17d0b91 100644 --- a/test/common/network/udp_listener_impl_test.cc +++ b/test/common/network/udp_listener_impl_test.cc @@ -73,17 +73,17 @@ class UdpListenerImplTest : public ListenerImplTestBase { // Validates receive data, source/destination address and received time. void validateRecvCallbackParams(const UdpRecvData& data) { - ASSERT_NE(data.local_address_, nullptr); + ASSERT_NE(data.addresses_.local_, nullptr); - ASSERT_NE(data.peer_address_, nullptr); - ASSERT_NE(data.peer_address_->ip(), nullptr); + ASSERT_NE(data.addresses_.peer_, nullptr); + ASSERT_NE(data.addresses_.peer_->ip(), nullptr); - EXPECT_EQ(data.local_address_->asString(), send_to_addr_->asString()); + EXPECT_EQ(data.addresses_.local_->asString(), send_to_addr_->asString()); - EXPECT_EQ(data.peer_address_->ip()->addressAsString(), + EXPECT_EQ(data.addresses_.peer_->ip()->addressAsString(), client_socket_->localAddress()->ip()->addressAsString()); - EXPECT_EQ(*data.local_address_, *send_to_addr_); + EXPECT_EQ(*data.addresses_.local_, *send_to_addr_); EXPECT_EQ(time_system_.monotonicTime(), data.receive_time_); // Advance time so that next onData() should have different received time. time_system_.sleep(std::chrono::milliseconds(100)); @@ -196,7 +196,7 @@ TEST_P(UdpListenerImplTest, UdpEcho) { .WillOnce(Invoke([&](const UdpRecvData& data) -> void { validateRecvCallbackParams(data); - test_peer_address = data.peer_address_; + test_peer_address = data.addresses_.peer_; const std::string data_str = data.buffer_->toString(); EXPECT_EQ(data_str, first); diff --git a/test/config/utility.cc b/test/config/utility.cc index 29717682558e..17804e22de83 100644 --- a/test/config/utility.cc +++ b/test/config/utility.cc @@ -572,8 +572,7 @@ void ConfigHelper::addFilter(const std::string& config) { loadHttpConnectionManager(hcm_config); auto* filter_list_back = hcm_config.add_http_filters(); - const std::string json = Json::Factory::loadFromYamlString(config)->asJsonString(); - TestUtility::loadFromJson(json, *filter_list_back); + TestUtility::loadFromYaml(config, *filter_list_back); // Now move it to the front. for (int i = hcm_config.http_filters_size() - 1; i > 0; --i) { diff --git a/test/extensions/filters/udp/udp_proxy/BUILD b/test/extensions/filters/udp/udp_proxy/BUILD new file mode 100644 index 000000000000..e636786d9bfa --- /dev/null +++ b/test/extensions/filters/udp/udp_proxy/BUILD @@ -0,0 +1,22 @@ +licenses(["notice"]) # Apache 2 + +load( + "//bazel:envoy_build_system.bzl", + "envoy_package", +) +load( + "//test/extensions:extensions_build_system.bzl", + "envoy_extension_cc_test", +) + +envoy_package() + +envoy_extension_cc_test( + name = "udp_proxy_integration_test", + srcs = ["udp_proxy_integration_test.cc"], + extension_name = "envoy.filters.udp.udp_proxy", + deps = [ + "//source/extensions/filters/udp/udp_proxy:config", + "//test/integration:integration_lib", + ], +) diff --git a/test/extensions/filters/udp/udp_proxy/udp_proxy_integration_test.cc b/test/extensions/filters/udp/udp_proxy/udp_proxy_integration_test.cc new file mode 100644 index 000000000000..62a40bc2131a --- /dev/null +++ b/test/extensions/filters/udp/udp_proxy/udp_proxy_integration_test.cc @@ -0,0 +1,182 @@ +#include "test/integration/integration.h" + +namespace Envoy { +namespace { + +/** + * A synchronous UDP client used for testing. + */ +class UdpSyncClient { +public: + UdpSyncClient(Event::TestTimeSystem& time_system, Network::Address::IpVersion version) + : time_system_(time_system), + socket_(std::make_unique( + Network::Test::getCanonicalLoopbackAddress(version), nullptr, true)) { + // TODO(mattklein123): Right now all sockets are non-blocking. Move this non-blocking + // modification black to the abstraction layer so it will work for multiple platforms. + RELEASE_ASSERT(fcntl(socket_->ioHandle().fd(), F_SETFL, 0) != -1, ""); + } + + void write(const std::string& buffer, const Network::Address::Instance& peer) { + const auto rc = Network::Utility::writeToSocket(socket_->ioHandle(), Buffer::OwnedImpl(buffer), + nullptr, peer); + ASSERT_EQ(rc.rc_, buffer.length()); + } + + void recv(Network::UdpRecvData& datagram) { + SyncPacketProcessor processor(datagram); + const auto rc = + Network::Utility::readFromSocket(socket_->ioHandle(), *socket_->localAddress(), processor, + time_system_.monotonicTime(), nullptr); + ASSERT_TRUE(rc.ok()); + } + +private: + struct SyncPacketProcessor : public Network::UdpPacketProcessor { + SyncPacketProcessor(Network::UdpRecvData& data) : data_(data) {} + + void processPacket(Network::Address::InstanceConstSharedPtr local_address, + Network::Address::InstanceConstSharedPtr peer_address, + Buffer::InstancePtr buffer, MonotonicTime receive_time) override { + data_.addresses_.local_ = std::move(local_address); + data_.addresses_.peer_ = std::move(peer_address); + data_.buffer_ = std::move(buffer); + data_.receive_time_ = receive_time; + } + uint64_t maxPacketSize() const override { return Network::MAX_UDP_PACKET_SIZE; } + + Network::UdpRecvData& data_; + }; + + Event::TestTimeSystem& time_system_; + const Network::SocketPtr socket_; +}; + +class UdpProxyIntegrationTest : public testing::TestWithParam, + public BaseIntegrationTest { +public: + UdpProxyIntegrationTest() : BaseIntegrationTest(GetParam(), configToUse()) {} + + static std::string configToUse() { + return ConfigHelper::BASE_UDP_LISTENER_CONFIG + R"EOF( + listener_filters: + name: envoy.filters.udp.udp_proxy + typed_config: + '@type': type.googleapis.com/envoy.config.filter.udp.udp_proxy.v2alpha.UdpProxyConfig + cluster: cluster_0 + )EOF"; + } + + /** + * Initializer for an individual test. + */ + void SetUp() override { + udp_fake_upstream_ = true; + BaseIntegrationTest::initialize(); + } + + /** + * Destructor for an individual test. + */ + void TearDown() override { + test_server_.reset(); + fake_upstreams_.clear(); + } + + void requestResponseWithListenerAddress(const Network::Address::Instance& listener_address) { + // Send datagram to be proxied. + UdpSyncClient client(timeSystem(), version_); + client.write("hello", listener_address); + + // Wait for the upstream datagram. + Network::UdpRecvData request_datagram; + ASSERT_TRUE(fake_upstreams_[0]->waitForUdpDatagram(request_datagram)); + EXPECT_EQ("hello", request_datagram.buffer_->toString()); + + // Respond from the upstream. + fake_upstreams_[0]->sendUdpDatagram("world", *request_datagram.addresses_.peer_); + Network::UdpRecvData response_datagram; + client.recv(response_datagram); + EXPECT_EQ("world", response_datagram.buffer_->toString()); + EXPECT_EQ(listener_address.asString(), response_datagram.addresses_.peer_->asString()); + } +}; + +INSTANTIATE_TEST_SUITE_P(IpVersions, UdpProxyIntegrationTest, + testing::ValuesIn(TestEnvironment::getIpVersionsForTest()), + TestUtility::ipTestParamsToString); + +TEST_P(UdpProxyIntegrationTest, HelloWorldOnLoopback) { + const uint32_t port = lookupPort("listener_0"); + auto listener_address = Network::Utility::resolveUrl( + fmt::format("tcp://{}:{}", Network::Test::getLoopbackAddressUrlString(version_), port)); + requestResponseWithListenerAddress(*listener_address); +} + +TEST_P(UdpProxyIntegrationTest, HelloWorldOnNonLocalAddress) { + const uint32_t port = lookupPort("listener_0"); + Network::Address::InstanceConstSharedPtr listener_address; + if (version_ == Network::Address::IpVersion::v4) { + // Kernel regards any 127.x.x.x as local address. + listener_address.reset(new Network::Address::Ipv4Instance( +#ifndef __APPLE__ + "127.0.0.3", +#else + "127.0.0.1", +#endif + port)); + } else { + // IPv6 doesn't allow any non-local source address for sendmsg. And the only + // local address guaranteed in tests in loopback. Unfortunately, even if it's not + // specified, kernel will pick this address as source address. So this test + // only checks if IoSocketHandle::sendmsg() sets up CMSG_DATA correctly, + // i.e. cmsg_len is big enough when that code path is executed. + listener_address.reset(new Network::Address::Ipv6Instance("::1", port)); + } + + requestResponseWithListenerAddress(*listener_address); +} + +TEST_P(UdpProxyIntegrationTest, MultipleClients) { + const uint32_t port = lookupPort("listener_0"); + auto listener_address = Network::Utility::resolveUrl( + fmt::format("tcp://{}:{}", Network::Test::getLoopbackAddressUrlString(version_), port)); + + UdpSyncClient client1(timeSystem(), version_); + client1.write("client1_hello", *listener_address); + + UdpSyncClient client2(timeSystem(), version_); + client2.write("client2_hello", *listener_address); + client2.write("client2_hello_2", *listener_address); + + Network::UdpRecvData client1_request_datagram; + ASSERT_TRUE(fake_upstreams_[0]->waitForUdpDatagram(client1_request_datagram)); + EXPECT_EQ("client1_hello", client1_request_datagram.buffer_->toString()); + + Network::UdpRecvData client2_request_datagram; + ASSERT_TRUE(fake_upstreams_[0]->waitForUdpDatagram(client2_request_datagram)); + EXPECT_EQ("client2_hello", client2_request_datagram.buffer_->toString()); + ASSERT_TRUE(fake_upstreams_[0]->waitForUdpDatagram(client2_request_datagram)); + EXPECT_EQ("client2_hello_2", client2_request_datagram.buffer_->toString()); + + // We should not be getting datagrams from the same peer. + EXPECT_NE(*client1_request_datagram.addresses_.peer_, *client2_request_datagram.addresses_.peer_); + + // Send two datagrams back to client 2. + fake_upstreams_[0]->sendUdpDatagram("client2_world", *client2_request_datagram.addresses_.peer_); + fake_upstreams_[0]->sendUdpDatagram("client2_world_2", + *client2_request_datagram.addresses_.peer_); + Network::UdpRecvData response_datagram; + client2.recv(response_datagram); + EXPECT_EQ("client2_world", response_datagram.buffer_->toString()); + client2.recv(response_datagram); + EXPECT_EQ("client2_world_2", response_datagram.buffer_->toString()); + + // Send 1 datagram back to client 1. + fake_upstreams_[0]->sendUdpDatagram("client1_world", *client1_request_datagram.addresses_.peer_); + client1.recv(response_datagram); + EXPECT_EQ("client1_world", response_datagram.buffer_->toString()); +} + +} // namespace +} // namespace Envoy diff --git a/test/integration/BUILD b/test/integration/BUILD index 34500b029715..d55af3d393cd 100644 --- a/test/integration/BUILD +++ b/test/integration/BUILD @@ -560,21 +560,6 @@ envoy_cc_test( ], ) -envoy_cc_test( - name = "udp_echo_integration_test", - srcs = [ - "udp_echo_integration_test.cc", - ], - deps = [ - ":integration_lib", - "//source/common/network:listen_socket_lib", - "//source/server:active_raw_udp_listener_config", - "//test/integration/filters:udp_listener_echo_filter_lib", - "//test/server:utility_lib", - "//test/test_common:utility_lib", - ], -) - envoy_cc_test( name = "stats_integration_test", srcs = ["stats_integration_test.cc"], diff --git a/test/integration/autonomous_upstream.cc b/test/integration/autonomous_upstream.cc index bd00fd176b1e..dfecdc5f66d6 100644 --- a/test/integration/autonomous_upstream.cc +++ b/test/integration/autonomous_upstream.cc @@ -92,10 +92,8 @@ bool AutonomousUpstream::createNetworkFilterChain(Network::Connection& connectio bool AutonomousUpstream::createListenerFilterChain(Network::ListenerFilterManager&) { return true; } -bool AutonomousUpstream::createUdpListenerFilterChain(Network::UdpListenerFilterManager&, - Network::UdpReadFilterCallbacks&) { - return true; -} +void AutonomousUpstream::createUdpListenerFilterChain(Network::UdpListenerFilterManager&, + Network::UdpReadFilterCallbacks&) {} void AutonomousUpstream::setLastRequestHeaders(const Http::HeaderMap& headers) { Thread::LockGuard lock(headers_lock_); diff --git a/test/integration/autonomous_upstream.h b/test/integration/autonomous_upstream.h index 6783b6c5a8f9..15110798b8f9 100644 --- a/test/integration/autonomous_upstream.h +++ b/test/integration/autonomous_upstream.h @@ -65,7 +65,7 @@ class AutonomousUpstream : public FakeUpstream { createNetworkFilterChain(Network::Connection& connection, const std::vector& filter_factories) override; bool createListenerFilterChain(Network::ListenerFilterManager& listener) override; - bool createUdpListenerFilterChain(Network::UdpListenerFilterManager& listener, + void createUdpListenerFilterChain(Network::UdpListenerFilterManager& listener, Network::UdpReadFilterCallbacks& callbacks) override; void setLastRequestHeaders(const Http::HeaderMap& headers); diff --git a/test/integration/fake_upstream.cc b/test/integration/fake_upstream.cc index f3884981a1f2..6ef985a1bb5d 100644 --- a/test/integration/fake_upstream.cc +++ b/test/integration/fake_upstream.cc @@ -16,6 +16,7 @@ #include "common/network/address_impl.h" #include "common/network/listen_socket_impl.h" #include "common/network/raw_buffer_socket.h" +#include "common/network/socket_option_factory.h" #include "common/network/utility.h" #include "server/connection_handler_impl.h" @@ -357,7 +358,7 @@ FakeUpstream::FakeUpstream(const std::string& uds_path, FakeHttpConnection::Type static Network::SocketPtr makeTcpListenSocket(const Network::Address::InstanceConstSharedPtr& address) { - return Network::SocketPtr{new Network::TcpListenSocket(address, nullptr, true)}; + return std::make_unique(address, nullptr, true); } static Network::SocketPtr makeTcpListenSocket(uint32_t port, Network::Address::IpVersion version) { @@ -365,14 +366,25 @@ static Network::SocketPtr makeTcpListenSocket(uint32_t port, Network::Address::I Network::Utility::parseInternetAddress(Network::Test::getAnyAddressString(version), port)); } +static Network::SocketPtr +makeUdpListenSocket(const Network::Address::InstanceConstSharedPtr& address) { + auto socket = std::make_unique(address, nullptr, true); + // TODO(mattklein123): These options are set in multiple locations. We should centralize them for + // UDP listeners. + socket->addOptions(Network::SocketOptionFactory::buildIpPacketInfoOptions()); + socket->addOptions(Network::SocketOptionFactory::buildRxQueueOverFlowOptions()); + return socket; +} + FakeUpstream::FakeUpstream(const Network::Address::InstanceConstSharedPtr& address, FakeHttpConnection::Type type, Event::TestTimeSystem& time_system, - bool enable_half_close) - : FakeUpstream(Network::Test::createRawBufferSocketFactory(), makeTcpListenSocket(address), + bool enable_half_close, bool udp_fake_upstream) + : FakeUpstream(Network::Test::createRawBufferSocketFactory(), + udp_fake_upstream ? makeUdpListenSocket(address) : makeTcpListenSocket(address), type, time_system, enable_half_close) { - ENVOY_LOG(info, "starting fake server on socket {}:{}. Address version is {}", + ENVOY_LOG(info, "starting fake server on socket {}:{}. Address version is {}. UDP={}", address->ip()->addressAsString(), address->ip()->port(), - Network::Test::addressVersionAsString(address->ip()->version())); + Network::Test::addressVersionAsString(address->ip()->version()), udp_fake_upstream); } FakeUpstream::FakeUpstream(uint32_t port, FakeHttpConnection::Type type, @@ -381,7 +393,7 @@ FakeUpstream::FakeUpstream(uint32_t port, FakeHttpConnection::Type type, : FakeUpstream(Network::Test::createRawBufferSocketFactory(), makeTcpListenSocket(port, version), type, time_system, enable_half_close) { ENVOY_LOG(info, "starting fake server on port {}. Address version is {}", - this->localAddress()->ip()->port(), Network::Test::addressVersionAsString(version)); + localAddress()->ip()->port(), Network::Test::addressVersionAsString(version)); } FakeUpstream::FakeUpstream(Network::TransportSocketFactoryPtr&& transport_socket_factory, @@ -390,7 +402,7 @@ FakeUpstream::FakeUpstream(Network::TransportSocketFactoryPtr&& transport_socket : FakeUpstream(std::move(transport_socket_factory), makeTcpListenSocket(port, version), type, time_system, false) { ENVOY_LOG(info, "starting fake SSL server on port {}. Address version is {}", - this->localAddress()->ip()->port(), Network::Test::addressVersionAsString(version)); + localAddress()->ip()->port(), Network::Test::addressVersionAsString(version)); } FakeUpstream::FakeUpstream(Network::TransportSocketFactoryPtr&& transport_socket_factory, @@ -426,15 +438,15 @@ bool FakeUpstream::createNetworkFilterChain(Network::Connection& connection, auto connection_wrapper = std::make_unique(connection, allow_unexpected_disconnects_); connection_wrapper->moveIntoListBack(std::move(connection_wrapper), new_connections_); - new_connection_event_.notifyOne(); + upstream_event_.notifyOne(); return true; } bool FakeUpstream::createListenerFilterChain(Network::ListenerFilterManager&) { return true; } -bool FakeUpstream::createUdpListenerFilterChain(Network::UdpListenerFilterManager&, - Network::UdpReadFilterCallbacks&) { - return true; +void FakeUpstream::createUdpListenerFilterChain(Network::UdpListenerFilterManager& udp_listener, + Network::UdpReadFilterCallbacks& callbacks) { + udp_listener.addReadFilter(std::make_unique(*this, callbacks)); } void FakeUpstream::threadRoutine() { @@ -462,7 +474,7 @@ AssertionResult FakeUpstream::waitForHttpConnection(Event::Dispatcher& client_di if (time_system.monotonicTime() >= end_time) { return AssertionFailure() << "Timed out waiting for new connection."; } - time_system_.waitFor(lock_, new_connection_event_, 5ms); + time_system_.waitFor(lock_, upstream_event_, 5ms); if (new_connections_.empty()) { // Run the client dispatcher since we may need to process window updates, etc. client_dispatcher.run(Event::Dispatcher::RunType::NonBlock); @@ -495,7 +507,7 @@ FakeUpstream::waitForHttpConnection(Event::Dispatcher& client_dispatcher, FakeUpstream& upstream = *it; Thread::ReleasableLockGuard lock(upstream.lock_); if (upstream.new_connections_.empty()) { - time_system.waitFor(upstream.lock_, upstream.new_connection_event_, 5ms); + time_system.waitFor(upstream.lock_, upstream.upstream_event_, 5ms); } if (upstream.new_connections_.empty()) { @@ -522,7 +534,7 @@ AssertionResult FakeUpstream::waitForRawConnection(FakeRawConnectionPtr& connect Thread::LockGuard lock(lock_); if (new_connections_.empty()) { ENVOY_LOG(debug, "waiting for raw connection"); - time_system_.waitFor(lock_, new_connection_event_, + time_system_.waitFor(lock_, upstream_event_, timeout); // Safe since CondVar::waitFor won't throw. } @@ -545,6 +557,36 @@ SharedConnectionWrapper& FakeUpstream::consumeConnection() { return connection_wrapper->shared_connection(); } +testing::AssertionResult FakeUpstream::waitForUdpDatagram(Network::UdpRecvData& data_to_fill, + std::chrono::milliseconds timeout) { + Thread::LockGuard lock(lock_); + auto end_time = time_system_.monotonicTime() + timeout; + while (received_datagrams_.empty()) { + if (time_system_.monotonicTime() >= end_time) { + return AssertionFailure() << "Timed out waiting for UDP datagram."; + } + time_system_.waitFor(lock_, upstream_event_, 5ms); // Safe since CondVar::waitFor won't throw. + } + data_to_fill = std::move(received_datagrams_.front()); + received_datagrams_.pop_front(); + return AssertionSuccess(); +} + +void FakeUpstream::onRecvDatagram(Network::UdpRecvData& data) { + Thread::LockGuard lock(lock_); + received_datagrams_.emplace_back(std::move(data)); + upstream_event_.notifyOne(); +} + +void FakeUpstream::sendUdpDatagram(const std::string& buffer, + const Network::Address::Instance& peer) { + dispatcher_->post([this, buffer, &peer] { + const auto rc = Network::Utility::writeToSocket(socket_->ioHandle(), Buffer::OwnedImpl(buffer), + nullptr, peer); + EXPECT_TRUE(rc.rc_ == buffer.length()); + }); +} + AssertionResult FakeRawConnection::waitForData(uint64_t num_bytes, std::string* data, milliseconds timeout) { Thread::LockGuard lock(lock_); diff --git a/test/integration/fake_upstream.h b/test/integration/fake_upstream.h index a7ab4696804a..3e1022aaeec0 100644 --- a/test/integration/fake_upstream.h +++ b/test/integration/fake_upstream.h @@ -29,6 +29,8 @@ #include "common/network/listen_socket_impl.h" #include "common/stats/isolated_store_impl.h" +#include "server/active_raw_udp_listener_config.h" + #include "test/test_common/printers.h" #include "test/test_common/test_time_system.h" #include "test/test_common/utility.h" @@ -526,7 +528,7 @@ class FakeUpstream : Logger::Loggable, // Creates a fake upstream bound to the specified |address|. FakeUpstream(const Network::Address::InstanceConstSharedPtr& address, FakeHttpConnection::Type type, Event::TestTimeSystem& time_system, - bool enable_half_close = false); + bool enable_half_close = false, bool udp_fake_upstream = false); // Creates a fake upstream bound to INADDR_ANY and the specified |port|. FakeUpstream(uint32_t port, FakeHttpConnection::Type type, Network::Address::IpVersion version, @@ -560,6 +562,15 @@ class FakeUpstream : Logger::Loggable, FakeHttpConnectionPtr& connection, std::chrono::milliseconds timeout = TestUtility::DefaultTimeout); + // Waits for 1 UDP datagram to be received. + ABSL_MUST_USE_RESULT + testing::AssertionResult + waitForUdpDatagram(Network::UdpRecvData& data_to_fill, + std::chrono::milliseconds timeout = TestUtility::DefaultTimeout); + + // Send a UDP datagram on the fake upstream thread. + void sendUdpDatagram(const std::string& buffer, const Network::Address::Instance& peer); + // Network::FilterChainManager const Network::FilterChain* findFilterChain(const Network::ConnectionSocket&) const override { return filter_chain_.get(); @@ -570,7 +581,7 @@ class FakeUpstream : Logger::Loggable, createNetworkFilterChain(Network::Connection& connection, const std::vector& filter_factories) override; bool createListenerFilterChain(Network::ListenerFilterManager& listener) override; - bool createUdpListenerFilterChain(Network::UdpListenerFilterManager& udp_listener, + void createUdpListenerFilterChain(Network::UdpListenerFilterManager& udp_listener, Network::UdpReadFilterCallbacks& callbacks) override; void set_allow_unexpected_disconnects(bool value) { allow_unexpected_disconnects_ = value; } @@ -589,9 +600,23 @@ class FakeUpstream : Logger::Loggable, Network::SocketPtr&& connection, FakeHttpConnection::Type type, Event::TestTimeSystem& time_system, bool enable_half_close); + class FakeUpstreamUdpFilter : public Network::UdpListenerReadFilter { + public: + FakeUpstreamUdpFilter(FakeUpstream& parent, Network::UdpReadFilterCallbacks& callbacks) + : UdpListenerReadFilter(callbacks), parent_(parent) {} + + // Network::UdpListenerReadFilter + void onData(Network::UdpRecvData& data) override { parent_.onRecvDatagram(data); } + + private: + FakeUpstream& parent_; + }; + class FakeListener : public Network::ListenerConfig { public: - FakeListener(FakeUpstream& parent) : parent_(parent), name_("fake_upstream") {} + FakeListener(FakeUpstream& parent) + : parent_(parent), name_("fake_upstream"), + udp_listener_factory_(std::make_unique()) {} private: // Network::ListenerConfig @@ -609,7 +634,9 @@ class FakeUpstream : Logger::Loggable, Stats::Scope& listenerScope() override { return parent_.stats_store_; } uint64_t listenerTag() const override { return 0; } const std::string& name() const override { return name_; } - Network::ActiveUdpListenerFactory* udpListenerFactory() override { return nullptr; } + Network::ActiveUdpListenerFactory* udpListenerFactory() override { + return udp_listener_factory_.get(); + } Network::ConnectionBalancer& connectionBalancer() override { return connection_balancer_; } envoy::api::v2::core::TrafficDirection direction() const override { return envoy::api::v2::core::TrafficDirection::UNSPECIFIED; @@ -618,10 +645,12 @@ class FakeUpstream : Logger::Loggable, FakeUpstream& parent_; const std::string name_; Network::NopConnectionBalancerImpl connection_balancer_; + const Network::ActiveUdpListenerFactoryPtr udp_listener_factory_; }; void threadRoutine(); SharedConnectionWrapper& consumeConnection() ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_); + void onRecvDatagram(Network::UdpRecvData& data); Network::SocketPtr socket_; ConditionalInitializer server_initialized_; @@ -629,7 +658,7 @@ class FakeUpstream : Logger::Loggable, // main test thread. Thread::MutexBasicLockable lock_; Thread::ThreadPtr thread_; - Thread::CondVar new_connection_event_; + Thread::CondVar upstream_event_; Api::ApiPtr api_; Event::TestTimeSystem& time_system_; Event::DispatcherPtr dispatcher_; @@ -644,6 +673,7 @@ class FakeUpstream : Logger::Loggable, const bool enable_half_close_; FakeListener listener_; const Network::FilterChainSharedPtr filter_chain_; + std::list received_datagrams_ ABSL_GUARDED_BY(lock_); }; using FakeUpstreamPtr = std::unique_ptr; diff --git a/test/integration/filters/BUILD b/test/integration/filters/BUILD index b06b673294d6..21a41e73228b 100644 --- a/test/integration/filters/BUILD +++ b/test/integration/filters/BUILD @@ -246,22 +246,6 @@ envoy_cc_test_library( ], ) -envoy_cc_test_library( - name = "udp_listener_echo_filter_lib", - srcs = [ - "udp_listener_echo_filter.cc", - ], - deps = [ - "//include/envoy/buffer:buffer_interface", - "//include/envoy/network:filter_interface", - "//include/envoy/network:listener_interface", - "//include/envoy/registry", - "//include/envoy/server:filter_config_interface", - "//source/common/common:minimal_logger_lib", - "//source/common/network:connection_lib", - ], -) - envoy_cc_test_library( name = "metadata_stop_all_filter_config_lib", srcs = [ diff --git a/test/integration/filters/udp_listener_echo_filter.cc b/test/integration/filters/udp_listener_echo_filter.cc deleted file mode 100644 index d8a46c3f6117..000000000000 --- a/test/integration/filters/udp_listener_echo_filter.cc +++ /dev/null @@ -1,66 +0,0 @@ -#include - -#include "envoy/buffer/buffer.h" -#include "envoy/network/connection.h" -#include "envoy/network/filter.h" -#include "envoy/network/listener.h" -#include "envoy/registry/registry.h" -#include "envoy/server/filter_config.h" - -#include "common/common/assert.h" -#include "common/common/logger.h" - -namespace Envoy { - -class UdpListenerEchoFilter : public Network::UdpListenerReadFilter, - Logger::Loggable { -public: - UdpListenerEchoFilter(Network::UdpReadFilterCallbacks& callbacks); - - // Network::UdpListenerReadFilter - void onData(Network::UdpRecvData& data) override; -}; - -UdpListenerEchoFilter::UdpListenerEchoFilter(Network::UdpReadFilterCallbacks& callbacks) - : Network::UdpListenerReadFilter(callbacks) {} - -void UdpListenerEchoFilter::onData(Network::UdpRecvData& data) { - ENVOY_LOG(trace, "UdpEchoFilter: Got {} bytes from {} on {}", data.buffer_->length(), - data.peer_address_->asString(), data.local_address_->asString()); - - // send back the received data - Network::UdpSendData send_data{data.local_address_->ip(), *data.peer_address_, *data.buffer_}; - auto send_result = read_callbacks_->udpListener().send(send_data); - - ASSERT(send_result.ok()); -} - -/** - * Config registration for the echo filter. @see NamedNetworkFilterConfigFactory. - */ -class UdpListenerEchoFilterConfigFactory - : public Server::Configuration::NamedUdpListenerFilterConfigFactory { -public: - // NamedUdpListenerFilterConfigFactory - Network::UdpListenerFilterFactoryCb - createFilterFactoryFromProto(const Protobuf::Message&, - Server::Configuration::ListenerFactoryContext&) override { - return [](Network::UdpListenerFilterManager& filter_manager, - Network::UdpReadFilterCallbacks& callbacks) -> void { - filter_manager.addReadFilter(std::make_unique(callbacks)); - }; - } - - ProtobufTypes::MessagePtr createEmptyConfigProto() override { - return ProtobufTypes::MessagePtr{new Envoy::ProtobufWkt::Empty()}; - } - - std::string name() override { return "envoy.listener.udpecho"; } -}; - -// perform static registration -static Registry::RegisterFactory - register_; - -} // namespace Envoy diff --git a/test/integration/integration.cc b/test/integration/integration.cc index 8cd497617378..2cca82052c7d 100644 --- a/test/integration/integration.cc +++ b/test/integration/integration.cc @@ -297,8 +297,8 @@ void BaseIntegrationTest::createUpstreams() { fake_upstreams_.emplace_back( new AutonomousUpstream(endpoint, upstream_protocol_, *time_system_)); } else { - fake_upstreams_.emplace_back( - new FakeUpstream(endpoint, upstream_protocol_, *time_system_, enable_half_close_)); + fake_upstreams_.emplace_back(new FakeUpstream(endpoint, upstream_protocol_, *time_system_, + enable_half_close_, udp_fake_upstream_)); } } } diff --git a/test/integration/integration.h b/test/integration/integration.h index 3cdabb86abfb..99a8b5ba945a 100644 --- a/test/integration/integration.h +++ b/test/integration/integration.h @@ -368,6 +368,9 @@ class BaseIntegrationTest : Logger::Loggable { bool enable_half_close_{false}; + // Whether the default created fake upstreams are UDP listeners. + bool udp_fake_upstream_{false}; + // True if test will use a fixed RNG value. bool deterministic_{}; diff --git a/test/integration/udp_echo_integration_test.cc b/test/integration/udp_echo_integration_test.cc deleted file mode 100644 index 34c0f91f985a..000000000000 --- a/test/integration/udp_echo_integration_test.cc +++ /dev/null @@ -1,126 +0,0 @@ -#include "common/network/address_impl.h" -#include "common/network/listen_socket_impl.h" - -#include "test/integration/integration.h" -#include "test/integration/utility.h" - -namespace Envoy { - -std::string udp_echo_config; - -class UdpEchoIntegrationTest : public testing::TestWithParam, - public BaseIntegrationTest { -public: - UdpEchoIntegrationTest() : BaseIntegrationTest(GetParam(), udp_echo_config) {} - - // Called once by the gtest framework before any UdpEchoIntegrationTests are run. - static void SetUpTestSuite() { - udp_echo_config = ConfigHelper::BASE_UDP_LISTENER_CONFIG + R"EOF( - listener_filters: - name: envoy.listener.udpecho - )EOF"; - } - - /** - * Initializer for an individual test. - */ - void SetUp() override { BaseIntegrationTest::initialize(); } - - /** - * Destructor for an individual test. - */ - void TearDown() override { - test_server_.reset(); - fake_upstreams_.clear(); - } - - void - requestResponseWithListenerAddress(Network::Address::InstanceConstSharedPtr& listener_address) { - using NetworkSocketTraitType = - Network::NetworkSocketTrait; - - // Setup client socket. - Network::SocketPtr client_socket = - std::make_unique>( - Network::Test::getCanonicalLoopbackAddress(version_), nullptr, true); - ASSERT_NE(client_socket, nullptr); - - const std::string request("hello world"); - const void* void_pointer = static_cast(request.c_str()); - Buffer::RawSlice slice{const_cast(void_pointer), request.length()}; - auto send_rc = client_socket->ioHandle().sendto(slice, 0, *listener_address); - ASSERT_EQ(send_rc.rc_, request.length()); - - auto& os_sys_calls = Api::OsSysCallsSingleton::get(); - sockaddr_storage peer_addr; - socklen_t addr_len = sizeof(sockaddr_storage); - - const uint64_t bytes_to_read = request.length(); - auto recv_buf = std::make_unique(bytes_to_read + 1); - uint64_t bytes_read = 0; - - int retry = 0; - do { - Api::SysCallSizeResult result = - os_sys_calls.recvfrom(client_socket->ioHandle().fd(), recv_buf.get(), bytes_to_read, 0, - reinterpret_cast(&peer_addr), &addr_len); - if (result.rc_ >= 0) { - bytes_read = result.rc_; - Network::Address::InstanceConstSharedPtr peer_address = - Network::Address::addressFromSockAddr(peer_addr, addr_len, false); - // Expect to receive from the same peer address as it sent to. - EXPECT_EQ(listener_address->asString(), peer_address->asString()); - } else if (retry == 10 || result.errno_ != EAGAIN) { - break; - } - - if (bytes_read >= bytes_to_read) { - break; - } - ASSERT(bytes_read == 0); - // Retry after 10ms - timeSystem().sleep(std::chrono::milliseconds(10)); - retry++; - } while (true); - - recv_buf[bytes_to_read] = '\0'; - EXPECT_EQ(recv_buf.get(), request); - } -}; - -INSTANTIATE_TEST_SUITE_P(IpVersions, UdpEchoIntegrationTest, - testing::ValuesIn(TestEnvironment::getIpVersionsForTest()), - TestUtility::ipTestParamsToString); - -TEST_P(UdpEchoIntegrationTest, HelloWorldOnLoopback) { - uint32_t port = lookupPort("listener_0"); - auto listener_address = Network::Utility::resolveUrl( - fmt::format("tcp://{}:{}", Network::Test::getLoopbackAddressUrlString(version_), port)); - requestResponseWithListenerAddress(listener_address); -} - -TEST_P(UdpEchoIntegrationTest, HelloWorldOnNonLocalAddress) { - uint32_t port = lookupPort("listener_0"); - Network::Address::InstanceConstSharedPtr listener_address; - if (version_ == Network::Address::IpVersion::v4) { - // Kernel regards any 127.x.x.x as local address. - listener_address.reset(new Network::Address::Ipv4Instance( -#ifndef __APPLE__ - "127.0.0.3", -#else - "127.0.0.1", -#endif - port)); - } else { - // IPv6 doesn't allow any non-local source address for sendmsg. And the only - // local address guaranteed in tests in loopback. Unfortunately, even if it's not - // specified, kernel will pick this address as source address. So this test - // only checks if IoSocketHandle::sendmsg() sets up CMSG_DATA correctly, - // i.e. cmsg_len is big enough when that code path is executed. - listener_address.reset(new Network::Address::Ipv6Instance("::1", port)); - } - - requestResponseWithListenerAddress(listener_address); -} - -} // namespace Envoy diff --git a/test/mocks/network/mocks.cc b/test/mocks/network/mocks.cc index 7e3799968be6..2e24d5588512 100644 --- a/test/mocks/network/mocks.cc +++ b/test/mocks/network/mocks.cc @@ -116,7 +116,6 @@ MockFilterChainManager::~MockFilterChainManager() = default; MockFilterChainFactory::MockFilterChainFactory() { ON_CALL(*this, createListenerFilterChain(_)).WillByDefault(Return(true)); - ON_CALL(*this, createUdpListenerFilterChain(_, _)).WillByDefault(Return(true)); } MockFilterChainFactory::~MockFilterChainFactory() = default; diff --git a/test/mocks/network/mocks.h b/test/mocks/network/mocks.h index ee7b392e40be..24e115a48b73 100644 --- a/test/mocks/network/mocks.h +++ b/test/mocks/network/mocks.h @@ -206,7 +206,7 @@ class MockFilterChainFactory : public FilterChainFactory { const std::vector& filter_factories)); MOCK_METHOD1(createListenerFilterChain, bool(ListenerFilterManager& listener)); MOCK_METHOD2(createUdpListenerFilterChain, - bool(UdpListenerFilterManager& listener, UdpReadFilterCallbacks& callbacks)); + void(UdpListenerFilterManager& listener, UdpReadFilterCallbacks& callbacks)); }; class MockListenSocket : public Socket { From 58754e9eae129ce00d49b9fd2c98d7d65ad976de Mon Sep 17 00:00:00 2001 From: Matt Klein Date: Tue, 5 Nov 2019 09:58:34 -0800 Subject: [PATCH 2/5] fix Signed-off-by: Matt Klein --- source/extensions/filters/udp/udp_proxy/udp_proxy_filter.cc | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.cc b/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.cc index 4f1df27b13e7..6a9dada45516 100644 --- a/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.cc +++ b/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.cc @@ -14,6 +14,7 @@ void UdpProxyFilter::onData(Network::UdpRecvData& data) { const auto active_session_it = sessions_.find(data.addresses_); ActiveSession* active_session; if (active_session_it == sessions_.end()) { + // TODO(mattklein123): Session circuit breaker. // TODO(mattklein123): Instead of looking up the cluster each time, keep track of it via // cluster manager callbacks. Upstream::ThreadLocalCluster* cluster = config_->getCluster(); @@ -64,8 +65,8 @@ void UdpProxyFilter::ActiveSession::onReadReady() { void UdpProxyFilter::ActiveSession::write(const Buffer::Instance& buffer) { // TODO(mattklein123): Refresh idle timer. - Api::IoCallUint64Result rc = Network::Utility::writeToSocket( - *io_handle_, buffer, addresses_.local_->ip(), *host_->address()); + Api::IoCallUint64Result rc = + Network::Utility::writeToSocket(*io_handle_, buffer, nullptr, *host_->address()); // TODO(mattklein123): Increment stat on failure. ASSERT(rc.ok()); } From 8bb99ea43a3cefb328a7eb666125f092d9afb1fd Mon Sep 17 00:00:00 2001 From: Matt Klein Date: Thu, 7 Nov 2019 09:27:50 -0800 Subject: [PATCH 3/5] fix Signed-off-by: Matt Klein --- source/common/network/udp_listener_impl.cc | 3 ++- source/extensions/extensions_build_config.bzl | 6 +++--- source/extensions/filters/udp/udp_proxy/config.h | 2 +- test/extensions/filters/udp/udp_proxy/BUILD | 2 +- .../filters/udp/udp_proxy/udp_proxy_integration_test.cc | 2 +- 5 files changed, 8 insertions(+), 7 deletions(-) diff --git a/source/common/network/udp_listener_impl.cc b/source/common/network/udp_listener_impl.cc index da6c4db1f599..7217313422b4 100644 --- a/source/common/network/udp_listener_impl.cc +++ b/source/common/network/udp_listener_impl.cc @@ -70,7 +70,8 @@ void UdpListenerImpl::handleReadCallback() { ENVOY_UDP_LOG(trace, "handleReadCallback"); const Api::IoCallUint64Result result = Utility::readAllDatagramsFromSocket( socket_.ioHandle(), *socket_.localAddress(), *this, time_source_, packets_dropped_); - // We should always get a failed result, even if no further data. + // readAllDatagramsFromSocket should/will always read until we get an error, though that error may + // simply be IoErrorCode::Again. ASSERT(!result.ok()); if (result.err_->getErrorCode() != Api::IoError::IoErrorCode::Again) { ENVOY_UDP_LOG(error, "recvmsg result {}: {}", static_cast(result.err_->getErrorCode()), diff --git a/source/extensions/extensions_build_config.bzl b/source/extensions/extensions_build_config.bzl index 621f24374b39..c38ec7afa3db 100644 --- a/source/extensions/extensions_build_config.bzl +++ b/source/extensions/extensions_build_config.bzl @@ -97,7 +97,7 @@ EXTENSIONS = { # UDP filters # - "envoy.filters.udp.udp_proxy": "//source/extensions/filters/udp/udp_proxy:config", + "envoy.filters.udp_listener.udp_proxy": "//source/extensions/filters/udp/udp_proxy:config", # # Resource monitors @@ -148,8 +148,8 @@ EXTENSIONS = { "envoy.transport_sockets.tls": "//source/extensions/transport_sockets/tls:config", # Retry host predicates - "envoy.retry_host_predicates.previous_hosts": "//source/extensions/retry/host/previous_hosts:config", - "envoy.retry_host_predicates.omit_canary_hosts": "//source/extensions/retry/host/omit_canary_hosts:config", + "envoy.retry_host_predicates.previous_hosts": "//source/extensions/retry/host/previous_hosts:config", + "envoy.retry_host_predicates.omit_canary_hosts": "//source/extensions/retry/host/omit_canary_hosts:config", # Retry priorities "envoy.retry_priorities.previous_priorities": "//source/extensions/retry/priority/previous_priorities:config", diff --git a/source/extensions/filters/udp/udp_proxy/config.h b/source/extensions/filters/udp/udp_proxy/config.h index dbdb7ce63997..df69432eef07 100644 --- a/source/extensions/filters/udp/udp_proxy/config.h +++ b/source/extensions/filters/udp/udp_proxy/config.h @@ -36,7 +36,7 @@ class UdpProxyFilterConfigFactory return std::make_unique(); } - std::string name() override { return "envoy.filters.udp.udp_proxy"; } + std::string name() override { return "envoy.filters.udp_listener.udp_proxy"; } }; } // namespace UdpProxy diff --git a/test/extensions/filters/udp/udp_proxy/BUILD b/test/extensions/filters/udp/udp_proxy/BUILD index e636786d9bfa..0a2c103376b9 100644 --- a/test/extensions/filters/udp/udp_proxy/BUILD +++ b/test/extensions/filters/udp/udp_proxy/BUILD @@ -14,7 +14,7 @@ envoy_package() envoy_extension_cc_test( name = "udp_proxy_integration_test", srcs = ["udp_proxy_integration_test.cc"], - extension_name = "envoy.filters.udp.udp_proxy", + extension_name = "envoy.filters.udp_listener.udp_proxy", deps = [ "//source/extensions/filters/udp/udp_proxy:config", "//test/integration:integration_lib", diff --git a/test/extensions/filters/udp/udp_proxy/udp_proxy_integration_test.cc b/test/extensions/filters/udp/udp_proxy/udp_proxy_integration_test.cc index 62a40bc2131a..2ab583cd6b45 100644 --- a/test/extensions/filters/udp/udp_proxy/udp_proxy_integration_test.cc +++ b/test/extensions/filters/udp/udp_proxy/udp_proxy_integration_test.cc @@ -60,7 +60,7 @@ class UdpProxyIntegrationTest : public testing::TestWithParam Date: Fri, 8 Nov 2019 11:39:31 -0800 Subject: [PATCH 4/5] comments Signed-off-by: Matt Klein --- source/common/network/udp_listener_impl.cc | 16 +++--- source/common/network/utility.cc | 10 ++-- source/common/network/utility.h | 19 ++++--- .../filters/udp/udp_proxy/udp_proxy_filter.cc | 20 +++++-- .../filters/udp/udp_proxy/udp_proxy_filter.h | 11 ++++ .../udp_proxy/udp_proxy_integration_test.cc | 53 ++++++++++++++++--- 6 files changed, 100 insertions(+), 29 deletions(-) diff --git a/source/common/network/udp_listener_impl.cc b/source/common/network/udp_listener_impl.cc index 7217313422b4..81573904382b 100644 --- a/source/common/network/udp_listener_impl.cc +++ b/source/common/network/udp_listener_impl.cc @@ -68,21 +68,21 @@ void UdpListenerImpl::onSocketEvent(short flags) { void UdpListenerImpl::handleReadCallback() { ENVOY_UDP_LOG(trace, "handleReadCallback"); - const Api::IoCallUint64Result result = Utility::readAllDatagramsFromSocket( + const Api::IoErrorPtr result = Utility::readPacketsFromSocket( socket_.ioHandle(), *socket_.localAddress(), *this, time_source_, packets_dropped_); - // readAllDatagramsFromSocket should/will always read until we get an error, though that error may - // simply be IoErrorCode::Again. - ASSERT(!result.ok()); - if (result.err_->getErrorCode() != Api::IoError::IoErrorCode::Again) { - ENVOY_UDP_LOG(error, "recvmsg result {}: {}", static_cast(result.err_->getErrorCode()), - result.err_->getErrorDetails()); - cb_.onReceiveError(UdpListenerCallbacks::ErrorCode::SyscallError, result.err_->getErrorCode()); + // TODO(mattklein123): Handle no error when we limit the number of packets read. + if (result->getErrorCode() != Api::IoError::IoErrorCode::Again) { + ENVOY_UDP_LOG(error, "recvmsg result {}: {}", static_cast(result->getErrorCode()), + result->getErrorDetails()); + cb_.onReceiveError(UdpListenerCallbacks::ErrorCode::SyscallError, result->getErrorCode()); } } void UdpListenerImpl::processPacket(Address::InstanceConstSharedPtr local_address, Address::InstanceConstSharedPtr peer_address, Buffer::InstancePtr buffer, MonotonicTime receive_time) { + // UDP listeners are always configured with the socket option that allows pulling the local + // address. This should never be null. ASSERT(local_address != nullptr); UdpRecvData recvData{ {std::move(local_address), std::move(peer_address)}, std::move(buffer), receive_time}; diff --git a/source/common/network/utility.cc b/source/common/network/utility.cc index e63b61aa61b8..4392de6e9792 100644 --- a/source/common/network/utility.cc +++ b/source/common/network/utility.cc @@ -563,10 +563,10 @@ Api::IoCallUint64Result Utility::readFromSocket(IoHandle& handle, return result; } -Api::IoCallUint64Result -Utility::readAllDatagramsFromSocket(IoHandle& handle, const Address::Instance& local_address, - UdpPacketProcessor& udp_packet_processor, - TimeSource& time_source, uint32_t& packets_dropped) { +Api::IoErrorPtr Utility::readPacketsFromSocket(IoHandle& handle, + const Address::Instance& local_address, + UdpPacketProcessor& udp_packet_processor, + TimeSource& time_source, uint32_t& packets_dropped) { do { const uint32_t old_packets_dropped = packets_dropped; const MonotonicTime receive_time = time_source.monotonicTime(); @@ -575,7 +575,7 @@ Utility::readAllDatagramsFromSocket(IoHandle& handle, const Address::Instance& l if (!result.ok()) { // No more to read or encountered a system error. - return result; + return std::move(result.err_); } if (result.rc_ == 0) { diff --git a/source/common/network/utility.h b/source/common/network/utility.h index 65eab62d9f30..bb690417ce29 100644 --- a/source/common/network/utility.h +++ b/source/common/network/utility.h @@ -321,18 +321,25 @@ class Utility { uint32_t* packets_dropped); /** - * Read all available packets from a given UDP socket and pass the packet to a given + * Read available packets from a given UDP socket and pass the packet to a given * UdpPacketProcessor. * @param handle is the UDP socket to read from. * @param local_address is the socket's local address used to populate port. * @param udp_packet_processor is the callback to receive the packets. * @param time_source is the time source used to generate the time stamp of the received packets. * @param packets_dropped is the output parameter for number of packets dropped in kernel. - */ - static Api::IoCallUint64Result - readAllDatagramsFromSocket(IoHandle& handle, const Address::Instance& local_address, - UdpPacketProcessor& udp_packet_processor, TimeSource& time_source, - uint32_t& packets_dropped); + * + * TODO(mattklein123): Allow the number of packets read to be limited for fairness. Currently + * this function will always return an error, even if EAGAIN. In the future + * we can return no error if we limited the number of packets read and have + * to fake another read event. + * TODO(mattklein123): Can we potentially share this with the TCP stack somehow? Similar code + * exists there. + */ + static Api::IoErrorPtr readPacketsFromSocket(IoHandle& handle, + const Address::Instance& local_address, + UdpPacketProcessor& udp_packet_processor, + TimeSource& time_source, uint32_t& packets_dropped); private: static void throwWithMalformedIp(const std::string& ip_address); diff --git a/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.cc b/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.cc index 6a9dada45516..315277b7253d 100644 --- a/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.cc +++ b/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.cc @@ -41,10 +41,15 @@ UdpProxyFilter::ActiveSession::ActiveSession(UdpProxyFilter& parent, Network::UdpRecvData::LocalPeerAddresses&& addresses, const Upstream::HostConstSharedPtr& host) : parent_(parent), addresses_(std::move(addresses)), host_(host), + // NOTE: The socket call can only fail due to memory/fd exhaustion. No local ephemeral port + // is bound until the first packet is sent to the upstream host. io_handle_(host->address()->socket(Network::Address::SocketType::Datagram)), socket_event_(parent.read_callbacks_->udpListener().dispatcher().createFileEvent( io_handle_->fd(), [this](uint32_t) { onReadReady(); }, Event::FileTriggerType::Edge, Event::FileReadyType::Read)) { + ENVOY_LOG(debug, "creating new session: downstream={} local={}", addresses_.peer_->asStringView(), + addresses_.local_->asStringView()); + // TODO(mattklein123): Enable dropped packets socket option. In general the Socket abstraction // does not work well right now for client sockets. It's too heavy weight and is aimed at listener // sockets. We need to figure out how to either refactor Socket into something that works better @@ -55,16 +60,23 @@ UdpProxyFilter::ActiveSession::ActiveSession(UdpProxyFilter& parent, void UdpProxyFilter::ActiveSession::onReadReady() { // TODO(mattklein123): Refresh idle timer. uint32_t packets_dropped = 0; - const Api::IoCallUint64Result result = Network::Utility::readAllDatagramsFromSocket( + const Api::IoErrorPtr result = Network::Utility::readPacketsFromSocket( *io_handle_, *addresses_.local_, *this, parent_.config_->timeSource(), packets_dropped); - // We should always get a failed result, even if no further data. - ASSERT(!result.ok()); + // TODO(mattklein123): Handle no error when we limit the number of packets read. // TODO(mattklein123): Increment stat on failure. - ASSERT(result.err_->getErrorCode() == Api::IoError::IoErrorCode::Again); + ASSERT(result->getErrorCode() == Api::IoError::IoErrorCode::Again); } void UdpProxyFilter::ActiveSession::write(const Buffer::Instance& buffer) { + ENVOY_LOG(trace, "writing {} byte datagram: downstream={} local={} upstream={}", buffer.length(), + addresses_.peer_->asStringView(), addresses_.local_->asStringView(), + host_->address()->asStringView()); + // TODO(mattklein123): Refresh idle timer. + // NOTE: On the first write, a local ephemeral port is bound, and thus this write can fail due to + // port exhaustion. + // NOTE: We do not specify the local IP to use for the sendmsg call. We allow the OS to select + // the right IP based on outbound routing rules. Api::IoCallUint64Result rc = Network::Utility::writeToSocket(*io_handle_, buffer, nullptr, *host_->address()); // TODO(mattklein123): Increment stat on failure. diff --git a/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.h b/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.h index fb16fdfb74bf..9ee9188bb460 100644 --- a/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.h +++ b/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.h @@ -44,6 +44,14 @@ class UdpProxyFilter : public Network::UdpListenerReadFilter, Logger::Loggable 1) { + setDeterministic(); + setUpstreamCount(upstream_count); + config_helper_.addConfigModifier([upstream_count]( + envoy::config::bootstrap::v2::Bootstrap& bootstrap) { + for (uint32_t i = 1; i < upstream_count; i++) { + auto* new_host = bootstrap.mutable_static_resources()->mutable_clusters(0)->add_hosts(); + new_host->MergeFrom(bootstrap.static_resources().clusters(0).hosts(0)); + } + }); + } BaseIntegrationTest::initialize(); } @@ -106,14 +114,19 @@ INSTANTIATE_TEST_SUITE_P(IpVersions, UdpProxyIntegrationTest, testing::ValuesIn(TestEnvironment::getIpVersionsForTest()), TestUtility::ipTestParamsToString); +// Basic loopback test. TEST_P(UdpProxyIntegrationTest, HelloWorldOnLoopback) { + setup(1); const uint32_t port = lookupPort("listener_0"); - auto listener_address = Network::Utility::resolveUrl( + const auto listener_address = Network::Utility::resolveUrl( fmt::format("tcp://{}:{}", Network::Test::getLoopbackAddressUrlString(version_), port)); requestResponseWithListenerAddress(*listener_address); } +// Verifies calling sendmsg with a non-local address. Note that this test is only fully complete for +// IPv4. See the comment below for more details. TEST_P(UdpProxyIntegrationTest, HelloWorldOnNonLocalAddress) { + setup(1); const uint32_t port = lookupPort("listener_0"); Network::Address::InstanceConstSharedPtr listener_address; if (version_ == Network::Address::IpVersion::v4) { @@ -137,9 +150,11 @@ TEST_P(UdpProxyIntegrationTest, HelloWorldOnNonLocalAddress) { requestResponseWithListenerAddress(*listener_address); } +// Make sure multiple clients are routed correctly to a single upstream host. TEST_P(UdpProxyIntegrationTest, MultipleClients) { + setup(1); const uint32_t port = lookupPort("listener_0"); - auto listener_address = Network::Utility::resolveUrl( + const auto listener_address = Network::Utility::resolveUrl( fmt::format("tcp://{}:{}", Network::Test::getLoopbackAddressUrlString(version_), port)); UdpSyncClient client1(timeSystem(), version_); @@ -178,5 +193,31 @@ TEST_P(UdpProxyIntegrationTest, MultipleClients) { EXPECT_EQ("client1_world", response_datagram.buffer_->toString()); } +// Make sure sessions correctly forward to the same upstream host when there are multiple upstream +// hosts. +TEST_P(UdpProxyIntegrationTest, MultipleUpstreams) { + setup(2); + const uint32_t port = lookupPort("listener_0"); + const auto listener_address = Network::Utility::resolveUrl( + fmt::format("tcp://{}:{}", Network::Test::getLoopbackAddressUrlString(version_), port)); + + UdpSyncClient client(timeSystem(), version_); + client.write("hello1", *listener_address); + client.write("hello2", *listener_address); + Network::UdpRecvData request_datagram; + ASSERT_TRUE(fake_upstreams_[0]->waitForUdpDatagram(request_datagram)); + EXPECT_EQ("hello1", request_datagram.buffer_->toString()); + ASSERT_TRUE(fake_upstreams_[0]->waitForUdpDatagram(request_datagram)); + EXPECT_EQ("hello2", request_datagram.buffer_->toString()); + + fake_upstreams_[0]->sendUdpDatagram("world1", *request_datagram.addresses_.peer_); + fake_upstreams_[0]->sendUdpDatagram("world2", *request_datagram.addresses_.peer_); + Network::UdpRecvData response_datagram; + client.recv(response_datagram); + EXPECT_EQ("world1", response_datagram.buffer_->toString()); + client.recv(response_datagram); + EXPECT_EQ("world2", response_datagram.buffer_->toString()); +} + } // namespace } // namespace Envoy From bb20fbd96bc2db14aa92b1ea66664629fbc51457 Mon Sep 17 00:00:00 2001 From: Matt Klein Date: Mon, 11 Nov 2019 14:10:04 -0800 Subject: [PATCH 5/5] fix Signed-off-by: Matt Klein --- source/extensions/filters/udp/udp_proxy/BUILD | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/source/extensions/filters/udp/udp_proxy/BUILD b/source/extensions/filters/udp/udp_proxy/BUILD index 49f9377023dd..ca7466ea964b 100644 --- a/source/extensions/filters/udp/udp_proxy/BUILD +++ b/source/extensions/filters/udp/udp_proxy/BUILD @@ -2,6 +2,7 @@ licenses(["notice"]) # Apache 2 load( "//bazel:envoy_build_system.bzl", + "envoy_cc_extension", "envoy_cc_library", "envoy_package", ) @@ -23,10 +24,12 @@ envoy_cc_library( ], ) -envoy_cc_library( +envoy_cc_extension( name = "config", srcs = ["config.cc"], hdrs = ["config.h"], + security_posture = "robust_to_untrusted_downstream", + status = "wip", deps = [ ":udp_proxy_filter_lib", "//include/envoy/registry",