Skip to content

Commit

Permalink
Add recvfrom option
Browse files Browse the repository at this point in the history
Reviewed By: kvtsoy

Differential Revision: D65234259

fbshipit-source-id: 012b8de53e2192f7acec9999fd144a24c98da23a
  • Loading branch information
Crystal Jin authored and facebook-github-bot committed Oct 31, 2024
1 parent 77dba40 commit 25c2ea7
Show file tree
Hide file tree
Showing 6 changed files with 118 additions and 1 deletion.
78 changes: 77 additions & 1 deletion quic/client/QuicClientTransport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1293,6 +1293,68 @@ void QuicClientTransport::recvMsg(
networkData.getPackets().size(), networkData.getTotalData());
}

void QuicClientTransport::recvFrom(
QuicAsyncUDPSocket& sock,
uint64_t readBufferSize,
int numPackets,
NetworkData& networkData,
Optional<folly::SocketAddress>& server,
size_t& totalData) {
for (int packetNum = 0; packetNum < numPackets; ++packetNum) {
// We create 1 buffer per packet so that it is not shared, this enables
// us to decrypt in place. If the fizz decrypt api could decrypt in-place
// even if shared, then we could allocate one giant IOBuf here.
Buf readBuffer = folly::IOBuf::createCombined(readBufferSize);
struct iovec vec;
vec.iov_base = readBuffer->writableData();
vec.iov_len = readBufferSize;

sockaddr* rawAddr{nullptr};
struct sockaddr_storage addrStorage {};
if (!server) {
rawAddr = reinterpret_cast<sockaddr*>(&addrStorage);
rawAddr->sa_family = sock.getLocalAddressFamily();
}

ssize_t ret =
sock.recvfrom(readBuffer->writableData(), readBufferSize, &addrStorage);
if (ret < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
// If we got a retriable error, let us continue.
if (conn_->loopDetectorCallback) {
conn_->readDebugState.noReadReason = NoReadReason::RETRIABLE_ERROR;
}
break;
}
// If we got a non-retriable error, we might have received
// a packet that we could process, however let's just quit early.
sock.pauseRead();
if (conn_->loopDetectorCallback) {
conn_->readDebugState.noReadReason = NoReadReason::NONRETRIABLE_ERROR;
}
return onReadError(folly::AsyncSocketException(
folly::AsyncSocketException::INTERNAL_ERROR,
"::recvmsg() failed",
errno));
} else if (ret == 0) {
break;
}

size_t bytesRead = size_t(ret);
totalData += bytesRead;
if (!server) {
server = folly::SocketAddress();
server->setFromSockaddr(rawAddr, kAddrLen);
}
VLOG(10) << "Got data from socket peer=" << *server << " len=" << bytesRead;
readBuffer->append(bytesRead);

networkData.addPacket(ReceivedUdpPacket(std::move(readBuffer)));
}
trackDatagramsReceived(
networkData.getPackets().size(), networkData.getTotalData());
}

