Skip to content

Commit

Permalink
4/[thrift][checksum]port payload util free functions to a class
Browse files Browse the repository at this point in the history
Summary: Adds a default serializer and points the direct calls to PayloadUtils free function to the LegacyPayloadSerializerStrategy which delegates to the free functions.

Reviewed By: praihan

Differential Revision: D62485627

fbshipit-source-id: d52f9feb46c1cf5e75c7aa35838d6970515f3e25
  • Loading branch information
Robert Roeser authored and facebook-github-bot committed Sep 24, 2024
1 parent eff10a5 commit c342b98
Show file tree
Hide file tree
Showing 7 changed files with 403 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,212 @@

#include <thrift/lib/cpp2/transport/rocket/payload/DefaultPayloadSerializerStrategy.h>

#include <folly/io/async/AsyncTransport.h>
#include <thrift/lib/cpp2/protocol/BinaryProtocol.h>
#include <thrift/lib/cpp2/protocol/CompactProtocol.h>
#include <thrift/lib/cpp2/transport/rocket/Compression.h>
#include <thrift/lib/cpp2/transport/rocket/FdSocket.h>

#include <thrift/lib/cpp2/transport/rocket/Types.h>
#include <thrift/lib/thrift/gen-cpp2/RpcMetadata_types.h>

