From 75e906f982c4b64489bba21ca8e1b70e17e5d296 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Wed, 7 Feb 2024 17:25:30 +0100 Subject: [PATCH] WIP --- .../arrow/util/byte_stream_split_internal.h | 74 +++++- cpp/src/arrow/util/byte_stream_split_test.cc | 100 +++++-- cpp/src/parquet/encoding.cc | 246 +++++++++++------- cpp/src/parquet/encoding_benchmark.cc | 176 +++++++++---- cpp/src/parquet/encoding_test.cc | 97 +++---- 5 files changed, 462 insertions(+), 231 deletions(-) diff --git a/cpp/src/arrow/util/byte_stream_split_internal.h b/cpp/src/arrow/util/byte_stream_split_internal.h index f70b3991473fa..64c0febfc4ede 100644 --- a/cpp/src/arrow/util/byte_stream_split_internal.h +++ b/cpp/src/arrow/util/byte_stream_split_internal.h @@ -19,9 +19,11 @@ #include "arrow/util/endian.h" #include "arrow/util/simd.h" +#include "arrow/util/small_vector.h" #include "arrow/util/ubsan.h" #include +#include #include #include @@ -35,6 +37,9 @@ namespace arrow::util::internal { // SIMD implementations // +// TODO have all decode and encode routines take an explicit width? This would simplify +// testing and benchmarking quite a bit... + #if defined(ARROW_HAVE_SSE4_2) template void ByteStreamSplitDecodeSse2(const uint8_t* data, int64_t num_values, int64_t stride, @@ -672,14 +677,24 @@ inline void DoMergeStreams(const uint8_t** src_streams, int width, int64_t nvalu template void ByteStreamSplitEncodeScalar(const uint8_t* raw_values, const int64_t num_values, - uint8_t* output_buffer_raw) { + uint8_t* out) { std::array dest_streams; for (int stream = 0; stream < kNumStreams; ++stream) { - dest_streams[stream] = &output_buffer_raw[stream * num_values]; + dest_streams[stream] = &out[stream * num_values]; } DoSplitStreams(raw_values, kNumStreams, num_values, dest_streams.data()); } +inline void ByteStreamSplitEncodeScalarDynamic(const uint8_t* raw_values, int width, + const int64_t num_values, uint8_t* out) { + ::arrow::internal::SmallVector dest_streams; + dest_streams.resize(width); + for (int stream = 0; stream < width; ++stream) { + dest_streams[stream] = &out[stream * num_values]; + } + DoSplitStreams(raw_values, width, num_values, dest_streams.data()); +} + template void ByteStreamSplitDecodeScalar(const uint8_t* data, int64_t num_values, int64_t stride, uint8_t* out) { @@ -690,26 +705,57 @@ void ByteStreamSplitDecodeScalar(const uint8_t* data, int64_t num_values, int64_ DoMergeStreams(src_streams.data(), kNumStreams, num_values, out); } -template -void inline ByteStreamSplitEncode(const uint8_t* raw_values, const int64_t num_values, - uint8_t* output_buffer_raw) { +inline void ByteStreamSplitDecodeScalarDynamic(const uint8_t* data, int width, + int64_t num_values, int64_t stride, + uint8_t* out) { + ::arrow::internal::SmallVector src_streams; + src_streams.resize(width); + for (int stream = 0; stream < width; ++stream) { + src_streams[stream] = &data[stream * stride]; + } + DoMergeStreams(src_streams.data(), width, num_values, out); +} + +inline void ByteStreamSplitEncode(const uint8_t* raw_values, int width, + const int64_t num_values, uint8_t* out) { #if defined(ARROW_HAVE_SIMD_SPLIT) - return ByteStreamSplitEncodeSimd(raw_values, num_values, - output_buffer_raw); +#define ByteStreamSplitEncodePerhapsSimd ByteStreamSplitEncodeSimd #else - return ByteStreamSplitEncodeScalar(raw_values, num_values, - output_buffer_raw); +#define ByteStreamSplitEncodePerhapsSimd ByteStreamSplitEncodeScalar #endif + switch (width) { + case 2: + return ByteStreamSplitEncodeScalar<2>(raw_values, num_values, out); + case 4: + return ByteStreamSplitEncodePerhapsSimd<4>(raw_values, num_values, out); + case 8: + return ByteStreamSplitEncodePerhapsSimd<8>(raw_values, num_values, out); + case 16: + return ByteStreamSplitEncodeScalar<16>(raw_values, num_values, out); + } + return ByteStreamSplitEncodeScalarDynamic(raw_values, width, num_values, out); +#undef ByteStreamSplitEncodePerhapsSimd } -template -void inline ByteStreamSplitDecode(const uint8_t* data, int64_t num_values, int64_t stride, - uint8_t* out) { +inline void ByteStreamSplitDecode(const uint8_t* data, int width, int64_t num_values, + int64_t stride, uint8_t* out) { #if defined(ARROW_HAVE_SIMD_SPLIT) - return ByteStreamSplitDecodeSimd(data, num_values, stride, out); +#define ByteStreamSplitDecodePerhapsSimd ByteStreamSplitDecodeSimd #else - return ByteStreamSplitDecodeScalar(data, num_values, stride, out); +#define ByteStreamSplitDecodePerhapsSimd ByteStreamSplitDecodeScalar #endif + switch (width) { + case 2: + return ByteStreamSplitDecodeScalar<2>(data, num_values, stride, out); + case 4: + return ByteStreamSplitDecodePerhapsSimd<4>(data, num_values, stride, out); + case 8: + return ByteStreamSplitDecodePerhapsSimd<8>(data, num_values, stride, out); + case 16: + return ByteStreamSplitDecodeScalar<16>(data, num_values, stride, out); + } + return ByteStreamSplitDecodeScalarDynamic(data, width, num_values, stride, out); +#undef ByteStreamSplitDecodePerhapsSimd } } // namespace arrow::util::internal diff --git a/cpp/src/arrow/util/byte_stream_split_test.cc b/cpp/src/arrow/util/byte_stream_split_test.cc index 71c6063179ea6..7cfd332140325 100644 --- a/cpp/src/arrow/util/byte_stream_split_test.cc +++ b/cpp/src/arrow/util/byte_stream_split_test.cc @@ -63,29 +63,12 @@ class TestByteStreamSplitSpecialized : public ::testing::Test { public: static constexpr int kWidth = static_cast(sizeof(T)); - using EncodeFunc = NamedFunc)>>; - using DecodeFunc = NamedFunc)>>; + using EncodeFunc = NamedFunc>; + using DecodeFunc = NamedFunc>; void SetUp() override { - encode_funcs_.push_back({"reference", &ReferenceEncode}); - encode_funcs_.push_back({"scalar", &ByteStreamSplitEncodeScalar}); - decode_funcs_.push_back({"scalar", &ByteStreamSplitDecodeScalar}); -#if defined(ARROW_HAVE_SIMD_SPLIT) - encode_funcs_.push_back({"simd", &ByteStreamSplitEncodeSimd}); - decode_funcs_.push_back({"simd", &ByteStreamSplitDecodeSimd}); -#endif -#if defined(ARROW_HAVE_SSE4_2) - encode_funcs_.push_back({"sse2", &ByteStreamSplitEncodeSse2}); - decode_funcs_.push_back({"sse2", &ByteStreamSplitDecodeSse2}); -#endif -#if defined(ARROW_HAVE_AVX2) - encode_funcs_.push_back({"avx2", &ByteStreamSplitEncodeAvx2}); - decode_funcs_.push_back({"avx2", &ByteStreamSplitDecodeAvx2}); -#endif -#if defined(ARROW_HAVE_AVX512) - encode_funcs_.push_back({"avx512", &ByteStreamSplitEncodeAvx512}); - decode_funcs_.push_back({"avx512", &ByteStreamSplitDecodeAvx512}); -#endif + decode_funcs_ = MakeDecodeFuncs(); + encode_funcs_ = MakeEncodeFuncs(); } void TestRoundtrip(int64_t num_values) { @@ -98,12 +81,12 @@ class TestByteStreamSplitSpecialized : public ::testing::Test { for (const auto& encode_func : encode_funcs_) { ARROW_SCOPED_TRACE("encode_func = ", encode_func); encoded.assign(encoded.size(), 0); - encode_func.func(reinterpret_cast(input.data()), num_values, + encode_func.func(reinterpret_cast(input.data()), kWidth, num_values, encoded.data()); for (const auto& decode_func : decode_funcs_) { ARROW_SCOPED_TRACE("decode_func = ", decode_func); decoded.assign(decoded.size(), T{}); - decode_func.func(encoded.data(), num_values, /*stride=*/num_values, + decode_func.func(encoded.data(), kWidth, num_values, /*stride=*/num_values, reinterpret_cast(decoded.data())); ASSERT_EQ(decoded, input); } @@ -129,7 +112,8 @@ class TestByteStreamSplitSpecialized : public ::testing::Test { int64_t offset = 0; while (offset < num_values) { auto chunk_size = std::min(num_values - offset, chunk_size_dist(gen)); - decode_func.func(encoded.data() + offset, chunk_size, /*stride=*/num_values, + decode_func.func(encoded.data() + offset, kWidth, chunk_size, + /*stride=*/num_values, reinterpret_cast(decoded.data() + offset)); offset += chunk_size; } @@ -156,6 +140,74 @@ class TestByteStreamSplitSpecialized : public ::testing::Test { return input; } + template + static std::vector MakeDecodeFuncs() { + std::vector funcs; + funcs.push_back({"scalar", &ByteStreamSplitDecodeScalarDynamic}); + funcs.push_back( + {"scalar", DynamicWidthDecodeFromStatic(&ByteStreamSplitDecodeScalar)}); +#if defined(ARROW_HAVE_SIMD_SPLIT) + if constexpr (kSimdImplemented) { + funcs.push_back( + {"simd", DynamicWidthDecodeFromStatic(&ByteStreamSplitDecodeSimd)}); +#if defined(ARROW_HAVE_SSE4_2) + funcs.push_back( + {"sse2", DynamicWidthDecodeFromStatic(&ByteStreamSplitDecodeSse2)}); +#endif +#if defined(ARROW_HAVE_AVX2) + funcs.push_back( + {"avx2", DynamicWidthDecodeFromStatic(&ByteStreamSplitDecodeAvx2)}); +#endif +#if defined(ARROW_HAVE_AVX512) + funcs.push_back( + {"avx512", DynamicWidthDecodeFromStatic(&ByteStreamSplitDecodeAvx512)}); +#endif + } +#endif // defined(ARROW_HAVE_SIMD_SPLIT) + return funcs; + } + + template + static std::vector MakeEncodeFuncs() { + std::vector funcs; + funcs.push_back({"reference", &ReferenceByteStreamSplitEncode}); + funcs.push_back({"reference", &ByteStreamSplitEncodeScalarDynamic}); + funcs.push_back( + {"scalar", DynamicWidthEncodeFromStatic(&ByteStreamSplitEncodeScalar)}); +#if defined(ARROW_HAVE_SIMD_SPLIT) + if constexpr (kSimdImplemented) { + funcs.push_back( + {"simd", DynamicWidthEncodeFromStatic(&ByteStreamSplitEncodeSimd)}); +#if defined(ARROW_HAVE_SSE4_2) + funcs.push_back( + {"sse2", DynamicWidthEncodeFromStatic(&ByteStreamSplitEncodeSse2)}); +#endif +#if defined(ARROW_HAVE_AVX2) + funcs.push_back( + {"avx2", DynamicWidthEncodeFromStatic(&ByteStreamSplitEncodeAvx2)}); +#endif +#if defined(ARROW_HAVE_AVX512) + funcs.push_back( + {"avx512", DynamicWidthEncodeFromStatic(&ByteStreamSplitEncodeAvx512)}); +#endif + } +#endif // defined(ARROW_HAVE_SIMD_SPLIT) + return funcs; + } + + static std::function DynamicWidthDecodeFromStatic( + std::function)> wrapped) { + return [wrapped](const uint8_t* data, int width, int64_t num_values, int64_t stride, + uint8_t* out) { wrapped(data, num_values, stride, out); }; + } + + static std::function DynamicWidthEncodeFromStatic( + std::function)> wrapped) { + return [wrapped](const uint8_t* data, int width, int64_t num_values, uint8_t* out) { + wrapped(data, num_values, out); + }; + } + std::vector encode_funcs_; std::vector decode_funcs_; diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index a3d1746536647..95f8964799fef 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -73,6 +73,10 @@ namespace { // unsigned, but the Java implementation uses signed ints. constexpr size_t kMaxByteArraySize = std::numeric_limits::max(); +// ---------------------------------------------------------------------- +// Encoders +// ---------------------------------------------------------------------- + class EncoderImpl : virtual public Encoder { public: EncoderImpl(const ColumnDescriptor* descr, Encoding::type encoding, MemoryPool* pool) @@ -92,7 +96,7 @@ class EncoderImpl : virtual public Encoder { MemoryPool* pool_; /// Type length from descr - int type_length_; + const int type_length_; }; // ---------------------------------------------------------------------- @@ -262,8 +266,7 @@ inline void PlainEncoder::Put(const ::arrow::Array& values) { } void AssertFixedSizeBinary(const ::arrow::Array& values, int type_length) { - if (values.type_id() != ::arrow::Type::FIXED_SIZE_BINARY && - values.type_id() != ::arrow::Type::DECIMAL) { + if (!::arrow::is_fixed_size_binary(values.type_id())) { throw ParquetException("Only FixedSizeBinaryArray and subclasses supported"); } if (checked_cast(*values.type()).byte_width() != @@ -801,98 +804,151 @@ void DictEncoderImpl::PutDictionary(const ::arrow::Array& values) // ---------------------------------------------------------------------- // ByteStreamSplitEncoder implementations +// Common base class for all types + template -class ByteStreamSplitEncoder : public EncoderImpl, virtual public TypedEncoder { +class ByteStreamSplitEncoderBase : public EncoderImpl, + virtual public TypedEncoder { public: using T = typename DType::c_type; using TypedEncoder::Put; - explicit ByteStreamSplitEncoder( - const ColumnDescriptor* descr, - ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()); + ByteStreamSplitEncoderBase(const ColumnDescriptor* descr, int byte_width, + ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()) + : EncoderImpl(descr, Encoding::BYTE_STREAM_SPLIT, pool), + sink_{pool}, + byte_width_(byte_width), + num_values_in_buffer_{0} {} - int64_t EstimatedDataEncodedSize() override; - std::shared_ptr FlushValues() override; + int64_t EstimatedDataEncodedSize() override { return sink_.length(); } - void Put(const T* buffer, int num_values) override; - void Put(const ::arrow::Array& values) override; - void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits, - int64_t valid_bits_offset) override; + std::shared_ptr FlushValues() override { + DCHECK_GE(byte_width_, 0); + // TODO what if byte_width_ in {0, 1}? + auto output_buffer = AllocateBuffer(this->memory_pool(), EstimatedDataEncodedSize()); + uint8_t* output_buffer_raw = output_buffer->mutable_data(); + const uint8_t* raw_values = sink_.data(); + ::arrow::util::internal::ByteStreamSplitEncode( + raw_values, /*width=*/byte_width_, num_values_in_buffer_, output_buffer_raw); + sink_.Reset(); + num_values_in_buffer_ = 0; + return std::move(output_buffer); + } - protected: - template - void PutImpl(const ::arrow::Array& values) { - if (values.type_id() != ArrowType::type_id) { - throw ParquetException(std::string() + "direct put to " + ArrowType::type_name() + - " from " + values.type()->ToString() + " not supported"); + void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits, + int64_t valid_bits_offset) override { + if (valid_bits != NULLPTR) { + PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values * sizeof(T), + this->memory_pool())); + T* data = buffer->template mutable_data_as(); + int num_valid_values = ::arrow::util::internal::SpacedCompress( + src, num_values, valid_bits, valid_bits_offset, data); + Put(data, num_valid_values); + } else { + Put(src, num_values); } - const auto& data = *values.data(); - PutSpaced(data.GetValues(1), - static_cast(data.length), data.GetValues(0, 0), data.offset); } + protected: ::arrow::BufferBuilder sink_; + // Required because type_length_ is only filled in for FLBA + const int byte_width_; int64_t num_values_in_buffer_; }; -template -ByteStreamSplitEncoder::ByteStreamSplitEncoder(const ColumnDescriptor* descr, - ::arrow::MemoryPool* pool) - : EncoderImpl(descr, Encoding::BYTE_STREAM_SPLIT, pool), - sink_{pool}, - num_values_in_buffer_{0} {} +// BYTE_STREAM_SPLIT encoder implementation for FLOAT, DOUBLE, INT32, INT64 template -int64_t ByteStreamSplitEncoder::EstimatedDataEncodedSize() { - return sink_.length(); -} +class ByteStreamSplitEncoder : public ByteStreamSplitEncoderBase { + public: + using T = typename DType::c_type; + using ArrowType = typename EncodingTraits::ArrowType; -template -std::shared_ptr ByteStreamSplitEncoder::FlushValues() { - std::shared_ptr output_buffer = - AllocateBuffer(this->memory_pool(), EstimatedDataEncodedSize()); - uint8_t* output_buffer_raw = output_buffer->mutable_data(); - const uint8_t* raw_values = sink_.data(); - ::arrow::util::internal::ByteStreamSplitEncode( - raw_values, num_values_in_buffer_, output_buffer_raw); - sink_.Reset(); - num_values_in_buffer_ = 0; - return std::move(output_buffer); -} + ByteStreamSplitEncoder(const ColumnDescriptor* descr, + ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()) + : ByteStreamSplitEncoderBase(descr, + /*byte_width=*/static_cast(sizeof(T)), + pool) {} -template -void ByteStreamSplitEncoder::Put(const T* buffer, int num_values) { - if (num_values > 0) { - PARQUET_THROW_NOT_OK(sink_.Append(buffer, num_values * sizeof(T))); - num_values_in_buffer_ += num_values; + // Inherit Put(const std::vector&...) + using TypedEncoder::Put; + + void Put(const T* buffer, int num_values) override { + if (num_values > 0) { + PARQUET_THROW_NOT_OK( + this->sink_.Append(reinterpret_cast(buffer), + num_values * static_cast(sizeof(T)))); + this->num_values_in_buffer_ += num_values; + } } -} -template <> -void ByteStreamSplitEncoder::Put(const ::arrow::Array& values) { - PutImpl<::arrow::FloatType>(values); -} + void Put(const ::arrow::Array& values) override { + if (values.type_id() != ArrowType::type_id) { + throw ParquetException(std::string() + "direct put from " + + values.type()->ToString() + " not supported"); + } + const auto& data = *values.data(); + this->PutSpaced(data.GetValues(1), + static_cast(data.length), data.GetValues(0, 0), + data.offset); + } +}; + +// BYTE_STREAM_SPLIT encoder implementation for FLBA template <> -void ByteStreamSplitEncoder::Put(const ::arrow::Array& values) { - PutImpl<::arrow::DoubleType>(values); -} +class ByteStreamSplitEncoder : public ByteStreamSplitEncoderBase { + public: + using DType = FLBAType; + using T = FixedLenByteArray; + using ArrowType = ::arrow::FixedSizeBinaryArray; -template -void ByteStreamSplitEncoder::PutSpaced(const T* src, int num_values, - const uint8_t* valid_bits, - int64_t valid_bits_offset) { - if (valid_bits != NULLPTR) { - PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values * sizeof(T), - this->memory_pool())); - T* data = buffer->template mutable_data_as(); - int num_valid_values = ::arrow::util::internal::SpacedCompress( - src, num_values, valid_bits, valid_bits_offset, data); - Put(data, num_valid_values); - } else { - Put(src, num_values); + ByteStreamSplitEncoder(const ColumnDescriptor* descr, + ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()) + : ByteStreamSplitEncoderBase(descr, + /*byte_width=*/descr->type_length(), pool) {} + + // Inherit Put(const std::vector&...) + using TypedEncoder::Put; + + void Put(const T* buffer, int num_values) override { + if (byte_width_ > 0) { + const int64_t total_bytes = static_cast(num_values) * byte_width_; + PARQUET_THROW_NOT_OK(sink_.Reserve(total_bytes)); + for (int i = 0; i < num_values; ++i) { + // Write the result to the output stream + DCHECK(buffer[i].ptr != nullptr) << "Value ptr cannot be NULL"; + sink_.UnsafeAppend(buffer[i].ptr, byte_width_); + } + } + this->num_values_in_buffer_ += num_values; } -} + + void Put(const ::arrow::Array& values) override { + AssertFixedSizeBinary(values, byte_width_); + const auto& data = checked_cast(values); + if (data.null_count() == 0) { + // no nulls, just buffer the data + PARQUET_THROW_NOT_OK(sink_.Append(data.raw_values(), data.length() * byte_width_)); + this->num_values_in_buffer_ += data.length(); + } else { + const int64_t num_values = data.length() - data.null_count(); + const int64_t total_bytes = num_values * byte_width_; + PARQUET_THROW_NOT_OK(sink_.Reserve(total_bytes)); + // TODO use VisitSetBitRunsVoid + for (int64_t i = 0; i < data.length(); i++) { + if (data.IsValid(i)) { + sink_.UnsafeAppend(data.Value(i), byte_width_); + } + } + this->num_values_in_buffer_ += num_values; + } + } +}; + +// ---------------------------------------------------------------------- +// Decoders +// ---------------------------------------------------------------------- class DecoderImpl : virtual public Decoder { public: @@ -3621,8 +3677,9 @@ int ByteStreamSplitDecoder::Decode(T* buffer, int max_values) { const int num_decoded_previously = num_values_in_buffer_ - num_values_; const uint8_t* data = data_ + num_decoded_previously; - ::arrow::util::internal::ByteStreamSplitDecode( - data, values_to_decode, num_values_in_buffer_, reinterpret_cast(buffer)); + ::arrow::util::internal::ByteStreamSplitDecode(data, kNumStreams, values_to_decode, + num_values_in_buffer_, + reinterpret_cast(buffer)); num_values_ -= values_to_decode; len_ -= sizeof(T) * values_to_decode; return values_to_decode; @@ -3642,18 +3699,17 @@ int ByteStreamSplitDecoder::DecodeArrow( const int num_decoded_previously = num_values_in_buffer_ - num_values_; const uint8_t* data = data_ + num_decoded_previously; - int offset = 0; -#if defined(ARROW_HAVE_SIMD_SPLIT) - // Use fast decoding into intermediate buffer. This will also decode - // some null values, but it's fast enough that we don't care. + // Decode into intermediate buffer. T* decode_out = EnsureDecodeBuffer(values_decoded); - ::arrow::util::internal::ByteStreamSplitDecode( - data, values_decoded, num_values_in_buffer_, - reinterpret_cast(decode_out)); - - // XXX If null_count is 0, we could even append in bulk or decode directly into - // builder + ::arrow::util::internal::ByteStreamSplitDecode(data, kNumStreams, values_decoded, + num_values_in_buffer_, + reinterpret_cast(decode_out)); + + // If null_count is 0, we could even append in bulk or decode directly into + // builder. We could also decode in chunks, or use SpacedExpand. We don't + // bother currently, because DecodeArrow methods are only called for ByteArray. + int64_t offset = 0; VisitNullBitmapInline( valid_bits, valid_bits_offset, num_values, null_count, [&]() { @@ -3662,22 +3718,6 @@ int ByteStreamSplitDecoder::DecodeArrow( }, [&]() { builder->UnsafeAppendNull(); }); -#else - // XXX should operate over runs of 0s / 1s - VisitNullBitmapInline( - valid_bits, valid_bits_offset, num_values, null_count, - [&]() { - uint8_t gathered_byte_data[kNumStreams]; - for (int b = 0; b < kNumStreams; ++b) { - const int64_t byte_index = b * num_values_in_buffer_ + offset; - gathered_byte_data[b] = data[byte_index]; - } - builder->UnsafeAppend(SafeLoadAs(&gathered_byte_data[0])); - ++offset; - }, - [&]() { builder->UnsafeAppendNull(); }); -#endif - num_values_ -= values_decoded; len_ -= sizeof(T) * values_decoded; return values_decoded; @@ -3742,12 +3782,20 @@ std::unique_ptr MakeEncoder(Type::type type_num, Encoding::type encodin } } else if (encoding == Encoding::BYTE_STREAM_SPLIT) { switch (type_num) { + case Type::INT32: + return std::make_unique>(descr, pool); + case Type::INT64: + return std::make_unique>(descr, pool); case Type::FLOAT: return std::make_unique>(descr, pool); case Type::DOUBLE: return std::make_unique>(descr, pool); + case Type::FIXED_LEN_BYTE_ARRAY: + return std::make_unique>(descr, pool); default: - throw ParquetException("BYTE_STREAM_SPLIT only supports FLOAT and DOUBLE"); + throw ParquetException( + "BYTE_STREAM_SPLIT only supports FLOAT, DOUBLE, INT32, INT64 " + "and FIXED_LEN_BYTE_ARRAY"); } } else if (encoding == Encoding::DELTA_BINARY_PACKED) { switch (type_num) { diff --git a/cpp/src/parquet/encoding_benchmark.cc b/cpp/src/parquet/encoding_benchmark.cc index 76c411244b22d..f8fb210771ece 100644 --- a/cpp/src/parquet/encoding_benchmark.cc +++ b/cpp/src/parquet/encoding_benchmark.cc @@ -17,6 +17,11 @@ #include "benchmark/benchmark.h" +#include +#include +#include +#include + #include "arrow/array.h" #include "arrow/array/builder_binary.h" #include "arrow/array/builder_dict.h" @@ -31,10 +36,6 @@ #include "parquet/platform.h" #include "parquet/schema.h" -#include -#include -#include - using arrow::default_memory_pool; using arrow::MemoryPool; @@ -361,138 +362,221 @@ static void BM_PlainDecodingSpacedDouble(benchmark::State& state) { } BENCHMARK(BM_PlainDecodingSpacedDouble)->Apply(BM_SpacedArgs); -template +template +struct ByteStreamSplitDummyValue { + static constexpr T value() { return static_cast(42); } +}; + +template +struct ByteStreamSplitDummyValue> { + using Array = std::array; + + static constexpr Array value() { + Array array; + array.fill(ByteStreamSplitDummyValue::value()); + return array; + } +}; + +template static void BM_ByteStreamSplitDecode(benchmark::State& state, DecodeFunc&& decode_func) { - std::vector values(state.range(0), 64.0); + const std::vector values(state.range(0), ByteStreamSplitDummyValue::value()); const uint8_t* values_raw = reinterpret_cast(values.data()); - std::vector output(state.range(0), 0); + std::vector output(state.range(0)); for (auto _ : state) { - decode_func(values_raw, static_cast(values.size()), - static_cast(values.size()), - reinterpret_cast(output.data())); + if constexpr (kIsDynamicWidthDecode) { + decode_func(values_raw, + /*width=*/static_cast(sizeof(T)), + /*num_values=*/static_cast(values.size()), + /*stride=*/static_cast(values.size()), + reinterpret_cast(output.data())); + } else { + decode_func(values_raw, + /*num_values=*/static_cast(values.size()), + /*stride=*/static_cast(values.size()), + reinterpret_cast(output.data())); + } benchmark::ClobberMemory(); } state.SetBytesProcessed(state.iterations() * values.size() * sizeof(T)); + state.SetItemsProcessed(state.iterations() * values.size()); } -template +template static void BM_ByteStreamSplitEncode(benchmark::State& state, EncodeFunc&& encode_func) { - std::vector values(state.range(0), 64.0); + const std::vector values(state.range(0), ByteStreamSplitDummyValue::value()); const uint8_t* values_raw = reinterpret_cast(values.data()); - std::vector output(state.range(0) * sizeof(T), 0); + std::vector output(state.range(0) * sizeof(T)); for (auto _ : state) { - encode_func(values_raw, values.size(), output.data()); + if constexpr (kIsDynamicWidthDecode) { + encode_func(values_raw, /*width=*/static_cast(sizeof(T)), values.size(), + output.data()); + } else { + encode_func(values_raw, values.size(), output.data()); + } benchmark::ClobberMemory(); } state.SetBytesProcessed(state.iterations() * values.size() * sizeof(T)); + state.SetItemsProcessed(state.iterations() * values.size()); +} + +static void BM_ByteStreamSplitDecode_Float_Generic(benchmark::State& state) { + BM_ByteStreamSplitDecode(state, + ::arrow::util::internal::ByteStreamSplitDecode); +} + +static void BM_ByteStreamSplitDecode_Double_Generic(benchmark::State& state) { + BM_ByteStreamSplitDecode(state, + ::arrow::util::internal::ByteStreamSplitDecode); +} + +template +static void BM_ByteStreamSplitDecode_FLBA_Generic(benchmark::State& state) { + BM_ByteStreamSplitDecode, true>( + state, ::arrow::util::internal::ByteStreamSplitDecode); +} + +static void BM_ByteStreamSplitEncode_Float_Generic(benchmark::State& state) { + BM_ByteStreamSplitEncode(state, + ::arrow::util::internal::ByteStreamSplitEncode); +} + +static void BM_ByteStreamSplitEncode_Double_Generic(benchmark::State& state) { + BM_ByteStreamSplitEncode(state, + ::arrow::util::internal::ByteStreamSplitEncode); +} + +template +static void BM_ByteStreamSplitEncode_FLBA_Generic(benchmark::State& state) { + BM_ByteStreamSplitEncode, true>( + state, ::arrow::util::internal::ByteStreamSplitEncode); } static void BM_ByteStreamSplitDecode_Float_Scalar(benchmark::State& state) { - BM_ByteStreamSplitDecode( + BM_ByteStreamSplitDecode( state, ::arrow::util::internal::ByteStreamSplitDecodeScalar); } static void BM_ByteStreamSplitDecode_Double_Scalar(benchmark::State& state) { - BM_ByteStreamSplitDecode( + BM_ByteStreamSplitDecode( state, ::arrow::util::internal::ByteStreamSplitDecodeScalar); } static void BM_ByteStreamSplitEncode_Float_Scalar(benchmark::State& state) { - BM_ByteStreamSplitEncode( + BM_ByteStreamSplitEncode( state, ::arrow::util::internal::ByteStreamSplitEncodeScalar); } static void BM_ByteStreamSplitEncode_Double_Scalar(benchmark::State& state) { - BM_ByteStreamSplitEncode( + BM_ByteStreamSplitEncode( state, ::arrow::util::internal::ByteStreamSplitEncodeScalar); } -BENCHMARK(BM_ByteStreamSplitDecode_Float_Scalar)->Range(MIN_RANGE, MAX_RANGE); -BENCHMARK(BM_ByteStreamSplitDecode_Double_Scalar)->Range(MIN_RANGE, MAX_RANGE); -BENCHMARK(BM_ByteStreamSplitEncode_Float_Scalar)->Range(MIN_RANGE, MAX_RANGE); -BENCHMARK(BM_ByteStreamSplitEncode_Double_Scalar)->Range(MIN_RANGE, MAX_RANGE); +static void ByteStreamSplitApply(::benchmark::internal::Benchmark* bench) { + // Reduce the number of variations by only testing the two range ends. + bench->Arg(MIN_RANGE)->Arg(MAX_RANGE); +} + +BENCHMARK(BM_ByteStreamSplitDecode_Float_Generic)->Apply(ByteStreamSplitApply); +BENCHMARK(BM_ByteStreamSplitDecode_Double_Generic)->Apply(ByteStreamSplitApply); +BENCHMARK_TEMPLATE(BM_ByteStreamSplitDecode_FLBA_Generic, 2)->Apply(ByteStreamSplitApply); +BENCHMARK_TEMPLATE(BM_ByteStreamSplitDecode_FLBA_Generic, 7)->Apply(ByteStreamSplitApply); +BENCHMARK_TEMPLATE(BM_ByteStreamSplitDecode_FLBA_Generic, 16) + ->Apply(ByteStreamSplitApply); + +BENCHMARK(BM_ByteStreamSplitEncode_Float_Generic)->Apply(ByteStreamSplitApply); +BENCHMARK(BM_ByteStreamSplitEncode_Double_Generic)->Apply(ByteStreamSplitApply); +BENCHMARK_TEMPLATE(BM_ByteStreamSplitEncode_FLBA_Generic, 2)->Apply(ByteStreamSplitApply); +BENCHMARK_TEMPLATE(BM_ByteStreamSplitEncode_FLBA_Generic, 7)->Apply(ByteStreamSplitApply); +BENCHMARK_TEMPLATE(BM_ByteStreamSplitEncode_FLBA_Generic, 16) + ->Apply(ByteStreamSplitApply); + +BENCHMARK(BM_ByteStreamSplitDecode_Float_Scalar)->Apply(ByteStreamSplitApply); +BENCHMARK(BM_ByteStreamSplitDecode_Double_Scalar)->Apply(ByteStreamSplitApply); +BENCHMARK(BM_ByteStreamSplitEncode_Float_Scalar)->Apply(ByteStreamSplitApply); +BENCHMARK(BM_ByteStreamSplitEncode_Double_Scalar)->Apply(ByteStreamSplitApply); #if defined(ARROW_HAVE_SSE4_2) static void BM_ByteStreamSplitDecode_Float_Sse2(benchmark::State& state) { - BM_ByteStreamSplitDecode( + BM_ByteStreamSplitDecode( state, ::arrow::util::internal::ByteStreamSplitDecodeSse2); } static void BM_ByteStreamSplitDecode_Double_Sse2(benchmark::State& state) { - BM_ByteStreamSplitDecode( + BM_ByteStreamSplitDecode( state, ::arrow::util::internal::ByteStreamSplitDecodeSse2); } static void BM_ByteStreamSplitEncode_Float_Sse2(benchmark::State& state) { - BM_ByteStreamSplitEncode( + BM_ByteStreamSplitEncode( state, ::arrow::util::internal::ByteStreamSplitEncodeSse2); } static void BM_ByteStreamSplitEncode_Double_Sse2(benchmark::State& state) { - BM_ByteStreamSplitEncode( + BM_ByteStreamSplitEncode( state, ::arrow::util::internal::ByteStreamSplitEncodeSse2); } -BENCHMARK(BM_ByteStreamSplitDecode_Float_Sse2)->Range(MIN_RANGE, MAX_RANGE); -BENCHMARK(BM_ByteStreamSplitDecode_Double_Sse2)->Range(MIN_RANGE, MAX_RANGE); -BENCHMARK(BM_ByteStreamSplitEncode_Float_Sse2)->Range(MIN_RANGE, MAX_RANGE); -BENCHMARK(BM_ByteStreamSplitEncode_Double_Sse2)->Range(MIN_RANGE, MAX_RANGE); +BENCHMARK(BM_ByteStreamSplitDecode_Float_Sse2)->Apply(ByteStreamSplitApply); +BENCHMARK(BM_ByteStreamSplitDecode_Double_Sse2)->Apply(ByteStreamSplitApply); +BENCHMARK(BM_ByteStreamSplitEncode_Float_Sse2)->Apply(ByteStreamSplitApply); +BENCHMARK(BM_ByteStreamSplitEncode_Double_Sse2)->Apply(ByteStreamSplitApply); #endif #if defined(ARROW_HAVE_AVX2) static void BM_ByteStreamSplitDecode_Float_Avx2(benchmark::State& state) { - BM_ByteStreamSplitDecode( + BM_ByteStreamSplitDecode( state, ::arrow::util::internal::ByteStreamSplitDecodeAvx2); } static void BM_ByteStreamSplitDecode_Double_Avx2(benchmark::State& state) { - BM_ByteStreamSplitDecode( + BM_ByteStreamSplitDecode( state, ::arrow::util::internal::ByteStreamSplitDecodeAvx2); } static void BM_ByteStreamSplitEncode_Float_Avx2(benchmark::State& state) { - BM_ByteStreamSplitEncode( + BM_ByteStreamSplitEncode( state, ::arrow::util::internal::ByteStreamSplitEncodeAvx2); } static void BM_ByteStreamSplitEncode_Double_Avx2(benchmark::State& state) { - BM_ByteStreamSplitEncode( + BM_ByteStreamSplitEncode( state, ::arrow::util::internal::ByteStreamSplitEncodeAvx2); } -BENCHMARK(BM_ByteStreamSplitDecode_Float_Avx2)->Range(MIN_RANGE, MAX_RANGE); -BENCHMARK(BM_ByteStreamSplitDecode_Double_Avx2)->Range(MIN_RANGE, MAX_RANGE); -BENCHMARK(BM_ByteStreamSplitEncode_Float_Avx2)->Range(MIN_RANGE, MAX_RANGE); -BENCHMARK(BM_ByteStreamSplitEncode_Double_Avx2)->Range(MIN_RANGE, MAX_RANGE); +BENCHMARK(BM_ByteStreamSplitDecode_Float_Avx2)->Apply(ByteStreamSplitApply); +BENCHMARK(BM_ByteStreamSplitDecode_Double_Avx2)->Apply(ByteStreamSplitApply); +BENCHMARK(BM_ByteStreamSplitEncode_Float_Avx2)->Apply(ByteStreamSplitApply); +BENCHMARK(BM_ByteStreamSplitEncode_Double_Avx2)->Apply(ByteStreamSplitApply); #endif #if defined(ARROW_HAVE_AVX512) static void BM_ByteStreamSplitDecode_Float_Avx512(benchmark::State& state) { - BM_ByteStreamSplitDecode( + BM_ByteStreamSplitDecode( state, ::arrow::util::internal::ByteStreamSplitDecodeAvx512); } static void BM_ByteStreamSplitDecode_Double_Avx512(benchmark::State& state) { - BM_ByteStreamSplitDecode( + BM_ByteStreamSplitDecode( state, ::arrow::util::internal::ByteStreamSplitDecodeAvx512); } static void BM_ByteStreamSplitEncode_Float_Avx512(benchmark::State& state) { - BM_ByteStreamSplitEncode( + BM_ByteStreamSplitEncode( state, ::arrow::util::internal::ByteStreamSplitEncodeAvx512); } static void BM_ByteStreamSplitEncode_Double_Avx512(benchmark::State& state) { - BM_ByteStreamSplitEncode( + BM_ByteStreamSplitEncode( state, ::arrow::util::internal::ByteStreamSplitEncodeAvx512); } -BENCHMARK(BM_ByteStreamSplitDecode_Float_Avx512)->Range(MIN_RANGE, MAX_RANGE); -BENCHMARK(BM_ByteStreamSplitDecode_Double_Avx512)->Range(MIN_RANGE, MAX_RANGE); -BENCHMARK(BM_ByteStreamSplitEncode_Float_Avx512)->Range(MIN_RANGE, MAX_RANGE); -BENCHMARK(BM_ByteStreamSplitEncode_Double_Avx512)->Range(MIN_RANGE, MAX_RANGE); +BENCHMARK(BM_ByteStreamSplitDecode_Float_Avx512)->Apply(ByteStreamSplitApply); +BENCHMARK(BM_ByteStreamSplitDecode_Double_Avx512)->Apply(ByteStreamSplitApply); +BENCHMARK(BM_ByteStreamSplitEncode_Float_Avx512)->Apply(ByteStreamSplitApply); +BENCHMARK(BM_ByteStreamSplitEncode_Double_Avx512)->Apply(ByteStreamSplitApply); #endif template diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc index ee581622c818f..e65bf367184d5 100644 --- a/cpp/src/parquet/encoding_test.cc +++ b/cpp/src/parquet/encoding_test.cc @@ -47,6 +47,7 @@ using arrow::default_memory_pool; using arrow::MemoryPool; using arrow::internal::checked_cast; +using arrow::util::span; namespace bit_util = arrow::bit_util; @@ -1263,6 +1264,15 @@ class TestByteStreamSplitEncoding : public TestEncodingBase { ASSERT_EQ(0, decoder->values_left()); } + template + void CheckDecode(span encoded_data, + span expected_decoded_data) { + static_assert(sizeof(U) == sizeof(c_type)); + CheckDecode(encoded_data.data(), static_cast(encoded_data.size()), + reinterpret_cast(expected_decoded_data.data()), + static_cast(expected_decoded_data.size())); + } + void CheckEncode(const c_type* data, const int num_elements, const uint8_t* expected_encoded_data, const int64_t encoded_data_size) { @@ -1276,6 +1286,14 @@ class TestByteStreamSplitEncoding : public TestEncodingBase { ASSERT_EQ(expected_encoded_data[i], encoded_data_raw[i]); } } + + template + void CheckEncode(span data, span expected_encoded_data) { + static_assert(sizeof(U) == sizeof(c_type)); + CheckEncode(reinterpret_cast(data.data()), + static_cast(data.size()), expected_encoded_data.data(), + static_cast(expected_encoded_data.size())); + } }; template @@ -1287,51 +1305,39 @@ static std::vector ToLittleEndian(const std::vector& input) { return data; } -static_assert(sizeof(float) == sizeof(uint32_t), - "BYTE_STREAM_SPLIT encoding tests assume float / uint32_t type sizes"); -static_assert(sizeof(double) == sizeof(uint64_t), - "BYTE_STREAM_SPLIT encoding tests assume double / uint64_t type sizes"); - -template <> -void TestByteStreamSplitEncoding::CheckDecode() { - const uint8_t data[] = {0x11, 0x22, 0x33, 0x44, 0x55, 0x66, - 0x77, 0x88, 0x99, 0xAA, 0xBB, 0xCC}; - const auto expected_output = - ToLittleEndian({0xAA774411U, 0xBB885522U, 0xCC996633U}); - CheckDecode(data, static_cast(sizeof(data)), - reinterpret_cast(expected_output.data()), - static_cast(sizeof(data) / sizeof(float))); -} - -template <> -void TestByteStreamSplitEncoding::CheckDecode() { - const uint8_t data[] = {0xDE, 0xC0, 0x37, 0x13, 0x11, 0x22, 0x33, 0x44, - 0xAA, 0xBB, 0xCC, 0xDD, 0x55, 0x66, 0x77, 0x88}; - const auto expected_output = - ToLittleEndian({0x7755CCAA331137DEULL, 0x8866DDBB442213C0ULL}); - CheckDecode(data, static_cast(sizeof(data)), - reinterpret_cast(expected_output.data()), - static_cast(sizeof(data) / sizeof(double))); -} - -template <> -void TestByteStreamSplitEncoding::CheckEncode() { - const auto data = ToLittleEndian( - {0x4142434445464748ULL, 0x0102030405060708ULL, 0xb1b2b3b4b5b6b7b8ULL}); - const uint8_t expected_output[24] = { - 0x48, 0x08, 0xb8, 0x47, 0x07, 0xb7, 0x46, 0x06, 0xb6, 0x45, 0x05, 0xb5, - 0x44, 0x04, 0xb4, 0x43, 0x03, 0xb3, 0x42, 0x02, 0xb2, 0x41, 0x01, 0xb1, - }; - CheckEncode(reinterpret_cast(data.data()), static_cast(data.size()), - expected_output, sizeof(expected_output)); +template +void TestByteStreamSplitEncoding::CheckDecode() { + if constexpr (sizeof(c_type) == 4) { + const std::vector data{0x11, 0x22, 0x33, 0x44, 0x55, 0x66, + 0x77, 0x88, 0x99, 0xAA, 0xBB, 0xCC}; + const auto expected_output = + ToLittleEndian({0xAA774411U, 0xBB885522U, 0xCC996633U}); + CheckDecode(span{data}, span{expected_output}); + } else { + const std::vector data{0xDE, 0xC0, 0x37, 0x13, 0x11, 0x22, 0x33, 0x44, + 0xAA, 0xBB, 0xCC, 0xDD, 0x55, 0x66, 0x77, 0x88}; + const auto expected_output = + ToLittleEndian({0x7755CCAA331137DEULL, 0x8866DDBB442213C0ULL}); + CheckDecode(span{data}, span{expected_output}); + } } -template <> -void TestByteStreamSplitEncoding::CheckEncode() { - const auto data = ToLittleEndian({0xaabbccdd, 0x11223344}); - const uint8_t expected_output[8] = {0xdd, 0x44, 0xcc, 0x33, 0xbb, 0x22, 0xaa, 0x11}; - CheckEncode(reinterpret_cast(data.data()), static_cast(data.size()), - expected_output, sizeof(expected_output)); +template +void TestByteStreamSplitEncoding::CheckEncode() { + if constexpr (sizeof(c_type) == 4) { + const auto data = ToLittleEndian({0xaabbccddUL, 0x11223344UL}); + const std::vector expected_output{0xdd, 0x44, 0xcc, 0x33, + 0xbb, 0x22, 0xaa, 0x11}; + CheckEncode(span{data}, span{expected_output}); + } else { + const auto data = ToLittleEndian( + {0x4142434445464748ULL, 0x0102030405060708ULL, 0xb1b2b3b4b5b6b7b8ULL}); + const std::vector expected_output{ + 0x48, 0x08, 0xb8, 0x47, 0x07, 0xb7, 0x46, 0x06, 0xb6, 0x45, 0x05, 0xb5, + 0x44, 0x04, 0xb4, 0x43, 0x03, 0xb3, 0x42, 0x02, 0xb2, 0x41, 0x01, 0xb1, + }; + CheckEncode(span{data}, span{expected_output}); + } } typedef ::testing::Types ByteStreamSplitTypes; @@ -1397,17 +1403,12 @@ TYPED_TEST(TestByteStreamSplitEncoding, CheckOnlyEncode) { TEST(ByteStreamSplitEncodeDecode, InvalidDataTypes) { // First check encoders. - ASSERT_THROW(MakeTypedEncoder(Encoding::BYTE_STREAM_SPLIT), - ParquetException); - ASSERT_THROW(MakeTypedEncoder(Encoding::BYTE_STREAM_SPLIT), - ParquetException); ASSERT_THROW(MakeTypedEncoder(Encoding::BYTE_STREAM_SPLIT), ParquetException); ASSERT_THROW(MakeTypedEncoder(Encoding::BYTE_STREAM_SPLIT), ParquetException); ASSERT_THROW(MakeTypedEncoder(Encoding::BYTE_STREAM_SPLIT), ParquetException); - ASSERT_THROW(MakeTypedEncoder(Encoding::BYTE_STREAM_SPLIT), ParquetException); // Then check decoders. ASSERT_THROW(MakeTypedDecoder(Encoding::BYTE_STREAM_SPLIT),