diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index ca4ed33629df..d21b2f5121d3 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -32,6 +32,7 @@ maximize the chances of your PR being merged. deprecation window. We make no guarantees about code or deployments that rely on undocumented behavior. * All deprecations/breaking changes will be clearly listed in the release notes. +* See [DEPRECATED.md](DEPRECATED.md) # Release cadence diff --git a/DEPRECATED.md b/DEPRECATED.md new file mode 100644 index 000000000000..41cf5d3e076d --- /dev/null +++ b/DEPRECATED.md @@ -0,0 +1,10 @@ +# DEPRECATED + +As of release 1.3.0, Envoy will follow a +[Breaking Change Policy](https://github.com/lyft/envoy/blob/master//CONTRIBUTING.md#breaking-change-policy). + +The following features have been DEPRECATED and will be removed in the specified release cycle. + +* Version 1.4.0 + * Config option `statsd_local_udp_port` has been deprecated and has been replaced with + `statsd_udp_ip_address`. diff --git a/docs/configuration/overview/overview.rst b/docs/configuration/overview/overview.rst index 011af57ea863..ee7add401a32 100644 --- a/docs/configuration/overview/overview.rst +++ b/docs/configuration/overview/overview.rst @@ -16,6 +16,7 @@ specify miscellaneous configuration. "cluster_manager": "{...}", "flags_path": "...", "statsd_local_udp_port": "...", + "statsd_udp_ip_address": "...", "statsd_tcp_cluster_name": "...", "stats_flush_interval_ms": "...", "watchdog_miss_timeout_ms": "...", @@ -45,10 +46,16 @@ flags_path *(optional, string)* The file system path to search for :ref:`startup flag files `. -statsd_local_udp_port +statsd_local_udp_port (Warning: DEPRECATED and will be removed in 1.4.0) *(optional, integer)* The UDP port of a locally running statsd compliant listener. If specified, :ref:`statistics ` will be flushed to this port. +statsd_udp_ip_address + *(optional, string)* The UDP address of a running statsd compliant listener. If specified, + :ref:`statistics ` will be flushed to this address. IPv4 addresses should + have format host:port (ex: 127.0.0.1:855). IPv6 addresses should have URL format [host]:port + (ex: [::1]:855). + statsd_tcp_cluster_name *(optional, string)* The name of a cluster manager cluster that is running a TCP statsd compliant listener. If specified, Envoy will connect to this cluster to flush :ref:`statistics diff --git a/include/envoy/server/configuration.h b/include/envoy/server/configuration.h index 4f1dd8bcabcb..09ec8d85e55d 100644 --- a/include/envoy/server/configuration.h +++ b/include/envoy/server/configuration.h @@ -130,11 +130,17 @@ class Main { */ virtual Optional statsdTcpClusterName() PURE; + // TODO(hennna): DEPRECATED - will be removed in 1.4.0. /** * @return Optional the optional local UDP statsd port to write to. */ virtual Optional statsdUdpPort() PURE; + /** + * @return Optional the optional UDP statsd address to write to. + */ + virtual Optional statsdUdpIpAddress() PURE; + /** * @return std::chrono::milliseconds the time interval between flushing to configured stat sinks. * The server latches counters. diff --git a/source/common/json/config_schemas.cc b/source/common/json/config_schemas.cc index c427fd80611b..dc90186bbc0f 100644 --- a/source/common/json/config_schemas.cc +++ b/source/common/json/config_schemas.cc @@ -1014,6 +1014,7 @@ const std::string Json::Schema::TOP_LEVEL_CONFIG_SCHEMA(R"EOF( "cluster_manager" : {"type" : "object"}, "flags_path" : {"type" : "string"}, "statsd_local_udp_port" : {"type" : "integer"}, + "statsd_udp_ip_address" : {"type" : "string"}, "statsd_tcp_cluster_name" : {"type" : "string"}, "stats_flush_interval_ms" : {"type" : "integer"}, "tracing" : { diff --git a/source/common/stats/statsd.cc b/source/common/stats/statsd.cc index 97556ceac825..05ba4ac7ffba 100644 --- a/source/common/stats/statsd.cc +++ b/source/common/stats/statsd.cc @@ -19,8 +19,7 @@ namespace Envoy { namespace Stats { namespace Statsd { -Writer::Writer(uint32_t port) { - Network::Address::InstanceConstSharedPtr address(new Network::Address::Ipv4Instance(port)); +Writer::Writer(Network::Address::InstanceConstSharedPtr address) { fd_ = address->socket(Network::Address::SocketType::Datagram); ASSERT(fd_ != -1); @@ -29,7 +28,14 @@ Writer::Writer(uint32_t port) { UNREFERENCED_PARAMETER(rc); } -Writer::~Writer() { close(fd_); } +Writer::~Writer() { ASSERT(shutdown_); } + +void Writer::shutdown() { + shutdown_ = true; + if (fd_ != -1) { + RELEASE_ASSERT(close(fd_) == 0); + } +} void Writer::writeCounter(const std::string& name, uint64_t increment) { std::string message(fmt::format("envoy.{}:{}|c", name, increment)); @@ -47,19 +53,30 @@ void Writer::writeTimer(const std::string& name, const std::chrono::milliseconds } void Writer::send(const std::string& message) { + if (shutdown_) { + return; + } ::send(fd_, message.c_str(), message.size(), MSG_DONTWAIT); } +UdpStatsdSink::UdpStatsdSink(ThreadLocal::Instance& tls, + Network::Address::InstanceConstSharedPtr address) + : tls_(tls), tls_slot_(tls.allocateSlot()), server_address_(address) { + tls.set(tls_slot_, [this](Event::Dispatcher&) -> ThreadLocal::ThreadLocalObjectSharedPtr { + return std::make_shared(this->server_address_); + }); +} + void UdpStatsdSink::flushCounter(const std::string& name, uint64_t delta) { - writer().writeCounter(name, delta); + tls_.getTyped(tls_slot_).writeCounter(name, delta); } void UdpStatsdSink::flushGauge(const std::string& name, uint64_t value) { - writer().writeGauge(name, value); + tls_.getTyped(tls_slot_).writeGauge(name, value); } void UdpStatsdSink::onTimespanComplete(const std::string& name, std::chrono::milliseconds ms) { - writer().writeTimer(name, ms); + tls_.getTyped(tls_slot_).writeTimer(name, ms); } TcpStatsdSink::TcpStatsdSink(const LocalInfo::LocalInfo& local_info, diff --git a/source/common/stats/statsd.h b/source/common/stats/statsd.h index 41baaf66425a..ccba6e899718 100644 --- a/source/common/stats/statsd.h +++ b/source/common/stats/statsd.h @@ -17,27 +17,31 @@ namespace Statsd { /** * This is a simple UDP localhost writer for statsd messages. */ -class Writer { +class Writer : public ThreadLocal::ThreadLocalObject { public: - Writer(uint32_t port); + Writer(Network::Address::InstanceConstSharedPtr address); ~Writer(); void writeCounter(const std::string& name, uint64_t increment); void writeGauge(const std::string& name, uint64_t value); void writeTimer(const std::string& name, const std::chrono::milliseconds& ms); + void shutdown() override; + // Called in unit test to validate address. + int getFdForTests() const { return fd_; }; private: void send(const std::string& message); int fd_; + bool shutdown_{}; }; /** - * Implementation of Sink that writes to a local UDP statsd port. + * Implementation of Sink that writes to a UDP statsd address. */ class UdpStatsdSink : public Sink { public: - UdpStatsdSink(uint32_t port) : port_(port) {} + UdpStatsdSink(ThreadLocal::Instance& tls, Network::Address::InstanceConstSharedPtr address); // Stats::Sink void flushCounter(const std::string& name, uint64_t delta) override; @@ -47,14 +51,13 @@ class UdpStatsdSink : public Sink { onTimespanComplete(name, std::chrono::milliseconds(value)); } void onTimespanComplete(const std::string& name, std::chrono::milliseconds ms) override; + // Called in unit test to validate writer construction and address. + int getFdForTests() { return tls_.getTyped(tls_slot_).getFdForTests(); } private: - Writer& writer() { - static thread_local Statsd::Writer writer_(port_); - return writer_; - } - - uint32_t port_; + ThreadLocal::Instance& tls_; + const uint32_t tls_slot_; + Network::Address::InstanceConstSharedPtr server_address_; }; /** diff --git a/source/server/BUILD b/source/server/BUILD index ee2d3b634349..0b34696f39f1 100644 --- a/source/server/BUILD +++ b/source/server/BUILD @@ -138,6 +138,7 @@ envoy_cc_library( "//source/common/common:version_lib", "//source/common/json:config_schemas_lib", "//source/common/memory:stats_lib", + "//source/common/network:address_lib", "//source/common/network:utility_lib", "//source/common/runtime:runtime_lib", "//source/common/ssl:context_lib", diff --git a/source/server/configuration_impl.cc b/source/server/configuration_impl.cc index ecce1aa5424f..dc80d1633fea 100644 --- a/source/server/configuration_impl.cc +++ b/source/server/configuration_impl.cc @@ -56,10 +56,20 @@ void MainImpl::initialize(const Json::Object& json) { Server::Configuration::ListenerPtr{new ListenerConfig(*this, *listeners[i])}); } + if (json.hasObject("statsd_local_udp_port") && json.hasObject("statsd_udp_ip_address")) { + throw EnvoyException("statsd_local_udp_port and statsd_udp_ip_address " + "are mutually exclusive."); + } + + // TODO(hennna): DEPRECATED - statsd_local_udp_port will be removed in 1.4.0. if (json.hasObject("statsd_local_udp_port")) { statsd_udp_port_.value(json.getInteger("statsd_local_udp_port")); } + if (json.hasObject("statsd_udp_ip_address")) { + statsd_udp_ip_address_.value(json.getString("statsd_udp_ip_address")); + } + if (json.hasObject("statsd_tcp_cluster_name")) { statsd_tcp_cluster_name_.value(json.getString("statsd_tcp_cluster_name")); } diff --git a/source/server/configuration_impl.h b/source/server/configuration_impl.h index 9ae0bf38444c..bc531655c1d1 100644 --- a/source/server/configuration_impl.h +++ b/source/server/configuration_impl.h @@ -177,7 +177,9 @@ class MainImpl : Logger::Loggable, public Main { const std::list& listeners() override; RateLimit::ClientFactory& rateLimitClientFactory() override { return *ratelimit_client_factory_; } Optional statsdTcpClusterName() override { return statsd_tcp_cluster_name_; } + // TODO(hennna): DEPRECATED - statsdUdpPort() will be removed in 1.4.0 Optional statsdUdpPort() override { return statsd_udp_port_; } + Optional statsdUdpIpAddress() override { return statsd_udp_ip_address_; } std::chrono::milliseconds statsFlushInterval() override { return stats_flush_interval_; } std::chrono::milliseconds wdMissTimeout() const override { return watchdog_miss_timeout_; } std::chrono::milliseconds wdMegaMissTimeout() const override { @@ -263,6 +265,7 @@ class MainImpl : Logger::Loggable, public Main { std::list listeners_; Optional statsd_tcp_cluster_name_; Optional statsd_udp_port_; + Optional statsd_udp_ip_address_; RateLimit::ClientFactoryPtr ratelimit_client_factory_; std::chrono::milliseconds stats_flush_interval_; std::chrono::milliseconds watchdog_miss_timeout_; diff --git a/source/server/server.cc b/source/server/server.cc index c6ec2352b060..46499372ff9e 100644 --- a/source/server/server.cc +++ b/source/server/server.cc @@ -19,6 +19,7 @@ #include "common/common/version.h" #include "common/json/config_schemas.h" #include "common/memory/stats.h" +#include "common/network/address_impl.h" #include "common/network/utility.h" #include "common/runtime/runtime_impl.h" #include "common/stats/statsd.h" @@ -317,9 +318,20 @@ Runtime::LoaderPtr InstanceUtil::createRuntime(Instance& server, } void InstanceImpl::initializeStatSinks() { - if (config_->statsdUdpPort().valid()) { + if (config_->statsdUdpIpAddress().valid()) { + log().info("statsd UDP ip address: {}", config_->statsdUdpIpAddress().value()); + stat_sinks_.emplace_back(new Stats::Statsd::UdpStatsdSink( + thread_local_, + Network::Utility::parseInternetAddressAndPort(config_->statsdUdpIpAddress().value()))); + stats_store_.addSink(*stat_sinks_.back()); + } else if (config_->statsdUdpPort().valid()) { + // TODO(hennna): DEPRECATED - statsdUdpPort will be removed in 1.4.0. + log().warn("statsd_local_udp_port has been DEPRECATED and will be removed in 1.4.0. " + "Consider setting statsd_udp_ip_address instead."); log().info("statsd UDP port: {}", config_->statsdUdpPort().value()); - stat_sinks_.emplace_back(new Stats::Statsd::UdpStatsdSink(config_->statsdUdpPort().value())); + Network::Address::InstanceConstSharedPtr address( + new Network::Address::Ipv4Instance(config_->statsdUdpPort().value())); + stat_sinks_.emplace_back(new Stats::Statsd::UdpStatsdSink(thread_local_, address)); stats_store_.addSink(*stat_sinks_.back()); } diff --git a/test/common/stats/BUILD b/test/common/stats/BUILD index 863a4477338e..67abbbdcdaff 100644 --- a/test/common/stats/BUILD +++ b/test/common/stats/BUILD @@ -31,6 +31,19 @@ envoy_cc_test( ], ) +envoy_cc_test( + name = "udp_statsd_test", + srcs = ["udp_statsd_test.cc"], + deps = [ + "//source/common/network:address_lib", + "//source/common/network:utility_lib", + "//source/common/stats:statsd_lib", + "//test/mocks/thread_local:thread_local_mocks", + "//test/test_common:environment_lib", + "//test/test_common:network_utility_lib", + ], +) + envoy_cc_test( name = "thread_local_store_test", srcs = ["thread_local_store_test.cc"], diff --git a/test/common/stats/udp_statsd_test.cc b/test/common/stats/udp_statsd_test.cc new file mode 100644 index 000000000000..7b6f574735dd --- /dev/null +++ b/test/common/stats/udp_statsd_test.cc @@ -0,0 +1,51 @@ +#include + +#include "common/network/address_impl.h" +#include "common/network/utility.h" +#include "common/stats/statsd.h" + +#include "test/mocks/thread_local/mocks.h" +#include "test/test_common/environment.h" +#include "test/test_common/network_utility.h" + +#include "gmock/gmock.h" +#include "gtest/gtest.h" +#include "spdlog/spdlog.h" + +namespace Envoy { +using testing::NiceMock; + +namespace Stats { +namespace Statsd { + +class UdpStatsdSinkTest : public testing::TestWithParam {}; +INSTANTIATE_TEST_CASE_P(IpVersions, UdpStatsdSinkTest, + testing::ValuesIn(TestEnvironment::getIpVersionsForTest())); + +TEST_P(UdpStatsdSinkTest, InitWithIpAddress) { + NiceMock tls_; + // UDP statsd server address. + Network::Address::InstanceConstSharedPtr server_address = + Network::Utility::parseInternetAddressAndPort( + fmt::format("{}:8125", Network::Test::getLoopbackAddressUrlString(GetParam()))); + UdpStatsdSink sink(tls_, server_address); + int fd = sink.getFdForTests(); + EXPECT_NE(fd, -1); + + // Check that fd has not changed. + sink.flushCounter("test_counter", 1); + sink.flushGauge("test_gauge", 1); + sink.onTimespanComplete("test_counter", std::chrono::milliseconds(5)); + EXPECT_EQ(fd, sink.getFdForTests()); + + if (GetParam() == Network::Address::IpVersion::v4) { + EXPECT_EQ("127.0.0.1:8125", Network::Address::peerAddressFromFd(fd)->asString()); + } else { + EXPECT_EQ("[::1]:8125", Network::Address::peerAddressFromFd(fd)->asString()); + } + tls_.shutdownThread(); +} + +} // Statsd +} // Stats +} // Envoy diff --git a/test/config/integration/server.json b/test/config/integration/server.json index d9b9b332e4e4..dbb5447d284b 100644 --- a/test/config/integration/server.json +++ b/test/config/integration/server.json @@ -232,7 +232,7 @@ "admin": { "access_log_path": "/dev/null", "profile_path": "{{ test_tmpdir }}/envoy.prof", "address": "tcp://127.0.0.1:0" }, "flags_path": "/invalid_flags", - "statsd_local_udp_port": 8125, + "statsd_udp_ip_address": "127.0.0.1:8125", "statsd_tcp_cluster_name": "statsd", "tracing": { "http": { diff --git a/test/mocks/server/mocks.h b/test/mocks/server/mocks.h index 7995ae5e9522..89c31ab3691b 100644 --- a/test/mocks/server/mocks.h +++ b/test/mocks/server/mocks.h @@ -167,6 +167,7 @@ class MockMain : public Main { MOCK_METHOD0(rateLimitClientFactory, RateLimit::ClientFactory&()); MOCK_METHOD0(statsdTcpClusterName, Optional()); MOCK_METHOD0(statsdUdpPort, Optional()); + MOCK_METHOD0(statsdUdpIpAddress, Optional()); MOCK_METHOD0(statsFlushInterval, std::chrono::milliseconds()); MOCK_CONST_METHOD0(wdMissTimeout, std::chrono::milliseconds()); MOCK_CONST_METHOD0(wdMegaMissTimeout, std::chrono::milliseconds());