Skip to content

Commit

Permalink
ARROW-?????: [C++] as-of-join backpressure for large sources (#21)
Browse files Browse the repository at this point in the history
* ARROW-?????: [C++] as-of-join backpressure for large sources

* fix test, expose parameters

* Cleaned up the finish handling in the process thread

* fix backpressure counter and demo memory usage

* fix hang

* fix race condition

* better fix of race condition

* Revert "fix hang"

This reverts commit ddd72d9.

* Fix hang introduced earlier.

* Add an alias to AssertTablesEqual called AssertTablesEqualUnordered to make it more obvious what the function is doing.  Fix a substrait test that was relying on an order that isn't really deterministic.

* Minor lint fix

Co-authored-by: Weston Pace <[email protected]>
  • Loading branch information
rtpsw and westonpace authored Nov 1, 2022
1 parent 97d72b0 commit b9cda43
Show file tree
Hide file tree
Showing 9 changed files with 293 additions and 102 deletions.
4 changes: 3 additions & 1 deletion cpp/proto/substrait/extension_rels.proto
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@ message AsOfJoinRel {
// As-Of-Join key
message AsOfJoinKey {
// A field reference defining the on-key
// The type and units of the referenced field must be the same across all inputs
.substrait.Expression on = 1;
// A set of field references defining the by-key
// A list of field references defining the by-key
// The types corresponding to the referenced fields must be the same across all inputs
repeated .substrait.Expression by = 2;
}
}
81 changes: 61 additions & 20 deletions cpp/src/arrow/compute/exec/asof_join_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -86,25 +86,39 @@ class ConcurrentQueue {
T Pop() {
std::unique_lock<std::mutex> lock(mutex_);
cond_.wait(lock, [&] { return !queue_.empty(); });
return PopUnlocked();
}

T PopUnlocked() {
auto item = queue_.front();
queue_.pop();
return item;
}

void Push(const T& item) {
std::unique_lock<std::mutex> lock(mutex_);
return PushUnlocked(item);
}

void PushUnlocked(const T& item) {
queue_.push(item);
cond_.notify_one();
}

void Clear() {
std::unique_lock<std::mutex> lock(mutex_);
queue_ = std::queue<T>();
ClearUnlocked();
}

void ClearUnlocked() { queue_ = std::queue<T>(); }

std::optional<T> TryPop() {
// Try to pop the oldest value from the queue (or return nullopt if none)
std::unique_lock<std::mutex> lock(mutex_);
return TryPopUnlocked();
}

std::optional<T> TryPopUnlocked() {
// Try to pop the oldest value from the queue (or return nullopt if none)
if (queue_.empty()) {
return std::nullopt;
} else {
Expand All @@ -127,6 +141,9 @@ class ConcurrentQueue {

size_t UnsyncSize() const { return queue_.size(); }

protected:
std::mutex& GetMutex() { return mutex_; }

private:
std::queue<T> queue_;
mutable std::mutex mutex_;
Expand Down Expand Up @@ -242,8 +259,8 @@ class BackpressureController : public BackpressureControl {
std::atomic<int32_t>& backpressure_counter)
: node_(node), output_(output), backpressure_counter_(backpressure_counter) {}

void Pause() override { node_->PauseProducing(output_, backpressure_counter_++); }
void Resume() override { node_->ResumeProducing(output_, backpressure_counter_++); }
void Pause() override { node_->PauseProducing(output_, ++backpressure_counter_); }
void Resume() override { node_->ResumeProducing(output_, ++backpressure_counter_); }

private:
ExecNode* node_;
Expand Down Expand Up @@ -310,23 +327,27 @@ class BackpressureConcurrentQueue : public ConcurrentQueue<T> {
: handler_(std::move(handler)) {}

T Pop() {
std::unique_lock<std::mutex> lock(ConcurrentQueue<T>::GetMutex());
DoHandle do_handle(*this);
return ConcurrentQueue<T>::Pop();
return ConcurrentQueue<T>::PopUnlocked();
}

void Push(const T& item) {
std::unique_lock<std::mutex> lock(ConcurrentQueue<T>::GetMutex());
DoHandle do_handle(*this);
ConcurrentQueue<T>::Push(item);
ConcurrentQueue<T>::PushUnlocked(item);
}

void Clear() {
std::unique_lock<std::mutex> lock(ConcurrentQueue<T>::GetMutex());
DoHandle do_handle(*this);
ConcurrentQueue<T>::Clear();
ConcurrentQueue<T>::ClearUnlocked();
}

std::optional<T> TryPop() {
std::unique_lock<std::mutex> lock(ConcurrentQueue<T>::GetMutex());
DoHandle do_handle(*this);
return ConcurrentQueue<T>::TryPop();
return ConcurrentQueue<T>::TryPopUnlocked();
}

private:
Expand Down Expand Up @@ -914,10 +935,29 @@ class AsofJoinNode : public ExecNode {
}
}

void Process() {
template <typename Callable>
struct Defer {
Callable callable;
explicit Defer(Callable callable) : callable(std::move(callable)) {}
~Defer() noexcept { callable(); }
};

bool CheckEnded() {
if (state_.at(0)->Finished()) {
ErrorIfNotOk(plan_->ScheduleTask([this] {
Defer cleanup([this]() { finished_.MarkFinished(); });
outputs_[0]->InputFinished(this, batches_produced_);
return Status::OK();
}));
return false;
}
return true;
}

bool Process() {
std::lock_guard<std::mutex> guard(gate_);
if (finished_.is_finished()) {
return;
if (!CheckEnded()) {
return false;
}

// Process batches while we have data
Expand All @@ -935,7 +975,7 @@ class AsofJoinNode : public ExecNode {
}));
} else {
ErrorIfNotOk(result.status());
return;
return false;
}
}

Expand All @@ -944,21 +984,23 @@ class AsofJoinNode : public ExecNode {
//
// It may happen here in cases where InputFinished was called before we were finished
// producing results (so we didn't know the output size at that time)
if (state_.at(0)->Finished()) {
ErrorIfNotOk(plan_->ScheduleTask([this] {
outputs_[0]->InputFinished(this, batches_produced_);
return Status::OK();
}));
finished_.MarkFinished();
if (!CheckEnded()) {
return false;
}

// There is no more we can do now but there is still work remaining for later when
// more data arrives.
return true;
}

void ProcessThread() {
for (;;) {
if (!process_.Pop()) {
return;
}
Process();
if (!Process()) {
return;
}
}
}

Expand Down Expand Up @@ -1165,7 +1207,6 @@ class AsofJoinNode : public ExecNode {
size_t n_by = 0;
for (size_t i = 0; i < input_keys.size(); ++i) {
const auto& by_key = input_keys[i].by_key;
std::cout << "Input: " << i << " by_key.size()=" << by_key.size() << std::endl;
if (i == 0) {
n_by = by_key.size();
} else if (n_by != by_key.size()) {
Expand Down
82 changes: 63 additions & 19 deletions cpp/src/arrow/compute/exec/asof_join_node_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1030,42 +1030,86 @@ TRACED_TEST(AsofJoinTest, TestUnorderedOnKey, {
schema({field("time", int64()), field("key", int32()), field("r0_v0", float64())}));
})

TEST(AsofJoinTest, BackpressureDemo) {
template <typename BatchesMaker>
void TestBackpressureDemo(BatchesMaker maker, int num_batches, int batch_size,
double fast_delay, double slow_delay, bool noisy = false) {
auto l_schema =
schema({field("time", int32()), field("key", int32()), field("l_value", int32())});
auto r0_schema =
schema({field("time", int32()), field("key", int32()), field("r0_value", int32())});
auto r1_schema =
schema({field("time", int32()), field("key", int32()), field("r1_value", int32())});

auto make_integer_batches = [](const std::shared_ptr<Schema>& schema, int shift) {
constexpr int num_batches = 10, batch_size = 1;
return MakeIntegerBatches({[](int row) -> int64_t { return row; },
[](int row) -> int64_t { return row / num_batches; },
[shift](int row) -> int64_t { return row * 10 + shift; }},
schema, num_batches, batch_size);
auto make_shift = [&maker, num_batches, batch_size](
const std::shared_ptr<Schema>& schema, int shift) {
return maker({[](int row) -> int64_t { return row; },
[num_batches](int row) -> int64_t { return row / num_batches; },
[shift](int row) -> int64_t { return row * 10 + shift; }},
schema, num_batches, batch_size);
};
ASSERT_OK_AND_ASSIGN(auto l_batches, make_integer_batches(l_schema, 0));
ASSERT_OK_AND_ASSIGN(auto r0_batches, make_integer_batches(r0_schema, 1));
ASSERT_OK_AND_ASSIGN(auto r1_batches, make_integer_batches(r1_schema, 2));
ASSERT_OK_AND_ASSIGN(auto l_batches, make_shift(l_schema, 0));
ASSERT_OK_AND_ASSIGN(auto r0_batches, make_shift(r0_schema, 1));
ASSERT_OK_AND_ASSIGN(auto r1_batches, make_shift(r1_schema, 2));

compute::Declaration l_src = {
"source", SourceNodeOptions(l_batches.schema,
MakeNoisyDelayedGen(l_batches, "0:fast", 0.01))};
"source", SourceNodeOptions(
l_schema, MakeDelayedGen(l_batches, "0:fast", fast_delay, noisy))};
compute::Declaration r0_src = {
"source", SourceNodeOptions(r0_batches.schema,
MakeNoisyDelayedGen(r0_batches, "1:slow", 0.1))};
"source", SourceNodeOptions(
r0_schema, MakeDelayedGen(r0_batches, "1:slow", slow_delay, noisy))};
compute::Declaration r1_src = {
"source", SourceNodeOptions(r1_batches.schema,
MakeNoisyDelayedGen(r1_batches, "2:fast", 0.1))};
"source", SourceNodeOptions(
r1_schema, MakeDelayedGen(r1_batches, "2:fast", fast_delay, noisy))};

compute::Declaration asofjoin = {
"asofjoin", {l_src, r0_src, r1_src}, GetRepeatedOptions(3, "time", {"key"}, 1000)};

ASSERT_OK_AND_ASSIGN(std::vector<std::shared_ptr<RecordBatch>> batches,
DeclarationToBatches(asofjoin));
ASSERT_OK_AND_ASSIGN(std::unique_ptr<RecordBatchReader> batch_reader,
DeclarationToReader(asofjoin, /*use_threads=*/false));

ASSERT_EQ(l_batches.batches.size(), batches.size());
int64_t total_length = 0;
for (;;) {
ASSERT_OK_AND_ASSIGN(auto batch, batch_reader->Next());
if (!batch) {
break;
}
total_length += batch->num_rows();
}
ASSERT_EQ(static_cast<int64_t>(num_batches * batch_size), total_length);
}

TEST(AsofJoinTest, BackpressureDemoWithBatches) {
return TestBackpressureDemo(MakeIntegerBatches, /*num_batches=*/10, /*batch_size=*/1,
/*fast_delay=*/0.01, /*slow_delay=*/0.1, /*noisy=*/true);
}

namespace {

Result<AsyncGenerator<std::optional<ExecBatch>>> MakeIntegerBatchGenForTest(
const std::vector<std::function<int64_t(int)>>& gens,
const std::shared_ptr<Schema>& schema, int num_batches, int batch_size) {
return MakeIntegerBatchGen(gens, schema, num_batches, batch_size);
}

template <typename T>
T GetEnvValue(const std::string& var, T default_value) {
const char* str = std::getenv(var.c_str());
if (str == NULLPTR) {
return default_value;
}
std::stringstream s(str);
T value = default_value;
s >> value;
return value;
}

} // namespace

TEST(AsofJoinTest, BackpressureDemoWithBatchesGen) {
int num_batches = GetEnvValue("ARROW_BACKPRESSURE_DEMO_NUM_BATCHES", 10);
int batch_size = GetEnvValue("ARROW_BACKPRESSURE_DEMO_BATCH_SIZE", 1);
return TestBackpressureDemo(MakeIntegerBatchGenForTest, num_batches, batch_size,
/*fast_delay=*/0.001, /*slow_delay=*/0.01);
}

} // namespace compute
Expand Down
8 changes: 5 additions & 3 deletions cpp/src/arrow/compute/exec/plan_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1495,13 +1495,15 @@ TEST(ExecPlanExecution, BackpressureDemo) {
BatchesWithSchema two = MakeRandomBatches(schema_two, /*num_batches=*/100);
BatchesWithSchema three = MakeRandomBatches(schema_three, /*num_batches=*/100);

constexpr bool noisy = true;
compute::Declaration src_one = {
"source", SourceNodeOptions(one.schema, MakeNoisyDelayedGen(one, "0:fast", 0.01))};
"source",
SourceNodeOptions(one.schema, MakeDelayedGen(one, "0:fast", 0.01, noisy))};
compute::Declaration src_two = {
"source", SourceNodeOptions(two.schema, MakeNoisyDelayedGen(two, "1:slow", 0.1))};
"source", SourceNodeOptions(two.schema, MakeDelayedGen(two, "1:slow", 0.1, noisy))};
compute::Declaration src_three = {
"source",
SourceNodeOptions(three.schema, MakeNoisyDelayedGen(three, "2:fast", 0.01))};
SourceNodeOptions(three.schema, MakeDelayedGen(three, "2:fast", 0.01, noisy))};

compute::Declaration concat = {
"concat", {src_one, src_two, src_three}, ConcatNodeOptions()};
Expand Down
55 changes: 39 additions & 16 deletions cpp/src/arrow/compute/exec/test_nodes.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "arrow/compute/exec/util.h"
#include "arrow/io/interfaces.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/iterator.h"

namespace arrow {

Expand Down Expand Up @@ -269,38 +270,60 @@ void RegisterConcatNode(ExecFactoryRegistry* registry) {
DCHECK_OK(registry->AddFactory("concat", ConcatNode::Make));
}

// Make a source that is both noisy (prints when it emits)
// and slowed by some delay
AsyncGenerator<std::optional<ExecBatch>> MakeNoisyDelayedGen(BatchesWithSchema src,
std::string label,
double delay_sec) {
std::vector<std::optional<ExecBatch>> opt_batches = ::arrow::internal::MapVector(
[](ExecBatch batch) { return std::make_optional(std::move(batch)); }, src.batches);
AsyncGenerator<std::optional<ExecBatch>> MakeDelayedGen(
Iterator<std::optional<ExecBatch>> src, std::string label, double delay_sec,
bool noisy) {
struct DelayedIoGenState {
DelayedIoGenState(std::vector<std::optional<ExecBatch>> batches, double delay_sec,
std::string label)
: batches(std::move(batches)), delay_sec(delay_sec), label(std::move(label)) {}
DelayedIoGenState(Iterator<std::optional<ExecBatch>> batch_it, double delay_sec,
std::string label, bool noisy)
: batch_it(std::move(batch_it)),
delay_sec(delay_sec),
label(std::move(label)),
noisy(noisy) {}
std::optional<ExecBatch> Next() {
if (index == batches.size()) {
Result<std::optional<ExecBatch>> opt_batch_res = batch_it.Next();
if (!opt_batch_res.ok()) {
return std::nullopt;
}
std::optional<ExecBatch> opt_batch = opt_batch_res.ValueOrDie();
if (!opt_batch) {
return std::nullopt;
}
std::cout << label + ": asking for batch(" + std::to_string(index) + ")\n";
if (noisy) {
std::cout << label + ": asking for batch(" + std::to_string(index) + ")\n";
}
SleepFor(delay_sec);
return batches[index++];
++index;
return *opt_batch;
}

std::vector<std::optional<ExecBatch>> batches;
Iterator<std::optional<ExecBatch>> batch_it;
double delay_sec;
std::string label;
bool noisy;
std::size_t index = 0;
};
auto state = std::make_shared<DelayedIoGenState>(std::move(opt_batches), delay_sec,
std::move(label));
auto state = std::make_shared<DelayedIoGenState>(std::move(src), delay_sec,
std::move(label), noisy);
return [state]() {
return DeferNotOk(::arrow::io::default_io_context().executor()->Submit(
[state]() { return state->Next(); }));
};
}

AsyncGenerator<std::optional<ExecBatch>> MakeDelayedGen(
AsyncGenerator<std::optional<ExecBatch>> src, std::string label, double delay_sec,
bool noisy) {
return MakeDelayedGen(MakeGeneratorIterator(src), label, delay_sec, noisy);
}

AsyncGenerator<std::optional<ExecBatch>> MakeDelayedGen(BatchesWithSchema src,
std::string label,
double delay_sec, bool noisy) {
std::vector<std::optional<ExecBatch>> opt_batches = ::arrow::internal::MapVector(
[](ExecBatch batch) { return std::make_optional(std::move(batch)); }, src.batches);
return MakeDelayedGen(MakeVectorIterator(opt_batches), label, delay_sec, noisy);
}

} // namespace compute
} // namespace arrow
Loading

0 comments on commit b9cda43

Please sign in to comment.