Skip to content

Commit

Permalink
Use now-official metadata representation for compressed record batches
Browse files Browse the repository at this point in the history
  • Loading branch information
wesm committed Jun 28, 2020
1 parent 22994b7 commit 96c4c79
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 33 deletions.
8 changes: 0 additions & 8 deletions cpp/src/arrow/ipc/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,11 @@ function(ADD_ARROW_IPC_TEST REL_TEST_NAME)
set(PREFIX "arrow-ipc")
endif()

if(ARG_LABELS)
set(LABELS ${ARG_LABELS})
else()
set(LABELS "arrow_ipc")
endif()

add_arrow_test(${REL_TEST_NAME}
EXTRA_LINK_LIBS
${ARROW_DATASET_TEST_LINK_LIBS}
PREFIX
${PREFIX}
LABELS
${LABELS}
${ARG_UNPARSED_ARGUMENTS})
endfunction()

Expand Down
41 changes: 33 additions & 8 deletions cpp/src/arrow/ipc/metadata_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "arrow/io/interfaces.h"
#include "arrow/ipc/dictionary.h"
#include "arrow/ipc/message.h"
#include "arrow/ipc/options.h"
#include "arrow/ipc/util.h"
#include "arrow/sparse_tensor.h"
#include "arrow/status.h"
Expand Down Expand Up @@ -839,6 +840,7 @@ Result<std::shared_ptr<Buffer>> WriteFBMessage(
using FieldNodeVector =
flatbuffers::Offset<flatbuffers::Vector<const flatbuf::FieldNode*>>;
using BufferVector = flatbuffers::Offset<flatbuffers::Vector<const flatbuf::Buffer*>>;
using BodyCompressionOffset = flatbuffers::Offset<flatbuf::BodyCompression>;

static Status WriteFieldNodes(FBB& fbb, const std::vector<FieldMetadata>& nodes,
FieldNodeVector* out) {
Expand Down Expand Up @@ -870,17 +872,38 @@ static Status WriteBuffers(FBB& fbb, const std::vector<BufferMetadata>& buffers,
return Status::OK();
}

static Status GetBodyCompression(FBB& fbb, const IpcWriteOptions& options,
BodyCompressionOffset* out) {
if (options.compression != Compression::UNCOMPRESSED) {
flatbuf::CompressionType codec;
if (options.compression == Compression::LZ4_FRAME) {
codec = flatbuf::CompressionType::LZ4_FRAME;
} else if (options.compression == Compression::ZSTD) {
codec = flatbuf::CompressionType::ZSTD;
} else {
return Status::Invalid("Unsupported IPC compression codec: ",
util::Codec::GetCodecAsString(options.compression));
}
*out = flatbuf::CreateBodyCompression(fbb, codec,
flatbuf::BodyCompressionMethod::BUFFER);
}
return Status::OK();
}

static Status MakeRecordBatch(FBB& fbb, int64_t length, int64_t body_length,
const std::vector<FieldMetadata>& nodes,
const std::vector<BufferMetadata>& buffers,
RecordBatchOffset* offset) {
const IpcWriteOptions& options, RecordBatchOffset* offset) {
FieldNodeVector fb_nodes;
BufferVector fb_buffers;

RETURN_NOT_OK(WriteFieldNodes(fbb, nodes, &fb_nodes));

BufferVector fb_buffers;
RETURN_NOT_OK(WriteBuffers(fbb, buffers, &fb_buffers));

*offset = flatbuf::CreateRecordBatch(fbb, length, fb_nodes, fb_buffers);
BodyCompressionOffset fb_compression;
RETURN_NOT_OK(GetBodyCompression(fbb, options, &fb_compression));

*offset = flatbuf::CreateRecordBatch(fbb, length, fb_nodes, fb_buffers, fb_compression);
return Status::OK();
}

Expand Down Expand Up @@ -1125,10 +1148,11 @@ Status WriteRecordBatchMessage(
int64_t length, int64_t body_length,
const std::shared_ptr<const KeyValueMetadata>& custom_metadata,
const std::vector<FieldMetadata>& nodes, const std::vector<BufferMetadata>& buffers,
std::shared_ptr<Buffer>* out) {
const IpcWriteOptions& options, std::shared_ptr<Buffer>* out) {
FBB fbb;
RecordBatchOffset record_batch;
RETURN_NOT_OK(MakeRecordBatch(fbb, length, body_length, nodes, buffers, &record_batch));
RETURN_NOT_OK(
MakeRecordBatch(fbb, length, body_length, nodes, buffers, options, &record_batch));
return WriteFBMessage(fbb, flatbuf::MessageHeader::RecordBatch, record_batch.Union(),
body_length, custom_metadata)
.Value(out);
Expand Down Expand Up @@ -1183,10 +1207,11 @@ Status WriteDictionaryMessage(
int64_t id, int64_t length, int64_t body_length,
const std::shared_ptr<const KeyValueMetadata>& custom_metadata,
const std::vector<FieldMetadata>& nodes, const std::vector<BufferMetadata>& buffers,
std::shared_ptr<Buffer>* out) {
const IpcWriteOptions& options, std::shared_ptr<Buffer>* out) {
FBB fbb;
RecordBatchOffset record_batch;
RETURN_NOT_OK(MakeRecordBatch(fbb, length, body_length, nodes, buffers, &record_batch));
RETURN_NOT_OK(
MakeRecordBatch(fbb, length, body_length, nodes, buffers, options, &record_batch));
auto dictionary_batch = flatbuf::CreateDictionaryBatch(fbb, id, record_batch).Union();
return WriteFBMessage(fbb, flatbuf::MessageHeader::DictionaryBatch, dictionary_batch,
body_length, custom_metadata)
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/arrow/ipc/metadata_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ Status WriteRecordBatchMessage(
const int64_t length, const int64_t body_length,
const std::shared_ptr<const KeyValueMetadata>& custom_metadata,
const std::vector<FieldMetadata>& nodes, const std::vector<BufferMetadata>& buffers,
std::shared_ptr<Buffer>* out);
const IpcWriteOptions& options, std::shared_ptr<Buffer>* out);

Result<std::shared_ptr<Buffer>> WriteTensorMessage(const Tensor& tensor,
const int64_t buffer_start_offset);
Expand All @@ -198,7 +198,7 @@ Status WriteDictionaryMessage(
const int64_t id, const int64_t length, const int64_t body_length,
const std::shared_ptr<const KeyValueMetadata>& custom_metadata,
const std::vector<FieldMetadata>& nodes, const std::vector<BufferMetadata>& buffers,
std::shared_ptr<Buffer>* out);
const IpcWriteOptions& options, std::shared_ptr<Buffer>* out);

static inline Result<std::shared_ptr<Buffer>> WriteFlatbufferBuilder(
flatbuffers::FlatBufferBuilder& fbb) {
Expand Down
5 changes: 5 additions & 0 deletions cpp/src/arrow/ipc/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <cstdint>
#include <vector>

#include "arrow/ipc/type_fwd.h"
#include "arrow/status.h"
#include "arrow/type_fwd.h"
#include "arrow/util/compression.h"
Expand Down Expand Up @@ -66,6 +67,10 @@ struct ARROW_EXPORT IpcWriteOptions {
/// like compression
bool use_threads = true;

/// \brief Format version to use for IPC messages and their
/// metadata. Presently using V4 version (readable by v0.8.0 and later).
MetadataVersion metadata_version = MetadataVersion::V4;

static IpcWriteOptions Defaults();
};

Expand Down
7 changes: 4 additions & 3 deletions cpp/src/arrow/ipc/read_write_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,10 @@ TEST(TestMessage, SerializeCustomMetadata) {
key_value_metadata({"foo", "bar"}, {"fizz", "buzz"})};
for (auto metadata : cases) {
std::shared_ptr<Buffer> serialized;
ASSERT_OK(internal::WriteRecordBatchMessage(/*length=*/0, /*body_length=*/0, metadata,
/*nodes=*/{},
/*buffers=*/{}, &serialized));
ASSERT_OK(internal::WriteRecordBatchMessage(
/*length=*/0, /*body_length=*/0, metadata,
/*nodes=*/{},
/*buffers=*/{}, IpcWriteOptions::Defaults(), &serialized));
ASSERT_OK_AND_ASSIGN(std::unique_ptr<Message> message,
Message::Open(serialized, /*body=*/nullptr));

Expand Down
48 changes: 42 additions & 6 deletions cpp/src/arrow/ipc/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,29 @@ Result<std::shared_ptr<RecordBatch>> LoadRecordBatch(
// ----------------------------------------------------------------------
// Array loading

Status GetCompression(const flatbuf::Message* message, Compression::type* out) {
Status GetCompression(const flatbuf::RecordBatch* batch, Compression::type* out) {
*out = Compression::UNCOMPRESSED;
const flatbuf::BodyCompression* compression = batch->compression();
if (compression != nullptr) {
if (compression->method() != flatbuf::BodyCompressionMethod::BUFFER) {
// Forward compatibility
return Status::Invalid("This library only supports BUFFER compression method");
}

if (compression->codec() == flatbuf::CompressionType::LZ4_FRAME) {
*out = Compression::LZ4_FRAME;
} else if (compression->codec() == flatbuf::CompressionType::ZSTD) {
*out = Compression::ZSTD;
} else {
return Status::Invalid("Unsupported codec in RecordBatch::compression metadata");
}
return Status::OK();
}
return Status::OK();
}

Status GetCompressionExperimental(const flatbuf::Message* message,
Compression::type* out) {
*out = Compression::UNCOMPRESSED;
if (message->custom_metadata() != nullptr) {
// TODO: Ensure this deserialization only ever happens once
Expand All @@ -489,7 +511,7 @@ Status GetCompression(const flatbuf::Message* message, Compression::type* out) {
ARROW_ASSIGN_OR_RAISE(*out,
util::Codec::GetCompressionType(metadata->value(index)));
}
RETURN_NOT_OK(internal::CheckCompressionSupported(*out));
return internal::CheckCompressionSupported(*out);
}
return Status::OK();
}
Expand Down Expand Up @@ -535,8 +557,15 @@ Result<std::shared_ptr<RecordBatch>> ReadRecordBatchInternal(
return Status::IOError(
"Header-type of flatbuffer-encoded Message is not RecordBatch.");
}

Compression::type compression;
RETURN_NOT_OK(GetCompression(message, &compression));
RETURN_NOT_OK(GetCompression(batch, &compression));
if (compression == Compression::UNCOMPRESSED &&
message->version() == flatbuf::MetadataVersion::V4) {
// Possibly obtain codec information from experimental serialization format
// in 0.17.x
RETURN_NOT_OK(GetCompressionExperimental(message, &compression));
}
return LoadRecordBatch(batch, schema, inclusion_mask, dictionary_memo, options,
compression, file);
}
Expand Down Expand Up @@ -623,8 +652,17 @@ Status ReadDictionary(const Buffer& metadata, DictionaryMemo* dictionary_memo,
"Header-type of flatbuffer-encoded Message is not DictionaryBatch.");
}

// The dictionary is embedded in a record batch with a single column
auto batch_meta = dictionary_batch->data();

Compression::type compression;
RETURN_NOT_OK(GetCompression(message, &compression));
RETURN_NOT_OK(GetCompression(batch_meta, &compression));
if (compression == Compression::UNCOMPRESSED &&
message->version() == flatbuf::MetadataVersion::V4) {
// Possibly obtain codec information from experimental serialization format
// in 0.17.x
RETURN_NOT_OK(GetCompressionExperimental(message, &compression));
}

int64_t id = dictionary_batch->id();

Expand All @@ -635,8 +673,6 @@ Status ReadDictionary(const Buffer& metadata, DictionaryMemo* dictionary_memo,

auto value_field = ::arrow::field("dummy", value_type);

// The dictionary is embedded in a record batch with a single column
auto batch_meta = dictionary_batch->data();
CHECK_FLATBUFFERS_NOT_NULL(batch_meta, "DictionaryBatch.data");

std::shared_ptr<RecordBatch> batch;
Expand Down
8 changes: 2 additions & 6 deletions cpp/src/arrow/ipc/writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ class RecordBatchSerializer {
// Override this for writing dictionary metadata
virtual Status SerializeMetadata(int64_t num_rows) {
return WriteRecordBatchMessage(num_rows, out_->body_length, custom_metadata_,
field_nodes_, buffer_meta_, &out_->metadata);
field_nodes_, buffer_meta_, options_, &out_->metadata);
}

void AppendCustomMetadata(const std::string& key, const std::string& value) {
Expand Down Expand Up @@ -186,10 +186,6 @@ class RecordBatchSerializer {

RETURN_NOT_OK(internal::CheckCompressionSupported(options_.compression));

// TODO check allowed values for compression?
AppendCustomMetadata("ARROW:experimental_compression",
util::Codec::GetCodecAsString(options_.compression));

ARROW_ASSIGN_OR_RAISE(
codec, util::Codec::Create(options_.compression, options_.compression_level));

Expand Down Expand Up @@ -543,7 +539,7 @@ class DictionarySerializer : public RecordBatchSerializer {

Status SerializeMetadata(int64_t num_rows) override {
return WriteDictionaryMessage(dictionary_id_, num_rows, out_->body_length,
custom_metadata_, field_nodes_, buffer_meta_,
custom_metadata_, field_nodes_, buffer_meta_, options_,
&out_->metadata);
}

Expand Down

0 comments on commit 96c4c79

Please sign in to comment.