Skip to content

Commit

Permalink
apacheGH-43927: [C++] Make ChunkResolver::ResolveMany output a list o…
Browse files Browse the repository at this point in the history
…f ChunkLocations (apache#43928)

### Rationale for this change

Better cache locality and it's simpler. Easier to use with a single allocation.

### What changes are included in this PR?

Change of the `ChunkResolver::ResolveMany()` signature.

### Are these changes tested?

Yes, by the tests in `chunked_array_test.cc`

### Are there any user-facing changes?

No because `ChunkResolver` is still in the `internal` namespace.
* GitHub Issue: apache#43927

Authored-by: Felipe Oliveira Carvalho <[email protected]>
Signed-off-by: Felipe Oliveira Carvalho <[email protected]>
  • Loading branch information
felipecrv authored and zanmato1984 committed Sep 6, 2024
1 parent 43fe102 commit 80f2a32
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 79 deletions.
74 changes: 35 additions & 39 deletions cpp/src/arrow/chunk_resolver.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,42 +60,38 @@ inline std::vector<int64_t> MakeChunksOffsets(const std::vector<T>& chunks) {
template <typename IndexType>
void ResolveManyInline(size_t num_offsets, const int64_t* signed_offsets,
int64_t n_indices, const IndexType* logical_index_vec,
IndexType* out_chunk_index_vec, IndexType chunk_hint,
IndexType* out_index_in_chunk_vec) {
TypedChunkLocation<IndexType>* out_chunk_location_vec,
IndexType chunk_hint) {
auto* offsets = reinterpret_cast<const uint64_t*>(signed_offsets);
const auto num_chunks = static_cast<IndexType>(num_offsets - 1);
// chunk_hint in [0, num_offsets) per the precondition.
for (int64_t i = 0; i < n_indices; i++) {
const auto index = static_cast<uint64_t>(logical_index_vec[i]);
auto typed_logical_index = logical_index_vec[i];
const auto index = static_cast<uint64_t>(typed_logical_index);
// use or update chunk_hint
if (index >= offsets[chunk_hint] &&
(chunk_hint == num_chunks || index < offsets[chunk_hint + 1])) {
out_chunk_index_vec[i] = chunk_hint; // hint is correct!
continue;
// hint is correct!
} else {
// lo < hi is guaranteed by `num_offsets = chunks.size() + 1`
auto chunk_index =
ChunkResolver::Bisect(index, offsets, /*lo=*/0, /*hi=*/num_offsets);
chunk_hint = static_cast<IndexType>(chunk_index);
}
// lo < hi is guaranteed by `num_offsets = chunks.size() + 1`
auto chunk_index =
ChunkResolver::Bisect(index, offsets, /*lo=*/0, /*hi=*/num_offsets);
chunk_hint = static_cast<IndexType>(chunk_index);
out_chunk_index_vec[i] = chunk_hint;
}
if (out_index_in_chunk_vec != NULLPTR) {
for (int64_t i = 0; i < n_indices; i++) {
auto logical_index = logical_index_vec[i];
auto chunk_index = out_chunk_index_vec[i];
// chunk_index is in [0, chunks.size()] no matter what the
// value of logical_index is, so it's always safe to dereference
// offset_ as it contains chunks.size()+1 values.
out_index_in_chunk_vec[i] =
logical_index - static_cast<IndexType>(offsets[chunk_index]);
out_chunk_location_vec[i].chunk_index = chunk_hint;
// chunk_index is in [0, chunks.size()] no matter what the
// value of logical_index is, so it's always safe to dereference
// offset_ as it contains chunks.size()+1 values.
out_chunk_location_vec[i].index_in_chunk =
typed_logical_index - static_cast<IndexType>(offsets[chunk_hint]);
#if defined(ARROW_VALGRIND) || defined(ADDRESS_SANITIZER)
// Make it more likely that Valgrind/ASAN can catch an invalid memory
// access by poisoning out_index_in_chunk_vec[i] when the logical
// index is out-of-bounds.
if (chunk_index == num_chunks) {
out_index_in_chunk_vec[i] = std::numeric_limits<IndexType>::max();
}
#endif
// Make it more likely that Valgrind/ASAN can catch an invalid memory
// access by poisoning the index-in-chunk value when the logical
// index is out-of-bounds.
if (chunk_hint == num_chunks) {
out_chunk_location_vec[i].index_in_chunk = std::numeric_limits<IndexType>::max();
}
#endif
}
}

Expand Down Expand Up @@ -130,31 +126,31 @@ ChunkResolver& ChunkResolver::operator=(const ChunkResolver& other) noexcept {
}

void ChunkResolver::ResolveManyImpl(int64_t n_indices, const uint8_t* logical_index_vec,
uint8_t* out_chunk_index_vec, uint8_t chunk_hint,
uint8_t* out_index_in_chunk_vec) const {
TypedChunkLocation<uint8_t>* out_chunk_location_vec,
uint8_t chunk_hint) const {
ResolveManyInline(offsets_.size(), offsets_.data(), n_indices, logical_index_vec,
out_chunk_index_vec, chunk_hint, out_index_in_chunk_vec);
out_chunk_location_vec, chunk_hint);
}

void ChunkResolver::ResolveManyImpl(int64_t n_indices, const uint32_t* logical_index_vec,
uint32_t* out_chunk_index_vec, uint32_t chunk_hint,
uint32_t* out_index_in_chunk_vec) const {
TypedChunkLocation<uint32_t>* out_chunk_location_vec,
uint32_t chunk_hint) const {
ResolveManyInline(offsets_.size(), offsets_.data(), n_indices, logical_index_vec,
out_chunk_index_vec, chunk_hint, out_index_in_chunk_vec);
out_chunk_location_vec, chunk_hint);
}

void ChunkResolver::ResolveManyImpl(int64_t n_indices, const uint16_t* logical_index_vec,
uint16_t* out_chunk_index_vec, uint16_t chunk_hint,
uint16_t* out_index_in_chunk_vec) const {
TypedChunkLocation<uint16_t>* out_chunk_location_vec,
uint16_t chunk_hint) const {
ResolveManyInline(offsets_.size(), offsets_.data(), n_indices, logical_index_vec,
out_chunk_index_vec, chunk_hint, out_index_in_chunk_vec);
out_chunk_location_vec, chunk_hint);
}

