From 0b8f92a23f1889be849065f3281edff941916f24 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Sun, 28 Jun 2020 14:07:00 -0500 Subject: [PATCH] Use now-official metadata representation for compressed record batches --- cpp/src/arrow/ipc/CMakeLists.txt | 8 ----- cpp/src/arrow/ipc/metadata_internal.cc | 41 +++++++++++++++++----- cpp/src/arrow/ipc/metadata_internal.h | 4 +-- cpp/src/arrow/ipc/options.h | 5 +++ cpp/src/arrow/ipc/read_write_test.cc | 7 ++-- cpp/src/arrow/ipc/reader.cc | 48 ++++++++++++++++++++++---- cpp/src/arrow/ipc/writer.cc | 8 ++--- 7 files changed, 88 insertions(+), 33 deletions(-) diff --git a/cpp/src/arrow/ipc/CMakeLists.txt b/cpp/src/arrow/ipc/CMakeLists.txt index 10294d9f93096..8d9f03945d696 100644 --- a/cpp/src/arrow/ipc/CMakeLists.txt +++ b/cpp/src/arrow/ipc/CMakeLists.txt @@ -36,19 +36,11 @@ function(ADD_ARROW_IPC_TEST REL_TEST_NAME) set(PREFIX "arrow-ipc") endif() - if(ARG_LABELS) - set(LABELS ${ARG_LABELS}) - else() - set(LABELS "arrow_ipc") - endif() - add_arrow_test(${REL_TEST_NAME} EXTRA_LINK_LIBS ${ARROW_DATASET_TEST_LINK_LIBS} PREFIX ${PREFIX} - LABELS - ${LABELS} ${ARG_UNPARSED_ARGUMENTS}) endfunction() diff --git a/cpp/src/arrow/ipc/metadata_internal.cc b/cpp/src/arrow/ipc/metadata_internal.cc index 25f52c085babb..30214ac18efb2 100644 --- a/cpp/src/arrow/ipc/metadata_internal.cc +++ b/cpp/src/arrow/ipc/metadata_internal.cc @@ -29,6 +29,7 @@ #include "arrow/io/interfaces.h" #include "arrow/ipc/dictionary.h" #include "arrow/ipc/message.h" +#include "arrow/ipc/options.h" #include "arrow/ipc/util.h" #include "arrow/sparse_tensor.h" #include "arrow/status.h" @@ -839,6 +840,7 @@ Result> WriteFBMessage( using FieldNodeVector = flatbuffers::Offset>; using BufferVector = flatbuffers::Offset>; +using BodyCompressionOffset = flatbuffers::Offset; static Status WriteFieldNodes(FBB& fbb, const std::vector& nodes, FieldNodeVector* out) { @@ -870,17 +872,38 @@ static Status WriteBuffers(FBB& fbb, const std::vector& buffers, return Status::OK(); } +static Status GetBodyCompression(FBB& fbb, const IpcWriteOptions& options, + BodyCompressionOffset* out) { + if (options.compression != Compression::UNCOMPRESSED) { + flatbuf::CompressionType codec; + if (options.compression == Compression::LZ4_FRAME) { + codec = flatbuf::CompressionType::LZ4_FRAME; + } else if (options.compression == Compression::ZSTD) { + codec = flatbuf::CompressionType::ZSTD; + } else { + return Status::Invalid("Unsupported IPC compression codec: ", + util::Codec::GetCodecAsString(options.compression)); + } + *out = flatbuf::CreateBodyCompression(fbb, codec, + flatbuf::BodyCompressionMethod::BUFFER); + } + return Status::OK(); +} + static Status MakeRecordBatch(FBB& fbb, int64_t length, int64_t body_length, const std::vector& nodes, const std::vector& buffers, - RecordBatchOffset* offset) { + const IpcWriteOptions& options, RecordBatchOffset* offset) { FieldNodeVector fb_nodes; - BufferVector fb_buffers; - RETURN_NOT_OK(WriteFieldNodes(fbb, nodes, &fb_nodes)); + + BufferVector fb_buffers; RETURN_NOT_OK(WriteBuffers(fbb, buffers, &fb_buffers)); - *offset = flatbuf::CreateRecordBatch(fbb, length, fb_nodes, fb_buffers); + BodyCompressionOffset fb_compression; + RETURN_NOT_OK(GetBodyCompression(fbb, options, &fb_compression)); + + *offset = flatbuf::CreateRecordBatch(fbb, length, fb_nodes, fb_buffers, fb_compression); return Status::OK(); } @@ -1125,10 +1148,11 @@ Status WriteRecordBatchMessage( int64_t length, int64_t body_length, const std::shared_ptr& custom_metadata, const std::vector& nodes, const std::vector& buffers, - std::shared_ptr* out) { + const IpcWriteOptions& options, std::shared_ptr* out) { FBB fbb; RecordBatchOffset record_batch; - RETURN_NOT_OK(MakeRecordBatch(fbb, length, body_length, nodes, buffers, &record_batch)); + RETURN_NOT_OK( + MakeRecordBatch(fbb, length, body_length, nodes, buffers, options, &record_batch)); return WriteFBMessage(fbb, flatbuf::MessageHeader::RecordBatch, record_batch.Union(), body_length, custom_metadata) .Value(out); @@ -1183,10 +1207,11 @@ Status WriteDictionaryMessage( int64_t id, int64_t length, int64_t body_length, const std::shared_ptr& custom_metadata, const std::vector& nodes, const std::vector& buffers, - std::shared_ptr* out) { + const IpcWriteOptions& options, std::shared_ptr* out) { FBB fbb; RecordBatchOffset record_batch; - RETURN_NOT_OK(MakeRecordBatch(fbb, length, body_length, nodes, buffers, &record_batch)); + RETURN_NOT_OK( + MakeRecordBatch(fbb, length, body_length, nodes, buffers, options, &record_batch)); auto dictionary_batch = flatbuf::CreateDictionaryBatch(fbb, id, record_batch).Union(); return WriteFBMessage(fbb, flatbuf::MessageHeader::DictionaryBatch, dictionary_batch, body_length, custom_metadata) diff --git a/cpp/src/arrow/ipc/metadata_internal.h b/cpp/src/arrow/ipc/metadata_internal.h index e94c02784ad8a..b0da188363fa0 100644 --- a/cpp/src/arrow/ipc/metadata_internal.h +++ b/cpp/src/arrow/ipc/metadata_internal.h @@ -180,7 +180,7 @@ Status WriteRecordBatchMessage( const int64_t length, const int64_t body_length, const std::shared_ptr& custom_metadata, const std::vector& nodes, const std::vector& buffers, - std::shared_ptr* out); + const IpcWriteOptions& options, std::shared_ptr* out); Result> WriteTensorMessage(const Tensor& tensor, const int64_t buffer_start_offset); @@ -198,7 +198,7 @@ Status WriteDictionaryMessage( const int64_t id, const int64_t length, const int64_t body_length, const std::shared_ptr& custom_metadata, const std::vector& nodes, const std::vector& buffers, - std::shared_ptr* out); + const IpcWriteOptions& options, std::shared_ptr* out); static inline Result> WriteFlatbufferBuilder( flatbuffers::FlatBufferBuilder& fbb) { diff --git a/cpp/src/arrow/ipc/options.h b/cpp/src/arrow/ipc/options.h index 3cb42ba09cebc..385fc6814494b 100644 --- a/cpp/src/arrow/ipc/options.h +++ b/cpp/src/arrow/ipc/options.h @@ -20,6 +20,7 @@ #include #include +#include "arrow/ipc/type_fwd.h" #include "arrow/status.h" #include "arrow/type_fwd.h" #include "arrow/util/compression.h" @@ -66,6 +67,10 @@ struct ARROW_EXPORT IpcWriteOptions { /// like compression bool use_threads = true; + /// \brief Format version to use for IPC messages and their + /// metadata. Presently using V4 version (readable by v0.8.0 and later). + MetadataVersion metadata_version = MetadataVersion::V4; + static IpcWriteOptions Defaults(); }; diff --git a/cpp/src/arrow/ipc/read_write_test.cc b/cpp/src/arrow/ipc/read_write_test.cc index 92367083464de..2c1bf1c73a24d 100644 --- a/cpp/src/arrow/ipc/read_write_test.cc +++ b/cpp/src/arrow/ipc/read_write_test.cc @@ -126,9 +126,10 @@ TEST(TestMessage, SerializeCustomMetadata) { key_value_metadata({"foo", "bar"}, {"fizz", "buzz"})}; for (auto metadata : cases) { std::shared_ptr serialized; - ASSERT_OK(internal::WriteRecordBatchMessage(/*length=*/0, /*body_length=*/0, metadata, - /*nodes=*/{}, - /*buffers=*/{}, &serialized)); + ASSERT_OK(internal::WriteRecordBatchMessage( + /*length=*/0, /*body_length=*/0, metadata, + /*nodes=*/{}, + /*buffers=*/{}, IpcWriteOptions::Defaults(), &serialized)); ASSERT_OK_AND_ASSIGN(std::unique_ptr message, Message::Open(serialized, /*body=*/nullptr)); diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc index f266f60074915..877ab69a80503 100644 --- a/cpp/src/arrow/ipc/reader.cc +++ b/cpp/src/arrow/ipc/reader.cc @@ -478,7 +478,29 @@ Result> LoadRecordBatch( // ---------------------------------------------------------------------- // Array loading -Status GetCompression(const flatbuf::Message* message, Compression::type* out) { +Status GetCompression(const flatbuf::RecordBatch* batch, Compression::type* out) { + *out = Compression::UNCOMPRESSED; + const flatbuf::BodyCompression* compression = batch->compression(); + if (compression != nullptr) { + if (compression->method() != flatbuf::BodyCompressionMethod::BUFFER) { + // Forward compatibility + return Status::Invalid("This library only supports BUFFER compression method"); + } + + if (compression->codec() == flatbuf::CompressionType::LZ4_FRAME) { + *out = Compression::LZ4_FRAME; + } else if (compression->codec() == flatbuf::CompressionType::ZSTD) { + *out = Compression::ZSTD; + } else { + return Status::Invalid("Unsupported codec in RecordBatch::compression metadata"); + } + return Status::OK(); + } + return Status::OK(); +} + +Status GetCompressionExperimental(const flatbuf::Message* message, + Compression::type* out) { *out = Compression::UNCOMPRESSED; if (message->custom_metadata() != nullptr) { // TODO: Ensure this deserialization only ever happens once @@ -489,7 +511,7 @@ Status GetCompression(const flatbuf::Message* message, Compression::type* out) { ARROW_ASSIGN_OR_RAISE(*out, util::Codec::GetCompressionType(metadata->value(index))); } - RETURN_NOT_OK(internal::CheckCompressionSupported(*out)); + return internal::CheckCompressionSupported(*out); } return Status::OK(); } @@ -535,8 +557,15 @@ Result> ReadRecordBatchInternal( return Status::IOError( "Header-type of flatbuffer-encoded Message is not RecordBatch."); } + Compression::type compression; - RETURN_NOT_OK(GetCompression(message, &compression)); + RETURN_NOT_OK(GetCompression(batch, &compression)); + if (compression == Compression::UNCOMPRESSED && + message->version() == flatbuf::MetadataVersion::V4) { + // Possibly obtain codec information from experimental serialization format + // in 0.17.x + RETURN_NOT_OK(GetCompressionExperimental(message, &compression)); + } return LoadRecordBatch(batch, schema, inclusion_mask, dictionary_memo, options, compression, file); } @@ -623,8 +652,17 @@ Status ReadDictionary(const Buffer& metadata, DictionaryMemo* dictionary_memo, "Header-type of flatbuffer-encoded Message is not DictionaryBatch."); } + // The dictionary is embedded in a record batch with a single column + auto batch_meta = dictionary_batch->data(); + Compression::type compression; - RETURN_NOT_OK(GetCompression(message, &compression)); + RETURN_NOT_OK(GetCompression(batch_meta, &compression)); + if (compression == Compression::UNCOMPRESSED && + message->version() == flatbuf::MetadataVersion::V4) { + // Possibly obtain codec information from experimental serialization format + // in 0.17.x + RETURN_NOT_OK(GetCompressionExperimental(message, &compression)); + } int64_t id = dictionary_batch->id(); @@ -635,8 +673,6 @@ Status ReadDictionary(const Buffer& metadata, DictionaryMemo* dictionary_memo, auto value_field = ::arrow::field("dummy", value_type); - // The dictionary is embedded in a record batch with a single column - auto batch_meta = dictionary_batch->data(); CHECK_FLATBUFFERS_NOT_NULL(batch_meta, "DictionaryBatch.data"); std::shared_ptr batch; diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc index d0d7deb65b4d8..3587490b20370 100644 --- a/cpp/src/arrow/ipc/writer.cc +++ b/cpp/src/arrow/ipc/writer.cc @@ -155,7 +155,7 @@ class RecordBatchSerializer { // Override this for writing dictionary metadata virtual Status SerializeMetadata(int64_t num_rows) { return WriteRecordBatchMessage(num_rows, out_->body_length, custom_metadata_, - field_nodes_, buffer_meta_, &out_->metadata); + field_nodes_, buffer_meta_, options_, &out_->metadata); } void AppendCustomMetadata(const std::string& key, const std::string& value) { @@ -186,10 +186,6 @@ class RecordBatchSerializer { RETURN_NOT_OK(internal::CheckCompressionSupported(options_.compression)); - // TODO check allowed values for compression? - AppendCustomMetadata("ARROW:experimental_compression", - util::Codec::GetCodecAsString(options_.compression)); - ARROW_ASSIGN_OR_RAISE( codec, util::Codec::Create(options_.compression, options_.compression_level)); @@ -543,7 +539,7 @@ class DictionarySerializer : public RecordBatchSerializer { Status SerializeMetadata(int64_t num_rows) override { return WriteDictionaryMessage(dictionary_id_, num_rows, out_->body_length, - custom_metadata_, field_nodes_, buffer_meta_, + custom_metadata_, field_nodes_, buffer_meta_, options_, &out_->metadata); }