Skip to content

Commit

Permalink
net: don't call send in recv callback (nghttp2)
Browse files Browse the repository at this point in the history
  • Loading branch information
Chilledheart committed May 17, 2024
1 parent 51a1ad3 commit 6807332
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 95 deletions.
72 changes: 20 additions & 52 deletions src/cli/cli_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,24 +114,7 @@ bool DataFrameSource::Send(absl::string_view frame_header, size_t payload_length
}

const int64_t result = connection_->OnReadyToSend(concatenated);
// Write encountered error.
if (result < 0) {
connection_->OnConnectionError(http2::adapter::Http2VisitorInterface::ConnectionError::kSendError);
return false;
}

// Write blocked.
if (result == 0) {
connection_->blocked_stream_ = stream_id_;
return false;
}

if (static_cast<size_t>(result) < concatenated.size()) {
// Probably need to handle this better within this test class.
QUICHE_LOG(DFATAL) << "DATA frame not fully flushed. Connection will be corrupt!";
connection_->OnConnectionError(http2::adapter::Http2VisitorInterface::ConnectionError::kSendError);
return false;
}
DCHECK_EQ(static_cast<size_t>(result), concatenated.size());

if (!payload_length) {
return true;
Expand Down Expand Up @@ -206,28 +189,14 @@ void CliConnection::close() {
VLOG(1) << "close() error: " << ec;
}
if (channel_) {
#ifdef HAVE_QUICHE
if (adapter_) {
if (data_frame_) {
data_frame_->set_last_frame(true);
adapter_->ResumeStream(stream_id_);
SendIfNotProcessing();
data_frame_ = nullptr;
stream_id_ = 0;
}
adapter_->SubmitGoAway(0, http2::adapter::Http2ErrorCode::HTTP2_NO_ERROR, ""sv);
DCHECK(adapter_->want_write());
SendIfNotProcessing();
WriteUpstreamInPipe();
}
#endif
channel_->close();
}
on_disconnect();
}