void ChunkResolver::ResolveManyImpl(int64_t n_indices, const uint64_t* logical_index_vec,
uint64_t* out_chunk_index_vec, uint64_t chunk_hint,
uint64_t* out_index_in_chunk_vec) const {
TypedChunkLocation<uint64_t>* out_chunk_location_vec,
uint64_t chunk_hint) const {
ResolveManyInline(offsets_.size(), offsets_.data(), n_indices, logical_index_vec,
out_chunk_index_vec, chunk_hint, out_index_in_chunk_vec);
out_chunk_location_vec, chunk_hint);
}

} // namespace arrow::internal
61 changes: 34 additions & 27 deletions cpp/src/arrow/chunk_resolver.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,28 +31,34 @@ namespace arrow::internal {

struct ChunkResolver;

struct ChunkLocation {
template <typename IndexType>
struct TypedChunkLocation {
/// \brief Index of the chunk in the array of chunks
///
/// The value is always in the range `[0, chunks.size()]`. `chunks.size()` is used
/// to represent out-of-bounds locations.
int64_t chunk_index = 0;
IndexType chunk_index = 0;

/// \brief Index of the value in the chunk
///
/// The value is UNDEFINED if chunk_index >= chunks.size()
int64_t index_in_chunk = 0;
IndexType index_in_chunk = 0;

ChunkLocation() = default;
TypedChunkLocation() = default;

ChunkLocation(int64_t chunk_index, int64_t index_in_chunk)
: chunk_index(chunk_index), index_in_chunk(index_in_chunk) {}
TypedChunkLocation(IndexType chunk_index, IndexType index_in_chunk)
: chunk_index(chunk_index), index_in_chunk(index_in_chunk) {
static_assert(sizeof(TypedChunkLocation<IndexType>) == 2 * sizeof(IndexType));
static_assert(alignof(TypedChunkLocation<IndexType>) == alignof(IndexType));
}

bool operator==(ChunkLocation other) const {
bool operator==(TypedChunkLocation other) const {
return chunk_index == other.chunk_index && index_in_chunk == other.index_in_chunk;
}
};

using ChunkLocation = TypedChunkLocation<int64_t>;

/// \brief An utility that incrementally resolves logical indices into
/// physical indices in a chunked array.
struct ARROW_EXPORT ChunkResolver {
Expand Down Expand Up @@ -144,26 +150,25 @@ struct ARROW_EXPORT ChunkResolver {
///
/// \pre 0 <= logical_index_vec[i] < logical_array_length()
/// (for well-defined and valid chunk index results)
/// \pre out_chunk_index_vec has space for `n_indices`
/// \pre out_chunk_location_vec has space for `n_indices` locations
/// \pre chunk_hint in [0, chunks.size()]
/// \post out_chunk_index_vec[i] in [0, chunks.size()] for i in [0, n)
/// \post out_chunk_location_vec[i].chunk_index in [0, chunks.size()] for i in [0, n)
/// \post if logical_index_vec[i] >= chunked_array.length(), then
/// out_chunk_index_vec[i] == chunks.size()
/// and out_index_in_chunk_vec[i] is UNDEFINED (can be out-of-bounds)
/// \post if logical_index_vec[i] < 0, then both out_chunk_index_vec[i] and
/// out_index_in_chunk_vec[i] are UNDEFINED
/// out_chunk_location_vec[i].chunk_index == chunks.size()
/// and out_chunk_location_vec[i].index_in_chunk is UNDEFINED (can be
/// out-of-bounds)
/// \post if logical_index_vec[i] < 0, then both values in out_chunk_index_vec[i]
/// are UNDEFINED
///
/// \param n_indices The number of logical indices to resolve
/// \param logical_index_vec The logical indices to resolve
/// \param out_chunk_index_vec The output array where the chunk indices will be written
/// \param out_chunk_location_vec The output array where the locations will be written
/// \param chunk_hint 0 or the last chunk_index produced by ResolveMany
/// \param out_index_in_chunk_vec If not NULLPTR, the output array where the
/// within-chunk indices will be written
/// \return false iff chunks.size() > std::numeric_limits<IndexType>::max()
template <typename IndexType>
[[nodiscard]] bool ResolveMany(int64_t n_indices, const IndexType* logical_index_vec,
IndexType* out_chunk_index_vec, IndexType chunk_hint = 0,
IndexType* out_index_in_chunk_vec = NULLPTR) const {
TypedChunkLocation<IndexType>* out_chunk_location_vec,
IndexType chunk_hint = 0) const {
if constexpr (sizeof(IndexType) < sizeof(uint64_t)) {
// The max value returned by Bisect is `offsets.size() - 1` (= chunks.size()).
constexpr uint64_t kMaxIndexTypeValue = std::numeric_limits<IndexType>::max();
Expand All @@ -188,13 +193,11 @@ struct ARROW_EXPORT ChunkResolver {
// logical index in the chunked array.
using U = std::make_unsigned_t<IndexType>;
ResolveManyImpl(n_indices, reinterpret_cast<const U*>(logical_index_vec),
reinterpret_cast<U*>(out_chunk_index_vec),
static_cast<U>(chunk_hint),
reinterpret_cast<U*>(out_index_in_chunk_vec));
reinterpret_cast<TypedChunkLocation<U>*>(out_chunk_location_vec),
static_cast<U>(chunk_hint));
} else {
static_assert(std::is_unsigned_v<IndexType>);
ResolveManyImpl(n_indices, logical_index_vec, out_chunk_index_vec, chunk_hint,
out_index_in_chunk_vec);
ResolveManyImpl(n_indices, logical_index_vec, out_chunk_location_vec, chunk_hint);
}
return true;
}
Expand Down Expand Up @@ -226,10 +229,14 @@ struct ARROW_EXPORT ChunkResolver {

/// \pre all the pre-conditions of ChunkResolver::ResolveMany()
/// \pre num_offsets - 1 <= std::numeric_limits<IndexType>::max()
void ResolveManyImpl(int64_t, const uint8_t*, uint8_t*, uint8_t, uint8_t*) const;
void ResolveManyImpl(int64_t, const uint16_t*, uint16_t*, uint16_t, uint16_t*) const;
void ResolveManyImpl(int64_t, const uint32_t*, uint32_t*, uint32_t, uint32_t*) const;
void ResolveManyImpl(int64_t, const uint64_t*, uint64_t*, uint64_t, uint64_t*) const;
void ResolveManyImpl(int64_t, const uint8_t*, TypedChunkLocation<uint8_t>*,
uint8_t) const;
void ResolveManyImpl(int64_t, const uint16_t*, TypedChunkLocation<uint16_t>*,
uint16_t) const;
void ResolveManyImpl(int64_t, const uint32_t*, TypedChunkLocation<uint32_t>*,
uint32_t) const;
void ResolveManyImpl(int64_t, const uint64_t*, TypedChunkLocation<uint64_t>*,
uint64_t) const;

public:
/// \brief Find the index of the chunk that contains the logical index.
Expand Down
29 changes: 16 additions & 13 deletions cpp/src/arrow/chunked_array_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ namespace arrow {

using internal::ChunkLocation;
using internal::ChunkResolver;
using internal::TypedChunkLocation;

class TestChunkedArray : public ::testing::Test {
protected:
Expand Down Expand Up @@ -380,24 +381,26 @@ class TestChunkResolverMany : public ::testing::Test {
Result<std::vector<ChunkLocation>> ResolveMany(
const ChunkResolver& resolver, const std::vector<IndexType>& logical_index_vec) {
const size_t n = logical_index_vec.size();
std::vector<IndexType> chunk_index_vec;
chunk_index_vec.resize(n);
std::vector<IndexType> index_in_chunk_vec;
index_in_chunk_vec.resize(n);
std::vector<TypedChunkLocation<IndexType>> chunk_location_vec;
chunk_location_vec.resize(n);
bool valid = resolver.ResolveMany<IndexType>(
static_cast<int64_t>(n), logical_index_vec.data(), chunk_index_vec.data(), 0,
index_in_chunk_vec.data());
static_cast<int64_t>(n), logical_index_vec.data(), chunk_location_vec.data(), 0);
if (ARROW_PREDICT_FALSE(!valid)) {
return Status::Invalid("index type doesn't fit possible chunk indexes");
}
std::vector<ChunkLocation> locations;
locations.reserve(n);
for (size_t i = 0; i < n; i++) {
auto chunk_index = static_cast<int64_t>(chunk_index_vec[i]);
auto index_in_chunk = static_cast<int64_t>(index_in_chunk_vec[i]);
locations.emplace_back(chunk_index, index_in_chunk);
if constexpr (std::is_same<decltype(ChunkLocation::chunk_index), IndexType>::value) {
return chunk_location_vec;
} else {
std::vector<ChunkLocation> locations;
locations.reserve(n);
for (size_t i = 0; i < n; i++) {
auto loc = chunk_location_vec[i];
auto chunk_index = static_cast<int64_t>(loc.chunk_index);
auto index_in_chunk = static_cast<int64_t>(loc.index_in_chunk);
locations.emplace_back(chunk_index, index_in_chunk);
}
return locations;
}
return locations;
}

void CheckResolveMany(const ChunkResolver& resolver,
Expand Down

0 comments on commit 80f2a32

Please sign in to comment.