Skip to content

Commit

Permalink
GH-39978: [C++][Parquet] Expand BYTE_STREAM_SPLIT to support FIXED_LE…
Browse files Browse the repository at this point in the history
…N_BYTE_ARRAY, INT32 and INT64 (#40094)

### What changes are included in this PR?

Implement the format addition described in https://issues.apache.org/jira/browse/PARQUET-2414 .

### Are these changes tested?

Yes.

### Are there any user-facing changes?

Yes (additional types supported for Parquet encoding).

* GitHub Issue: #39978

Authored-by: Antoine Pitrou <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
  • Loading branch information
pitrou authored Mar 19, 2024
1 parent 6c06b37 commit a364e4a
Show file tree
Hide file tree
Showing 10 changed files with 849 additions and 371 deletions.
126 changes: 92 additions & 34 deletions cpp/src/arrow/util/byte_stream_split_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@

#include "arrow/util/endian.h"
#include "arrow/util/simd.h"
#include "arrow/util/small_vector.h"
#include "arrow/util/ubsan.h"

#include <algorithm>
#include <array>
#include <cassert>
#include <cstdint>
#include <cstring>

#if defined(ARROW_HAVE_NEON) || defined(ARROW_HAVE_SSE4_2)
#include <xsimd/xsimd.hpp>
Expand All @@ -38,10 +41,11 @@ namespace arrow::util::internal {

#if defined(ARROW_HAVE_NEON) || defined(ARROW_HAVE_SSE4_2)
template <int kNumStreams>
void ByteStreamSplitDecodeSimd128(const uint8_t* data, int64_t num_values, int64_t stride,
uint8_t* out) {
void ByteStreamSplitDecodeSimd128(const uint8_t* data, int width, int64_t num_values,
int64_t stride, uint8_t* out) {
using simd_batch = xsimd::make_sized_batch_t<int8_t, 16>;

assert(width == kNumStreams);
static_assert(kNumStreams == 4 || kNumStreams == 8, "Invalid number of streams.");
constexpr int kNumStreamsLog2 = (kNumStreams == 8 ? 3 : 2);
constexpr int64_t kBlockSize = sizeof(simd_batch) * kNumStreams;
Expand Down Expand Up @@ -92,10 +96,11 @@ void ByteStreamSplitDecodeSimd128(const uint8_t* data, int64_t num_values, int64
}

template <int kNumStreams>
void ByteStreamSplitEncodeSimd128(const uint8_t* raw_values, const int64_t num_values,
uint8_t* output_buffer_raw) {
void ByteStreamSplitEncodeSimd128(const uint8_t* raw_values, int width,
const int64_t num_values, uint8_t* output_buffer_raw) {
using simd_batch = xsimd::make_sized_batch_t<int8_t, 16>;

assert(width == kNumStreams);
static_assert(kNumStreams == 4 || kNumStreams == 8, "Invalid number of streams.");
constexpr int kBlockSize = sizeof(simd_batch) * kNumStreams;

Expand Down Expand Up @@ -215,15 +220,17 @@ void ByteStreamSplitEncodeSimd128(const uint8_t* raw_values, const int64_t num_v

#if defined(ARROW_HAVE_AVX2)
template <int kNumStreams>
void ByteStreamSplitDecodeAvx2(const uint8_t* data, int64_t num_values, int64_t stride,
uint8_t* out) {
void ByteStreamSplitDecodeAvx2(const uint8_t* data, int width, int64_t num_values,
int64_t stride, uint8_t* out) {
assert(width == kNumStreams);
static_assert(kNumStreams == 4 || kNumStreams == 8, "Invalid number of streams.");
constexpr int kNumStreamsLog2 = (kNumStreams == 8 ? 3 : 2);
constexpr int64_t kBlockSize = sizeof(__m256i) * kNumStreams;

const int64_t size = num_values * kNumStreams;
if (size < kBlockSize) // Back to SSE for small size
return ByteStreamSplitDecodeSimd128<kNumStreams>(data, num_values, stride, out);
return ByteStreamSplitDecodeSimd128<kNumStreams>(data, width, num_values, stride,
out);
const int64_t num_blocks = size / kBlockSize;

// First handle suffix.
Expand Down Expand Up @@ -299,18 +306,19 @@ void ByteStreamSplitDecodeAvx2(const uint8_t* data, int64_t num_values, int64_t
}

template <int kNumStreams>
void ByteStreamSplitEncodeAvx2(const uint8_t* raw_values, const int64_t num_values,
uint8_t* output_buffer_raw) {
void ByteStreamSplitEncodeAvx2(const uint8_t* raw_values, int width,
const int64_t num_values, uint8_t* output_buffer_raw) {
assert(width == kNumStreams);
static_assert(kNumStreams == 4 || kNumStreams == 8, "Invalid number of streams.");
constexpr int kBlockSize = sizeof(__m256i) * kNumStreams;

if constexpr (kNumStreams == 8) // Back to SSE, currently no path for double.
return ByteStreamSplitEncodeSimd128<kNumStreams>(raw_values, num_values,
return ByteStreamSplitEncodeSimd128<kNumStreams>(raw_values, width, num_values,
output_buffer_raw);

const int64_t size = num_values * kNumStreams;
if (size < kBlockSize) // Back to SSE for small size
return ByteStreamSplitEncodeSimd128<kNumStreams>(raw_values, num_values,
return ByteStreamSplitEncodeSimd128<kNumStreams>(raw_values, width, num_values,
output_buffer_raw);
const int64_t num_blocks = size / kBlockSize;
const __m256i* raw_values_simd = reinterpret_cast<const __m256i*>(raw_values);
Expand Down Expand Up @@ -373,25 +381,26 @@ void ByteStreamSplitEncodeAvx2(const uint8_t* raw_values, const int64_t num_valu

#if defined(ARROW_HAVE_SIMD_SPLIT)
template <int kNumStreams>
void inline ByteStreamSplitDecodeSimd(const uint8_t* data, int64_t num_values,
void inline ByteStreamSplitDecodeSimd(const uint8_t* data, int width, int64_t num_values,
int64_t stride, uint8_t* out) {
#if defined(ARROW_HAVE_AVX2)
return ByteStreamSplitDecodeAvx2<kNumStreams>(data, num_values, stride, out);
return ByteStreamSplitDecodeAvx2<kNumStreams>(data, width, num_values, stride, out);
#elif defined(ARROW_HAVE_SSE4_2) || defined(ARROW_HAVE_NEON)
return ByteStreamSplitDecodeSimd128<kNumStreams>(data, num_values, stride, out);
return ByteStreamSplitDecodeSimd128<kNumStreams>(data, width, num_values, stride, out);
#else
#error "ByteStreamSplitDecodeSimd not implemented"
#endif
}

template <int kNumStreams>
void inline ByteStreamSplitEncodeSimd(const uint8_t* raw_values, const int64_t num_values,
void inline ByteStreamSplitEncodeSimd(const uint8_t* raw_values, int width,
const int64_t num_values,
uint8_t* output_buffer_raw) {
#if defined(ARROW_HAVE_AVX2)
return ByteStreamSplitEncodeAvx2<kNumStreams>(raw_values, num_values,
return ByteStreamSplitEncodeAvx2<kNumStreams>(raw_values, width, num_values,
output_buffer_raw);
#elif defined(ARROW_HAVE_SSE4_2) || defined(ARROW_HAVE_NEON)
return ByteStreamSplitEncodeSimd128<kNumStreams>(raw_values, num_values,
return ByteStreamSplitEncodeSimd128<kNumStreams>(raw_values, width, num_values,
output_buffer_raw);
#else
#error "ByteStreamSplitEncodeSimd not implemented"
Expand Down Expand Up @@ -492,45 +501,94 @@ inline void DoMergeStreams(const uint8_t** src_streams, int width, int64_t nvalu
}

template <int kNumStreams>
void ByteStreamSplitEncodeScalar(const uint8_t* raw_values, const int64_t num_values,
uint8_t* output_buffer_raw) {
void ByteStreamSplitEncodeScalar(const uint8_t* raw_values, int width,
const int64_t num_values, uint8_t* out) {
assert(width == kNumStreams);
std::array<uint8_t*, kNumStreams> dest_streams;
for (int stream = 0; stream < kNumStreams; ++stream) {
dest_streams[stream] = &output_buffer_raw[stream * num_values];
dest_streams[stream] = &out[stream * num_values];
}
DoSplitStreams(raw_values, kNumStreams, num_values, dest_streams.data());
}

inline void ByteStreamSplitEncodeScalarDynamic(const uint8_t* raw_values, int width,
const int64_t num_values, uint8_t* out) {
::arrow::internal::SmallVector<uint8_t*, 16> dest_streams;
dest_streams.resize(width);
for (int stream = 0; stream < width; ++stream) {
dest_streams[stream] = &out[stream * num_values];
}
DoSplitStreams(raw_values, width, num_values, dest_streams.data());
}

template <int kNumStreams>
void ByteStreamSplitDecodeScalar(const uint8_t* data, int64_t num_values, int64_t stride,
uint8_t* out) {
void ByteStreamSplitDecodeScalar(const uint8_t* data, int width, int64_t num_values,
int64_t stride, uint8_t* out) {
assert(width == kNumStreams);
std::array<const uint8_t*, kNumStreams> src_streams;
for (int stream = 0; stream < kNumStreams; ++stream) {
src_streams[stream] = &data[stream * stride];
}
DoMergeStreams(src_streams.data(), kNumStreams, num_values, out);
}

template <int kNumStreams>
void inline ByteStreamSplitEncode(const uint8_t* raw_values, const int64_t num_values,
uint8_t* output_buffer_raw) {
inline void ByteStreamSplitDecodeScalarDynamic(const uint8_t* data, int width,
int64_t num_values, int64_t stride,
uint8_t* out) {
::arrow::internal::SmallVector<const uint8_t*, 16> src_streams;
src_streams.resize(width);
for (int stream = 0; stream < width; ++stream) {
src_streams[stream] = &data[stream * stride];
}
DoMergeStreams(src_streams.data(), width, num_values, out);
}

inline void ByteStreamSplitEncode(const uint8_t* raw_values, int width,
const int64_t num_values, uint8_t* out) {
#if defined(ARROW_HAVE_SIMD_SPLIT)
return ByteStreamSplitEncodeSimd<kNumStreams>(raw_values, num_values,
output_buffer_raw);
#define ByteStreamSplitEncodePerhapsSimd ByteStreamSplitEncodeSimd
#else
return ByteStreamSplitEncodeScalar<kNumStreams>(raw_values, num_values,
output_buffer_raw);
#define ByteStreamSplitEncodePerhapsSimd ByteStreamSplitEncodeScalar
#endif
switch (width) {
case 1:
memcpy(out, raw_values, num_values);
return;
case 2:
return ByteStreamSplitEncodeScalar<2>(raw_values, width, num_values, out);
case 4:
return ByteStreamSplitEncodePerhapsSimd<4>(raw_values, width, num_values, out);
case 8:
return ByteStreamSplitEncodePerhapsSimd<8>(raw_values, width, num_values, out);
case 16:
return ByteStreamSplitEncodeScalar<16>(raw_values, width, num_values, out);
}
return ByteStreamSplitEncodeScalarDynamic(raw_values, width, num_values, out);
#undef ByteStreamSplitEncodePerhapsSimd
}

template <int kNumStreams>
void inline ByteStreamSplitDecode(const uint8_t* data, int64_t num_values, int64_t stride,
uint8_t* out) {
inline void ByteStreamSplitDecode(const uint8_t* data, int width, int64_t num_values,
int64_t stride, uint8_t* out) {
#if defined(ARROW_HAVE_SIMD_SPLIT)
return ByteStreamSplitDecodeSimd<kNumStreams>(data, num_values, stride, out);
#define ByteStreamSplitDecodePerhapsSimd ByteStreamSplitDecodeSimd
#else
return ByteStreamSplitDecodeScalar<kNumStreams>(data, num_values, stride, out);
#define ByteStreamSplitDecodePerhapsSimd ByteStreamSplitDecodeScalar
#endif
switch (width) {
case 1:
memcpy(out, data, num_values);
return;
case 2:
return ByteStreamSplitDecodeScalar<2>(data, width, num_values, stride, out);
case 4:
return ByteStreamSplitDecodePerhapsSimd<4>(data, width, num_values, stride, out);
case 8:
return ByteStreamSplitDecodePerhapsSimd<8>(data, width, num_values, stride, out);
case 16:
return ByteStreamSplitDecodeScalar<16>(data, width, num_values, stride, out);
}
return ByteStreamSplitDecodeScalarDynamic(data, width, num_values, stride, out);
#undef ByteStreamSplitDecodePerhapsSimd
}

} // namespace arrow::util::internal
72 changes: 46 additions & 26 deletions cpp/src/arrow/util/byte_stream_split_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

#include <algorithm>
#include <array>
#include <cmath>
#include <cstddef>
#include <functional>
Expand All @@ -35,7 +36,8 @@

namespace arrow::util::internal {

using ByteStreamSplitTypes = ::testing::Types<float, double>;
using ByteStreamSplitTypes =
::testing::Types<int8_t, int16_t, int32_t, int64_t, std::array<uint8_t, 3>>;

template <typename Func>
struct NamedFunc {
Expand Down Expand Up @@ -63,23 +65,12 @@ class TestByteStreamSplitSpecialized : public ::testing::Test {
public:
static constexpr int kWidth = static_cast<int>(sizeof(T));

using EncodeFunc = NamedFunc<std::function<decltype(ByteStreamSplitEncode<kWidth>)>>;
using DecodeFunc = NamedFunc<std::function<decltype(ByteStreamSplitDecode<kWidth>)>>;
using EncodeFunc = NamedFunc<std::function<decltype(ByteStreamSplitEncode)>>;
using DecodeFunc = NamedFunc<std::function<decltype(ByteStreamSplitDecode)>>;

void SetUp() override {
encode_funcs_.push_back({"reference", &ReferenceEncode});
encode_funcs_.push_back({"scalar", &ByteStreamSplitEncodeScalar<kWidth>});
decode_funcs_.push_back({"scalar", &ByteStreamSplitDecodeScalar<kWidth>});
#if defined(ARROW_HAVE_SIMD_SPLIT)
encode_funcs_.push_back({"simd", &ByteStreamSplitEncodeSimd<kWidth>});
decode_funcs_.push_back({"simd", &ByteStreamSplitDecodeSimd<kWidth>});
encode_funcs_.push_back({"simd128", &ByteStreamSplitEncodeSimd128<kWidth>});
decode_funcs_.push_back({"simd128", &ByteStreamSplitDecodeSimd128<kWidth>});
#endif
#if defined(ARROW_HAVE_AVX2)
encode_funcs_.push_back({"avx2", &ByteStreamSplitEncodeAvx2<kWidth>});
decode_funcs_.push_back({"avx2", &ByteStreamSplitDecodeAvx2<kWidth>});
#endif
decode_funcs_ = MakeDecodeFuncs();
encode_funcs_ = MakeEncodeFuncs();
}

void TestRoundtrip(int64_t num_values) {
Expand All @@ -92,12 +83,12 @@ class TestByteStreamSplitSpecialized : public ::testing::Test {
for (const auto& encode_func : encode_funcs_) {
ARROW_SCOPED_TRACE("encode_func = ", encode_func);
encoded.assign(encoded.size(), 0);
encode_func.func(reinterpret_cast<const uint8_t*>(input.data()), num_values,
encode_func.func(reinterpret_cast<const uint8_t*>(input.data()), kWidth, num_values,
encoded.data());
for (const auto& decode_func : decode_funcs_) {
ARROW_SCOPED_TRACE("decode_func = ", decode_func);
decoded.assign(decoded.size(), T{});
decode_func.func(encoded.data(), num_values, /*stride=*/num_values,
decode_func.func(encoded.data(), kWidth, num_values, /*stride=*/num_values,
reinterpret_cast<uint8_t*>(decoded.data()));
ASSERT_EQ(decoded, input);
}
Expand All @@ -123,7 +114,8 @@ class TestByteStreamSplitSpecialized : public ::testing::Test {
int64_t offset = 0;
while (offset < num_values) {
auto chunk_size = std::min<int64_t>(num_values - offset, chunk_size_dist(gen));
decode_func.func(encoded.data() + offset, chunk_size, /*stride=*/num_values,
decode_func.func(encoded.data() + offset, kWidth, chunk_size,
/*stride=*/num_values,
reinterpret_cast<uint8_t*>(decoded.data() + offset));
offset += chunk_size;
}
Expand All @@ -141,20 +133,48 @@ class TestByteStreamSplitSpecialized : public ::testing::Test {
static std::vector<T> MakeRandomInput(int64_t num_values) {
std::vector<T> input(num_values);
random_bytes(kWidth * num_values, seed_++, reinterpret_cast<uint8_t*>(input.data()));
// Avoid NaNs to ease comparison
for (auto& value : input) {
if (std::isnan(value)) {
value = nan_replacement_++;
}
}
return input;
}

template <bool kSimdImplemented = (kWidth == 4 || kWidth == 8)>
static std::vector<DecodeFunc> MakeDecodeFuncs() {
std::vector<DecodeFunc> funcs;
funcs.push_back({"scalar_dynamic", &ByteStreamSplitDecodeScalarDynamic});
funcs.push_back({"scalar", &ByteStreamSplitDecodeScalar<kWidth>});
#if defined(ARROW_HAVE_SIMD_SPLIT)
if constexpr (kSimdImplemented) {
funcs.push_back({"simd", &ByteStreamSplitDecodeSimd<kWidth>});
funcs.push_back({"simd128", &ByteStreamSplitDecodeSimd128<kWidth>});
#if defined(ARROW_HAVE_AVX2)
funcs.push_back({"avx2", &ByteStreamSplitDecodeAvx2<kWidth>});
#endif
}
#endif // defined(ARROW_HAVE_SIMD_SPLIT)
return funcs;
}

template <bool kSimdImplemented = (kWidth == 4 || kWidth == 8)>
static std::vector<EncodeFunc> MakeEncodeFuncs() {
std::vector<EncodeFunc> funcs;
funcs.push_back({"reference", &ReferenceByteStreamSplitEncode});
funcs.push_back({"scalar_dynamic", &ByteStreamSplitEncodeScalarDynamic});
funcs.push_back({"scalar", &ByteStreamSplitEncodeScalar<kWidth>});
#if defined(ARROW_HAVE_SIMD_SPLIT)
if constexpr (kSimdImplemented) {
funcs.push_back({"simd", &ByteStreamSplitEncodeSimd<kWidth>});
funcs.push_back({"simd128", &ByteStreamSplitEncodeSimd128<kWidth>});
#if defined(ARROW_HAVE_AVX2)
funcs.push_back({"avx2", &ByteStreamSplitEncodeAvx2<kWidth>});
#endif
}
#endif // defined(ARROW_HAVE_SIMD_SPLIT)
return funcs;
}

std::vector<EncodeFunc> encode_funcs_;
std::vector<DecodeFunc> decode_funcs_;

static inline uint32_t seed_ = 42;
static inline T nan_replacement_ = 0;
};

TYPED_TEST_SUITE(TestByteStreamSplitSpecialized, ByteStreamSplitTypes);
Expand Down
Loading

0 comments on commit a364e4a

Please sign in to comment.