Skip to content

Commit

Permalink
network filters: add onNewConnection() callback (#233)
Browse files Browse the repository at this point in the history
This is more fallout from c172a4. For network filters we want the
ability to do things when the connection is created, but also to
allow stopping iterations of further filters. In the previous change,
we made it so that the proxy filter will always make a new connection
to upstream regardless of what other filters do. We change that now
such that the tcp proxy filter will only make the upstream connection
in the onNewConnection() callback. Other filters (currently client_ssl
and ratelimit) now do their work on their own onNewConnection() callback
and stop iteration if necessary to prevent the tcp proxy filter from
making the upstream connection until it is supposed to.
  • Loading branch information
mattklein123 authored Nov 18, 2016
1 parent 3a80591 commit 7e57daa
Show file tree
Hide file tree
Showing 24 changed files with 98 additions and 30 deletions.
20 changes: 19 additions & 1 deletion include/envoy/network/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class ReadFilterCallbacks {
/**
* If a read filter stopped filter iteration, continueReading() can be called to continue the
* filter chain. The next filter will be called with all currently available data in the read
* buffer.
* buffer (it will also have onNewConnection() called on it if it was not previously called).
*/
virtual void continueReading() PURE;

Expand Down Expand Up @@ -80,11 +80,23 @@ class ReadFilter {
*/
virtual FilterStatus onData(Buffer::Instance& data) PURE;

/**
* Called when a connection is first established. Filters should do one time long term processing
* that needs to be done when a connection is established. Filter chain iteration can be stopped
* if needed.
* @return status used by the filter manager to manage further filter iteration.
*/
virtual FilterStatus onNewConnection() PURE;

/**
* Initializes the read filter callbacks used to interact with the filter manager. It will be
* called by the filter manager a single time when the filter is first registered. Thus, any
* construction that requires the backing connection should take place in the context of this
* function.
*
* IMPORTANT: No outbound networking or complex processing should be done in this function.
* That should be done in the context of onNewConnection() if needed.
*
* @param callbacks supplies the callbacks.
*/
virtual void initializeReadFilterCallbacks(ReadFilterCallbacks& callbacks) PURE;
Expand Down Expand Up @@ -123,6 +135,12 @@ class FilterManager {
* first is called first).
*/
virtual void addReadFilter(ReadFilterPtr filter) PURE;

/**
* Initialize all of the installed read filters. This effectively calls onNewConnection() on
* each of them.
*/
virtual void initializeReadFilters() PURE;
};

/**
Expand Down
8 changes: 3 additions & 5 deletions source/common/filter/auth/client_ssl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,10 @@ void Config::requestComplete() {
}

Network::FilterStatus Instance::onData(Buffer::Instance&) {
if (auth_checked_) {
return Network::FilterStatus::Continue;
}

auth_checked_ = true;
return Network::FilterStatus::Continue;
}

Network::FilterStatus Instance::onNewConnection() {
// If this is not an SSL connection, do no further checking. High layers should redirect, etc.
// if SSL is required.
if (!read_callbacks_->connection().ssl()) {
Expand Down
2 changes: 1 addition & 1 deletion source/common/filter/auth/client_ssl.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,14 @@ class Instance : public Network::ReadFilter {

// Network::ReadFilter
Network::FilterStatus onData(Buffer::Instance& data) override;
Network::FilterStatus onNewConnection() override;
void initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callbacks) override {
read_callbacks_ = &callbacks;
}

private:
ConfigPtr config_;
Network::ReadFilterCallbacks* read_callbacks_{};
bool auth_checked_{};
};

} // ClientSsl
Expand Down
1 change: 1 addition & 0 deletions source/common/filter/echo.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class Echo : public Network::ReadFilter, Logger::Loggable<Logger::Id::filter> {
public:
// Network::ReadFilter
Network::FilterStatus onData(Buffer::Instance& data) override;
Network::FilterStatus onNewConnection() override { return Network::FilterStatus::Continue; }
void initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callbacks) override {
read_callbacks_ = &callbacks;
}
Expand Down
5 changes: 5 additions & 0 deletions source/common/filter/ratelimit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ InstanceStats Config::generateStats(const std::string& name, Stats::Store& store
}

Network::FilterStatus Instance::onData(Buffer::Instance&) {
return status_ == Status::Calling ? Network::FilterStatus::StopIteration
: Network::FilterStatus::Continue;
}

Network::FilterStatus Instance::onNewConnection() {
if (status_ == Status::NotStarted &&
!config_->runtime().snapshot().featureEnabled("ratelimit.tcp_filter_enabled", 100)) {
status_ = Status::Complete;
Expand Down
1 change: 1 addition & 0 deletions source/common/filter/ratelimit.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class Instance : public Network::ReadFilter,

// Network::ReadFilter
Network::FilterStatus onData(Buffer::Instance& data) override;
Network::FilterStatus onNewConnection() override;
void initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callbacks) override {
filter_callbacks_ = &callbacks;
filter_callbacks_->connection().addConnectionCallbacks(*this);
Expand Down
8 changes: 5 additions & 3 deletions source/common/filter/tcp_proxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,15 @@ TcpProxyStats TcpProxyConfig::generateStats(const std::string& name, Stats::Stor
POOL_GAUGE_PREFIX(store, final_prefix))};
}

void TcpProxy::initializeUpstreamConnection() {
Network::FilterStatus TcpProxy::initializeUpstreamConnection() {
Upstream::ResourceManager& upstream_cluster_resource_manager =
cluster_manager_.get(config_->clusterName())
->resourceManager(Upstream::ResourcePriority::Default);

if (!upstream_cluster_resource_manager.connections().canCreate()) {
cluster_manager_.get(config_->clusterName())->stats().upstream_cx_overflow_.inc();
read_callbacks_->connection().close(Network::ConnectionCloseType::NoFlush);
return;
return Network::FilterStatus::StopIteration;
}
Upstream::Host::CreateConnectionData conn_info =
cluster_manager_.tcpConnForCluster(config_->clusterName());
Expand All @@ -62,7 +62,7 @@ void TcpProxy::initializeUpstreamConnection() {
read_callbacks_->upstreamHost(conn_info.host_description_);
if (!upstream_connection_) {
read_callbacks_->connection().close(Network::ConnectionCloseType::NoFlush);
return;
return Network::FilterStatus::StopIteration;
}
upstream_cluster_resource_manager.connections().inc();

Expand All @@ -84,6 +84,8 @@ void TcpProxy::initializeUpstreamConnection() {
read_callbacks_->upstreamHost()->cluster().stats().upstream_cx_connect_ms_.allocateSpan();
connected_timespan_ =
read_callbacks_->upstreamHost()->cluster().stats().upstream_cx_length_ms_.allocateSpan();

return Network::FilterStatus::Continue;
}

void TcpProxy::onConnectTimeout() {
Expand Down
4 changes: 2 additions & 2 deletions source/common/filter/tcp_proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,11 @@ class TcpProxy : public Network::ReadFilter, Logger::Loggable<Logger::Id::filter

// Network::ReadFilter
Network::FilterStatus onData(Buffer::Instance& data) override;
Network::FilterStatus onNewConnection() override { return initializeUpstreamConnection(); }
void initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callbacks) override {
read_callbacks_ = &callbacks;
conn_log_info("new tcp proxy session", read_callbacks_->connection());
read_callbacks_->connection().addConnectionCallbacks(downstream_callbacks_);
initializeUpstreamConnection();
}

private:
Expand Down Expand Up @@ -103,7 +103,7 @@ class TcpProxy : public Network::ReadFilter, Logger::Loggable<Logger::Id::filter
TcpProxy& parent_;
};

void initializeUpstreamConnection();
Network::FilterStatus initializeUpstreamConnection();
void onConnectTimeout();
void onDownstreamBufferChange(Network::ConnectionBufferType type, uint64_t old_size,
int64_t delta);
Expand Down
1 change: 1 addition & 0 deletions source/common/http/conn_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,

// Network::ReadFilter
Network::FilterStatus onData(Buffer::Instance& data) override;
Network::FilterStatus onNewConnection() override { return Network::FilterStatus::Continue; }
void initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callbacks) override;

// Http::ConnectionCallbacks
Expand Down
1 change: 1 addition & 0 deletions source/common/mongo/proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ class ProxyFilter : public Network::Filter,

// Network::ReadFilter
Network::FilterStatus onData(Buffer::Instance& data) override;
Network::FilterStatus onNewConnection() override { return Network::FilterStatus::Continue; }
void initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callbacks) override {
read_callbacks_ = &callbacks;
read_callbacks_->connection().addConnectionCallbacks(*this);
Expand Down
2 changes: 2 additions & 0 deletions source/common/network/connection_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ void ConnectionImpl::addFilter(FilterPtr filter) { filter_manager_.addFilter(fil

void ConnectionImpl::addReadFilter(ReadFilterPtr filter) { filter_manager_.addReadFilter(filter); }

void ConnectionImpl::initializeReadFilters() { filter_manager_.initializeReadFilters(); }

void ConnectionImpl::close(ConnectionCloseType type) {
if (fd_ == -1) {
return;
Expand Down
1 change: 1 addition & 0 deletions source/common/network/connection_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class ConnectionImpl : public virtual Connection,
void addWriteFilter(WriteFilterPtr filter) override;
void addFilter(FilterPtr filter) override;
void addReadFilter(ReadFilterPtr filter) override;
void initializeReadFilters() override;

// Network::Connection
void addConnectionCallbacks(ConnectionCallbacks& cb) override;
Expand Down
2 changes: 2 additions & 0 deletions source/common/network/filter_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ namespace Network {
class ReadFilterBaseImpl : public ReadFilter {
public:
void initializeReadFilterCallbacks(ReadFilterCallbacks&) override {}
Network::FilterStatus onNewConnection() override { return Network::FilterStatus::Continue; }
};

/**
Expand All @@ -16,6 +17,7 @@ class ReadFilterBaseImpl : public ReadFilter {
class FilterBaseImpl : public Filter {
public:
void initializeReadFilterCallbacks(ReadFilterCallbacks&) override {}
Network::FilterStatus onNewConnection() override { return Network::FilterStatus::Continue; }
};

} // Network
22 changes: 19 additions & 3 deletions source/common/network/filter_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ void FilterManagerImpl::destroyFilters() {
downstream_filters_.clear();
}

void FilterManagerImpl::initializeReadFilters() {
ASSERT(!upstream_filters_.empty());
onContinueReading(nullptr);
}

void FilterManagerImpl::onContinueReading(ActiveReadFilter* filter) {
std::list<ActiveReadFilterPtr>::iterator entry;
if (!filter) {
Expand All @@ -37,9 +42,20 @@ void FilterManagerImpl::onContinueReading(ActiveReadFilter* filter) {
}

for (; entry != upstream_filters_.end(); entry++) {
FilterStatus status = (*entry)->filter_->onData(buffer_source_.getReadBuffer());
if (status == FilterStatus::StopIteration) {
return;
if (!(*entry)->initialized_) {
(*entry)->initialized_ = true;
FilterStatus status = (*entry)->filter_->onNewConnection();
if (status == FilterStatus::StopIteration) {
return;
}
}

Buffer::Instance& read_buffer = buffer_source_.getReadBuffer();
if (read_buffer.length() > 0) {
FilterStatus status = (*entry)->filter_->onData(read_buffer);
if (status == FilterStatus::StopIteration) {
return;
}
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions source/common/network/filter_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class FilterManagerImpl {
void addFilter(FilterPtr filter);
void addReadFilter(ReadFilterPtr filter);
void destroyFilters();
void initializeReadFilters();
void onRead();
FilterStatus onWrite();

Expand All @@ -53,6 +54,7 @@ class FilterManagerImpl {

FilterManagerImpl& parent_;
ReadFilterPtr filter_;
bool initialized_{};
};

typedef std::unique_ptr<ActiveReadFilter> ActiveReadFilterPtr;
Expand Down
12 changes: 4 additions & 8 deletions source/server/configuration_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,13 @@
namespace Server {
namespace Configuration {

void FilterChainUtility::buildFilterChain(Network::Connection& connection,
void FilterChainUtility::buildFilterChain(Network::FilterManager& filter_manager,
const std::list<NetworkFilterFactoryCb>& factories) {
for (const NetworkFilterFactoryCb& factory : factories) {
// It's possible for a connection to be closed immediately in the middle of chain creation.
// If this happened, do not instantiate any more filters.
if (connection.state() != Network::Connection::State::Open) {
break;
}

factory(connection);
factory(filter_manager);
}

filter_manager.initializeReadFilters();
}

MainImpl::MainImpl(Server::Instance& server) : server_(server) {}
Expand Down
2 changes: 1 addition & 1 deletion source/server/configuration_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class FilterChainUtility {
* Given a connection and a list of factories, create a new filter chain. Chain creation will
* exit early if any filters immediately close the connection.
*/
static void buildFilterChain(Network::Connection& connection,
static void buildFilterChain(Network::FilterManager& filter_manager,
const std::list<NetworkFilterFactoryCb>& factories);
};

Expand Down
5 changes: 4 additions & 1 deletion test/common/filter/auth/client_ssl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ TEST_F(ClientSslAuthFilterTest, Basic) {

// Check no SSL case, mulitple iterations.
EXPECT_CALL(filter_callbacks_.connection_, ssl()).WillOnce(Return(nullptr));
EXPECT_EQ(Network::FilterStatus::Continue, instance_->onNewConnection());
EXPECT_EQ(Network::FilterStatus::Continue, instance_->onData(dummy));
EXPECT_EQ(Network::FilterStatus::Continue, instance_->onData(dummy));

Expand All @@ -107,7 +108,7 @@ TEST_F(ClientSslAuthFilterTest, Basic) {
.WillOnce(ReturnRefOfCopy(std::string("192.168.1.1")));
EXPECT_CALL(ssl_, sha256PeerCertificateDigest()).WillOnce(Return("digest"));
EXPECT_CALL(filter_callbacks_.connection_, close(Network::ConnectionCloseType::NoFlush));
EXPECT_EQ(Network::FilterStatus::StopIteration, instance_->onData(dummy));
EXPECT_EQ(Network::FilterStatus::StopIteration, instance_->onNewConnection());

// Respond.
EXPECT_CALL(*interval_timer_, enableTimer(_));
Expand All @@ -125,6 +126,7 @@ TEST_F(ClientSslAuthFilterTest, Basic) {
.WillOnce(ReturnRefOfCopy(std::string("192.168.1.1")));
EXPECT_CALL(ssl_, sha256PeerCertificateDigest())
.WillOnce(Return("1b7d42ef0025ad89c1c911d6c10d7e86a4cb7c5863b2980abcbad1895f8b5314"));
EXPECT_EQ(Network::FilterStatus::Continue, instance_->onNewConnection());
EXPECT_EQ(Network::FilterStatus::Continue, instance_->onData(dummy));
EXPECT_EQ(Network::FilterStatus::Continue, instance_->onData(dummy));

Expand All @@ -133,6 +135,7 @@ TEST_F(ClientSslAuthFilterTest, Basic) {
EXPECT_CALL(filter_callbacks_.connection_, ssl()).WillOnce(Return(&ssl_));
EXPECT_CALL(filter_callbacks_.connection_, remoteAddress())
.WillOnce(ReturnRefOfCopy(std::string("1.2.3.4")));
EXPECT_EQ(Network::FilterStatus::Continue, instance_->onNewConnection());
EXPECT_EQ(Network::FilterStatus::Continue, instance_->onData(dummy));
EXPECT_EQ(Network::FilterStatus::Continue, instance_->onData(dummy));

Expand Down
7 changes: 7 additions & 0 deletions test/common/filter/ratelimit_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ TEST_F(RateLimitFilterTest, OK) {
.WillOnce(WithArgs<0>(
Invoke([&](RequestCallbacks& callbacks) -> void { request_callbacks_ = &callbacks; })));

EXPECT_EQ(Network::FilterStatus::StopIteration, filter_->onNewConnection());
Buffer::OwnedImpl data("hello");
EXPECT_EQ(Network::FilterStatus::StopIteration, filter_->onData(data));
EXPECT_EQ(Network::FilterStatus::StopIteration, filter_->onData(data));
Expand All @@ -90,6 +91,7 @@ TEST_F(RateLimitFilterTest, OverLimit) {
.WillOnce(WithArgs<0>(
Invoke([&](RequestCallbacks& callbacks) -> void { request_callbacks_ = &callbacks; })));

EXPECT_EQ(Network::FilterStatus::StopIteration, filter_->onNewConnection());
Buffer::OwnedImpl data("hello");
EXPECT_EQ(Network::FilterStatus::StopIteration, filter_->onData(data));

Expand All @@ -111,6 +113,7 @@ TEST_F(RateLimitFilterTest, OverLimitNotEnforcing) {
.WillOnce(WithArgs<0>(
Invoke([&](RequestCallbacks& callbacks) -> void { request_callbacks_ = &callbacks; })));

EXPECT_EQ(Network::FilterStatus::StopIteration, filter_->onNewConnection());
Buffer::OwnedImpl data("hello");
EXPECT_EQ(Network::FilterStatus::StopIteration, filter_->onData(data));

Expand All @@ -135,6 +138,7 @@ TEST_F(RateLimitFilterTest, Error) {
.WillOnce(WithArgs<0>(
Invoke([&](RequestCallbacks& callbacks) -> void { request_callbacks_ = &callbacks; })));

EXPECT_EQ(Network::FilterStatus::StopIteration, filter_->onNewConnection());
Buffer::OwnedImpl data("hello");
EXPECT_EQ(Network::FilterStatus::StopIteration, filter_->onData(data));

Expand All @@ -157,6 +161,7 @@ TEST_F(RateLimitFilterTest, Disconnect) {
.WillOnce(WithArgs<0>(
Invoke([&](RequestCallbacks& callbacks) -> void { request_callbacks_ = &callbacks; })));

EXPECT_EQ(Network::FilterStatus::StopIteration, filter_->onNewConnection());
Buffer::OwnedImpl data("hello");
EXPECT_EQ(Network::FilterStatus::StopIteration, filter_->onData(data));

Expand All @@ -174,6 +179,7 @@ TEST_F(RateLimitFilterTest, ImmediateOK) {
.WillOnce(WithArgs<0>(Invoke([&](RequestCallbacks& callbacks)
-> void { callbacks.complete(LimitStatus::OK); })));

EXPECT_EQ(Network::FilterStatus::Continue, filter_->onNewConnection());
Buffer::OwnedImpl data("hello");
EXPECT_EQ(Network::FilterStatus::Continue, filter_->onData(data));
EXPECT_EQ(Network::FilterStatus::Continue, filter_->onData(data));
Expand All @@ -192,6 +198,7 @@ TEST_F(RateLimitFilterTest, RuntimeDisable) {
.WillOnce(Return(false));
EXPECT_CALL(*client_, limit(_, _, _, _)).Times(0);

EXPECT_EQ(Network::FilterStatus::Continue, filter_->onNewConnection());
Buffer::OwnedImpl data("hello");
EXPECT_EQ(Network::FilterStatus::Continue, filter_->onData(data));
}
Expand Down
Loading

0 comments on commit 7e57daa

Please sign in to comment.