Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

config/stats: add udp statds address as config option #1019

Merged
merged 8 commits into from
May 26, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ maximize the chances of your PR being merged.
deprecation window. We make no guarantees about code or deployments that rely on undocumented
behavior.
* All deprecations/breaking changes will be clearly listed in the release notes.
* See [DEPRECATED.md](DEPRECATED.md)

# Release cadence

Expand Down
10 changes: 10 additions & 0 deletions DEPRECATED.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# DEPRECATED

As of release 1.3.0, Envoy will follow a
[Breaking Change Policy](https://github.com/lyft/envoy/blob/master//CONTRIBUTING.md#breaking-change-policy).

The following features have been DEPRECATED and will be removed in the specified release cycle.

* Version 1.4.0
* Config option `statsd_local_udp_port` has been deprecated and has been replaced with
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @hennna I'll update this with 3a466c3
once you merge it

`statsd_udp_ip_address`.
9 changes: 8 additions & 1 deletion docs/configuration/overview/overview.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ specify miscellaneous configuration.
"cluster_manager": "{...}",
"flags_path": "...",
"statsd_local_udp_port": "...",
"statsd_udp_ip_address": "...",
"statsd_tcp_cluster_name": "...",
"stats_flush_interval_ms": "...",
"watchdog_miss_timeout_ms": "...",
Expand Down Expand Up @@ -45,10 +46,16 @@ flags_path
*(optional, string)* The file system path to search for :ref:`startup flag files
<operations_file_system_flags>`.

statsd_local_udp_port
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we keep this until it's removed from the code? (and add deprecation warning)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Will add it back.

statsd_local_udp_port (Warning: DEPRECATED and will be removed in 1.4.0)
*(optional, integer)* The UDP port of a locally running statsd compliant listener. If specified,
:ref:`statistics <arch_overview_statistics>` will be flushed to this port.

statsd_udp_ip_address
*(optional, string)* The UDP address of a running statsd compliant listener. If specified,
:ref:`statistics <arch_overview_statistics>` will be flushed to this address. IPv4 addresses should
have format host:port (ex: 127.0.0.1:855). IPv6 addresses should have URL format [host]:port
(ex: [::1]:855).

statsd_tcp_cluster_name
*(optional, string)* The name of a cluster manager cluster that is running a TCP statsd compliant
listener. If specified, Envoy will connect to this cluster to flush :ref:`statistics
Expand Down
6 changes: 6 additions & 0 deletions include/envoy/server/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,17 @@ class Main {
*/
virtual Optional<std::string> statsdTcpClusterName() PURE;

// TODO(hennna): DEPRECATED - will be removed in 1.4.0.
/**
* @return Optional<uint32_t> the optional local UDP statsd port to write to.
*/
virtual Optional<uint32_t> statsdUdpPort() PURE;

/**
* @return Optional<std::string> the optional UDP statsd address to write to.
*/
virtual Optional<std::string> statsdUdpIpAddress() PURE;

/**
* @return std::chrono::milliseconds the time interval between flushing to configured stat sinks.
* The server latches counters.
Expand Down
1 change: 1 addition & 0 deletions source/common/json/config_schemas.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1014,6 +1014,7 @@ const std::string Json::Schema::TOP_LEVEL_CONFIG_SCHEMA(R"EOF(
"cluster_manager" : {"type" : "object"},
"flags_path" : {"type" : "string"},
"statsd_local_udp_port" : {"type" : "integer"},
"statsd_udp_ip_address" : {"type" : "string"},
"statsd_tcp_cluster_name" : {"type" : "string"},
"stats_flush_interval_ms" : {"type" : "integer"},
"tracing" : {
Expand Down
29 changes: 23 additions & 6 deletions source/common/stats/statsd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ namespace Envoy {
namespace Stats {
namespace Statsd {

Writer::Writer(uint32_t port) {
Network::Address::InstanceConstSharedPtr address(new Network::Address::Ipv4Instance(port));
Writer::Writer(Network::Address::InstanceConstSharedPtr address) {
fd_ = address->socket(Network::Address::SocketType::Datagram);
ASSERT(fd_ != -1);

Expand All @@ -29,7 +28,14 @@ Writer::Writer(uint32_t port) {
UNREFERENCED_PARAMETER(rc);
}

Writer::~Writer() { close(fd_); }
Writer::~Writer() { ASSERT(shutdown_); }

void Writer::shutdown() {
shutdown_ = true;
if (fd_ != -1) {
RELEASE_ASSERT(close(fd_) == 0);
}
}

void Writer::writeCounter(const std::string& name, uint64_t increment) {
std::string message(fmt::format("envoy.{}:{}|c", name, increment));
Expand All @@ -47,19 +53,30 @@ void Writer::writeTimer(const std::string& name, const std::chrono::milliseconds
}

void Writer::send(const std::string& message) {
if (shutdown_) {
return;
}
::send(fd_, message.c_str(), message.size(), MSG_DONTWAIT);
}

UdpStatsdSink::UdpStatsdSink(ThreadLocal::Instance& tls,
Network::Address::InstanceConstSharedPtr address)
: tls_(tls), tls_slot_(tls.allocateSlot()), server_address_(address) {
tls.set(tls_slot_, [this](Event::Dispatcher&) -> ThreadLocal::ThreadLocalObjectSharedPtr {
return std::make_shared<Writer>(this->server_address_);
});
}

void UdpStatsdSink::flushCounter(const std::string& name, uint64_t delta) {
writer().writeCounter(name, delta);
tls_.getTyped<Writer>(tls_slot_).writeCounter(name, delta);
}

void UdpStatsdSink::flushGauge(const std::string& name, uint64_t value) {
writer().writeGauge(name, value);
tls_.getTyped<Writer>(tls_slot_).writeGauge(name, value);
}

void UdpStatsdSink::onTimespanComplete(const std::string& name, std::chrono::milliseconds ms) {
writer().writeTimer(name, ms);
tls_.getTyped<Writer>(tls_slot_).writeTimer(name, ms);
}

TcpStatsdSink::TcpStatsdSink(const LocalInfo::LocalInfo& local_info,
Expand Down
23 changes: 13 additions & 10 deletions source/common/stats/statsd.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,31 @@ namespace Statsd {
/**
* This is a simple UDP localhost writer for statsd messages.
*/
class Writer {
class Writer : public ThreadLocal::ThreadLocalObject {
public:
Writer(uint32_t port);
Writer(Network::Address::InstanceConstSharedPtr address);
~Writer();

void writeCounter(const std::string& name, uint64_t increment);
void writeGauge(const std::string& name, uint64_t value);
void writeTimer(const std::string& name, const std::chrono::milliseconds& ms);
void shutdown() override;
// Called in unit test to validate address.
int getFdForTests() const { return fd_; };

private:
void send(const std::string& message);

int fd_;
bool shutdown_{};
};

/**
* Implementation of Sink that writes to a local UDP statsd port.
* Implementation of Sink that writes to a UDP statsd address.
*/
class UdpStatsdSink : public Sink {
public:
UdpStatsdSink(uint32_t port) : port_(port) {}
UdpStatsdSink(ThreadLocal::Instance& tls, Network::Address::InstanceConstSharedPtr address);

// Stats::Sink
void flushCounter(const std::string& name, uint64_t delta) override;
Expand All @@ -47,14 +51,13 @@ class UdpStatsdSink : public Sink {
onTimespanComplete(name, std::chrono::milliseconds(value));
}
void onTimespanComplete(const std::string& name, std::chrono::milliseconds ms) override;
// Called in unit test to validate writer construction and address.
int getFdForTests() { return tls_.getTyped<Writer>(tls_slot_).getFdForTests(); }

private:
Writer& writer() {
static thread_local Statsd::Writer writer_(port_);
return writer_;
}

uint32_t port_;
ThreadLocal::Instance& tls_;
const uint32_t tls_slot_;
Network::Address::InstanceConstSharedPtr server_address_;
};

/**
Expand Down
1 change: 1 addition & 0 deletions source/server/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ envoy_cc_library(
"//source/common/common:version_lib",
"//source/common/json:config_schemas_lib",
"//source/common/memory:stats_lib",
"//source/common/network:address_lib",
"//source/common/network:utility_lib",
"//source/common/runtime:runtime_lib",
"//source/common/ssl:context_lib",
Expand Down
10 changes: 10 additions & 0 deletions source/server/configuration_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,20 @@ void MainImpl::initialize(const Json::Object& json) {
Server::Configuration::ListenerPtr{new ListenerConfig(*this, *listeners[i])});
}

if (json.hasObject("statsd_local_udp_port") && json.hasObject("statsd_udp_ip_address")) {
throw EnvoyException("statsd_local_udp_port and statsd_udp_ip_address "
"are mutually exclusive.");
}

// TODO(hennna): DEPRECATED - statsd_local_udp_port will be removed in 1.4.0.
if (json.hasObject("statsd_local_udp_port")) {
statsd_udp_port_.value(json.getInteger("statsd_local_udp_port"));
}

if (json.hasObject("statsd_udp_ip_address")) {
statsd_udp_ip_address_.value(json.getString("statsd_udp_ip_address"));
}

if (json.hasObject("statsd_tcp_cluster_name")) {
statsd_tcp_cluster_name_.value(json.getString("statsd_tcp_cluster_name"));
}
Expand Down
3 changes: 3 additions & 0 deletions source/server/configuration_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,9 @@ class MainImpl : Logger::Loggable<Logger::Id::config>, public Main {
const std::list<ListenerPtr>& listeners() override;
RateLimit::ClientFactory& rateLimitClientFactory() override { return *ratelimit_client_factory_; }
Optional<std::string> statsdTcpClusterName() override { return statsd_tcp_cluster_name_; }
// TODO(hennna): DEPRECATED - statsdUdpPort() will be removed in 1.4.0
Optional<uint32_t> statsdUdpPort() override { return statsd_udp_port_; }
Optional<std::string> statsdUdpIpAddress() override { return statsd_udp_ip_address_; }
std::chrono::milliseconds statsFlushInterval() override { return stats_flush_interval_; }
std::chrono::milliseconds wdMissTimeout() const override { return watchdog_miss_timeout_; }
std::chrono::milliseconds wdMegaMissTimeout() const override {
Expand Down Expand Up @@ -263,6 +265,7 @@ class MainImpl : Logger::Loggable<Logger::Id::config>, public Main {
std::list<Server::Configuration::ListenerPtr> listeners_;
Optional<std::string> statsd_tcp_cluster_name_;
Optional<uint32_t> statsd_udp_port_;
Optional<std::string> statsd_udp_ip_address_;
RateLimit::ClientFactoryPtr ratelimit_client_factory_;
std::chrono::milliseconds stats_flush_interval_;
std::chrono::milliseconds watchdog_miss_timeout_;
Expand Down
16 changes: 14 additions & 2 deletions source/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "common/common/version.h"
#include "common/json/config_schemas.h"
#include "common/memory/stats.h"
#include "common/network/address_impl.h"
#include "common/network/utility.h"
#include "common/runtime/runtime_impl.h"
#include "common/stats/statsd.h"
Expand Down Expand Up @@ -317,9 +318,20 @@ Runtime::LoaderPtr InstanceUtil::createRuntime(Instance& server,
}

void InstanceImpl::initializeStatSinks() {
if (config_->statsdUdpPort().valid()) {
if (config_->statsdUdpIpAddress().valid()) {
log().info("statsd UDP ip address: {}", config_->statsdUdpIpAddress().value());
stat_sinks_.emplace_back(new Stats::Statsd::UdpStatsdSink(
thread_local_,
Network::Utility::parseInternetAddressAndPort(config_->statsdUdpIpAddress().value())));
stats_store_.addSink(*stat_sinks_.back());
} else if (config_->statsdUdpPort().valid()) {
// TODO(hennna): DEPRECATED - statsdUdpPort will be removed in 1.4.0.
log().warn("statsd_local_udp_port has been DEPRECATED and will be removed in 1.4.0. "
"Consider setting statsd_udp_ip_address instead.");
log().info("statsd UDP port: {}", config_->statsdUdpPort().value());
stat_sinks_.emplace_back(new Stats::Statsd::UdpStatsdSink(config_->statsdUdpPort().value()));
Network::Address::InstanceConstSharedPtr address(
new Network::Address::Ipv4Instance(config_->statsdUdpPort().value()));
stat_sinks_.emplace_back(new Stats::Statsd::UdpStatsdSink(thread_local_, address));
stats_store_.addSink(*stat_sinks_.back());
}

Expand Down
13 changes: 13 additions & 0 deletions test/common/stats/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,19 @@ envoy_cc_test(
],
)

envoy_cc_test(
name = "udp_statsd_test",
srcs = ["udp_statsd_test.cc"],
deps = [
"//source/common/network:address_lib",
"//source/common/network:utility_lib",
"//source/common/stats:statsd_lib",
"//test/mocks/thread_local:thread_local_mocks",
"//test/test_common:environment_lib",
"//test/test_common:network_utility_lib",
],
)

envoy_cc_test(
name = "thread_local_store_test",
srcs = ["thread_local_store_test.cc"],
Expand Down
51 changes: 51 additions & 0 deletions test/common/stats/udp_statsd_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#include <chrono>

#include "common/network/address_impl.h"
#include "common/network/utility.h"
#include "common/stats/statsd.h"

#include "test/mocks/thread_local/mocks.h"
#include "test/test_common/environment.h"
#include "test/test_common/network_utility.h"

#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "spdlog/spdlog.h"

namespace Envoy {
using testing::NiceMock;

namespace Stats {
namespace Statsd {

class UdpStatsdSinkTest : public testing::TestWithParam<Network::Address::IpVersion> {};
INSTANTIATE_TEST_CASE_P(IpVersions, UdpStatsdSinkTest,
testing::ValuesIn(TestEnvironment::getIpVersionsForTest()));

TEST_P(UdpStatsdSinkTest, InitWithIpAddress) {
NiceMock<ThreadLocal::MockInstance> tls_;
// UDP statsd server address.
Network::Address::InstanceConstSharedPtr server_address =
Network::Utility::parseInternetAddressAndPort(
fmt::format("{}:8125", Network::Test::getLoopbackAddressUrlString(GetParam())));
UdpStatsdSink sink(tls_, server_address);
int fd = sink.getFdForTests();
EXPECT_NE(fd, -1);

// Check that fd has not changed.
sink.flushCounter("test_counter", 1);
sink.flushGauge("test_gauge", 1);
sink.onTimespanComplete("test_counter", std::chrono::milliseconds(5));
EXPECT_EQ(fd, sink.getFdForTests());

if (GetParam() == Network::Address::IpVersion::v4) {
EXPECT_EQ("127.0.0.1:8125", Network::Address::peerAddressFromFd(fd)->asString());
} else {
EXPECT_EQ("[::1]:8125", Network::Address::peerAddressFromFd(fd)->asString());
}
tls_.shutdownThread();
}

} // Statsd
} // Stats
} // Envoy
2 changes: 1 addition & 1 deletion test/config/integration/server.json
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@

"admin": { "access_log_path": "/dev/null", "profile_path": "{{ test_tmpdir }}/envoy.prof", "address": "tcp://127.0.0.1:0" },
"flags_path": "/invalid_flags",
"statsd_local_udp_port": 8125,
"statsd_udp_ip_address": "127.0.0.1:8125",
"statsd_tcp_cluster_name": "statsd",
"tracing": {
"http": {
Expand Down
1 change: 1 addition & 0 deletions test/mocks/server/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ class MockMain : public Main {
MOCK_METHOD0(rateLimitClientFactory, RateLimit::ClientFactory&());
MOCK_METHOD0(statsdTcpClusterName, Optional<std::string>());
MOCK_METHOD0(statsdUdpPort, Optional<uint32_t>());
MOCK_METHOD0(statsdUdpIpAddress, Optional<std::string>());
MOCK_METHOD0(statsFlushInterval, std::chrono::milliseconds());
MOCK_CONST_METHOD0(wdMissTimeout, std::chrono::milliseconds());
MOCK_CONST_METHOD0(wdMegaMissTimeout, std::chrono::milliseconds());
Expand Down