namespace apache::thrift::rocket {

template <class T>
folly::Try<T> DefaultPayloadSerializerStrategy::unpackAsCompressed(
rocket::Payload&&, bool) {
throw std::runtime_error("not implemented");
namespace {

template <typename Metadata>
void applyCompressionIfNeeded(
std::unique_ptr<folly::IOBuf>& payload, Metadata* metadata) {
if (auto compress = metadata->compression_ref()) {
apache::thrift::rocket::detail::compressPayload(payload, *compress);
}
}

bool validateFileDescriptor(size_t numFds, FdMetadata& fdMetadata) {
// The kernel maximum is actually much lower (at least on Linux, and
// MacOS doesn't seem to document it at all), but that will only fail in
// in `AsyncFdSocket`.
constexpr auto numFdsTypeMax = std::numeric_limits<
op::get_native_type<FdMetadata, ident::numFds>>::max();

if (LIKELY(numFdsTypeMax <= numFds)) {
return true;
} else {
LOG(DFATAL) << numFds << " would overflow FdMetadata::numFds";
fdMetadata.numFds() = numFdsTypeMax;
// This will cause "AsyncFdSocket::writeChainWithFds" to error out.
fdMetadata.fdSeqNum() = folly::SocketFds::kNoSeqNum;
return false;
}
}

template <typename Metadata>
void handleFds(
folly::SocketFds& fds,
Metadata* metadata,
folly::AsyncTransport* transport) {
auto numFds = fds.size();
if (numFds) {
FdMetadata fdMetadata;
if (LIKELY(validateFileDescriptor(numFds, fdMetadata))) {
// When received, the request will know to retrieve this many FDs.
fdMetadata.numFds() = numFds;
// FD sequence numbers count the total number of FDs sent on this
// socket, and are used to detect & fail on the dire class of bugs where
// the wrong FDs are about to be associated with a message.
//
// We currently require message bytes and FDs to be both sent and
// received in a coherent order, so sequence numbers here in `pack*` are
// expected to exactly match the sequencing of socket sends, and also the
// sequencing of `popNextReceivedFds` on the receiving side.
//
// NB: If `transport` is not backed by a `AsyncFdSocket*`, this will
// store `fdSeqNum == -1`, which cannot happen otherwise, thanks to
// AsyncFdSocket's 2^63 -> 0 wrap-around logic. Furthermore, the
// subsequent `writeChainWithFds` will discard `fds`. As a result, the
// recipient will see read errors on the FDs due to both `numFds` not
// matching, and `fdSeqNum` not matching.
fdMetadata.fdSeqNum() =
injectFdSocketSeqNumIntoFdsToSend(transport, &fds);
}

DCHECK(!metadata->fdMetadata().has_value());
metadata->fdMetadata() = fdMetadata;
}
}

template <class T>
folly::Try<T> DefaultPayloadSerializerStrategy::unpack(
rocket::Payload&&, bool) {
throw std::runtime_error("not implemented");
} // namespace

template <typename Metadata>
rocket::Payload DefaultPayloadSerializerStrategy::finalizePayload(
std::unique_ptr<folly::IOBuf>&& payload,
Metadata* metadata,
folly::SocketFds fds) {
auto ret = makePayload<Metadata, CompactProtocolWriter>(
*metadata, std::move(payload));
if (fds.size()) {
ret.fds = std::move(fds.dcheckToSendOrEmpty());
}
return ret;
}

template <class PayloadType>
Payload DefaultPayloadSerializerStrategy::pack(
PayloadType&&, folly::AsyncTransport*) {
throw std::runtime_error("not implemented");
template <typename Metadata>
rocket::Payload DefaultPayloadSerializerStrategy::packWithFds(
Metadata* metadata,
std::unique_ptr<folly::IOBuf>&& payload,
folly::SocketFds fds,
folly::AsyncTransport* transport) {
applyCompressionIfNeeded(payload, metadata);
handleFds(fds, metadata, transport);
return finalizePayload(std::move(payload), metadata, std::move(fds));
}

template rocket::Payload
DefaultPayloadSerializerStrategy::packWithFds<RequestRpcMetadata>(
RequestRpcMetadata*,
std::unique_ptr<folly::IOBuf>&&,
folly::SocketFds,
folly::AsyncTransport*);

template rocket::Payload
DefaultPayloadSerializerStrategy::packWithFds<ResponseRpcMetadata>(
ResponseRpcMetadata*,
std::unique_ptr<folly::IOBuf>&&,
folly::SocketFds,
folly::AsyncTransport*);

template rocket::Payload
DefaultPayloadSerializerStrategy::packWithFds<StreamPayloadMetadata>(
StreamPayloadMetadata*,
std::unique_ptr<folly::IOBuf>&&,
folly::SocketFds,
folly::AsyncTransport*);

bool DefaultPayloadSerializerStrategy::
canSerializeMetadataIntoDataBufferHeadroom(
const std::unique_ptr<folly::IOBuf>& data, const size_t serSize) const {
return data && !data->isChained() &&
data->headroom() >= serSize + kHeadroomBytes && !data->isSharedOne();
}

template <class Metadata, class ProtocolWriter>
Payload DefaultPayloadSerializerStrategy::makePayloadWithHeadroom(
ProtocolWriter& writer,
const Metadata& metadata,
std::unique_ptr<folly::IOBuf> data) {
folly::IOBufQueue queue;
// Store previous state of the buffer pointers and rewind it.
auto startBuffer = data->buffer();
auto start = data->data();
auto origLen = data->length();
data->trimEnd(origLen);
data->retreat(start - startBuffer);

queue.append(std::move(data), false);
writer.setOutput(&queue);
auto metadataLen = metadata.write(&writer);

// Move the new data to come right before the old data and restore the
// old tail pointer.
data = queue.move();
data->advance(start - data->tail());
data->append(origLen);

return Payload::makeCombined(std::move(data), metadataLen);
}

template <class Metadata, class ProtocolWriter>
Payload DefaultPayloadSerializerStrategy::makePayloadWithoutHeadroom(
size_t serSize,
ProtocolWriter& writer,
const Metadata& metadata,
std::unique_ptr<folly::IOBuf> data) {
folly::IOBufQueue queue;
constexpr size_t kMinAllocBytes = 1024;
auto buf =
folly::IOBuf::create(std::max(kHeadroomBytes + serSize, kMinAllocBytes));
buf->advance(kHeadroomBytes);
queue.append(std::move(buf));
writer.setOutput(&queue);
auto metadataLen = metadata.write(&writer);
queue.append(std::move(data));
return Payload::makeCombined(queue.move(), metadataLen);
}

template <class Metadata, class ProtocolWriter>
Payload DefaultPayloadSerializerStrategy::makePayload(
const Metadata& metadata, std::unique_ptr<folly::IOBuf> data) {
ProtocolWriter writer;
// Default is to leave some headroom for rsocket headers
size_t serSize = metadata.serializedSizeZC(&writer);

// If possible, serialize metadata into the headeroom of data.
if (canSerializeMetadataIntoDataBufferHeadroom(data, serSize)) {
return makePayloadWithHeadroom(writer, metadata, std::move(data));
} else {
return makePayloadWithoutHeadroom(
serSize, writer, metadata, std::move(data));
}
}

template Payload DefaultPayloadSerializerStrategy::
makePayload<RequestRpcMetadata, BinaryProtocolWriter>(
const RequestRpcMetadata&, std::unique_ptr<folly::IOBuf> data);
template Payload DefaultPayloadSerializerStrategy::
makePayload<ResponseRpcMetadata, BinaryProtocolWriter>(
const ResponseRpcMetadata&, std::unique_ptr<folly::IOBuf> data);
template Payload DefaultPayloadSerializerStrategy::
makePayload<StreamPayloadMetadata, BinaryProtocolWriter>(
const StreamPayloadMetadata&, std::unique_ptr<folly::IOBuf> data);

template Payload DefaultPayloadSerializerStrategy::
makePayload<RequestRpcMetadata, CompactProtocolWriter>(
const RequestRpcMetadata&, std::unique_ptr<folly::IOBuf> data);
template Payload DefaultPayloadSerializerStrategy::
makePayload<ResponseRpcMetadata, CompactProtocolWriter>(
const ResponseRpcMetadata&, std::unique_ptr<folly::IOBuf> data);
template Payload DefaultPayloadSerializerStrategy::
makePayload<StreamPayloadMetadata, CompactProtocolWriter>(
const StreamPayloadMetadata&, std::unique_ptr<folly::IOBuf> data);

} // namespace apache::thrift::rocket
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,30 @@

namespace apache::thrift::rocket {

// TODO rroeser - right now this is a no-op, but will be used for adding support
// for checksum and compression, and other features that don't delegate to the
// PayloadUtils.h header free functions.
/**
* Port of PayloadUtils.h header free functions into a strategy class.
*/
class DefaultPayloadSerializerStrategy final
: public PayloadSerializerStrategy<DefaultPayloadSerializerStrategy> {
public:
DefaultPayloadSerializerStrategy() : PayloadSerializerStrategy(*this) {}

template <class T>
folly::Try<T> unpackAsCompressed(rocket::Payload&& payload, bool useBinary);
folly::Try<T> unpackAsCompressed(rocket::Payload&& payload) {
return folly::makeTryWith([&]() {
T t = unpackImpl<T>(std::move(payload));
if (auto compression = t.metadata.compression()) {
t.payload = uncompressBuffer(std::move(t.payload), *compression);
}
return std::move(t);
});
}

template <class T>
folly::Try<T> unpack(rocket::Payload&& payload, bool useBinary);
folly::Try<T> unpack(rocket::Payload&& payload) {
return folly::makeTryWith(
[&]() { return unpackImpl<T>(std::move(payload)); });
}

template <typename Metadata>
rocket::Payload packWithFds(
Expand All @@ -43,12 +54,88 @@ class DefaultPayloadSerializerStrategy final
folly::AsyncTransport* transport);

template <typename T>
size_t unpackCompact(T&, const folly::IOBuf*) {
throw std::runtime_error("not implemented");
std::unique_ptr<folly::IOBuf> packCompact(T&& data) {
CompactProtocolWriter writer;
folly::IOBufQueue queue;
writer.setOutput(&queue);
data.write(&writer);
return queue.move();
}

template <typename T>
size_t unpackCompact(T& output, const folly::IOBuf* buffer) {
if (FOLLY_UNLIKELY(!buffer)) {
folly::throw_exception<std::runtime_error>("Underflow");
}
CompactProtocolReader reader;
reader.setInput(buffer);
output.read(&reader);
return reader.getCursorPosition();
}

template <class PayloadType>
Payload pack(PayloadType&& payload, folly::AsyncTransport* transport);
rocket::Payload pack(
PayloadType&& payload, folly::AsyncTransport* transport) {
auto metadata = std::forward<PayloadType>(payload).metadata;
return packWithFds(
&metadata,
std::forward<PayloadType>(payload).payload,
std::forward<PayloadType>(payload).fds,
transport);
}

private:
static constexpr size_t kHeadroomBytes = 16;

template <typename Metadata>
rocket::Payload finalizePayload(
std::unique_ptr<folly::IOBuf>&& payload,
Metadata* metadata,
folly::SocketFds fds);

bool canSerializeMetadataIntoDataBufferHeadroom(
const std::unique_ptr<folly::IOBuf>& data, const size_t serSize) const;

template <class Metadata, class ProtocolWriter>
Payload makePayloadWithHeadroom(
ProtocolWriter& writer,
const Metadata& metadata,
std::unique_ptr<folly::IOBuf> data);

template <class Metadata, class ProtocolWriter>
Payload makePayloadWithoutHeadroom(
size_t serSize,
ProtocolWriter& writer,
const Metadata& metadata,
std::unique_ptr<folly::IOBuf> data);

template <class Metadata, class ProtocolWriter>
Payload makePayload(
const Metadata& metadata, std::unique_ptr<folly::IOBuf> data);

void verifyMetadataSize(size_t metadataSize, size_t expectedSize) {
if (metadataSize != expectedSize) {
folly::throw_exception<std::out_of_range>("metadata size mismatch");
}
}

template <typename T>
T unpackImpl(rocket::Payload&& payload) {
T t{{}, {}};
unpackPayloadMetadata(t, payload);
t.payload = std::move(payload).data();
return t;
}

template <typename T>
void unpackPayloadMetadata(T& t, rocket::Payload& payload) {
if (payload.hasNonemptyMetadata()) {
if (unpackCompact(t.metadata, payload.buffer()) !=
payload.metadataSize()) {
folly::throw_exception<std::out_of_range>("metadata size mismatch");
}
}
}
};

} // namespace apache::thrift::rocket
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ class LegacyPayloadSerializerStrategy final
}

template <class T>
FOLLY_ERASE folly::Try<T> unpack(rocket::Payload&& payload, bool useBinary) {
return ::apache::thrift::rocket::unpack<T>(std::move(payload), useBinary);
FOLLY_ERASE folly::Try<T> unpack(rocket::Payload&& payload) {
return ::apache::thrift::rocket::unpack<T>(std::move(payload));
}

template <typename T>
Expand All @@ -57,6 +57,16 @@ class LegacyPayloadSerializerStrategy final
return ::apache::thrift::rocket::pack(
std::forward<PayloadType>(payload), transport);
}

template <typename Metadata>
FOLLY_ERASE rocket::Payload packWithFds(
Metadata* metadata,
std::unique_ptr<folly::IOBuf>&& payload,
folly::SocketFds fds,
folly::AsyncTransport* transport) {
return ::apache::thrift::rocket::packWithFds(
metadata, std::move(payload), std::move(fds), transport);
}
};

} // namespace apache::thrift::rocket
Loading

0 comments on commit c342b98

Please sign in to comment.