From b8bd38224bc2a1fff61db45b789769503d96d574 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Wed, 7 Feb 2024 17:25:30 +0100 Subject: [PATCH 1/3] GH-39978: [C++][Parquet] Expand BYTE_STREAM_SPLIT to support FIXED_LEN_BYTE_ARRAY, INT32 and INT64 --- .../arrow/util/byte_stream_split_internal.h | 126 +++-- cpp/src/arrow/util/byte_stream_split_test.cc | 72 ++- cpp/src/parquet/column_io_benchmark.cc | 103 +++- cpp/src/parquet/column_writer_test.cc | 16 +- cpp/src/parquet/encoding.cc | 445 +++++++++++------- cpp/src/parquet/encoding_benchmark.cc | 115 ++++- cpp/src/parquet/encoding_test.cc | 207 +++++--- python/pyarrow/tests/parquet/test_basic.py | 42 +- 8 files changed, 757 insertions(+), 369 deletions(-) diff --git a/cpp/src/arrow/util/byte_stream_split_internal.h b/cpp/src/arrow/util/byte_stream_split_internal.h index 77284d695b833..8bca0d442c681 100644 --- a/cpp/src/arrow/util/byte_stream_split_internal.h +++ b/cpp/src/arrow/util/byte_stream_split_internal.h @@ -19,11 +19,14 @@ #include "arrow/util/endian.h" #include "arrow/util/simd.h" +#include "arrow/util/small_vector.h" #include "arrow/util/ubsan.h" #include +#include #include #include +#include #if defined(ARROW_HAVE_NEON) || defined(ARROW_HAVE_SSE4_2) #include @@ -38,10 +41,11 @@ namespace arrow::util::internal { #if defined(ARROW_HAVE_NEON) || defined(ARROW_HAVE_SSE4_2) template -void ByteStreamSplitDecodeSimd128(const uint8_t* data, int64_t num_values, int64_t stride, - uint8_t* out) { +void ByteStreamSplitDecodeSimd128(const uint8_t* data, int width, int64_t num_values, + int64_t stride, uint8_t* out) { using simd_batch = xsimd::make_sized_batch_t; + assert(width == kNumStreams); static_assert(kNumStreams == 4 || kNumStreams == 8, "Invalid number of streams."); constexpr int kNumStreamsLog2 = (kNumStreams == 8 ? 3 : 2); constexpr int64_t kBlockSize = sizeof(simd_batch) * kNumStreams; @@ -92,10 +96,11 @@ void ByteStreamSplitDecodeSimd128(const uint8_t* data, int64_t num_values, int64 } template -void ByteStreamSplitEncodeSimd128(const uint8_t* raw_values, const int64_t num_values, - uint8_t* output_buffer_raw) { +void ByteStreamSplitEncodeSimd128(const uint8_t* raw_values, int width, + const int64_t num_values, uint8_t* output_buffer_raw) { using simd_batch = xsimd::make_sized_batch_t; + assert(width == kNumStreams); static_assert(kNumStreams == 4 || kNumStreams == 8, "Invalid number of streams."); constexpr int kBlockSize = sizeof(simd_batch) * kNumStreams; @@ -215,15 +220,17 @@ void ByteStreamSplitEncodeSimd128(const uint8_t* raw_values, const int64_t num_v #if defined(ARROW_HAVE_AVX2) template -void ByteStreamSplitDecodeAvx2(const uint8_t* data, int64_t num_values, int64_t stride, - uint8_t* out) { +void ByteStreamSplitDecodeAvx2(const uint8_t* data, int width, int64_t num_values, + int64_t stride, uint8_t* out) { + assert(width == kNumStreams); static_assert(kNumStreams == 4 || kNumStreams == 8, "Invalid number of streams."); constexpr int kNumStreamsLog2 = (kNumStreams == 8 ? 3 : 2); constexpr int64_t kBlockSize = sizeof(__m256i) * kNumStreams; const int64_t size = num_values * kNumStreams; if (size < kBlockSize) // Back to SSE for small size - return ByteStreamSplitDecodeSimd128(data, num_values, stride, out); + return ByteStreamSplitDecodeSimd128(data, width, num_values, stride, + out); const int64_t num_blocks = size / kBlockSize; // First handle suffix. @@ -299,18 +306,19 @@ void ByteStreamSplitDecodeAvx2(const uint8_t* data, int64_t num_values, int64_t } template -void ByteStreamSplitEncodeAvx2(const uint8_t* raw_values, const int64_t num_values, - uint8_t* output_buffer_raw) { +void ByteStreamSplitEncodeAvx2(const uint8_t* raw_values, int width, + const int64_t num_values, uint8_t* output_buffer_raw) { + assert(width == kNumStreams); static_assert(kNumStreams == 4 || kNumStreams == 8, "Invalid number of streams."); constexpr int kBlockSize = sizeof(__m256i) * kNumStreams; if constexpr (kNumStreams == 8) // Back to SSE, currently no path for double. - return ByteStreamSplitEncodeSimd128(raw_values, num_values, + return ByteStreamSplitEncodeSimd128(raw_values, width, num_values, output_buffer_raw); const int64_t size = num_values * kNumStreams; if (size < kBlockSize) // Back to SSE for small size - return ByteStreamSplitEncodeSimd128(raw_values, num_values, + return ByteStreamSplitEncodeSimd128(raw_values, width, num_values, output_buffer_raw); const int64_t num_blocks = size / kBlockSize; const __m256i* raw_values_simd = reinterpret_cast(raw_values); @@ -373,25 +381,26 @@ void ByteStreamSplitEncodeAvx2(const uint8_t* raw_values, const int64_t num_valu #if defined(ARROW_HAVE_SIMD_SPLIT) template -void inline ByteStreamSplitDecodeSimd(const uint8_t* data, int64_t num_values, +void inline ByteStreamSplitDecodeSimd(const uint8_t* data, int width, int64_t num_values, int64_t stride, uint8_t* out) { #if defined(ARROW_HAVE_AVX2) - return ByteStreamSplitDecodeAvx2(data, num_values, stride, out); + return ByteStreamSplitDecodeAvx2(data, width, num_values, stride, out); #elif defined(ARROW_HAVE_SSE4_2) || defined(ARROW_HAVE_NEON) - return ByteStreamSplitDecodeSimd128(data, num_values, stride, out); + return ByteStreamSplitDecodeSimd128(data, width, num_values, stride, out); #else #error "ByteStreamSplitDecodeSimd not implemented" #endif } template -void inline ByteStreamSplitEncodeSimd(const uint8_t* raw_values, const int64_t num_values, +void inline ByteStreamSplitEncodeSimd(const uint8_t* raw_values, int width, + const int64_t num_values, uint8_t* output_buffer_raw) { #if defined(ARROW_HAVE_AVX2) - return ByteStreamSplitEncodeAvx2(raw_values, num_values, + return ByteStreamSplitEncodeAvx2(raw_values, width, num_values, output_buffer_raw); #elif defined(ARROW_HAVE_SSE4_2) || defined(ARROW_HAVE_NEON) - return ByteStreamSplitEncodeSimd128(raw_values, num_values, + return ByteStreamSplitEncodeSimd128(raw_values, width, num_values, output_buffer_raw); #else #error "ByteStreamSplitEncodeSimd not implemented" @@ -492,18 +501,30 @@ inline void DoMergeStreams(const uint8_t** src_streams, int width, int64_t nvalu } template -void ByteStreamSplitEncodeScalar(const uint8_t* raw_values, const int64_t num_values, - uint8_t* output_buffer_raw) { +void ByteStreamSplitEncodeScalar(const uint8_t* raw_values, int width, + const int64_t num_values, uint8_t* out) { + assert(width == kNumStreams); std::array dest_streams; for (int stream = 0; stream < kNumStreams; ++stream) { - dest_streams[stream] = &output_buffer_raw[stream * num_values]; + dest_streams[stream] = &out[stream * num_values]; } DoSplitStreams(raw_values, kNumStreams, num_values, dest_streams.data()); } +inline void ByteStreamSplitEncodeScalarDynamic(const uint8_t* raw_values, int width, + const int64_t num_values, uint8_t* out) { + ::arrow::internal::SmallVector dest_streams; + dest_streams.resize(width); + for (int stream = 0; stream < width; ++stream) { + dest_streams[stream] = &out[stream * num_values]; + } + DoSplitStreams(raw_values, width, num_values, dest_streams.data()); +} + template -void ByteStreamSplitDecodeScalar(const uint8_t* data, int64_t num_values, int64_t stride, - uint8_t* out) { +void ByteStreamSplitDecodeScalar(const uint8_t* data, int width, int64_t num_values, + int64_t stride, uint8_t* out) { + assert(width == kNumStreams); std::array src_streams; for (int stream = 0; stream < kNumStreams; ++stream) { src_streams[stream] = &data[stream * stride]; @@ -511,26 +532,63 @@ void ByteStreamSplitDecodeScalar(const uint8_t* data, int64_t num_values, int64_ DoMergeStreams(src_streams.data(), kNumStreams, num_values, out); } -template -void inline ByteStreamSplitEncode(const uint8_t* raw_values, const int64_t num_values, - uint8_t* output_buffer_raw) { +inline void ByteStreamSplitDecodeScalarDynamic(const uint8_t* data, int width, + int64_t num_values, int64_t stride, + uint8_t* out) { + ::arrow::internal::SmallVector src_streams; + src_streams.resize(width); + for (int stream = 0; stream < width; ++stream) { + src_streams[stream] = &data[stream * stride]; + } + DoMergeStreams(src_streams.data(), width, num_values, out); +} + +inline void ByteStreamSplitEncode(const uint8_t* raw_values, int width, + const int64_t num_values, uint8_t* out) { #if defined(ARROW_HAVE_SIMD_SPLIT) - return ByteStreamSplitEncodeSimd(raw_values, num_values, - output_buffer_raw); +#define ByteStreamSplitEncodePerhapsSimd ByteStreamSplitEncodeSimd #else - return ByteStreamSplitEncodeScalar(raw_values, num_values, - output_buffer_raw); +#define ByteStreamSplitEncodePerhapsSimd ByteStreamSplitEncodeScalar #endif + switch (width) { + case 1: + memcpy(out, raw_values, num_values); + return; + case 2: + return ByteStreamSplitEncodeScalar<2>(raw_values, width, num_values, out); + case 4: + return ByteStreamSplitEncodePerhapsSimd<4>(raw_values, width, num_values, out); + case 8: + return ByteStreamSplitEncodePerhapsSimd<8>(raw_values, width, num_values, out); + case 16: + return ByteStreamSplitEncodeScalar<16>(raw_values, width, num_values, out); + } + return ByteStreamSplitEncodeScalarDynamic(raw_values, width, num_values, out); +#undef ByteStreamSplitEncodePerhapsSimd } -template -void inline ByteStreamSplitDecode(const uint8_t* data, int64_t num_values, int64_t stride, - uint8_t* out) { +inline void ByteStreamSplitDecode(const uint8_t* data, int width, int64_t num_values, + int64_t stride, uint8_t* out) { #if defined(ARROW_HAVE_SIMD_SPLIT) - return ByteStreamSplitDecodeSimd(data, num_values, stride, out); +#define ByteStreamSplitDecodePerhapsSimd ByteStreamSplitDecodeSimd #else - return ByteStreamSplitDecodeScalar(data, num_values, stride, out); +#define ByteStreamSplitDecodePerhapsSimd ByteStreamSplitDecodeScalar #endif + switch (width) { + case 1: + memcpy(out, data, num_values); + return; + case 2: + return ByteStreamSplitDecodeScalar<2>(data, width, num_values, stride, out); + case 4: + return ByteStreamSplitDecodePerhapsSimd<4>(data, width, num_values, stride, out); + case 8: + return ByteStreamSplitDecodePerhapsSimd<8>(data, width, num_values, stride, out); + case 16: + return ByteStreamSplitDecodeScalar<16>(data, width, num_values, stride, out); + } + return ByteStreamSplitDecodeScalarDynamic(data, width, num_values, stride, out); +#undef ByteStreamSplitDecodePerhapsSimd } } // namespace arrow::util::internal diff --git a/cpp/src/arrow/util/byte_stream_split_test.cc b/cpp/src/arrow/util/byte_stream_split_test.cc index 83ed8c9ba5fcd..3a537725b0692 100644 --- a/cpp/src/arrow/util/byte_stream_split_test.cc +++ b/cpp/src/arrow/util/byte_stream_split_test.cc @@ -16,6 +16,7 @@ // under the License. #include +#include #include #include #include @@ -35,7 +36,8 @@ namespace arrow::util::internal { -using ByteStreamSplitTypes = ::testing::Types; +using ByteStreamSplitTypes = + ::testing::Types>; template struct NamedFunc { @@ -63,23 +65,12 @@ class TestByteStreamSplitSpecialized : public ::testing::Test { public: static constexpr int kWidth = static_cast(sizeof(T)); - using EncodeFunc = NamedFunc)>>; - using DecodeFunc = NamedFunc)>>; + using EncodeFunc = NamedFunc>; + using DecodeFunc = NamedFunc>; void SetUp() override { - encode_funcs_.push_back({"reference", &ReferenceEncode}); - encode_funcs_.push_back({"scalar", &ByteStreamSplitEncodeScalar}); - decode_funcs_.push_back({"scalar", &ByteStreamSplitDecodeScalar}); -#if defined(ARROW_HAVE_SIMD_SPLIT) - encode_funcs_.push_back({"simd", &ByteStreamSplitEncodeSimd}); - decode_funcs_.push_back({"simd", &ByteStreamSplitDecodeSimd}); - encode_funcs_.push_back({"simd128", &ByteStreamSplitEncodeSimd128}); - decode_funcs_.push_back({"simd128", &ByteStreamSplitDecodeSimd128}); -#endif -#if defined(ARROW_HAVE_AVX2) - encode_funcs_.push_back({"avx2", &ByteStreamSplitEncodeAvx2}); - decode_funcs_.push_back({"avx2", &ByteStreamSplitDecodeAvx2}); -#endif + decode_funcs_ = MakeDecodeFuncs(); + encode_funcs_ = MakeEncodeFuncs(); } void TestRoundtrip(int64_t num_values) { @@ -92,12 +83,12 @@ class TestByteStreamSplitSpecialized : public ::testing::Test { for (const auto& encode_func : encode_funcs_) { ARROW_SCOPED_TRACE("encode_func = ", encode_func); encoded.assign(encoded.size(), 0); - encode_func.func(reinterpret_cast(input.data()), num_values, + encode_func.func(reinterpret_cast(input.data()), kWidth, num_values, encoded.data()); for (const auto& decode_func : decode_funcs_) { ARROW_SCOPED_TRACE("decode_func = ", decode_func); decoded.assign(decoded.size(), T{}); - decode_func.func(encoded.data(), num_values, /*stride=*/num_values, + decode_func.func(encoded.data(), kWidth, num_values, /*stride=*/num_values, reinterpret_cast(decoded.data())); ASSERT_EQ(decoded, input); } @@ -123,7 +114,8 @@ class TestByteStreamSplitSpecialized : public ::testing::Test { int64_t offset = 0; while (offset < num_values) { auto chunk_size = std::min(num_values - offset, chunk_size_dist(gen)); - decode_func.func(encoded.data() + offset, chunk_size, /*stride=*/num_values, + decode_func.func(encoded.data() + offset, kWidth, chunk_size, + /*stride=*/num_values, reinterpret_cast(decoded.data() + offset)); offset += chunk_size; } @@ -141,20 +133,48 @@ class TestByteStreamSplitSpecialized : public ::testing::Test { static std::vector MakeRandomInput(int64_t num_values) { std::vector input(num_values); random_bytes(kWidth * num_values, seed_++, reinterpret_cast(input.data())); - // Avoid NaNs to ease comparison - for (auto& value : input) { - if (std::isnan(value)) { - value = nan_replacement_++; - } - } return input; } + template + static std::vector MakeDecodeFuncs() { + std::vector funcs; + funcs.push_back({"scalar_dynamic", &ByteStreamSplitDecodeScalarDynamic}); + funcs.push_back({"scalar", &ByteStreamSplitDecodeScalar}); +#if defined(ARROW_HAVE_SIMD_SPLIT) + if constexpr (kSimdImplemented) { + funcs.push_back({"simd", &ByteStreamSplitDecodeSimd}); + funcs.push_back({"simd128", &ByteStreamSplitDecodeSimd128}); +#if defined(ARROW_HAVE_AVX2) + funcs.push_back({"avx2", &ByteStreamSplitDecodeAvx2}); +#endif + } +#endif // defined(ARROW_HAVE_SIMD_SPLIT) + return funcs; + } + + template + static std::vector MakeEncodeFuncs() { + std::vector funcs; + funcs.push_back({"reference", &ReferenceByteStreamSplitEncode}); + funcs.push_back({"scalar_dynamic", &ByteStreamSplitEncodeScalarDynamic}); + funcs.push_back({"scalar", &ByteStreamSplitEncodeScalar}); +#if defined(ARROW_HAVE_SIMD_SPLIT) + if constexpr (kSimdImplemented) { + funcs.push_back({"simd", &ByteStreamSplitEncodeSimd}); + funcs.push_back({"simd128", &ByteStreamSplitEncodeSimd128}); +#if defined(ARROW_HAVE_AVX2) + funcs.push_back({"avx2", &ByteStreamSplitEncodeAvx2}); +#endif + } +#endif // defined(ARROW_HAVE_SIMD_SPLIT) + return funcs; + } + std::vector encode_funcs_; std::vector decode_funcs_; static inline uint32_t seed_ = 42; - static inline T nan_replacement_ = 0; }; TYPED_TEST_SUITE(TestByteStreamSplitSpecialized, ByteStreamSplitTypes); diff --git a/cpp/src/parquet/column_io_benchmark.cc b/cpp/src/parquet/column_io_benchmark.cc index 593765dcd4e0b..b003b4eede1e4 100644 --- a/cpp/src/parquet/column_io_benchmark.cc +++ b/cpp/src/parquet/column_io_benchmark.cc @@ -54,45 +54,57 @@ std::shared_ptr Int64Schema(Repetition::type repetition) { } void SetBytesProcessed(::benchmark::State& state, Repetition::type repetition) { - int64_t bytes_processed = state.iterations() * state.range(0) * sizeof(int64_t); + int64_t num_values = state.iterations() * state.range(0); + int64_t bytes_processed = num_values * sizeof(int64_t); if (repetition != Repetition::REQUIRED) { - bytes_processed += state.iterations() * state.range(0) * sizeof(int16_t); + bytes_processed += num_values * sizeof(int16_t); } if (repetition == Repetition::REPEATED) { - bytes_processed += state.iterations() * state.range(0) * sizeof(int16_t); + bytes_processed += num_values * sizeof(int16_t); } state.SetBytesProcessed(bytes_processed); + state.SetItemsProcessed(num_values); } -template -static void BM_WriteInt64Column(::benchmark::State& state) { +static void BM_WriteInt64Column(::benchmark::State& state, Repetition::type repetition, + Compression::type codec, Encoding::type encoding) { format::ColumnChunk thrift_metadata; ::arrow::random::RandomArrayGenerator rgen(1337); auto values = rgen.Int64(state.range(0), 0, 1000000, 0); - const auto& i8_values = static_cast(*values); + const auto& int64_values = static_cast(*values); std::vector definition_levels(state.range(0), 1); std::vector repetition_levels(state.range(0), 0); std::shared_ptr schema = Int64Schema(repetition); std::shared_ptr properties = WriterProperties::Builder() .compression(codec) - ->encoding(Encoding::PLAIN) + ->encoding(encoding) ->disable_dictionary() ->build(); auto metadata = ColumnChunkMetaDataBuilder::Make( properties, schema.get(), reinterpret_cast(&thrift_metadata)); - while (state.KeepRunning()) { + int64_t data_size = values->length() * sizeof(int64_t); + int64_t stream_size = 0; + for (auto _ : state) { auto stream = CreateOutputStream(); std::shared_ptr writer = BuildWriter( state.range(0), stream, metadata.get(), schema.get(), properties.get(), codec); - writer->WriteBatch(i8_values.length(), definition_levels.data(), - repetition_levels.data(), i8_values.raw_values()); + writer->WriteBatch(int64_values.length(), definition_levels.data(), + repetition_levels.data(), int64_values.raw_values()); writer->Close(); + stream_size = stream->Tell().ValueOrDie(); } SetBytesProcessed(state, repetition); + state.counters["compression_ratio"] = static_cast(data_size) / stream_size; +} + +template +static void BM_WriteInt64Column(::benchmark::State& state) { + BM_WriteInt64Column(state, repetition, codec, encoding); } BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::REQUIRED)->Arg(1 << 20); @@ -106,6 +118,12 @@ BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::OPTIONAL, Compression::SNAPP ->Arg(1 << 20); BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::REPEATED, Compression::SNAPPY) ->Arg(1 << 20); +BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::REQUIRED, Compression::SNAPPY, + Encoding::BYTE_STREAM_SPLIT) + ->Arg(1 << 20); +BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::OPTIONAL, Compression::SNAPPY, + Encoding::BYTE_STREAM_SPLIT) + ->Arg(1 << 20); #endif #ifdef ARROW_WITH_LZ4 @@ -115,6 +133,12 @@ BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::OPTIONAL, Compression::LZ4) ->Arg(1 << 20); BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::REPEATED, Compression::LZ4) ->Arg(1 << 20); +BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::REQUIRED, Compression::LZ4, + Encoding::BYTE_STREAM_SPLIT) + ->Arg(1 << 20); +BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::OPTIONAL, Compression::LZ4, + Encoding::BYTE_STREAM_SPLIT) + ->Arg(1 << 20); #endif #ifdef ARROW_WITH_ZSTD @@ -124,6 +148,12 @@ BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::OPTIONAL, Compression::ZSTD) ->Arg(1 << 20); BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::REPEATED, Compression::ZSTD) ->Arg(1 << 20); +BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::REQUIRED, Compression::ZSTD, + Encoding::BYTE_STREAM_SPLIT) + ->Arg(1 << 20); +BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::OPTIONAL, Compression::ZSTD, + Encoding::BYTE_STREAM_SPLIT) + ->Arg(1 << 20); #endif std::shared_ptr BuildReader(std::shared_ptr& buffer, @@ -135,17 +165,20 @@ std::shared_ptr BuildReader(std::shared_ptr& buffer, ColumnReader::Make(schema, std::move(page_reader))); } -template -static void BM_ReadInt64Column(::benchmark::State& state) { +static void BM_ReadInt64Column(::benchmark::State& state, Repetition::type repetition, + Compression::type codec, Encoding::type encoding) { format::ColumnChunk thrift_metadata; - std::vector values(state.range(0), 128); + + ::arrow::random::RandomArrayGenerator rgen(1337); + auto values = rgen.Int64(state.range(0), 0, 1000000, 0); + const auto& int64_values = static_cast(*values); + std::vector definition_levels(state.range(0), 1); std::vector repetition_levels(state.range(0), 0); std::shared_ptr schema = Int64Schema(repetition); std::shared_ptr properties = WriterProperties::Builder() .compression(codec) - ->encoding(Encoding::PLAIN) + ->encoding(encoding) ->disable_dictionary() ->build(); @@ -155,11 +188,14 @@ static void BM_ReadInt64Column(::benchmark::State& state) { auto stream = CreateOutputStream(); std::shared_ptr writer = BuildWriter( state.range(0), stream, metadata.get(), schema.get(), properties.get(), codec); - writer->WriteBatch(values.size(), definition_levels.data(), repetition_levels.data(), - values.data()); + writer->WriteBatch(int64_values.length(), definition_levels.data(), + repetition_levels.data(), int64_values.raw_values()); writer->Close(); PARQUET_ASSIGN_OR_THROW(auto src, stream->Finish()); + int64_t stream_size = src->size(); + int64_t data_size = int64_values.length() * sizeof(int64_t); + std::vector values_out(state.range(1)); std::vector definition_levels_out(state.range(1)); std::vector repetition_levels_out(state.range(1)); @@ -167,12 +203,20 @@ static void BM_ReadInt64Column(::benchmark::State& state) { std::shared_ptr reader = BuildReader(src, state.range(1), codec, schema.get()); int64_t values_read = 0; - for (size_t i = 0; i < values.size(); i += values_read) { + for (int64_t i = 0; i < int64_values.length(); i += values_read) { reader->ReadBatch(values_out.size(), definition_levels_out.data(), repetition_levels_out.data(), values_out.data(), &values_read); } } SetBytesProcessed(state, repetition); + state.counters["compression_ratio"] = static_cast(data_size) / stream_size; +} + +template +static void BM_ReadInt64Column(::benchmark::State& state) { + BM_ReadInt64Column(state, repetition, codec, encoding); } void ReadColumnSetArgs(::benchmark::internal::Benchmark* bench) { @@ -197,6 +241,13 @@ BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::OPTIONAL, Compression::SNAPPY ->Apply(ReadColumnSetArgs); BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::REPEATED, Compression::SNAPPY) ->Apply(ReadColumnSetArgs); + +BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::REQUIRED, Compression::SNAPPY, + Encoding::BYTE_STREAM_SPLIT) + ->Apply(ReadColumnSetArgs); +BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::OPTIONAL, Compression::SNAPPY, + Encoding::BYTE_STREAM_SPLIT) + ->Apply(ReadColumnSetArgs); #endif #ifdef ARROW_WITH_LZ4 @@ -206,6 +257,13 @@ BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::OPTIONAL, Compression::LZ4) ->Apply(ReadColumnSetArgs); BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::REPEATED, Compression::LZ4) ->Apply(ReadColumnSetArgs); + +BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::REQUIRED, Compression::LZ4, + Encoding::BYTE_STREAM_SPLIT) + ->Apply(ReadColumnSetArgs); +BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::OPTIONAL, Compression::LZ4, + Encoding::BYTE_STREAM_SPLIT) + ->Apply(ReadColumnSetArgs); #endif #ifdef ARROW_WITH_ZSTD @@ -215,6 +273,13 @@ BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::OPTIONAL, Compression::ZSTD) ->Apply(ReadColumnSetArgs); BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::REPEATED, Compression::ZSTD) ->Apply(ReadColumnSetArgs); + +BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::REQUIRED, Compression::ZSTD, + Encoding::BYTE_STREAM_SPLIT) + ->Apply(ReadColumnSetArgs); +BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::OPTIONAL, Compression::ZSTD, + Encoding::BYTE_STREAM_SPLIT) + ->Apply(ReadColumnSetArgs); #endif static void BM_RleEncoding(::benchmark::State& state) { diff --git a/cpp/src/parquet/column_writer_test.cc b/cpp/src/parquet/column_writer_test.cc index a8519a0f56861..c99efd17961aa 100644 --- a/cpp/src/parquet/column_writer_test.cc +++ b/cpp/src/parquet/column_writer_test.cc @@ -507,23 +507,33 @@ TEST_F(TestValuesWriterInt32Type, RequiredDeltaBinaryPacked) { this->TestRequiredWithEncoding(Encoding::DELTA_BINARY_PACKED); } +TEST_F(TestValuesWriterInt32Type, RequiredByteStreamSplit) { + this->TestRequiredWithEncoding(Encoding::BYTE_STREAM_SPLIT); +} + TEST_F(TestValuesWriterInt64Type, RequiredDeltaBinaryPacked) { this->TestRequiredWithEncoding(Encoding::DELTA_BINARY_PACKED); } +TEST_F(TestValuesWriterInt64Type, RequiredByteStreamSplit) { + this->TestRequiredWithEncoding(Encoding::BYTE_STREAM_SPLIT); +} + TEST_F(TestByteArrayValuesWriter, RequiredDeltaLengthByteArray) { this->TestRequiredWithEncoding(Encoding::DELTA_LENGTH_BYTE_ARRAY); } -/* -TYPED_TEST(TestByteArrayValuesWriter, RequiredDeltaByteArray) { +TEST_F(TestByteArrayValuesWriter, RequiredDeltaByteArray) { this->TestRequiredWithEncoding(Encoding::DELTA_BYTE_ARRAY); } TEST_F(TestFixedLengthByteArrayValuesWriter, RequiredDeltaByteArray) { this->TestRequiredWithEncoding(Encoding::DELTA_BYTE_ARRAY); } -*/ + +TEST_F(TestFixedLengthByteArrayValuesWriter, RequiredByteStreamSplit) { + this->TestRequiredWithEncoding(Encoding::BYTE_STREAM_SPLIT); +} TYPED_TEST(TestPrimitiveWriter, RequiredRLEDictionary) { this->TestRequiredWithEncoding(Encoding::RLE_DICTIONARY); diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index a3d1746536647..3eed88f08b22a 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -73,6 +73,10 @@ namespace { // unsigned, but the Java implementation uses signed ints. constexpr size_t kMaxByteArraySize = std::numeric_limits::max(); +// ---------------------------------------------------------------------- +// Encoders +// ---------------------------------------------------------------------- + class EncoderImpl : virtual public Encoder { public: EncoderImpl(const ColumnDescriptor* descr, Encoding::type encoding, MemoryPool* pool) @@ -92,7 +96,7 @@ class EncoderImpl : virtual public Encoder { MemoryPool* pool_; /// Type length from descr - int type_length_; + const int type_length_; }; // ---------------------------------------------------------------------- @@ -262,8 +266,7 @@ inline void PlainEncoder::Put(const ::arrow::Array& values) { } void AssertFixedSizeBinary(const ::arrow::Array& values, int type_length) { - if (values.type_id() != ::arrow::Type::FIXED_SIZE_BINARY && - values.type_id() != ::arrow::Type::DECIMAL) { + if (!::arrow::is_fixed_size_binary(values.type_id())) { throw ParquetException("Only FixedSizeBinaryArray and subclasses supported"); } if (checked_cast(*values.type()).byte_width() != @@ -464,8 +467,6 @@ class DictEncoderImpl : public EncoderImpl, virtual public DictEncoder { return kDataPageBitWidthBytes + encoder.len(); } - void set_type_length(int type_length) { this->type_length_ = type_length; } - /// Returns a conservative estimate of the number of bytes needed to encode the buffered /// indices. Used to size the buffer passed to WriteIndices(). int64_t EstimatedDataEncodedSize() override { @@ -801,98 +802,154 @@ void DictEncoderImpl::PutDictionary(const ::arrow::Array& values) // ---------------------------------------------------------------------- // ByteStreamSplitEncoder implementations +// Common base class for all types + template -class ByteStreamSplitEncoder : public EncoderImpl, virtual public TypedEncoder { +class ByteStreamSplitEncoderBase : public EncoderImpl, + virtual public TypedEncoder { public: using T = typename DType::c_type; using TypedEncoder::Put; - explicit ByteStreamSplitEncoder( - const ColumnDescriptor* descr, - ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()); + ByteStreamSplitEncoderBase(const ColumnDescriptor* descr, int byte_width, + ::arrow::MemoryPool* pool) + : EncoderImpl(descr, Encoding::BYTE_STREAM_SPLIT, pool), + sink_{pool}, + byte_width_(byte_width), + num_values_in_buffer_{0} {} - int64_t EstimatedDataEncodedSize() override; - std::shared_ptr FlushValues() override; + int64_t EstimatedDataEncodedSize() override { return sink_.length(); } - void Put(const T* buffer, int num_values) override; - void Put(const ::arrow::Array& values) override; - void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits, - int64_t valid_bits_offset) override; + std::shared_ptr FlushValues() override { + if (byte_width_ == 1) { + // Special-cased fast path + PARQUET_ASSIGN_OR_THROW(auto buf, sink_.Finish()); + return buf; + } + auto output_buffer = AllocateBuffer(this->memory_pool(), EstimatedDataEncodedSize()); + uint8_t* output_buffer_raw = output_buffer->mutable_data(); + const uint8_t* raw_values = sink_.data(); + ::arrow::util::internal::ByteStreamSplitEncode( + raw_values, /*width=*/byte_width_, num_values_in_buffer_, output_buffer_raw); + sink_.Reset(); + num_values_in_buffer_ = 0; + return output_buffer; + } - protected: - template - void PutImpl(const ::arrow::Array& values) { - if (values.type_id() != ArrowType::type_id) { - throw ParquetException(std::string() + "direct put to " + ArrowType::type_name() + - " from " + values.type()->ToString() + " not supported"); + void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits, + int64_t valid_bits_offset) override { + if (valid_bits != NULLPTR) { + PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values * sizeof(T), + this->memory_pool())); + T* data = buffer->template mutable_data_as(); + int num_valid_values = ::arrow::util::internal::SpacedCompress( + src, num_values, valid_bits, valid_bits_offset, data); + Put(data, num_valid_values); + } else { + Put(src, num_values); } - const auto& data = *values.data(); - PutSpaced(data.GetValues(1), - static_cast(data.length), data.GetValues(0, 0), data.offset); } + protected: ::arrow::BufferBuilder sink_; + // Required because type_length_ is only filled in for FLBA + const int byte_width_; int64_t num_values_in_buffer_; }; -template -ByteStreamSplitEncoder::ByteStreamSplitEncoder(const ColumnDescriptor* descr, - ::arrow::MemoryPool* pool) - : EncoderImpl(descr, Encoding::BYTE_STREAM_SPLIT, pool), - sink_{pool}, - num_values_in_buffer_{0} {} +// BYTE_STREAM_SPLIT encoder implementation for FLOAT, DOUBLE, INT32, INT64 template -int64_t ByteStreamSplitEncoder::EstimatedDataEncodedSize() { - return sink_.length(); -} +class ByteStreamSplitEncoder : public ByteStreamSplitEncoderBase { + public: + using T = typename DType::c_type; + using ArrowType = typename EncodingTraits::ArrowType; -template -std::shared_ptr ByteStreamSplitEncoder::FlushValues() { - std::shared_ptr output_buffer = - AllocateBuffer(this->memory_pool(), EstimatedDataEncodedSize()); - uint8_t* output_buffer_raw = output_buffer->mutable_data(); - const uint8_t* raw_values = sink_.data(); - ::arrow::util::internal::ByteStreamSplitEncode( - raw_values, num_values_in_buffer_, output_buffer_raw); - sink_.Reset(); - num_values_in_buffer_ = 0; - return std::move(output_buffer); -} + ByteStreamSplitEncoder(const ColumnDescriptor* descr, + ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()) + : ByteStreamSplitEncoderBase(descr, + /*byte_width=*/static_cast(sizeof(T)), + pool) {} -template -void ByteStreamSplitEncoder::Put(const T* buffer, int num_values) { - if (num_values > 0) { - PARQUET_THROW_NOT_OK(sink_.Append(buffer, num_values * sizeof(T))); - num_values_in_buffer_ += num_values; + // Inherit Put(const std::vector&...) + using TypedEncoder::Put; + + void Put(const T* buffer, int num_values) override { + if (num_values > 0) { + PARQUET_THROW_NOT_OK( + this->sink_.Append(reinterpret_cast(buffer), + num_values * static_cast(sizeof(T)))); + this->num_values_in_buffer_ += num_values; + } } -} -template <> -void ByteStreamSplitEncoder::Put(const ::arrow::Array& values) { - PutImpl<::arrow::FloatType>(values); -} + void Put(const ::arrow::Array& values) override { + if (values.type_id() != ArrowType::type_id) { + throw ParquetException(std::string() + "direct put from " + + values.type()->ToString() + " not supported"); + } + const auto& data = *values.data(); + this->PutSpaced(data.GetValues(1), + static_cast(data.length), data.GetValues(0, 0), + data.offset); + } +}; + +// BYTE_STREAM_SPLIT encoder implementation for FLBA template <> -void ByteStreamSplitEncoder::Put(const ::arrow::Array& values) { - PutImpl<::arrow::DoubleType>(values); -} +class ByteStreamSplitEncoder : public ByteStreamSplitEncoderBase { + public: + using DType = FLBAType; + using T = FixedLenByteArray; + using ArrowType = ::arrow::FixedSizeBinaryArray; -template -void ByteStreamSplitEncoder::PutSpaced(const T* src, int num_values, - const uint8_t* valid_bits, - int64_t valid_bits_offset) { - if (valid_bits != NULLPTR) { - PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateBuffer(num_values * sizeof(T), - this->memory_pool())); - T* data = buffer->template mutable_data_as(); - int num_valid_values = ::arrow::util::internal::SpacedCompress( - src, num_values, valid_bits, valid_bits_offset, data); - Put(data, num_valid_values); - } else { - Put(src, num_values); + ByteStreamSplitEncoder(const ColumnDescriptor* descr, + ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()) + : ByteStreamSplitEncoderBase(descr, + /*byte_width=*/descr->type_length(), pool) {} + + // Inherit Put(const std::vector&...) + using TypedEncoder::Put; + + void Put(const T* buffer, int num_values) override { + if (byte_width_ > 0) { + const int64_t total_bytes = static_cast(num_values) * byte_width_; + PARQUET_THROW_NOT_OK(sink_.Reserve(total_bytes)); + for (int i = 0; i < num_values; ++i) { + // Write the result to the output stream + DCHECK(buffer[i].ptr != nullptr) << "Value ptr cannot be NULL"; + sink_.UnsafeAppend(buffer[i].ptr, byte_width_); + } + } + this->num_values_in_buffer_ += num_values; } -} + + void Put(const ::arrow::Array& values) override { + AssertFixedSizeBinary(values, byte_width_); + const auto& data = checked_cast(values); + if (data.null_count() == 0) { + // no nulls, just buffer the data + PARQUET_THROW_NOT_OK(sink_.Append(data.raw_values(), data.length() * byte_width_)); + this->num_values_in_buffer_ += data.length(); + } else { + const int64_t num_values = data.length() - data.null_count(); + const int64_t total_bytes = num_values * byte_width_; + PARQUET_THROW_NOT_OK(sink_.Reserve(total_bytes)); + // TODO use VisitSetBitRunsVoid + for (int64_t i = 0; i < data.length(); i++) { + if (data.IsValid(i)) { + sink_.UnsafeAppend(data.Value(i), byte_width_); + } + } + this->num_values_in_buffer_ += num_values; + } + } +}; + +// ---------------------------------------------------------------------- +// Decoders +// ---------------------------------------------------------------------- class DecoderImpl : virtual public Decoder { public: @@ -3559,136 +3616,162 @@ class DeltaByteArrayFLBADecoder : public DeltaByteArrayDecoderImpl, }; // ---------------------------------------------------------------------- -// BYTE_STREAM_SPLIT +// BYTE_STREAM_SPLIT decoders template -class ByteStreamSplitDecoder : public DecoderImpl, virtual public TypedDecoder { +class ByteStreamSplitDecoderBase : public DecoderImpl, + virtual public TypedDecoder { public: using T = typename DType::c_type; - explicit ByteStreamSplitDecoder(const ColumnDescriptor* descr); - int Decode(T* buffer, int max_values) override; + ByteStreamSplitDecoderBase(const ColumnDescriptor* descr, int byte_width) + : DecoderImpl(descr, Encoding::BYTE_STREAM_SPLIT), byte_width_(byte_width) {} - int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, - int64_t valid_bits_offset, - typename EncodingTraits::Accumulator* builder) override; + void SetData(int num_values, const uint8_t* data, int len) override { + if (static_cast(num_values) * byte_width_ != len) { + throw ParquetException("Data size (" + std::to_string(len) + + ") does not match number of values in BYTE_STREAM_SPLIT (" + + std::to_string(num_values) + ")"); + } + DecoderImpl::SetData(num_values, data, len); + stride_ = num_values_; + } int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, - typename EncodingTraits::DictAccumulator* builder) override; - - void SetData(int num_values, const uint8_t* data, int len) override; + typename EncodingTraits::DictAccumulator* builder) override { + ParquetException::NYI("DecodeArrow to DictAccumulator for BYTE_STREAM_SPLIT"); + } - T* EnsureDecodeBuffer(int64_t min_values) { - const int64_t size = sizeof(T) * min_values; + protected: + int DecodeRaw(uint8_t* out_buffer, int max_values) { + const int values_to_decode = std::min(num_values_, max_values); + ::arrow::util::internal::ByteStreamSplitDecode(data_, byte_width_, values_to_decode, + stride_, out_buffer); + data_ += values_to_decode; + num_values_ -= values_to_decode; + len_ -= byte_width_ * values_to_decode; + return values_to_decode; + } + + uint8_t* EnsureDecodeBuffer(int64_t min_values) { + const int64_t size = byte_width_ * min_values; if (!decode_buffer_ || decode_buffer_->size() < size) { - PARQUET_ASSIGN_OR_THROW(decode_buffer_, ::arrow::AllocateBuffer(size)); + const auto alloc_size = ::arrow::bit_util::NextPower2(size); + PARQUET_ASSIGN_OR_THROW(decode_buffer_, ::arrow::AllocateBuffer(alloc_size)); } - return decode_buffer_->mutable_data_as(); + return decode_buffer_->mutable_data(); } - private: - int num_values_in_buffer_{0}; + const int byte_width_; + int stride_{0}; std::shared_ptr decode_buffer_; - - static constexpr int kNumStreams = sizeof(T); }; -template -ByteStreamSplitDecoder::ByteStreamSplitDecoder(const ColumnDescriptor* descr) - : DecoderImpl(descr, Encoding::BYTE_STREAM_SPLIT) {} +// BYTE_STREAM_SPLIT decoder for FLOAT, DOUBLE, INT32, INT64 template -void ByteStreamSplitDecoder::SetData(int num_values, const uint8_t* data, - int len) { - if (num_values * static_cast(sizeof(T)) < len) { - throw ParquetException( - "Data size too large for number of values (padding in byte stream split data " - "page?)"); - } - if (len % sizeof(T) != 0) { - throw ParquetException("ByteStreamSplit data size " + std::to_string(len) + - " not aligned with type " + TypeToString(DType::type_num)); +class ByteStreamSplitDecoder : public ByteStreamSplitDecoderBase { + public: + using T = typename DType::c_type; + + explicit ByteStreamSplitDecoder(const ColumnDescriptor* descr) + : ByteStreamSplitDecoderBase(descr, static_cast(sizeof(T))) {} + + int Decode(T* buffer, int max_values) override { + return this->DecodeRaw(reinterpret_cast(buffer), max_values); } - num_values = len / sizeof(T); - DecoderImpl::SetData(num_values, data, len); - num_values_in_buffer_ = num_values_; -} -template -int ByteStreamSplitDecoder::Decode(T* buffer, int max_values) { - const int values_to_decode = std::min(num_values_, max_values); - const int num_decoded_previously = num_values_in_buffer_ - num_values_; - const uint8_t* data = data_ + num_decoded_previously; + using ByteStreamSplitDecoderBase::DecodeArrow; - ::arrow::util::internal::ByteStreamSplitDecode( - data, values_to_decode, num_values_in_buffer_, reinterpret_cast(buffer)); - num_values_ -= values_to_decode; - len_ -= sizeof(T) * values_to_decode; - return values_to_decode; -} + int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, + int64_t valid_bits_offset, + typename EncodingTraits::Accumulator* builder) override { + const int values_to_decode = num_values - null_count; + if (ARROW_PREDICT_FALSE(this->num_values_ < values_to_decode)) { + ParquetException::EofException(); + } -template -int ByteStreamSplitDecoder::DecodeArrow( - int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, - typename EncodingTraits::Accumulator* builder) { - constexpr int value_size = kNumStreams; - int values_decoded = num_values - null_count; - if (ARROW_PREDICT_FALSE(len_ < value_size * values_decoded)) { - ParquetException::EofException(); + PARQUET_THROW_NOT_OK(builder->Reserve(num_values)); + + // Decode into intermediate buffer. + T* decode_out = reinterpret_cast(this->EnsureDecodeBuffer(values_to_decode)); + const int num_decoded = Decode(decode_out, values_to_decode); + DCHECK_EQ(num_decoded, values_to_decode); + + // If null_count is 0, we could append in bulk or decode directly into + // builder. We could also decode in chunks, or use SpacedExpand. We don't + // bother currently, because DecodeArrow methods are only called for ByteArray. + int64_t offset = 0; + VisitNullBitmapInline( + valid_bits, valid_bits_offset, num_values, null_count, + [&]() { + builder->UnsafeAppend(decode_out[offset]); + ++offset; + }, + [&]() { builder->UnsafeAppendNull(); }); + + return values_to_decode; } +}; - PARQUET_THROW_NOT_OK(builder->Reserve(num_values)); +// BYTE_STREAM_SPLIT decoder for FIXED_LEN_BYTE_ARRAY - const int num_decoded_previously = num_values_in_buffer_ - num_values_; - const uint8_t* data = data_ + num_decoded_previously; - int offset = 0; +template <> +class ByteStreamSplitDecoder : public ByteStreamSplitDecoderBase, + virtual public FLBADecoder { + public: + using DType = FLBAType; + using T = FixedLenByteArray; -#if defined(ARROW_HAVE_SIMD_SPLIT) - // Use fast decoding into intermediate buffer. This will also decode - // some null values, but it's fast enough that we don't care. - T* decode_out = EnsureDecodeBuffer(values_decoded); - ::arrow::util::internal::ByteStreamSplitDecode( - data, values_decoded, num_values_in_buffer_, - reinterpret_cast(decode_out)); + explicit ByteStreamSplitDecoder(const ColumnDescriptor* descr) + : ByteStreamSplitDecoderBase(descr, descr->type_length()) {} - // XXX If null_count is 0, we could even append in bulk or decode directly into - // builder - VisitNullBitmapInline( - valid_bits, valid_bits_offset, num_values, null_count, - [&]() { - builder->UnsafeAppend(decode_out[offset]); - ++offset; - }, - [&]() { builder->UnsafeAppendNull(); }); + int Decode(T* buffer, int max_values) override { + // Decode into intermediate buffer. + max_values = std::min(max_values, this->num_values_); + uint8_t* decode_out = this->EnsureDecodeBuffer(max_values); + const int num_decoded = this->DecodeRaw(decode_out, max_values); + DCHECK_EQ(num_decoded, max_values); -#else - // XXX should operate over runs of 0s / 1s - VisitNullBitmapInline( - valid_bits, valid_bits_offset, num_values, null_count, - [&]() { - uint8_t gathered_byte_data[kNumStreams]; - for (int b = 0; b < kNumStreams; ++b) { - const int64_t byte_index = b * num_values_in_buffer_ + offset; - gathered_byte_data[b] = data[byte_index]; - } - builder->UnsafeAppend(SafeLoadAs(&gathered_byte_data[0])); - ++offset; - }, - [&]() { builder->UnsafeAppendNull(); }); -#endif + for (int i = 0; i < num_decoded; ++i) { + buffer[i] = FixedLenByteArray(decode_out + static_cast(byte_width_) * i); + } + return num_decoded; + } - num_values_ -= values_decoded; - len_ -= sizeof(T) * values_decoded; - return values_decoded; -} + using ByteStreamSplitDecoderBase::DecodeArrow; -template -int ByteStreamSplitDecoder::DecodeArrow( - int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, - typename EncodingTraits::DictAccumulator* builder) { - ParquetException::NYI("DecodeArrow for ByteStreamSplitDecoder"); -} + int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, + int64_t valid_bits_offset, + typename EncodingTraits::Accumulator* builder) override { + const int values_to_decode = num_values - null_count; + if (ARROW_PREDICT_FALSE(this->num_values_ < values_to_decode)) { + ParquetException::EofException(); + } + + PARQUET_THROW_NOT_OK(builder->Reserve(num_values)); + + // Decode into intermediate buffer. + uint8_t* decode_out = this->EnsureDecodeBuffer(values_to_decode); + const int num_decoded = this->DecodeRaw(decode_out, values_to_decode); + DCHECK_EQ(num_decoded, values_to_decode); + + // If null_count is 0, we could append in bulk or decode directly into + // builder. We could also decode in chunks, or use SpacedExpand. We don't + // bother currently, because DecodeArrow methods are only called for ByteArray. + int64_t offset = 0; + VisitNullBitmapInline( + valid_bits, valid_bits_offset, num_values, null_count, + [&]() { + builder->UnsafeAppend(decode_out + offset * static_cast(byte_width_)); + ++offset; + }, + [&]() { builder->UnsafeAppendNull(); }); + + return values_to_decode; + } +}; } // namespace @@ -3742,12 +3825,20 @@ std::unique_ptr MakeEncoder(Type::type type_num, Encoding::type encodin } } else if (encoding == Encoding::BYTE_STREAM_SPLIT) { switch (type_num) { + case Type::INT32: + return std::make_unique>(descr, pool); + case Type::INT64: + return std::make_unique>(descr, pool); case Type::FLOAT: return std::make_unique>(descr, pool); case Type::DOUBLE: return std::make_unique>(descr, pool); + case Type::FIXED_LEN_BYTE_ARRAY: + return std::make_unique>(descr, pool); default: - throw ParquetException("BYTE_STREAM_SPLIT only supports FLOAT and DOUBLE"); + throw ParquetException( + "BYTE_STREAM_SPLIT only supports FLOAT, DOUBLE, INT32, INT64 " + "and FIXED_LEN_BYTE_ARRAY"); } } else if (encoding == Encoding::DELTA_BINARY_PACKED) { switch (type_num) { @@ -3816,12 +3907,20 @@ std::unique_ptr MakeDecoder(Type::type type_num, Encoding::type encodin } } else if (encoding == Encoding::BYTE_STREAM_SPLIT) { switch (type_num) { + case Type::INT32: + return std::make_unique>(descr); + case Type::INT64: + return std::make_unique>(descr); case Type::FLOAT: return std::make_unique>(descr); case Type::DOUBLE: return std::make_unique>(descr); + case Type::FIXED_LEN_BYTE_ARRAY: + return std::make_unique>(descr); default: - throw ParquetException("BYTE_STREAM_SPLIT only supports FLOAT and DOUBLE"); + throw ParquetException( + "BYTE_STREAM_SPLIT only supports FLOAT, DOUBLE, INT32, INT64 " + "and FIXED_LEN_BYTE_ARRAY"); } } else if (encoding == Encoding::DELTA_BINARY_PACKED) { switch (type_num) { diff --git a/cpp/src/parquet/encoding_benchmark.cc b/cpp/src/parquet/encoding_benchmark.cc index 3069e8c9057a9..61959b659f633 100644 --- a/cpp/src/parquet/encoding_benchmark.cc +++ b/cpp/src/parquet/encoding_benchmark.cc @@ -17,6 +17,11 @@ #include "benchmark/benchmark.h" +#include +#include +#include +#include + #include "arrow/array.h" #include "arrow/array/builder_binary.h" #include "arrow/array/builder_dict.h" @@ -31,10 +36,6 @@ #include "parquet/platform.h" #include "parquet/schema.h" -#include -#include -#include - using arrow::default_memory_pool; using arrow::MemoryPool; @@ -361,32 +362,81 @@ static void BM_PlainDecodingSpacedDouble(benchmark::State& state) { } BENCHMARK(BM_PlainDecodingSpacedDouble)->Apply(BM_SpacedArgs); +template +struct ByteStreamSplitDummyValue { + static constexpr T value() { return static_cast(42); } +}; + +template +struct ByteStreamSplitDummyValue> { + using Array = std::array; + + static constexpr Array value() { + Array array; + array.fill(ByteStreamSplitDummyValue::value()); + return array; + } +}; + template static void BM_ByteStreamSplitDecode(benchmark::State& state, DecodeFunc&& decode_func) { - std::vector values(state.range(0), 64.0); + const std::vector values(state.range(0), ByteStreamSplitDummyValue::value()); const uint8_t* values_raw = reinterpret_cast(values.data()); - std::vector output(state.range(0), 0); + std::vector output(state.range(0)); for (auto _ : state) { - decode_func(values_raw, static_cast(values.size()), - static_cast(values.size()), + decode_func(values_raw, + /*width=*/static_cast(sizeof(T)), + /*num_values=*/static_cast(values.size()), + /*stride=*/static_cast(values.size()), reinterpret_cast(output.data())); benchmark::ClobberMemory(); } state.SetBytesProcessed(state.iterations() * values.size() * sizeof(T)); + state.SetItemsProcessed(state.iterations() * values.size()); } template static void BM_ByteStreamSplitEncode(benchmark::State& state, EncodeFunc&& encode_func) { - std::vector values(state.range(0), 64.0); + const std::vector values(state.range(0), ByteStreamSplitDummyValue::value()); const uint8_t* values_raw = reinterpret_cast(values.data()); - std::vector output(state.range(0) * sizeof(T), 0); + std::vector output(state.range(0) * sizeof(T)); for (auto _ : state) { - encode_func(values_raw, values.size(), output.data()); + encode_func(values_raw, /*width=*/static_cast(sizeof(T)), values.size(), + output.data()); benchmark::ClobberMemory(); } state.SetBytesProcessed(state.iterations() * values.size() * sizeof(T)); + state.SetItemsProcessed(state.iterations() * values.size()); +} + +static void BM_ByteStreamSplitDecode_Float_Generic(benchmark::State& state) { + BM_ByteStreamSplitDecode(state, ::arrow::util::internal::ByteStreamSplitDecode); +} + +static void BM_ByteStreamSplitDecode_Double_Generic(benchmark::State& state) { + BM_ByteStreamSplitDecode(state, ::arrow::util::internal::ByteStreamSplitDecode); +} + +template +static void BM_ByteStreamSplitDecode_FLBA_Generic(benchmark::State& state) { + BM_ByteStreamSplitDecode>( + state, ::arrow::util::internal::ByteStreamSplitDecode); +} + +static void BM_ByteStreamSplitEncode_Float_Generic(benchmark::State& state) { + BM_ByteStreamSplitEncode(state, ::arrow::util::internal::ByteStreamSplitEncode); +} + +static void BM_ByteStreamSplitEncode_Double_Generic(benchmark::State& state) { + BM_ByteStreamSplitEncode(state, ::arrow::util::internal::ByteStreamSplitEncode); +} + +template +static void BM_ByteStreamSplitEncode_FLBA_Generic(benchmark::State& state) { + BM_ByteStreamSplitEncode>( + state, ::arrow::util::internal::ByteStreamSplitEncode); } static void BM_ByteStreamSplitDecode_Float_Scalar(benchmark::State& state) { @@ -409,10 +459,29 @@ static void BM_ByteStreamSplitEncode_Double_Scalar(benchmark::State& state) { state, ::arrow::util::internal::ByteStreamSplitEncodeScalar); } -BENCHMARK(BM_ByteStreamSplitDecode_Float_Scalar)->Range(MIN_RANGE, MAX_RANGE); -BENCHMARK(BM_ByteStreamSplitDecode_Double_Scalar)->Range(MIN_RANGE, MAX_RANGE); -BENCHMARK(BM_ByteStreamSplitEncode_Float_Scalar)->Range(MIN_RANGE, MAX_RANGE); -BENCHMARK(BM_ByteStreamSplitEncode_Double_Scalar)->Range(MIN_RANGE, MAX_RANGE); +static void ByteStreamSplitApply(::benchmark::internal::Benchmark* bench) { + // Reduce the number of variations by only testing the two range ends. + bench->Arg(MIN_RANGE)->Arg(MAX_RANGE); +} + +BENCHMARK(BM_ByteStreamSplitDecode_Float_Generic)->Apply(ByteStreamSplitApply); +BENCHMARK(BM_ByteStreamSplitDecode_Double_Generic)->Apply(ByteStreamSplitApply); +BENCHMARK_TEMPLATE(BM_ByteStreamSplitDecode_FLBA_Generic, 2)->Apply(ByteStreamSplitApply); +BENCHMARK_TEMPLATE(BM_ByteStreamSplitDecode_FLBA_Generic, 7)->Apply(ByteStreamSplitApply); +BENCHMARK_TEMPLATE(BM_ByteStreamSplitDecode_FLBA_Generic, 16) + ->Apply(ByteStreamSplitApply); + +BENCHMARK(BM_ByteStreamSplitEncode_Float_Generic)->Apply(ByteStreamSplitApply); +BENCHMARK(BM_ByteStreamSplitEncode_Double_Generic)->Apply(ByteStreamSplitApply); +BENCHMARK_TEMPLATE(BM_ByteStreamSplitEncode_FLBA_Generic, 2)->Apply(ByteStreamSplitApply); +BENCHMARK_TEMPLATE(BM_ByteStreamSplitEncode_FLBA_Generic, 7)->Apply(ByteStreamSplitApply); +BENCHMARK_TEMPLATE(BM_ByteStreamSplitEncode_FLBA_Generic, 16) + ->Apply(ByteStreamSplitApply); + +BENCHMARK(BM_ByteStreamSplitDecode_Float_Scalar)->Apply(ByteStreamSplitApply); +BENCHMARK(BM_ByteStreamSplitDecode_Double_Scalar)->Apply(ByteStreamSplitApply); +BENCHMARK(BM_ByteStreamSplitEncode_Float_Scalar)->Apply(ByteStreamSplitApply); +BENCHMARK(BM_ByteStreamSplitEncode_Double_Scalar)->Apply(ByteStreamSplitApply); #if defined(ARROW_HAVE_SSE4_2) static void BM_ByteStreamSplitDecode_Float_Sse2(benchmark::State& state) { @@ -435,10 +504,10 @@ static void BM_ByteStreamSplitEncode_Double_Sse2(benchmark::State& state) { state, ::arrow::util::internal::ByteStreamSplitEncodeSimd128); } -BENCHMARK(BM_ByteStreamSplitDecode_Float_Sse2)->Range(MIN_RANGE, MAX_RANGE); -BENCHMARK(BM_ByteStreamSplitDecode_Double_Sse2)->Range(MIN_RANGE, MAX_RANGE); -BENCHMARK(BM_ByteStreamSplitEncode_Float_Sse2)->Range(MIN_RANGE, MAX_RANGE); -BENCHMARK(BM_ByteStreamSplitEncode_Double_Sse2)->Range(MIN_RANGE, MAX_RANGE); +BENCHMARK(BM_ByteStreamSplitDecode_Float_Sse2)->Apply(ByteStreamSplitApply); +BENCHMARK(BM_ByteStreamSplitDecode_Double_Sse2)->Apply(ByteStreamSplitApply); +BENCHMARK(BM_ByteStreamSplitEncode_Float_Sse2)->Apply(ByteStreamSplitApply); +BENCHMARK(BM_ByteStreamSplitEncode_Double_Sse2)->Apply(ByteStreamSplitApply); #endif #if defined(ARROW_HAVE_AVX2) @@ -462,10 +531,10 @@ static void BM_ByteStreamSplitEncode_Double_Avx2(benchmark::State& state) { state, ::arrow::util::internal::ByteStreamSplitEncodeAvx2); } -BENCHMARK(BM_ByteStreamSplitDecode_Float_Avx2)->Range(MIN_RANGE, MAX_RANGE); -BENCHMARK(BM_ByteStreamSplitDecode_Double_Avx2)->Range(MIN_RANGE, MAX_RANGE); -BENCHMARK(BM_ByteStreamSplitEncode_Float_Avx2)->Range(MIN_RANGE, MAX_RANGE); -BENCHMARK(BM_ByteStreamSplitEncode_Double_Avx2)->Range(MIN_RANGE, MAX_RANGE); +BENCHMARK(BM_ByteStreamSplitDecode_Float_Avx2)->Apply(ByteStreamSplitApply); +BENCHMARK(BM_ByteStreamSplitDecode_Double_Avx2)->Apply(ByteStreamSplitApply); +BENCHMARK(BM_ByteStreamSplitEncode_Float_Avx2)->Apply(ByteStreamSplitApply); +BENCHMARK(BM_ByteStreamSplitEncode_Double_Avx2)->Apply(ByteStreamSplitApply); #endif #if defined(ARROW_HAVE_NEON) diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc index ee581622c818f..ea0029f4c7d7f 100644 --- a/cpp/src/parquet/encoding_test.cc +++ b/cpp/src/parquet/encoding_test.cc @@ -47,6 +47,7 @@ using arrow::default_memory_pool; using arrow::MemoryPool; using arrow::internal::checked_cast; +using arrow::util::span; namespace bit_util = arrow::bit_util; @@ -711,8 +712,11 @@ class EncodingAdHocTyped : public ::testing::Test { } void ByteStreamSplit(int seed) { - if (!std::is_same::value && - !std::is_same::value) { + if constexpr (!std::is_same_v && + !std::is_same_v && + !std::is_same_v && + !std::is_same_v && + !std::is_same_v) { return; } auto values = GetValues(seed); @@ -1234,7 +1238,8 @@ class TestByteStreamSplitEncoding : public TestEncodingBase { encoder->PutSpaced(draws_, num_values_, valid_bits, valid_bits_offset); encode_buffer_ = encoder->FlushValues(); - decoder->SetData(num_values_, encode_buffer_->data(), + ASSERT_EQ(encode_buffer_->size(), physical_byte_width() * (num_values_ - null_count)); + decoder->SetData(num_values_ - null_count, encode_buffer_->data(), static_cast(encode_buffer_->size())); auto values_decoded = decoder->DecodeSpaced(decode_buf_, num_values_, null_count, valid_bits, valid_bits_offset); @@ -1249,33 +1254,58 @@ class TestByteStreamSplitEncoding : public TestEncodingBase { protected: USING_BASE_MEMBERS(); - void CheckDecode(const uint8_t* encoded_data, const int64_t encoded_data_size, - const c_type* expected_decoded_data, const int num_elements) { + template + void CheckDecode(span encoded_data, span expected_decoded_data, + const ColumnDescriptor* descr = nullptr) { + static_assert(sizeof(U) == sizeof(c_type)); + static_assert(std::is_same_v == std::is_same_v); + std::unique_ptr> decoder = - MakeTypedDecoder(Encoding::BYTE_STREAM_SPLIT); - decoder->SetData(num_elements, encoded_data, static_cast(encoded_data_size)); - std::vector decoded_data(num_elements); - int num_decoded_elements = decoder->Decode(decoded_data.data(), num_elements); + MakeTypedDecoder(Encoding::BYTE_STREAM_SPLIT, descr); + int num_elements = static_cast(expected_decoded_data.size()); + decoder->SetData(num_elements, encoded_data.data(), + static_cast(encoded_data.size())); + std::vector decoded_data(num_elements); + int num_decoded_elements = + decoder->Decode(reinterpret_cast(decoded_data.data()), num_elements); ASSERT_EQ(num_elements, num_decoded_elements); - for (size_t i = 0U; i < decoded_data.size(); ++i) { - ASSERT_EQ(expected_decoded_data[i], decoded_data[i]); + // Compare to expected values + if constexpr (std::is_same_v) { + auto type_length = descr->type_length(); + for (int i = 0; i < num_elements; ++i) { + ASSERT_EQ(span(expected_decoded_data[i].ptr, type_length), + span(decoded_data[i].ptr, type_length)); + } + } else { + for (int i = 0; i < num_elements; ++i) { + ASSERT_EQ(expected_decoded_data[i], decoded_data[i]); + } } ASSERT_EQ(0, decoder->values_left()); } - void CheckEncode(const c_type* data, const int num_elements, - const uint8_t* expected_encoded_data, - const int64_t encoded_data_size) { - std::unique_ptr> encoder = - MakeTypedEncoder(Encoding::BYTE_STREAM_SPLIT); - encoder->Put(data, num_elements); + template + void CheckEncode(span data, span expected_encoded_data, + const ColumnDescriptor* descr = nullptr) { + static_assert(sizeof(U) == sizeof(c_type)); + static_assert(std::is_same_v == std::is_same_v); + + std::unique_ptr> encoder = MakeTypedEncoder( + Encoding::BYTE_STREAM_SPLIT, /*use_dictionary=*/false, descr); + int num_elements = static_cast(data.size()); + encoder->Put(reinterpret_cast(data.data()), num_elements); auto encoded_data = encoder->FlushValues(); - ASSERT_EQ(encoded_data_size, encoded_data->size()); + ASSERT_EQ(expected_encoded_data.size(), encoded_data->size()); const uint8_t* encoded_data_raw = encoded_data->data(); for (int64_t i = 0; i < encoded_data->size(); ++i) { ASSERT_EQ(expected_encoded_data[i], encoded_data_raw[i]); } } + + int physical_byte_width() const { + return std::is_same_v ? descr_->type_length() + : static_cast(sizeof(c_type)); + } }; template @@ -1287,54 +1317,97 @@ static std::vector ToLittleEndian(const std::vector& input) { return data; } -static_assert(sizeof(float) == sizeof(uint32_t), - "BYTE_STREAM_SPLIT encoding tests assume float / uint32_t type sizes"); -static_assert(sizeof(double) == sizeof(uint64_t), - "BYTE_STREAM_SPLIT encoding tests assume double / uint64_t type sizes"); - -template <> -void TestByteStreamSplitEncoding::CheckDecode() { - const uint8_t data[] = {0x11, 0x22, 0x33, 0x44, 0x55, 0x66, - 0x77, 0x88, 0x99, 0xAA, 0xBB, 0xCC}; - const auto expected_output = - ToLittleEndian({0xAA774411U, 0xBB885522U, 0xCC996633U}); - CheckDecode(data, static_cast(sizeof(data)), - reinterpret_cast(expected_output.data()), - static_cast(sizeof(data) / sizeof(float))); -} - -template <> -void TestByteStreamSplitEncoding::CheckDecode() { - const uint8_t data[] = {0xDE, 0xC0, 0x37, 0x13, 0x11, 0x22, 0x33, 0x44, - 0xAA, 0xBB, 0xCC, 0xDD, 0x55, 0x66, 0x77, 0x88}; - const auto expected_output = - ToLittleEndian({0x7755CCAA331137DEULL, 0x8866DDBB442213C0ULL}); - CheckDecode(data, static_cast(sizeof(data)), - reinterpret_cast(expected_output.data()), - static_cast(sizeof(data) / sizeof(double))); -} - -template <> -void TestByteStreamSplitEncoding::CheckEncode() { - const auto data = ToLittleEndian( - {0x4142434445464748ULL, 0x0102030405060708ULL, 0xb1b2b3b4b5b6b7b8ULL}); - const uint8_t expected_output[24] = { - 0x48, 0x08, 0xb8, 0x47, 0x07, 0xb7, 0x46, 0x06, 0xb6, 0x45, 0x05, 0xb5, - 0x44, 0x04, 0xb4, 0x43, 0x03, 0xb3, 0x42, 0x02, 0xb2, 0x41, 0x01, 0xb1, - }; - CheckEncode(reinterpret_cast(data.data()), static_cast(data.size()), - expected_output, sizeof(expected_output)); +std::shared_ptr FLBAColumnDescriptor(int type_length) { + auto node = + schema::PrimitiveNode::Make("", Repetition::REQUIRED, Type::FIXED_LEN_BYTE_ARRAY, + ConvertedType::NONE, type_length); + return std::make_shared(std::move(node), /*max_definition_level=*/0, + /*max_repetition_level=*/0); } -template <> -void TestByteStreamSplitEncoding::CheckEncode() { - const auto data = ToLittleEndian({0xaabbccdd, 0x11223344}); - const uint8_t expected_output[8] = {0xdd, 0x44, 0xcc, 0x33, 0xbb, 0x22, 0xaa, 0x11}; - CheckEncode(reinterpret_cast(data.data()), static_cast(data.size()), - expected_output, sizeof(expected_output)); +template +void TestByteStreamSplitEncoding::CheckDecode() { + if constexpr (std::is_same_v) { + // FIXED_LEN_BYTE_ARRAY + // - type_length = 3 + { + const std::vector data{0x11, 0x22, 0x33, 0x44, 0x55, 0x66, + 0x77, 0x88, 0x99, 0xAA, 0xBB, 0xCC}; + const std::vector raw_expected_output{0x11, 0x55, 0x99, 0x22, 0x66, 0xAA, + 0x33, 0x77, 0xBB, 0x44, 0x88, 0xCC}; + const std::vector expected_output{ + FLBA{&raw_expected_output[0]}, FLBA{&raw_expected_output[3]}, + FLBA{&raw_expected_output[6]}, FLBA{&raw_expected_output[9]}}; + CheckDecode(span{data}, span{expected_output}, FLBAColumnDescriptor(3).get()); + } + // - type_length = 1 + { + const std::vector data{0x11, 0x22, 0x33}; + const std::vector raw_expected_output{0x11, 0x22, 0x33}; + const std::vector expected_output{FLBA{&raw_expected_output[0]}, + FLBA{&raw_expected_output[1]}, + FLBA{&raw_expected_output[2]}}; + CheckDecode(span{data}, span{expected_output}, FLBAColumnDescriptor(1).get()); + } + } else if constexpr (sizeof(c_type) == 4) { + // INT32, FLOAT + const std::vector data{0x11, 0x22, 0x33, 0x44, 0x55, 0x66, + 0x77, 0x88, 0x99, 0xAA, 0xBB, 0xCC}; + const auto expected_output = + ToLittleEndian({0xAA774411U, 0xBB885522U, 0xCC996633U}); + CheckDecode(span{data}, span{expected_output}); + } else { + // INT64, DOUBLE + const std::vector data{0xDE, 0xC0, 0x37, 0x13, 0x11, 0x22, 0x33, 0x44, + 0xAA, 0xBB, 0xCC, 0xDD, 0x55, 0x66, 0x77, 0x88}; + const auto expected_output = + ToLittleEndian({0x7755CCAA331137DEULL, 0x8866DDBB442213C0ULL}); + CheckDecode(span{data}, span{expected_output}); + } } -typedef ::testing::Types ByteStreamSplitTypes; +template +void TestByteStreamSplitEncoding::CheckEncode() { + if constexpr (std::is_same_v) { + // FIXED_LEN_BYTE_ARRAY + // - type_length = 3 + { + const std::vector raw_data{0x11, 0x22, 0x33, 0x44, 0x55, 0x66, + 0x77, 0x88, 0x99, 0xAA, 0xBB, 0xCC}; + const std::vector data{FLBA{&raw_data[0]}, FLBA{&raw_data[3]}, + FLBA{&raw_data[6]}, FLBA{&raw_data[9]}}; + const std::vector expected_output{0x11, 0x44, 0x77, 0xAA, 0x22, 0x55, + 0x88, 0xBB, 0x33, 0x66, 0x99, 0xCC}; + CheckEncode(span{data}, span{expected_output}, FLBAColumnDescriptor(3).get()); + } + // - type_length = 1 + { + const std::vector raw_data{0x11, 0x22, 0x33}; + const std::vector data{FLBA{&raw_data[0]}, FLBA{&raw_data[1]}, + FLBA{&raw_data[2]}}; + const std::vector expected_output{0x11, 0x22, 0x33}; + CheckEncode(span{data}, span{expected_output}, FLBAColumnDescriptor(1).get()); + } + } else if constexpr (sizeof(c_type) == 4) { + // INT32, FLOAT + const auto data = ToLittleEndian({0xaabbccddUL, 0x11223344UL}); + const std::vector expected_output{0xdd, 0x44, 0xcc, 0x33, + 0xbb, 0x22, 0xaa, 0x11}; + CheckEncode(span{data}, span{expected_output}); + } else { + // INT64, DOUBLE + const auto data = ToLittleEndian( + {0x4142434445464748ULL, 0x0102030405060708ULL, 0xb1b2b3b4b5b6b7b8ULL}); + const std::vector expected_output{ + 0x48, 0x08, 0xb8, 0x47, 0x07, 0xb7, 0x46, 0x06, 0xb6, 0x45, 0x05, 0xb5, + 0x44, 0x04, 0xb4, 0x43, 0x03, 0xb3, 0x42, 0x02, 0xb2, 0x41, 0x01, 0xb1, + }; + CheckEncode(span{data}, span{expected_output}); + } +} + +using ByteStreamSplitTypes = + ::testing::Types; TYPED_TEST_SUITE(TestByteStreamSplitEncoding, ByteStreamSplitTypes); TYPED_TEST(TestByteStreamSplitEncoding, BasicRoundTrip) { @@ -1397,30 +1470,20 @@ TYPED_TEST(TestByteStreamSplitEncoding, CheckOnlyEncode) { TEST(ByteStreamSplitEncodeDecode, InvalidDataTypes) { // First check encoders. - ASSERT_THROW(MakeTypedEncoder(Encoding::BYTE_STREAM_SPLIT), - ParquetException); - ASSERT_THROW(MakeTypedEncoder(Encoding::BYTE_STREAM_SPLIT), - ParquetException); ASSERT_THROW(MakeTypedEncoder(Encoding::BYTE_STREAM_SPLIT), ParquetException); ASSERT_THROW(MakeTypedEncoder(Encoding::BYTE_STREAM_SPLIT), ParquetException); ASSERT_THROW(MakeTypedEncoder(Encoding::BYTE_STREAM_SPLIT), ParquetException); - ASSERT_THROW(MakeTypedEncoder(Encoding::BYTE_STREAM_SPLIT), ParquetException); // Then check decoders. - ASSERT_THROW(MakeTypedDecoder(Encoding::BYTE_STREAM_SPLIT), - ParquetException); - ASSERT_THROW(MakeTypedDecoder(Encoding::BYTE_STREAM_SPLIT), - ParquetException); ASSERT_THROW(MakeTypedDecoder(Encoding::BYTE_STREAM_SPLIT), ParquetException); ASSERT_THROW(MakeTypedDecoder(Encoding::BYTE_STREAM_SPLIT), ParquetException); ASSERT_THROW(MakeTypedDecoder(Encoding::BYTE_STREAM_SPLIT), ParquetException); - ASSERT_THROW(MakeTypedDecoder(Encoding::BYTE_STREAM_SPLIT), ParquetException); } // ---------------------------------------------------------------------- diff --git a/python/pyarrow/tests/parquet/test_basic.py b/python/pyarrow/tests/parquet/test_basic.py index bc21d709ec2c8..56b967a0595b8 100644 --- a/python/pyarrow/tests/parquet/test_basic.py +++ b/python/pyarrow/tests/parquet/test_basic.py @@ -323,6 +323,7 @@ def test_byte_stream_split(): # This is only a smoke test. arr_float = pa.array(list(map(float, range(100)))) arr_int = pa.array(list(map(int, range(100)))) + arr_bool = pa.array([True, False] * 50) data_float = [arr_float, arr_float] table = pa.Table.from_arrays(data_float, names=['a', 'b']) @@ -342,16 +343,16 @@ def test_byte_stream_split(): use_byte_stream_split=['a', 'b']) # Check with mixed column types. - mixed_table = pa.Table.from_arrays([arr_float, arr_int], - names=['a', 'b']) + mixed_table = pa.Table.from_arrays([arr_float, arr_float, arr_int, arr_int], + names=['a', 'b', 'c', 'd']) _check_roundtrip(mixed_table, expected=mixed_table, - use_dictionary=['b'], - use_byte_stream_split=['a']) + use_dictionary=['b', 'd'], + use_byte_stream_split=['a', 'c']) # Try to use the wrong data type with the byte_stream_split encoding. # This should throw an exception. - table = pa.Table.from_arrays([arr_int], names=['tmp']) - with pytest.raises(IOError): + table = pa.Table.from_arrays([arr_bool], names=['tmp']) + with pytest.raises(IOError, match='BYTE_STREAM_SPLIT only supports'): _check_roundtrip(table, expected=table, use_byte_stream_split=True, use_dictionary=False) @@ -367,12 +368,13 @@ def test_column_encoding(): [arr_float, arr_int, arr_bin, arr_flba, arr_bool], names=['a', 'b', 'c', 'd', 'e']) - # Check "BYTE_STREAM_SPLIT" for column 'a' and "PLAIN" column_encoding for - # column 'b' and 'c'. + # Check "BYTE_STREAM_SPLIT" for columns 'a', 'b', 'd' + # and "PLAIN" column_encoding for column 'c'. _check_roundtrip(mixed_table, expected=mixed_table, use_dictionary=False, column_encoding={'a': "BYTE_STREAM_SPLIT", - 'b': "PLAIN", - 'c': "PLAIN"}) + 'b': "BYTE_STREAM_SPLIT", + 'c': "PLAIN", + 'd': "BYTE_STREAM_SPLIT"}) # Check "PLAIN" for all columns. _check_roundtrip(mixed_table, expected=mixed_table, @@ -406,20 +408,20 @@ def test_column_encoding(): use_dictionary=False, column_encoding={'e': "RLE"}) - # Try to pass "BYTE_STREAM_SPLIT" column encoding for integer column 'b'. - # This should throw an error as it is only supports FLOAT and DOUBLE. + # Try to pass "BYTE_STREAM_SPLIT" column encoding for boolean column 'e'. + # This should throw an error as it is does not support BOOLEAN. with pytest.raises(IOError, - match="BYTE_STREAM_SPLIT only supports FLOAT and" - " DOUBLE"): + match="BYTE_STREAM_SPLIT only supports"): _check_roundtrip(mixed_table, expected=mixed_table, use_dictionary=False, column_encoding={'a': "PLAIN", - 'b': "BYTE_STREAM_SPLIT", - 'c': "PLAIN"}) + 'c': "PLAIN", + 'e': "BYTE_STREAM_SPLIT"}) # Try to pass use "DELTA_BINARY_PACKED" encoding on float column. # This should throw an error as only integers are supported. - with pytest.raises(OSError): + with pytest.raises(OSError, + match="DELTA_BINARY_PACKED encoder only supports"): _check_roundtrip(mixed_table, expected=mixed_table, use_dictionary=False, column_encoding={'a': "DELTA_BINARY_PACKED", @@ -429,13 +431,15 @@ def test_column_encoding(): # Try to pass "RLE_DICTIONARY". # This should throw an error as dictionary encoding is already used by # default and not supported to be specified as "fallback" encoding - with pytest.raises(ValueError): + with pytest.raises(ValueError, + match="'RLE_DICTIONARY' is already used by default"): _check_roundtrip(mixed_table, expected=mixed_table, use_dictionary=False, column_encoding="RLE_DICTIONARY") # Try to pass unsupported encoding. - with pytest.raises(ValueError): + with pytest.raises(ValueError, + match="Unsupported column encoding: 'MADE_UP_ENCODING'"): _check_roundtrip(mixed_table, expected=mixed_table, use_dictionary=False, column_encoding={'a': "MADE_UP_ENCODING"}) From 94626c4cd40b107d99d8e9b2d7ab98d4027b6378 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Mon, 18 Mar 2024 14:39:40 +0100 Subject: [PATCH 2/3] Add integration test --- cpp/src/parquet/reader_test.cc | 71 ++++++++++++++++++++++++++++++++++ cpp/submodules/parquet-testing | 2 +- 2 files changed, 72 insertions(+), 1 deletion(-) diff --git a/cpp/src/parquet/reader_test.cc b/cpp/src/parquet/reader_test.cc index f9c2e06873a22..a410b19231da5 100644 --- a/cpp/src/parquet/reader_test.cc +++ b/cpp/src/parquet/reader_test.cc @@ -21,6 +21,7 @@ #include #include #include +#include #include #include @@ -123,6 +124,10 @@ std::string concatenated_gzip_members() { std::string byte_stream_split() { return data_file("byte_stream_split.zstd.parquet"); } +std::string byte_stream_split_extended() { + return data_file("byte_stream_split_extended.gzip.parquet"); +} + template std::vector ReadColumnValues(ParquetFileReader* file_reader, int row_group, int column, int64_t expected_values_read) { @@ -154,6 +159,44 @@ void AssertColumnValues(std::shared_ptr> col, int64_t b ASSERT_EQ(expected_values_read, values_read); } +template +void AssertColumnValuesEqual(std::shared_ptr> left_col, + std::shared_ptr> right_col, + int64_t batch_size, int64_t expected_levels_read, + int64_t expected_values_read) { + std::vector left_values(batch_size); + std::vector right_values(batch_size); + int64_t values_read, levels_read; + + levels_read = + left_col->ReadBatch(batch_size, nullptr, nullptr, left_values.data(), &values_read); + ASSERT_EQ(expected_levels_read, levels_read); + ASSERT_EQ(expected_values_read, values_read); + + levels_read = right_col->ReadBatch(batch_size, nullptr, nullptr, right_values.data(), + &values_read); + ASSERT_EQ(expected_levels_read, levels_read); + ASSERT_EQ(expected_values_read, values_read); + + ASSERT_EQ(left_values, right_values); +} + +template +void AssertColumnValuesEqual(ParquetFileReader* file_reader, const std::string& left_col, + const std::string& right_col, int64_t num_rows, + int64_t row_group = 0) { + ARROW_SCOPED_TRACE("left_col = '", left_col, "', right_col = '", right_col, "'"); + + auto left_col_index = file_reader->metadata()->schema()->ColumnIndex(left_col); + auto right_col_index = file_reader->metadata()->schema()->ColumnIndex(right_col); + auto row_group_reader = file_reader->RowGroup(row_group); + auto left_reader = checked_pointer_cast>( + row_group_reader->Column(left_col_index)); + auto right_reader = checked_pointer_cast>( + row_group_reader->Column(right_col_index)); + AssertColumnValuesEqual(left_reader, right_reader, num_rows, num_rows, num_rows); +} + void CheckRowGroupMetadata(const RowGroupMetaData* rg_metadata, bool allow_uncompressed_mismatch = false) { const int64_t total_byte_size = rg_metadata->total_byte_size(); @@ -1522,6 +1565,34 @@ TEST(TestByteStreamSplit, FloatIntegrationFile) { } #endif // ARROW_WITH_ZSTD +#ifdef ARROW_WITH_ZLIB +TEST(TestByteStreamSplit, ExtendedIntegrationFile) { + auto file_path = byte_stream_split_extended(); + auto file = ParquetFileReader::OpenFile(file_path); + + const int64_t kNumRows = 200; + + ASSERT_EQ(kNumRows, file->metadata()->num_rows()); + ASSERT_EQ(14, file->metadata()->num_columns()); + ASSERT_EQ(1, file->metadata()->num_row_groups()); + + AssertColumnValuesEqual(file.get(), "float_plain", "float_byte_stream_split", + kNumRows); + AssertColumnValuesEqual(file.get(), "double_plain", + "double_byte_stream_split", kNumRows); + AssertColumnValuesEqual(file.get(), "int32_plain", "int32_byte_stream_split", + kNumRows); + AssertColumnValuesEqual(file.get(), "int64_plain", "int64_byte_stream_split", + kNumRows); + AssertColumnValuesEqual(file.get(), "float16_plain", + "float16_byte_stream_split", kNumRows); + AssertColumnValuesEqual(file.get(), "flba5_plain", "flba5_byte_stream_split", + kNumRows); + AssertColumnValuesEqual(file.get(), "decimal_plain", + "decimal_byte_stream_split", kNumRows); +} +#endif // ARROW_WITH_ZLIB + struct PageIndexReaderParam { std::vector row_group_indices; std::vector column_indices; diff --git a/cpp/submodules/parquet-testing b/cpp/submodules/parquet-testing index 4cb3cff24c965..74278bc4a1122 160000 --- a/cpp/submodules/parquet-testing +++ b/cpp/submodules/parquet-testing @@ -1 +1 @@ -Subproject commit 4cb3cff24c965fb329cdae763eabce47395a68a0 +Subproject commit 74278bc4a1122d74945969e6dec405abd1533ec3 From 93ebd84446850b286c8cdf422a223eed347a376e Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Mon, 18 Mar 2024 14:57:07 +0100 Subject: [PATCH 3/3] Fix failures --- cpp/src/parquet/reader_test.cc | 27 +++++++++++++++++++++++---- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/cpp/src/parquet/reader_test.cc b/cpp/src/parquet/reader_test.cc index a410b19231da5..7b2169e2473d9 100644 --- a/cpp/src/parquet/reader_test.cc +++ b/cpp/src/parquet/reader_test.cc @@ -21,6 +21,7 @@ #include #include #include +#include #include #include @@ -34,6 +35,7 @@ #include "arrow/testing/random.h" #include "arrow/util/checked_cast.h" #include "arrow/util/config.h" +#include "arrow/util/range.h" #include "parquet/column_reader.h" #include "parquet/column_scanner.h" @@ -46,6 +48,7 @@ #include "parquet/test_util.h" using arrow::internal::checked_pointer_cast; +using arrow::internal::Zip; namespace parquet { using schema::GroupNode; @@ -142,6 +145,23 @@ std::vector ReadColumnValues(ParquetFileReader* file_reader, int row_ return values; } +template +void AssertColumnValuesEqual(const ColumnDescriptor* descr, + const std::vector& left_values, + const std::vector& right_values) { + if constexpr (std::is_same_v) { + // operator== for FLBA in test_util.h is unusable (it hard-codes length to 12) + const auto length = descr->type_length(); + for (const auto& [left, right] : Zip(left_values, right_values)) { + std::string_view left_view(reinterpret_cast(left.ptr), length); + std::string_view right_view(reinterpret_cast(right.ptr), length); + ASSERT_EQ(left_view, right_view); + } + } else { + ASSERT_EQ(left_values, right_values); + } +} + // TODO: Assert on definition and repetition levels template void AssertColumnValues(std::shared_ptr> col, int64_t batch_size, @@ -154,9 +174,8 @@ void AssertColumnValues(std::shared_ptr> col, int64_t b auto levels_read = col->ReadBatch(batch_size, nullptr, nullptr, values.data(), &values_read); ASSERT_EQ(expected_levels_read, levels_read); - - ASSERT_EQ(expected_values, values); ASSERT_EQ(expected_values_read, values_read); + AssertColumnValuesEqual(col->descr(), expected_values, values); } template @@ -178,13 +197,13 @@ void AssertColumnValuesEqual(std::shared_ptr> left_col, ASSERT_EQ(expected_levels_read, levels_read); ASSERT_EQ(expected_values_read, values_read); - ASSERT_EQ(left_values, right_values); + AssertColumnValuesEqual(left_col->descr(), left_values, right_values); } template void AssertColumnValuesEqual(ParquetFileReader* file_reader, const std::string& left_col, const std::string& right_col, int64_t num_rows, - int64_t row_group = 0) { + int row_group = 0) { ARROW_SCOPED_TRACE("left_col = '", left_col, "', right_col = '", right_col, "'"); auto left_col_index = file_reader->metadata()->schema()->ColumnIndex(left_col);