Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-8671: [C++] Use new BodyCompression Flatbuffers member for IPC compression metadata #7571

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions cpp/src/arrow/ipc/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,11 @@ function(ADD_ARROW_IPC_TEST REL_TEST_NAME)
set(PREFIX "arrow-ipc")
endif()

if(ARG_LABELS)
set(LABELS ${ARG_LABELS})
else()
set(LABELS "arrow_ipc")
endif()

add_arrow_test(${REL_TEST_NAME}
EXTRA_LINK_LIBS
${ARROW_DATASET_TEST_LINK_LIBS}
PREFIX
${PREFIX}
LABELS
${LABELS}
${ARG_UNPARSED_ARGUMENTS})
endfunction()

Expand Down
41 changes: 33 additions & 8 deletions cpp/src/arrow/ipc/metadata_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "arrow/io/interfaces.h"
#include "arrow/ipc/dictionary.h"
#include "arrow/ipc/message.h"
#include "arrow/ipc/options.h"
#include "arrow/ipc/util.h"
#include "arrow/sparse_tensor.h"
#include "arrow/status.h"
Expand Down Expand Up @@ -839,6 +840,7 @@ Result<std::shared_ptr<Buffer>> WriteFBMessage(
using FieldNodeVector =
flatbuffers::Offset<flatbuffers::Vector<const flatbuf::FieldNode*>>;
using BufferVector = flatbuffers::Offset<flatbuffers::Vector<const flatbuf::Buffer*>>;
using BodyCompressionOffset = flatbuffers::Offset<flatbuf::BodyCompression>;

static Status WriteFieldNodes(FBB& fbb, const std::vector<FieldMetadata>& nodes,
FieldNodeVector* out) {
Expand Down Expand Up @@ -870,17 +872,38 @@ static Status WriteBuffers(FBB& fbb, const std::vector<BufferMetadata>& buffers,
return Status::OK();
}

static Status GetBodyCompression(FBB& fbb, const IpcWriteOptions& options,
BodyCompressionOffset* out) {
if (options.compression != Compression::UNCOMPRESSED) {
flatbuf::CompressionType codec;
if (options.compression == Compression::LZ4_FRAME) {
codec = flatbuf::CompressionType::LZ4_FRAME;
} else if (options.compression == Compression::ZSTD) {
codec = flatbuf::CompressionType::ZSTD;
} else {
return Status::Invalid("Unsupported IPC compression codec: ",
util::Codec::GetCodecAsString(options.compression));
}
*out = flatbuf::CreateBodyCompression(fbb, codec,
flatbuf::BodyCompressionMethod::BUFFER);
}
return Status::OK();
}

static Status MakeRecordBatch(FBB& fbb, int64_t length, int64_t body_length,
const std::vector<FieldMetadata>& nodes,
const std::vector<BufferMetadata>& buffers,
RecordBatchOffset* offset) {
const IpcWriteOptions& options, RecordBatchOffset* offset) {
FieldNodeVector fb_nodes;
BufferVector fb_buffers;

RETURN_NOT_OK(WriteFieldNodes(fbb, nodes, &fb_nodes));

BufferVector fb_buffers;
RETURN_NOT_OK(WriteBuffers(fbb, buffers, &fb_buffers));

*offset = flatbuf::CreateRecordBatch(fbb, length, fb_nodes, fb_buffers);
BodyCompressionOffset fb_compression;
RETURN_NOT_OK(GetBodyCompression(fbb, options, &fb_compression));

*offset = flatbuf::CreateRecordBatch(fbb, length, fb_nodes, fb_buffers, fb_compression);
return Status::OK();
}

Expand Down Expand Up @@ -1125,10 +1148,11 @@ Status WriteRecordBatchMessage(
int64_t length, int64_t body_length,
const std::shared_ptr<const KeyValueMetadata>& custom_metadata,
const std::vector<FieldMetadata>& nodes, const std::vector<BufferMetadata>& buffers,
std::shared_ptr<Buffer>* out) {
const IpcWriteOptions& options, std::shared_ptr<Buffer>* out) {
FBB fbb;
RecordBatchOffset record_batch;
RETURN_NOT_OK(MakeRecordBatch(fbb, length, body_length, nodes, buffers, &record_batch));
RETURN_NOT_OK(
MakeRecordBatch(fbb, length, body_length, nodes, buffers, options, &record_batch));
return WriteFBMessage(fbb, flatbuf::MessageHeader::RecordBatch, record_batch.Union(),
body_length, custom_metadata)
.Value(out);
Expand Down Expand Up @@ -1183,10 +1207,11 @@ Status WriteDictionaryMessage(
int64_t id, int64_t length, int64_t body_length,
const std::shared_ptr<const KeyValueMetadata>& custom_metadata,
const std::vector<FieldMetadata>& nodes, const std::vector<BufferMetadata>& buffers,
std::shared_ptr<Buffer>* out) {
const IpcWriteOptions& options, std::shared_ptr<Buffer>* out) {
FBB fbb;
RecordBatchOffset record_batch;
RETURN_NOT_OK(MakeRecordBatch(fbb, length, body_length, nodes, buffers, &record_batch));
RETURN_NOT_OK(
MakeRecordBatch(fbb, length, body_length, nodes, buffers, options, &record_batch));
auto dictionary_batch = flatbuf::CreateDictionaryBatch(fbb, id, record_batch).Union();
return WriteFBMessage(fbb, flatbuf::MessageHeader::DictionaryBatch, dictionary_batch,
body_length, custom_metadata)
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/arrow/ipc/metadata_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ Status WriteRecordBatchMessage(
const int64_t length, const int64_t body_length,
const std::shared_ptr<const KeyValueMetadata>& custom_metadata,
const std::vector<FieldMetadata>& nodes, const std::vector<BufferMetadata>& buffers,
std::shared_ptr<Buffer>* out);
const IpcWriteOptions& options, std::shared_ptr<Buffer>* out);

Result<std::shared_ptr<Buffer>> WriteTensorMessage(const Tensor& tensor,
const int64_t buffer_start_offset);
Expand All @@ -198,7 +198,7 @@ Status WriteDictionaryMessage(
const int64_t id, const int64_t length, const int64_t body_length,
const std::shared_ptr<const KeyValueMetadata>& custom_metadata,
const std::vector<FieldMetadata>& nodes, const std::vector<BufferMetadata>& buffers,
std::shared_ptr<Buffer>* out);
const IpcWriteOptions& options, std::shared_ptr<Buffer>* out);

static inline Result<std::shared_ptr<Buffer>> WriteFlatbufferBuilder(
flatbuffers::FlatBufferBuilder& fbb) {
Expand Down
5 changes: 5 additions & 0 deletions cpp/src/arrow/ipc/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <cstdint>
#include <vector>

#include "arrow/ipc/type_fwd.h"
#include "arrow/status.h"
#include "arrow/type_fwd.h"
#include "arrow/util/compression.h"
Expand Down Expand Up @@ -66,6 +67,10 @@ struct ARROW_EXPORT IpcWriteOptions {
/// like compression
bool use_threads = true;

/// \brief Format version to use for IPC messages and their
/// metadata. Presently using V4 version (readable by v0.8.0 and later).
MetadataVersion metadata_version = MetadataVersion::V4;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find it a bit weird to expose this an option. Will we be able to write data compatible with other metadata versions?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. This is what I've been saying in the V4/V5 MetadataVersion discussion e-mail

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, I had misunderstood that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


static IpcWriteOptions Defaults();
};

Expand Down
7 changes: 4 additions & 3 deletions cpp/src/arrow/ipc/read_write_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,10 @@ TEST(TestMessage, SerializeCustomMetadata) {
key_value_metadata({"foo", "bar"}, {"fizz", "buzz"})};
for (auto metadata : cases) {
std::shared_ptr<Buffer> serialized;
ASSERT_OK(internal::WriteRecordBatchMessage(/*length=*/0, /*body_length=*/0, metadata,
/*nodes=*/{},
/*buffers=*/{}, &serialized));
ASSERT_OK(internal::WriteRecordBatchMessage(
/*length=*/0, /*body_length=*/0, metadata,
/*nodes=*/{},
/*buffers=*/{}, IpcWriteOptions::Defaults(), &serialized));
ASSERT_OK_AND_ASSIGN(std::unique_ptr<Message> message,
Message::Open(serialized, /*body=*/nullptr));

Expand Down
48 changes: 42 additions & 6 deletions cpp/src/arrow/ipc/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,29 @@ Result<std::shared_ptr<RecordBatch>> LoadRecordBatch(
// ----------------------------------------------------------------------
// Array loading

Status GetCompression(const flatbuf::Message* message, Compression::type* out) {
Status GetCompression(const flatbuf::RecordBatch* batch, Compression::type* out) {
*out = Compression::UNCOMPRESSED;
const flatbuf::BodyCompression* compression = batch->compression();
if (compression != nullptr) {
if (compression->method() != flatbuf::BodyCompressionMethod::BUFFER) {
// Forward compatibility
return Status::Invalid("This library only supports BUFFER compression method");
}

if (compression->codec() == flatbuf::CompressionType::LZ4_FRAME) {
*out = Compression::LZ4_FRAME;
} else if (compression->codec() == flatbuf::CompressionType::ZSTD) {
*out = Compression::ZSTD;
} else {
return Status::Invalid("Unsupported codec in RecordBatch::compression metadata");
}
return Status::OK();
}
return Status::OK();
}

Status GetCompressionExperimental(const flatbuf::Message* message,
Compression::type* out) {
*out = Compression::UNCOMPRESSED;
if (message->custom_metadata() != nullptr) {
// TODO: Ensure this deserialization only ever happens once
Expand All @@ -489,7 +511,7 @@ Status GetCompression(const flatbuf::Message* message, Compression::type* out) {
ARROW_ASSIGN_OR_RAISE(*out,
util::Codec::GetCompressionType(metadata->value(index)));
}
RETURN_NOT_OK(internal::CheckCompressionSupported(*out));
return internal::CheckCompressionSupported(*out);
}
return Status::OK();
}
Expand Down Expand Up @@ -535,8 +557,15 @@ Result<std::shared_ptr<RecordBatch>> ReadRecordBatchInternal(
return Status::IOError(
"Header-type of flatbuffer-encoded Message is not RecordBatch.");
}

Compression::type compression;
RETURN_NOT_OK(GetCompression(message, &compression));
RETURN_NOT_OK(GetCompression(batch, &compression));
if (compression == Compression::UNCOMPRESSED &&
message->version() == flatbuf::MetadataVersion::V4) {
// Possibly obtain codec information from experimental serialization format
// in 0.17.x
RETURN_NOT_OK(GetCompressionExperimental(message, &compression));
}
return LoadRecordBatch(batch, schema, inclusion_mask, dictionary_memo, options,
compression, file);
}
Expand Down Expand Up @@ -623,8 +652,17 @@ Status ReadDictionary(const Buffer& metadata, DictionaryMemo* dictionary_memo,
"Header-type of flatbuffer-encoded Message is not DictionaryBatch.");
}

// The dictionary is embedded in a record batch with a single column
auto batch_meta = dictionary_batch->data();

Compression::type compression;
RETURN_NOT_OK(GetCompression(message, &compression));
RETURN_NOT_OK(GetCompression(batch_meta, &compression));
if (compression == Compression::UNCOMPRESSED &&
message->version() == flatbuf::MetadataVersion::V4) {
// Possibly obtain codec information from experimental serialization format
// in 0.17.x
RETURN_NOT_OK(GetCompressionExperimental(message, &compression));
}

int64_t id = dictionary_batch->id();

Expand All @@ -635,8 +673,6 @@ Status ReadDictionary(const Buffer& metadata, DictionaryMemo* dictionary_memo,

auto value_field = ::arrow::field("dummy", value_type);

// The dictionary is embedded in a record batch with a single column
auto batch_meta = dictionary_batch->data();
CHECK_FLATBUFFERS_NOT_NULL(batch_meta, "DictionaryBatch.data");

std::shared_ptr<RecordBatch> batch;
Expand Down
8 changes: 2 additions & 6 deletions cpp/src/arrow/ipc/writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ class RecordBatchSerializer {
// Override this for writing dictionary metadata
virtual Status SerializeMetadata(int64_t num_rows) {
return WriteRecordBatchMessage(num_rows, out_->body_length, custom_metadata_,
field_nodes_, buffer_meta_, &out_->metadata);
field_nodes_, buffer_meta_, options_, &out_->metadata);
}

void AppendCustomMetadata(const std::string& key, const std::string& value) {
Expand Down Expand Up @@ -186,10 +186,6 @@ class RecordBatchSerializer {

RETURN_NOT_OK(internal::CheckCompressionSupported(options_.compression));

// TODO check allowed values for compression?
AppendCustomMetadata("ARROW:experimental_compression",
util::Codec::GetCodecAsString(options_.compression));

ARROW_ASSIGN_OR_RAISE(
codec, util::Codec::Create(options_.compression, options_.compression_level));

Expand Down Expand Up @@ -543,7 +539,7 @@ class DictionarySerializer : public RecordBatchSerializer {

Status SerializeMetadata(int64_t num_rows) override {
return WriteDictionaryMessage(dictionary_id_, num_rows, out_->body_length,
custom_metadata_, field_nodes_, buffer_meta_,
custom_metadata_, field_nodes_, buffer_meta_, options_,
&out_->metadata);
}

Expand Down