#ifdef HAVE_QUICHE
void CliConnection::SendIfNotProcessing() {
DCHECK(!http2_in_recv_callback_);
if (!processing_responses_) {
processing_responses_ = true;
adapter_->Send();
Expand Down Expand Up @@ -285,8 +254,6 @@ bool CliConnection::OnEndStream(StreamId stream_id) {
stream_id_ = 0;
adapter_->SubmitGoAway(0, http2::adapter::Http2ErrorCode::HTTP2_NO_ERROR, ""sv);
DCHECK(adapter_->want_write());
SendIfNotProcessing();
WriteUpstreamInPipe();
}
return true;
}
Expand All @@ -304,8 +271,12 @@ bool CliConnection::OnCloseStream(StreamId stream_id, http2::adapter::Http2Error
return true;
}

void CliConnection::OnConnectionError(ConnectionError /*error*/) {
disconnected(asio::error::connection_aborted);
void CliConnection::OnConnectionError(ConnectionError error) {
LOG(INFO) << "Connection (client) " << connection_id() << " http2 connection error: " << (int)error;
data_frame_ = nullptr;
stream_id_ = 0;
adapter_->SubmitGoAway(0, http2::adapter::Http2ErrorCode::HTTP2_NO_ERROR, ""sv);
DCHECK(adapter_->want_write());
}

bool CliConnection::OnFrameHeader(StreamId stream_id, size_t /*length*/, uint8_t /*type*/, uint8_t /*flags*/) {
Expand Down Expand Up @@ -1041,15 +1012,22 @@ std::shared_ptr<IOBuf> CliConnection::GetNextDownstreamBuf(asio::error_code& ec,
#ifdef HAVE_QUICHE
if (adapter_) {
absl::string_view remaining_buffer(reinterpret_cast<const char*>(buf->data()), buf->length());
while (!remaining_buffer.empty()) {
int result = adapter_->ProcessBytes(remaining_buffer);
while (!remaining_buffer.empty() && adapter_->want_read()) {
http2_in_recv_callback_ = true;
int64_t result = adapter_->ProcessBytes(remaining_buffer);
http2_in_recv_callback_ = false;
if (result < 0) {
ec = asio::error::connection_refused;
disconnected(ec);
return nullptr;
/* handled in OnConnectionError inside ProcessBytes call */
goto out;
}
remaining_buffer = remaining_buffer.substr(result);
}
// don't want read anymore (after goaway sent)
if (UNLIKELY(!remaining_buffer.empty())) {
ec = asio::error::connection_refused;
disconnected(ec);
return nullptr;
}
// not enough buffer for recv window
if (downstream_.byte_length() < H2_STREAM_WINDOW_SIZE) {
goto try_again;
Expand Down Expand Up @@ -2243,16 +2221,6 @@ void CliConnection::disconnected(asio::error_code ec) {
scoped_refptr<CliConnection> self(this);
VLOG(1) << "Connection (client) " << connection_id() << " upstream: lost connection with: " << remote_domain()
<< " due to " << ec;
#ifdef HAVE_QUICHE
if (data_frame_) {
data_frame_->set_last_frame(true);
adapter_->ResumeStream(stream_id_);
SendIfNotProcessing();
data_frame_ = nullptr;
stream_id_ = 0;
WriteUpstreamInPipe();
}
#endif
upstream_readable_ = false;
upstream_writable_ = false;
channel_->close();
Expand Down
1 change: 1 addition & 0 deletions src/cli/cli_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ class CliConnection : public RefCountedThreadSafe<CliConnection>,

#ifdef HAVE_QUICHE
private:
bool http2_in_recv_callback_ = false;
void SendIfNotProcessing();
bool processing_responses_ = false;
StreamId stream_id_ = 0;
Expand Down
63 changes: 20 additions & 43 deletions src/server/server_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,24 +78,8 @@ bool DataFrameSource::Send(absl::string_view frame_header, size_t payload_length
concatenated = std::string{frame_header};
}
const int64_t result = connection_->OnReadyToSend(concatenated);
// Write encountered error.
if (result < 0) {
connection_->OnConnectionError(http2::adapter::Http2VisitorInterface::ConnectionError::kSendError);
return false;
}

// Write blocked.
if (result == 0) {
connection_->blocked_stream_ = stream_id_;
return false;
}

if (static_cast<size_t>(result) < concatenated.size()) {
// Probably need to handle this better within this test class.
QUICHE_LOG(DFATAL) << "DATA frame not fully flushed. Connection will be corrupt!";
connection_->OnConnectionError(http2::adapter::Http2VisitorInterface::ConnectionError::kSendError);
return false;
}
DCHECK_EQ(static_cast<size_t>(result), concatenated.size());

if (!payload_length) {
return true;
Expand Down Expand Up @@ -176,22 +160,6 @@ void ServerConnection::close() {
<< " disconnected with client at stage: " << ServerConnection::state_to_str(CurrentState());
asio::error_code ec;
closing_ = true;

#ifdef HAVE_QUICHE
if (adapter_) {
if (data_frame_) {
data_frame_->set_last_frame(true);
adapter_->ResumeStream(stream_id_);
SendIfNotProcessing();
data_frame_ = nullptr;
stream_id_ = 0;
}
adapter_->SubmitGoAway(0, http2::adapter::Http2ErrorCode::HTTP2_NO_ERROR, ""sv);
DCHECK(adapter_->want_write());
SendIfNotProcessing();
WriteStreamInPipe();
}
#endif
closed_ = true;
if (enable_tls_ && !shutdown_) {
shutdown_ = true;
Expand Down Expand Up @@ -257,6 +225,7 @@ void ServerConnection::Start() {

#ifdef HAVE_QUICHE
void ServerConnection::SendIfNotProcessing() {
DCHECK(!http2_in_recv_callback_);
if (!processing_responses_) {
processing_responses_ = true;
adapter_->Send();
Expand Down Expand Up @@ -375,8 +344,6 @@ bool ServerConnection::OnEndStream(StreamId stream_id) {
stream_id_ = 0;
adapter_->SubmitGoAway(0, http2::adapter::Http2ErrorCode::HTTP2_NO_ERROR, ""sv);
DCHECK(adapter_->want_write());
SendIfNotProcessing();
WriteStreamInPipe();
}
return true;
}
Expand All @@ -394,8 +361,12 @@ bool ServerConnection::OnCloseStream(StreamId stream_id, http2::adapter::Http2Er
return true;
}

void ServerConnection::OnConnectionError(ConnectionError /*error*/) {
OnDisconnect(asio::error::connection_aborted);
void ServerConnection::OnConnectionError(ConnectionError error) {
LOG(INFO) << "Connection (server) " << connection_id() << " http2 connection error: " << (int)error;
data_frame_ = nullptr;
stream_id_ = 0;
adapter_->SubmitGoAway(0, http2::adapter::Http2ErrorCode::HTTP2_NO_ERROR, ""sv);
DCHECK(adapter_->want_write());
}

bool ServerConnection::OnFrameHeader(StreamId stream_id, size_t /*length*/, uint8_t /*type*/, uint8_t /*flags*/) {
Expand Down Expand Up @@ -1414,15 +1385,22 @@ std::shared_ptr<IOBuf> ServerConnection::GetNextUpstreamBuf(asio::error_code& ec
#ifdef HAVE_QUICHE
if (adapter_) {
absl::string_view remaining_buffer(reinterpret_cast<const char*>(buf->data()), buf->length());
while (!remaining_buffer.empty()) {
int result = adapter_->ProcessBytes(remaining_buffer);
while (!remaining_buffer.empty() && adapter_->want_read()) {
http2_in_recv_callback_ = true;
int64_t result = adapter_->ProcessBytes(remaining_buffer);
http2_in_recv_callback_ = false;
if (result < 0) {
ec = asio::error::connection_refused;
OnDisconnect(asio::error::connection_refused);
return nullptr;
/* handled in OnConnectionError inside ProcessBytes call */
goto out;
}
remaining_buffer = remaining_buffer.substr(result);
}
// don't want read anymore (after goaway sent)
if (UNLIKELY(!remaining_buffer.empty())) {
ec = asio::error::connection_refused;
OnDisconnect(ec);
return nullptr;
}
// not enough buffer for recv window
if (upstream_.byte_length() < H2_STREAM_WINDOW_SIZE) {
goto try_again;
Expand Down Expand Up @@ -1581,7 +1559,6 @@ void ServerConnection::OnConnect() {
headers.emplace_back("padding"s, padding);
}
int submit_result = adapter_->SubmitResponse(stream_id_, GenerateHeaders(headers, 200), std::move(data_frame));
SendIfNotProcessing();
if (submit_result != 0) {
OnDisconnect(asio::error::connection_aborted);
}
Expand Down
1 change: 1 addition & 0 deletions src/server/server_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ class ServerConnection : public RefCountedThreadSafe<ServerConnection>,

#ifdef HAVE_QUICHE
private:
bool http2_in_recv_callback_ = false;
void SendIfNotProcessing();
bool processing_responses_ = false;
StreamId stream_id_ = 0;
Expand Down

0 comments on commit 6807332

Please sign in to comment.