Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-41334: [C++][Acero] Use per-node basis temp vector stack to mitigate overflow #41335

Merged
merged 20 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cpp/src/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -716,7 +716,8 @@ set(ARROW_COMPUTE_SRCS
compute/row/compare_internal.cc
compute/row/grouper.cc
compute/row/row_internal.cc
compute/util.cc)
compute/util.cc
compute/util_internal.cc)

append_runtime_avx2_src(ARROW_COMPUTE_SRCS compute/key_hash_internal_avx2.cc)
append_runtime_avx2_bmi2_src(ARROW_COMPUTE_SRCS compute/key_map_internal_avx2.cc)
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/acero/exec_plan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ struct ExecPlanImpl : public ExecPlan {
Future<> scheduler_finished = arrow::util::AsyncTaskScheduler::Make(
[this](arrow::util::AsyncTaskScheduler* async_scheduler) {
QueryContext* ctx = query_context();
RETURN_NOT_OK(ctx->Init(ctx->max_concurrency(), async_scheduler));
RETURN_NOT_OK(ctx->Init(async_scheduler));

#ifdef ARROW_WITH_OPENTELEMETRY
if (HasMetadata()) {
Expand Down
38 changes: 26 additions & 12 deletions cpp/src/arrow/acero/hash_join_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -497,11 +497,11 @@ struct BloomFilterPushdownContext {
using BuildFinishedCallback = std::function<Status(size_t, AccumulationQueue)>;
using FiltersReceivedCallback = std::function<Status(size_t)>;
using FilterFinishedCallback = std::function<Status(size_t, AccumulationQueue)>;
void Init(HashJoinNode* owner, size_t num_threads,
RegisterTaskGroupCallback register_task_group_callback,
StartTaskGroupCallback start_task_group_callback,
FiltersReceivedCallback on_bloom_filters_received, bool disable_bloom_filter,
bool use_sync_execution);
Status Init(HashJoinNode* owner, size_t num_threads,
RegisterTaskGroupCallback register_task_group_callback,
StartTaskGroupCallback start_task_group_callback,
FiltersReceivedCallback on_bloom_filters_received,
bool disable_bloom_filter, bool use_sync_execution);

Status StartProducing(size_t thread_index);

Expand Down Expand Up @@ -559,8 +559,7 @@ struct BloomFilterPushdownContext {
std::vector<uint32_t> hashes(batch.length);
std::vector<uint8_t> bv(bit_vector_bytes);

ARROW_ASSIGN_OR_RAISE(arrow::util::TempVectorStack * stack,
ctx_->GetTempStack(thread_index));
arrow::util::TempVectorStack* stack = &tld_[thread_index].stack;

// Start with full selection for the current batch
memset(selected.data(), 0xff, bit_vector_bytes);
Expand Down Expand Up @@ -654,7 +653,17 @@ struct BloomFilterPushdownContext {
FiltersReceivedCallback all_received_callback_;
FilterFinishedCallback on_finished_;
} eval_;

static constexpr auto kTempStackUsage =
Hashing32::kHashBatchTempStackUsage +
(sizeof(uint32_t) + /*extra=*/1) * arrow::util::MiniBatch::kMiniBatchLength;

struct ThreadLocalData {
arrow::util::TempVectorStack stack;
};
std::vector<ThreadLocalData> tld_;
};

bool HashJoinSchema::HasDictionaries() const {
for (int side = 0; side <= 1; ++side) {
for (int icol = 0; icol < proj_maps[side].num_cols(HashJoinProjection::INPUT);
Expand Down Expand Up @@ -930,7 +939,7 @@ class HashJoinNode : public ExecNode, public TracedNode {
// we will change it back to just the CPU's thread pool capacity.
size_t num_threads = (GetCpuThreadPoolCapacity() + io::GetIOThreadPoolCapacity() + 1);

pushdown_context_.Init(
RETURN_NOT_OK(pushdown_context_.Init(
this, num_threads,
[ctx](std::function<Status(size_t, int64_t)> fn,
std::function<Status(size_t)> on_finished) {
Expand All @@ -940,7 +949,7 @@ class HashJoinNode : public ExecNode, public TracedNode {
return ctx->StartTaskGroup(task_group_id, num_tasks);
},
[this](size_t thread_index) { return OnFiltersReceived(thread_index); },
disable_bloom_filter_, use_sync_execution);
disable_bloom_filter_, use_sync_execution));

RETURN_NOT_OK(impl_->Init(
ctx, join_type_, num_threads, &(schema_mgr_->proj_maps[0]),
Expand Down Expand Up @@ -1037,7 +1046,7 @@ class HashJoinNode : public ExecNode, public TracedNode {
BloomFilterPushdownContext pushdown_context_;
};

void BloomFilterPushdownContext::Init(
Status BloomFilterPushdownContext::Init(
HashJoinNode* owner, size_t num_threads,
RegisterTaskGroupCallback register_task_group_callback,
StartTaskGroupCallback start_task_group_callback,
Expand Down Expand Up @@ -1074,6 +1083,12 @@ void BloomFilterPushdownContext::Init(
return eval_.on_finished_(thread_index, std::move(eval_.batches_));
});
start_task_group_callback_ = std::move(start_task_group_callback);
tld_.resize(num_threads);
for (auto& local_data : tld_) {
RETURN_NOT_OK(local_data.stack.Init(ctx_->memory_pool(), kTempStackUsage));
}

return Status::OK();
}

Status BloomFilterPushdownContext::StartProducing(size_t thread_index) {
Expand Down Expand Up @@ -1124,8 +1139,7 @@ Status BloomFilterPushdownContext::BuildBloomFilter_exec_task(size_t thread_inde
}
ARROW_ASSIGN_OR_RAISE(ExecBatch key_batch, ExecBatch::Make(std::move(key_columns)));

ARROW_ASSIGN_OR_RAISE(arrow::util::TempVectorStack * stack,
ctx_->GetTempStack(thread_index));
arrow::util::TempVectorStack* stack = &tld_[thread_index].stack;
arrow::util::TempVectorHolder<uint32_t> hash_holder(
stack, arrow::util::MiniBatch::kMiniBatchLength);
uint32_t* hashes = hash_holder.mutable_data();
Expand Down
52 changes: 52 additions & 0 deletions cpp/src/arrow/acero/hash_join_node_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "arrow/api.h"
#include "arrow/compute/kernels/row_encoder_internal.h"
#include "arrow/compute/kernels/test_util.h"
#include "arrow/compute/light_array_internal.h"
#include "arrow/testing/extension_type.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/testing/matchers.h"
Expand All @@ -41,6 +42,7 @@ namespace arrow {

using compute::call;
using compute::default_exec_context;
using compute::ExecBatchBuilder;
using compute::ExecSpan;
using compute::field_ref;
using compute::SortIndices;
Expand Down Expand Up @@ -3201,5 +3203,55 @@ TEST(HashJoin, ChainedIntegerHashJoins) {
}
}

// Test that a large number of joins don't overflow the temp vector stack, like GH-39582
// and GH-39951.
TEST(HashJoin, ManyJoins) {
// The idea of this case is to create many nested join nodes that may possibly cause
// recursive usage of temp vector stack. To make sure that the recursion happens:
// 1. A left-deep join tree is created so that the left-most (the final probe side)
// table will go through all the hash tables from the right side.
// 2. Left-outer join is used so that every join will increase the cardinality.
// 3. The left-most table contains rows of unique integers from 0 to N.
// 4. Each right table at level i contains two rows of integer i, so that the probing of
// each level will increase the result by one row.
// 5. The left-most table is a single batch of enough rows, so that at each level, the
// probing will accumulate enough result rows to have to output to the subsequent level
// before finishing the current batch (releasing the buffer allocated on the temp vector
// stack), which is essentially the recursive usage of the temp vector stack.

// A fair number of joins to guarantee temp vector stack overflow before GH-41335.
const int num_joins = 64;

// `ExecBatchBuilder::num_rows_max()` is the number of rows for swiss join to accumulate
// before outputting.
const int num_left_rows = ExecBatchBuilder::num_rows_max();
ASSERT_OK_AND_ASSIGN(
auto left_batches,
MakeIntegerBatches({[](int row_id) -> int64_t { return row_id; }},
schema({field("l_key", int32())}),
/*num_batches=*/1, /*batch_size=*/num_left_rows));
Declaration root{"exec_batch_source",
ExecBatchSourceNodeOptions(std::move(left_batches.schema),
std::move(left_batches.batches))};

HashJoinNodeOptions join_opts(JoinType::LEFT_OUTER, /*left_keys=*/{"l_key"},
/*right_keys=*/{"r_key"});

for (int i = 0; i < num_joins; ++i) {
ASSERT_OK_AND_ASSIGN(auto right_batches,
MakeIntegerBatches({[i](int) -> int64_t { return i; }},
schema({field("r_key", int32())}),
/*num_batches=*/1, /*batch_size=*/2));
Declaration table{"exec_batch_source",
ExecBatchSourceNodeOptions(std::move(right_batches.schema),
std::move(right_batches.batches))};

Declaration new_root{"hashjoin", {std::move(root), std::move(table)}, join_opts};
root = std::move(new_root);
}

ASSERT_OK_AND_ASSIGN(std::ignore, DeclarationToTable(std::move(root)));
}

} // namespace acero
} // namespace arrow
12 changes: 1 addition & 11 deletions cpp/src/arrow/acero/query_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ QueryContext::QueryContext(QueryOptions opts, ExecContext exec_context)
const CpuInfo* QueryContext::cpu_info() const { return CpuInfo::GetInstance(); }
int64_t QueryContext::hardware_flags() const { return cpu_info()->hardware_flags(); }

Status QueryContext::Init(size_t max_num_threads, util::AsyncTaskScheduler* scheduler) {
tld_.resize(max_num_threads);
Status QueryContext::Init(util::AsyncTaskScheduler* scheduler) {
async_scheduler_ = scheduler;
return Status::OK();
}
Expand All @@ -50,15 +49,6 @@ size_t QueryContext::GetThreadIndex() { return thread_indexer_(); }

size_t QueryContext::max_concurrency() const { return thread_indexer_.Capacity(); }

Result<util::TempVectorStack*> QueryContext::GetTempStack(size_t thread_index) {
if (!tld_[thread_index].is_init) {
RETURN_NOT_OK(tld_[thread_index].stack.Init(
memory_pool(), 32 * util::MiniBatch::kMiniBatchLength * sizeof(uint64_t)));
tld_[thread_index].is_init = true;
}
return &tld_[thread_index].stack;
}

Result<Future<>> QueryContext::BeginExternalTask(std::string_view name) {
Future<> completion_future = Future<>::Make();
if (async_scheduler_->AddSimpleTask([completion_future] { return completion_future; },
Expand Down
8 changes: 1 addition & 7 deletions cpp/src/arrow/acero/query_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class ARROW_ACERO_EXPORT QueryContext {
QueryContext(QueryOptions opts = {},
ExecContext exec_context = *default_exec_context());

Status Init(size_t max_num_threads, arrow::util::AsyncTaskScheduler* scheduler);
Status Init(arrow::util::AsyncTaskScheduler* scheduler);

const ::arrow::internal::CpuInfo* cpu_info() const;
int64_t hardware_flags() const;
Expand All @@ -52,7 +52,6 @@ class ARROW_ACERO_EXPORT QueryContext {

size_t GetThreadIndex();
size_t max_concurrency() const;
Result<arrow::util::TempVectorStack*> GetTempStack(size_t thread_index);

/// \brief Start an external task
///
Expand Down Expand Up @@ -145,11 +144,6 @@ class ARROW_ACERO_EXPORT QueryContext {
std::unique_ptr<TaskScheduler> task_scheduler_ = TaskScheduler::Make();

ThreadIndexer thread_indexer_;
struct ThreadLocalData {
bool is_init = false;
arrow::util::TempVectorStack stack;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems this was the last place in a non-internal header file where the full definition of TempVectorStack is required, so, perhaps we can now move TempVectorStack to a arrow/compute/util_internal.h header file?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very thoughtful suggestion. Will do.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed. And reduced some more (maybe unrelated) includings.

};
std::vector<ThreadLocalData> tld_;

std::atomic<size_t> in_flight_bytes_to_disk_{0};
};
Expand Down
16 changes: 8 additions & 8 deletions cpp/src/arrow/acero/swiss_join.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2470,6 +2470,8 @@ Status JoinProbeProcessor::OnFinished() {

class SwissJoin : public HashJoinImpl {
public:
static constexpr auto kTempStackUsage = 64 * arrow::util::MiniBatch::kMiniBatchLength;

Status Init(QueryContext* ctx, JoinType join_type, size_t num_threads,
const HashJoinProjectionMaps* proj_map_left,
const HashJoinProjectionMaps* proj_map_right,
Expand Down Expand Up @@ -2513,6 +2515,7 @@ class SwissJoin : public HashJoinImpl {

local_states_.resize(num_threads_);
for (int i = 0; i < num_threads_; ++i) {
RETURN_NOT_OK(local_states_[i].stack.Init(pool_, kTempStackUsage));
local_states_[i].hash_table_ready = false;
local_states_[i].num_output_batches = 0;
local_states_[i].materialize.Init(pool_, proj_map_left, proj_map_right);
Expand Down Expand Up @@ -2566,8 +2569,7 @@ class SwissJoin : public HashJoinImpl {

ExecBatch keypayload_batch;
ARROW_ASSIGN_OR_RAISE(keypayload_batch, KeyPayloadFromInput(/*side=*/0, &batch));
ARROW_ASSIGN_OR_RAISE(arrow::util::TempVectorStack * temp_stack,
ctx_->GetTempStack(thread_index));
arrow::util::TempVectorStack* temp_stack = &local_states_[thread_index].stack;

return CancelIfNotOK(
probe_processor_.OnNextBatch(thread_index, keypayload_batch, temp_stack,
Expand Down Expand Up @@ -2679,8 +2681,7 @@ class SwissJoin : public HashJoinImpl {
input_batch.values[schema->num_cols(HashJoinProjection::KEY) + icol];
}
}
ARROW_ASSIGN_OR_RAISE(arrow::util::TempVectorStack * temp_stack,
ctx_->GetTempStack(thread_id));
arrow::util::TempVectorStack* temp_stack = &local_states_[thread_id].stack;
RETURN_NOT_OK(CancelIfNotOK(hash_table_build_.PushNextBatch(
static_cast<int64_t>(thread_id), key_batch, no_payload ? nullptr : &payload_batch,
temp_stack)));
Expand Down Expand Up @@ -2715,8 +2716,7 @@ class SwissJoin : public HashJoinImpl {

Status MergeFinished(size_t thread_id) {
RETURN_NOT_OK(status());
ARROW_ASSIGN_OR_RAISE(arrow::util::TempVectorStack * temp_stack,
ctx_->GetTempStack(thread_id));
arrow::util::TempVectorStack* temp_stack = &local_states_[thread_id].stack;
hash_table_build_.FinishPrtnMerge(temp_stack);
return CancelIfNotOK(OnBuildHashTableFinished(static_cast<int64_t>(thread_id)));
}
Expand Down Expand Up @@ -2771,8 +2771,7 @@ class SwissJoin : public HashJoinImpl {
std::min((task_id + 1) * kNumRowsPerScanTask, hash_table_.num_rows());
// Get thread index and related temp vector stack
//
ARROW_ASSIGN_OR_RAISE(arrow::util::TempVectorStack * temp_stack,
ctx_->GetTempStack(thread_id));
arrow::util::TempVectorStack* temp_stack = &local_states_[thread_id].stack;

// Split into mini-batches
//
Expand Down Expand Up @@ -2949,6 +2948,7 @@ class SwissJoin : public HashJoinImpl {
FinishedCallback finished_callback_;

struct ThreadLocalState {
arrow::util::TempVectorStack stack;
JoinResultMaterialize materialize;
std::vector<KeyColumnArray> temp_column_arrays;
int64_t num_output_batches;
Expand Down
19 changes: 19 additions & 0 deletions cpp/src/arrow/compute/key_hash_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,16 @@ class ARROW_EXPORT Hashing32 {
static void HashMultiColumn(const std::vector<KeyColumnArray>& cols, LightContext* ctx,
uint32_t* out_hash);

// Clarify the max temp stack usage for HashBatch, which might be necessary for the
// caller to be aware of at compile time to reserve enough stack size in advance. The
// HashBatch implementation uses one uint32 temp vector as a buffer for hash, one uint16
// temp vector as a buffer for null indices and one uint32 temp vector as a buffer for
// null hash, all are of size kMiniBatchLength. Plus extra kMiniBatchLength to cope with
// stack padding and aligning.
static constexpr auto kHashBatchTempStackUsage =
(sizeof(uint32_t) + sizeof(uint16_t) + sizeof(uint32_t) + /*extra=*/1) *
util::MiniBatch::kMiniBatchLength;

static Status HashBatch(const ExecBatch& key_batch, uint32_t* hashes,
std::vector<KeyColumnArray>& column_arrays,
int64_t hardware_flags, util::TempVectorStack* temp_stack,
Expand Down Expand Up @@ -161,6 +171,15 @@ class ARROW_EXPORT Hashing64 {
static void HashMultiColumn(const std::vector<KeyColumnArray>& cols, LightContext* ctx,
uint64_t* hashes);

// Clarify the max temp stack usage for HashBatch, which might be necessary for the
// caller to be aware of at compile time to reserve enough stack size in advance. The
// HashBatch implementation uses one uint16 temp vector as a buffer for null indices and
// one uint64 temp vector as a buffer for null hash, all are of size kMiniBatchLength.
// Plus extra kMiniBatchLength to cope with stack padding and aligning.
static constexpr auto kHashBatchTempStackUsage =
(sizeof(uint16_t) + sizeof(uint64_t) + /*extra=*/1) *
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor nit: but why does this constant have 2 sizeof calls in the calculation but the previous constant has 3 sizeof calls?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is something else I should've made the comment more verbose. Each sizeof operator corresponds to a specific stack usage (a TempVectorHolder in the implementation):

auto null_indices_buf = util::TempVectorHolder<uint16_t>(ctx->stack, max_batch_size);
uint16_t* null_indices = null_indices_buf.mutable_data();
int num_null_indices;
auto null_hash_temp_buf = util::TempVectorHolder<uint64_t>(ctx->stack, max_batch_size);
uint64_t* null_hash_temp = null_hash_temp_buf.mutable_data();

The previous three-sizeof corresponds here:

auto hash_temp_buf = util::TempVectorHolder<uint32_t>(ctx->stack, max_batch_size);
uint32_t* hash_temp = hash_temp_buf.mutable_data();
auto null_indices_buf = util::TempVectorHolder<uint16_t>(ctx->stack, max_batch_size);
uint16_t* null_indices = null_indices_buf.mutable_data();
int num_null_indices;
auto null_hash_temp_buf = util::TempVectorHolder<uint32_t>(ctx->stack, max_batch_size);
uint32_t* null_hash_temp = null_hash_temp_buf.mutable_data();

I'll update the comment later.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More comments added.

util::MiniBatch::kMiniBatchLength;

static Status HashBatch(const ExecBatch& key_batch, uint64_t* hashes,
std::vector<KeyColumnArray>& column_arrays,
int64_t hardware_flags, util::TempVectorStack* temp_stack,
Expand Down
Loading
Loading