Skip to content

Commit

Permalink
Introduce listener TCP connection buffer configuration and implement … (
Browse files Browse the repository at this point in the history
#558)

Baby steps towards flow control. This patch plumbs the TCP connection buffer limits 
to the listener and enforces it on the read buffer only. Later patches will also plumb to 
the upstream cluster connection and cover the write buffer (which requires the 
watermark API).
  • Loading branch information
htuch authored and mattklein123 committed Mar 13, 2017
1 parent b72309d commit fd58242
Show file tree
Hide file tree
Showing 30 changed files with 431 additions and 174 deletions.
4 changes: 4 additions & 0 deletions docs/configuration/listeners/listeners.rst
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ use_original_dst
destination port. If there is no listener associated with the original destination port, the
connection is handled by the listener that receives it. Default is false.

per_connection_buffer_limit_bytes
*(optional, integer)* Soft limit on size of the listener's new connection read and write buffers.
If unspecified, an implementation defined default is applied (1MB).

.. toctree::
:hidden:

Expand Down
36 changes: 11 additions & 25 deletions include/envoy/event/dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,20 +79,13 @@ class Dispatcher {
* @param socket supplies the socket to listen on.
* @param cb supplies the callbacks to invoke for listener events.
* @param stats_store supplies the Stats::Store to use.
* @param bind_to_port specifies if the listener should actually bind to the port.
* a listener that doesn't bind can only receive connections redirected from
* other listeners that that set use_origin_dst to true
* @param use_proxy_proto whether to use the PROXY Protocol V1
* (http://www.haproxy.org/download/1.5/doc/proxy-protocol.txt)
* @param use_orig_dst if a connection was redirected to this port using iptables,
* allow the listener to hand it off to the listener associated to the original port
* @param listener_options listener configuration options.
* @return Network::ListenerPtr a new listener that is owned by the caller.
*/
virtual Network::ListenerPtr createListener(Network::ConnectionHandler& conn_handler,
Network::ListenSocket& socket,
Network::ListenerCallbacks& cb,
Stats::Store& stats_store, bool bind_to_port,
bool use_proxy_proto, bool use_orig_dst) PURE;
virtual Network::ListenerPtr
createListener(Network::ConnectionHandler& conn_handler, Network::ListenSocket& socket,
Network::ListenerCallbacks& cb, Stats::Store& stats_store,
const Network::ListenerOptions& listener_options) PURE;

/**
* Create a listener on a specific port.
Expand All @@ -101,21 +94,14 @@ class Dispatcher {
* @param socket supplies the socket to listen on.
* @param cb supplies the callbacks to invoke for listener events.
* @param stats_store supplies the Stats::Store to use.
* @param bind_to_port specifies if the listener should actually bind to the port.
* a listener that doesn't bind can only receive connections redirected from
* other listeners that set use_origin_dst to true
* @param use_proxy_proto whether to use the PROXY Protocol V1
* (http://www.haproxy.org/download/1.5/doc/proxy-protocol.txt)
* @param use_orig_dst if a connection was redirected to this port using iptables,
* allow the listener to hand it off to the listener associated to the original port
* @param listener_options listener configuration options.
* @return Network::ListenerPtr a new listener that is owned by the caller.
*/
virtual Network::ListenerPtr createSslListener(Network::ConnectionHandler& conn_handler,
Ssl::ServerContext& ssl_ctx,
Network::ListenSocket& socket,
Network::ListenerCallbacks& cb,
Stats::Store& stats_store, bool bind_to_port,
bool use_proxy_proto, bool use_orig_dst) PURE;
virtual Network::ListenerPtr
createSslListener(Network::ConnectionHandler& conn_handler, Ssl::ServerContext& ssl_ctx,
Network::ListenSocket& socket, Network::ListenerCallbacks& cb,
Stats::Store& stats_store,
const Network::ListenerOptions& listener_options) PURE;

/**
* Allocate a timer. @see Event::Timer for docs on how to use the timer.
Expand Down
6 changes: 6 additions & 0 deletions include/envoy/network/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,12 @@ class Connection : public Event::DeferredDeletable, public FilterManager {
* are installed.
*/
virtual void write(Buffer::Instance& data) PURE;

/**
* Set a soft limit on the size of the read buffer prior to flushing to further stages in the
* processing pipeline.
*/
virtual void setReadBufferLimit(uint32_t limit) PURE;
};

typedef std::unique_ptr<Connection> ConnectionPtr;
Expand Down
22 changes: 5 additions & 17 deletions include/envoy/network/connection_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,32 +21,20 @@ class ConnectionHandler {
* Adds listener to the handler.
* @param factory supplies the configuration factory for new connections.
* @param socket supplies the already bound socket to listen on.
* @param bind_to_port specifies if the listener should actually bind to the port.
* a listener that doesn't bind can only receive connections redirected from
* other listeners that set use_origin_dst to true
* @param use_proxy_proto whether to use the PROXY Protocol V1
* (http://www.haproxy.org/download/1.5/doc/proxy-protocol.txt)
* @param use_orig_dst if a connection was redirected to this port using iptables,
* allow the listener to hand it off to the listener associated to the original port
* @param listener_options listener configuration options.
*/
virtual void addListener(Network::FilterChainFactory& factory, Network::ListenSocket& socket,
bool bind_to_port, bool use_proxy_proto, bool use_orig_dst) PURE;
const Network::ListenerOptions& listener_options) PURE;

/**
* Adds listener to the handler.
* @param factory supplies the configuration factory for new connections.
* @param socket supplies the already bound socket to listen on.
* @param bind_to_port specifies if the listener should actually bind to the port.
* a listener that doesn't bind can only receive connections redirected from
* other listeners that set use_origin_dst to true
* @param use_proxy_proto whether to use the PROXY Protocol V1
* (http://www.haproxy.org/download/1.5/doc/proxy-protocol.txt)
* @param use_orig_dst if a connection was redirected to this port using iptables,
* allow the listener to hand it off to the listener associated to the original port
* @param listener_options listener configuration options.
*/
virtual void addSslListener(Network::FilterChainFactory& factory, Ssl::ServerContext& ssl_ctx,
Network::ListenSocket& socket, bool bind_to_port,
bool use_proxy_proto, bool use_orig_dst) PURE;
Network::ListenSocket& socket,
const Network::ListenerOptions& listener_options) PURE;

/**
* Find a listener based on the provided listener port value.
Expand Down
28 changes: 28 additions & 0 deletions include/envoy/network/listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,34 @@

namespace Network {

/**
* Listener configurations options.
*/
struct ListenerOptions {
// Specifies if the listener should actually bind to the port. A listener that doesn't bind can
// only receive connections redirected from other listeners that set use_origin_dst_ to true.
bool bind_to_port_;
// Whether to use the PROXY Protocol V1
// (http://www.haproxy.org/download/1.5/doc/proxy-protocol.txt)
bool use_proxy_proto_;
// If a connection was redirected to this port using iptables, allow the listener to hand it off
// to the listener associated to the original port.
bool use_original_dst_;
// Soft limit on size of the listener's new connection read and write buffers.
uint32_t per_connection_buffer_limit_bytes_;

/**
* Factory for ListenerOptions with bind_to_port_ set.
* @return ListenerOptions object initialized with bind_to_port_ set.
*/
static ListenerOptions listenerOptionsWithBindToPort() {
return {.bind_to_port_ = true,
.use_proxy_proto_ = false,
.use_original_dst_ = false,
.per_connection_buffer_limit_bytes_ = 0};
}
};

/**
* Callbacks invoked by a listener.
*/
Expand Down
6 changes: 6 additions & 0 deletions include/envoy/server/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ class Listener {
* allow the listener to hand it off to the listener associated to the original port
*/
virtual bool useOriginalDst() PURE;

/**
* @return uint32_t providing a soft limit on size of the listener's new connection read and write
* buffers.
*/
virtual uint32_t perConnectionBufferLimitBytes() PURE;
};

typedef std::unique_ptr<Listener> ListenerPtr;
Expand Down
28 changes: 13 additions & 15 deletions source/common/event/dispatcher_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,24 +79,22 @@ Filesystem::WatcherPtr DispatcherImpl::createFilesystemWatcher() {
return Filesystem::WatcherPtr{new Filesystem::WatcherImpl(*this)};
}

Network::ListenerPtr DispatcherImpl::createListener(Network::ConnectionHandler& conn_handler,
Network::ListenSocket& socket,
Network::ListenerCallbacks& cb,
Stats::Store& stats_store, bool bind_to_port,
bool use_proxy_proto, bool use_orig_dst) {
return Network::ListenerPtr{new Network::ListenerImpl(
conn_handler, *this, socket, cb, stats_store, bind_to_port, use_proxy_proto, use_orig_dst)};
Network::ListenerPtr
DispatcherImpl::createListener(Network::ConnectionHandler& conn_handler,
Network::ListenSocket& socket, Network::ListenerCallbacks& cb,
Stats::Store& stats_store,
const Network::ListenerOptions& listener_options) {
return Network::ListenerPtr{
new Network::ListenerImpl(conn_handler, *this, socket, cb, stats_store, listener_options)};
}

Network::ListenerPtr DispatcherImpl::createSslListener(Network::ConnectionHandler& conn_handler,
Ssl::ServerContext& ssl_ctx,
Network::ListenSocket& socket,
Network::ListenerCallbacks& cb,
Stats::Store& stats_store, bool bind_to_port,
bool use_proxy_proto, bool use_orig_dst) {
Network::ListenerPtr
DispatcherImpl::createSslListener(Network::ConnectionHandler& conn_handler,
Ssl::ServerContext& ssl_ctx, Network::ListenSocket& socket,
Network::ListenerCallbacks& cb, Stats::Store& stats_store,
const Network::ListenerOptions& listener_options) {
return Network::ListenerPtr{new Network::SslListenerImpl(conn_handler, *this, ssl_ctx, socket, cb,
stats_store, bind_to_port,
use_proxy_proto, use_orig_dst)};
stats_store, listener_options)};
}

TimerPtr DispatcherImpl::createTimer(TimerCb cb) { return TimerPtr{new TimerImpl(*this, cb)}; }
Expand Down
7 changes: 3 additions & 4 deletions source/common/event/dispatcher_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,12 @@ class DispatcherImpl : Logger::Loggable<Logger::Id::main>, public Dispatcher {
Filesystem::WatcherPtr createFilesystemWatcher() override;
Network::ListenerPtr createListener(Network::ConnectionHandler& conn_handler,
Network::ListenSocket& socket, Network::ListenerCallbacks& cb,
Stats::Store& stats_store, bool bind_to_port,
bool use_proxy_proto, bool use_orig_dst) override;
Stats::Store& stats_store,
const Network::ListenerOptions& listener_options) override;
Network::ListenerPtr createSslListener(Network::ConnectionHandler& conn_handler,
Ssl::ServerContext& ssl_ctx, Network::ListenSocket& socket,
Network::ListenerCallbacks& cb, Stats::Store& stats_store,
bool bind_to_port, bool use_proxy_proto,
bool use_orig_dst) override;
const Network::ListenerOptions& listener_options) override;
TimerPtr createTimer(TimerCb cb) override;
void deferredDelete(DeferredDeletablePtr&& to_delete) override;
void exit() override;
Expand Down
7 changes: 6 additions & 1 deletion source/common/json/config_schemas.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,12 @@ const std::string Json::Schema::LISTENER_SCHEMA(R"EOF(
"ssl_context" : {"$ref" : "#/definitions/ssl_context"},
"bind_to_port" : {"type": "boolean"},
"use_proxy_proto" : {"type" : "boolean"},
"use_original_dst" : {"type" : "boolean"}
"use_original_dst" : {"type" : "boolean"},
"per_connection_buffer_limit_bytes" : {
"type" : "integer",
"minimum" : 0,
"exclusiveMinimum" : true
}
},
"required": ["port", "filters"],
"additionalProperties": false
Expand Down
10 changes: 6 additions & 4 deletions source/common/network/connection_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ void ConnectionImpl::onFileEvent(uint32_t events) {
}

ConnectionImpl::IoResult ConnectionImpl::doReadFromSocket() {
PostIoAction action;
PostIoAction action = PostIoAction::KeepOpen;
uint64_t bytes_read = 0;
do {
// 16K read is arbitrary. IIRC, libevent will currently clamp this to 4K. libevent will also
Expand All @@ -288,15 +288,17 @@ ConnectionImpl::IoResult ConnectionImpl::doReadFromSocket() {
} else if (rc == -1) {
// Remote error (might be no data).
conn_log_trace("read error: {}", *this, errno);
if (errno == EAGAIN) {
action = PostIoAction::KeepOpen;
} else {
if (errno != EAGAIN) {
action = PostIoAction::Close;
}

break;
} else {
bytes_read += rc;
if (shouldDrainReadBuffer()) {
setReadBufferReady();
break;
}
}
} while (true);

Expand Down
12 changes: 12 additions & 0 deletions source/common/network/connection_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ class ConnectionImpl : public virtual Connection,
Ssl::Connection* ssl() override { return nullptr; }
State state() override;
void write(Buffer::Instance& data) override;
void setReadBufferLimit(uint32_t limit) override { read_buffer_limit_ = limit; }

// Network::BufferSource
Buffer::Instance& getReadBuffer() override { return read_buffer_; }
Expand All @@ -79,6 +80,16 @@ class ConnectionImpl : public virtual Connection,
virtual void closeSocket(uint32_t close_type);
void doConnect();
void raiseEvents(uint32_t events);
// Should the read buffer be drained?
bool shouldDrainReadBuffer() {
return read_buffer_limit_ > 0 && read_buffer_.length() >= read_buffer_limit_;
}
// Mark read buffer ready to read in the event loop. This is used when yielding following
// shouldDrainReadBuffer().
// TODO(htuch): While this is the basis for also yielding to other connections to provide some
// fair sharing of CPU resources, the underlying event loop does not make any fairness guarantees.
// Reconsider how to make fairness happen.
void setReadBufferReady() { file_event_->activate(Event::FileReadyType::Read); }

static const Address::InstancePtr null_local_address_;

Expand All @@ -87,6 +98,7 @@ class ConnectionImpl : public virtual Connection,
Address::InstancePtr local_address_;
Buffer::OwnedImpl read_buffer_;
Buffer::OwnedImpl write_buffer_;
uint32_t read_buffer_limit_ = 0;

private:
// clang-format off
Expand Down
15 changes: 8 additions & 7 deletions source/common/network/listener_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ void ListenerImpl::listenCallback(evconnlistener*, evutil_socket_t fd, sockaddr*
ListenerImpl* listener = static_cast<ListenerImpl*>(arg);

Address::InstancePtr final_local_address = listener->socket_.localAddress();
if (listener->use_original_dst_ && final_local_address->type() == Address::Type::Ip) {
if (listener->options_.use_original_dst_ && final_local_address->type() == Address::Type::Ip) {
Address::InstancePtr original_local_address = listener->getOriginalDst(fd);
if (original_local_address) {
final_local_address = original_local_address;
Expand All @@ -42,7 +42,7 @@ void ListenerImpl::listenCallback(evconnlistener*, evutil_socket_t fd, sockaddr*
}
}

if (listener->use_proxy_proto_) {
if (listener->options_.use_proxy_proto_) {
listener->proxy_protocol_.newConnection(listener->dispatcher_, fd, *listener);
} else {
Address::InstancePtr final_remote_address;
Expand All @@ -62,13 +62,12 @@ void ListenerImpl::listenCallback(evconnlistener*, evutil_socket_t fd, sockaddr*

ListenerImpl::ListenerImpl(Network::ConnectionHandler& conn_handler,
Event::DispatcherImpl& dispatcher, ListenSocket& socket,
ListenerCallbacks& cb, Stats::Store& stats_store, bool bind_to_port,
bool use_proxy_proto, bool use_orig_dst)
ListenerCallbacks& cb, Stats::Store& stats_store,
const Network::ListenerOptions& listener_options)
: connection_handler_(conn_handler), dispatcher_(dispatcher), socket_(socket), cb_(cb),
bind_to_port_(bind_to_port), use_proxy_proto_(use_proxy_proto), proxy_protocol_(stats_store),
use_original_dst_(use_orig_dst), listener_(nullptr) {
proxy_protocol_(stats_store), options_(listener_options), listener_(nullptr) {

if (bind_to_port_) {
if (options_.bind_to_port_) {
listener_.reset(
evconnlistener_new(&dispatcher_.base(), listenCallback, this, 0, -1, socket.fd()));

Expand All @@ -90,6 +89,7 @@ void ListenerImpl::errorCallback(evconnlistener*, void*) {
void ListenerImpl::newConnection(int fd, Address::InstancePtr remote_address,
Address::InstancePtr local_address) {
ConnectionPtr new_connection(new ConnectionImpl(dispatcher_, fd, remote_address, local_address));
new_connection->setReadBufferLimit(options_.per_connection_buffer_limit_bytes_);
cb_.onNewConnection(std::move(new_connection));
}

Expand All @@ -98,6 +98,7 @@ void SslListenerImpl::newConnection(int fd, Address::InstancePtr remote_address,
ConnectionPtr new_connection(new Ssl::ConnectionImpl(dispatcher_, fd, remote_address,
local_address, ssl_ctx_,
Ssl::ConnectionImpl::InitialState::Server));
new_connection->setReadBufferLimit(options_.per_connection_buffer_limit_bytes_);
cb_.onNewConnection(std::move(new_connection));
}

Expand Down
12 changes: 4 additions & 8 deletions source/common/network/listener_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class ListenerImpl : public Listener {
public:
ListenerImpl(Network::ConnectionHandler& conn_handler, Event::DispatcherImpl& dispatcher,
ListenSocket& socket, ListenerCallbacks& cb, Stats::Store& stats_store,
bool bind_to_port, bool use_proxy_proto, bool use_orig_dst);
const ListenerOptions& listener_options);

/**
* Accept/process a new connection.
Expand All @@ -43,10 +43,8 @@ class ListenerImpl : public Listener {
Event::DispatcherImpl& dispatcher_;
ListenSocket& socket_;
ListenerCallbacks& cb_;
const bool bind_to_port_;
const bool use_proxy_proto_;
ProxyProtocol proxy_protocol_;
const bool use_original_dst_;
const ListenerOptions options_;

private:
static void errorCallback(evconnlistener* listener, void* context);
Expand All @@ -59,10 +57,8 @@ class SslListenerImpl : public ListenerImpl {
public:
SslListenerImpl(Network::ConnectionHandler& conn_handler, Event::DispatcherImpl& dispatcher,
Ssl::Context& ssl_ctx, ListenSocket& socket, ListenerCallbacks& cb,
Stats::Store& stats_store, bool bind_to_port, bool use_proxy_proto,
bool use_orig_dst)
: ListenerImpl(conn_handler, dispatcher, socket, cb, stats_store, bind_to_port,
use_proxy_proto, use_orig_dst),
Stats::Store& stats_store, const Network::ListenerOptions& listener_options)
: ListenerImpl(conn_handler, dispatcher, socket, cb, stats_store, listener_options),
ssl_ctx_(ssl_ctx) {}

// ListenerImpl
Expand Down
4 changes: 4 additions & 0 deletions source/common/ssl/connection_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ Network::ConnectionImpl::IoResult ConnectionImpl::doReadFromSocket() {

if (slices_to_commit > 0) {
read_buffer_.commit(slices, slices_to_commit);
if (shouldDrainReadBuffer()) {
setReadBufferReady();
keep_reading = false;
}
}
}

Expand Down
Loading

0 comments on commit fd58242

Please sign in to comment.