void QuicClientTransport::recvMmsg(
QuicAsyncUDPSocket& sock,
uint64_t readBufferSize,
Expand Down Expand Up @@ -1598,6 +1660,18 @@ void QuicClientTransport::readWithRecvmsg(
processPackets(std::move(networkData), server);
}

void QuicClientTransport::readWithRecvfrom(
QuicAsyncUDPSocket& sock,
uint64_t readBufferSize,
uint16_t numPackets) {
NetworkData networkData;
networkData.reserve(numPackets);
size_t totalData = 0;
Optional<folly::SocketAddress> server;
recvFrom(sock, readBufferSize, numPackets, networkData, server, totalData);
processPackets(std::move(networkData), server);
}

void QuicClientTransport::onNotifyDataAvailable(
QuicAsyncUDPSocket& sock) noexcept {
auto self = this->shared_from_this();
Expand All @@ -1606,7 +1680,9 @@ void QuicClientTransport::onNotifyDataAvailable(
conn_->transportSettings.maxRecvPacketSize * numGROBuffers_;
const uint16_t numPackets = conn_->transportSettings.maxRecvBatchSize;

if (conn_->transportSettings.shouldUseWrapperRecvmmsgForBatchRecv) {
if (conn_->transportSettings.shouldUseRecvfromForBatchRecv) {
readWithRecvfrom(sock, readBufferSize, numPackets);
} else if (conn_->transportSettings.shouldUseWrapperRecvmmsgForBatchRecv) {
readWithRecvmmsgWrapper(sock, readBufferSize, numPackets);
} else if (conn_->transportSettings.shouldUseRecvmmsgForBatchRecv) {
readWithRecvmmsg(sock, readBufferSize, numPackets);
Expand Down
13 changes: 13 additions & 0 deletions quic/client/QuicClientTransport.h
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,14 @@ class QuicClientTransport
bool shouldOnlyNotify() override;
void onNotifyDataAvailable(QuicAsyncUDPSocket& sock) noexcept override;

void recvFrom(
QuicAsyncUDPSocket& sock,
uint64_t readBufferSize,
int numPackets,
NetworkData& networkData,
Optional<folly::SocketAddress>& server,
size_t& totalData);

void recvMsg(
QuicAsyncUDPSocket& sock,
uint64_t readBufferSize,
Expand Down Expand Up @@ -327,6 +335,11 @@ class QuicClientTransport
NetworkData&& networkData,
const Optional<folly::SocketAddress>& server);

void readWithRecvfrom(
QuicAsyncUDPSocket& sock,
uint64_t readBufferSize,
uint16_t numPackets);

void readWithRecvmmsgWrapper(
QuicAsyncUDPSocket& sock,
uint64_t readBufferSize,
Expand Down
14 changes: 14 additions & 0 deletions quic/common/udpsocket/LibevQuicAsyncUDPSocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,20 @@ int LibevQuicAsyncUDPSocket::getGRO() {
return -1;
}

ssize_t LibevQuicAsyncUDPSocket::recvfrom(
uint8_t* buf,
size_t bufSize,
sockaddr_storage* sockaddrStorage) {
socklen_t addrlen = sizeof(*sockaddrStorage);
return ::recvfrom(
fd_,
buf,
bufSize,
MSG_DONTWAIT,
(struct sockaddr*)sockaddrStorage,
&addrlen);
}

ssize_t LibevQuicAsyncUDPSocket::recvmsg(struct msghdr* msg, int flags) {
return ::recvmsg(fd_, msg, flags);
}
Expand Down
5 changes: 5 additions & 0 deletions quic/common/udpsocket/LibevQuicAsyncUDPSocket.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ class LibevQuicAsyncUDPSocket : public QuicAsyncUDPSocketImpl {
LOG(FATAL) << __func__ << " not supported in LibevQuicAsyncUDPSocket";
}

ssize_t recvfrom(
uint8_t* buf,
size_t bufSize,
sockaddr_storage* sockaddrStorage) override;

ssize_t recvmsg(struct msghdr* msg, int flags) override;

int recvmmsg(
Expand Down
7 changes: 7 additions & 0 deletions quic/common/udpsocket/QuicAsyncUDPSocket.h
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,13 @@ class QuicAsyncUDPSocket {
size_t count,
const WriteOptions* options) = 0;

virtual ssize_t recvfrom(
uint8_t* /* buf */,
size_t /* bufSize */,
sockaddr_storage* /* sockaddrStorage */) {
return -1;
}

virtual ssize_t recvmsg(struct msghdr* /* msg */, int /* flags */) = 0;

virtual int recvmmsg(
Expand Down
2 changes: 2 additions & 0 deletions quic/state/TransportSettings.h
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,8 @@ struct TransportSettings {
bool shouldUseWrapperRecvmmsgForBatchRecv{false};
// Whether or not use recvmmsg.
bool shouldUseRecvmmsgForBatchRecv{false};
// Whether or not use recvfrom.
bool shouldUseRecvfromForBatchRecv{false};
// Config struct for congestion controllers
CongestionControlConfig ccaConfig;
// A packet is considered loss when a packet that's sent later by at least
Expand Down

0 comments on commit 25c2ea7

Please sign in to comment.