Skip to content

Commit

Permalink
GH-40224: [C++] Fix: improve the backpressure handling in the dataset…
Browse files Browse the repository at this point in the history
… writer (#40722)

### Rationale for this change

The dataset writer would fire the resume callback as soon as the underlying dataset writer's queues freed up, even if there were pending tasks.  Backpressure is not applied immediately and so a few tasks will always trickle in.  If backpressure is pausing and then resuming frequently this can lead to a buildup of pending tasks and uncontrolled memory growth.

### What changes are included in this PR?

The resume callback is not called until all pending write tasks have completed.

### Are these changes tested?

There is quite an extensive set of tests for the dataset writer already and they continue to pass.  I ran them on repeat, with and without stress, and did not see any issues.

However, the underlying problem (dataset writer can have uncontrolled memory growth) is still not tested as it is quite difficult to test.  I was able to run the setup described in the issue to reproduce the issue.  With this fix the repartitioning task completes for me.

### Are there any user-facing changes?

No
* GitHub Issue: #40224

Authored-by: Weston Pace <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
  • Loading branch information
westonpace authored Apr 4, 2024
1 parent 26631d7 commit 640c101
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 6 deletions.
36 changes: 30 additions & 6 deletions cpp/src/arrow/dataset/dataset_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ class DatasetWriter::DatasetWriterImpl {
std::function<void()> finish_callback, uint64_t max_rows_queued)
: scheduler_(scheduler),
write_tasks_(util::MakeThrottledAsyncTaskGroup(
scheduler_, 1, /*queue=*/nullptr,
scheduler_, /*max_concurrent_cost=*/1, /*queue=*/nullptr,
[finish_callback = std::move(finish_callback)] {
finish_callback();
return Status::OK();
Expand All @@ -541,6 +541,23 @@ class DatasetWriter::DatasetWriterImpl {
}
}

void ResumeIfNeeded() {
if (!paused_) {
return;
}
bool needs_resume = false;
{
std::lock_guard lg(mutex_);
if (!write_tasks_ || write_tasks_->QueueSize() == 0) {
needs_resume = true;
}
}
if (needs_resume) {
paused_ = false;
resume_callback_();
}
}

void WriteRecordBatch(std::shared_ptr<RecordBatch> batch, const std::string& directory,
const std::string& prefix) {
write_tasks_->AddSimpleTask(
Expand All @@ -549,11 +566,14 @@ class DatasetWriter::DatasetWriterImpl {
WriteAndCheckBackpressure(std::move(batch), directory, prefix);
if (!has_room.is_finished()) {
// We don't have to worry about sequencing backpressure here since
// task_group_ serves as our sequencer. If batches continue to arrive after
// we pause they will queue up in task_group_ until we free up and call
// Resume
// task_group_ serves as our sequencer. If batches continue to arrive
// after we pause they will queue up in task_group_ until we free up and
// call Resume
pause_callback_();
return has_room.Then([this] { resume_callback_(); });
paused_ = true;
return has_room.Then([this] { ResumeIfNeeded(); });
} else {
ResumeIfNeeded();
}
return has_room;
},
Expand All @@ -571,6 +591,9 @@ class DatasetWriter::DatasetWriterImpl {
return Future<>::MakeFinished();
},
"DatasetWriter::FinishAll"sv);
// Reset write_tasks_ to signal that we are done adding tasks, this will allow
// us to invoke the finish callback once the tasks wrap up.
std::lock_guard lg(mutex_);
write_tasks_.reset();
}

Expand Down Expand Up @@ -660,7 +683,7 @@ class DatasetWriter::DatasetWriterImpl {
}

util::AsyncTaskScheduler* scheduler_ = nullptr;
std::unique_ptr<util::AsyncTaskScheduler> write_tasks_;
std::unique_ptr<util::ThrottledAsyncTaskScheduler> write_tasks_;
Future<> finish_fut_ = Future<>::Make();
FileSystemDatasetWriteOptions write_options_;
DatasetWriterState writer_state_;
Expand All @@ -670,6 +693,7 @@ class DatasetWriter::DatasetWriterImpl {
std::unordered_map<std::string, std::shared_ptr<DatasetWriterDirectoryQueue>>
directory_queues_;
std::mutex mutex_;
bool paused_ = false;
Status err_;
};

Expand Down
7 changes: 7 additions & 0 deletions cpp/src/arrow/util/async_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ class FifoQueue : public ThrottledAsyncTaskScheduler::Queue {

void Purge() override { tasks_.clear(); }

std::size_t Size() const override { return tasks_.size(); }

private:
std::list<std::unique_ptr<Task>> tasks_;
};
Expand Down Expand Up @@ -332,6 +334,10 @@ class ThrottledAsyncTaskSchedulerImpl

void Pause() override { throttle_->Pause(); }
void Resume() override { throttle_->Resume(); }
std::size_t QueueSize() override {
std::lock_guard lk(mutex_);
return queue_->Size();
}
const util::tracing::Span& span() const override { return target_->span(); }

private:
Expand Down Expand Up @@ -499,6 +505,7 @@ class ThrottledAsyncTaskGroup : public ThrottledAsyncTaskScheduler {
: throttle_(std::move(throttle)), task_group_(std::move(task_group)) {}
void Pause() override { throttle_->Pause(); }
void Resume() override { throttle_->Resume(); }
std::size_t QueueSize() override { return throttle_->QueueSize(); }
const util::tracing::Span& span() const override { return task_group_->span(); }
bool AddTask(std::unique_ptr<Task> task) override {
return task_group_->AddTask(std::move(task));
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/arrow/util/async_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ class ARROW_EXPORT ThrottledAsyncTaskScheduler : public AsyncTaskScheduler {
virtual bool Empty() = 0;
/// Purge the queue of all items
virtual void Purge() = 0;
virtual std::size_t Size() const = 0;
};

class Throttle {
Expand Down Expand Up @@ -277,6 +278,8 @@ class ARROW_EXPORT ThrottledAsyncTaskScheduler : public AsyncTaskScheduler {
/// Allows task to be submitted again. If there is a max_concurrent_cost limit then
/// it will still apply.
virtual void Resume() = 0;
/// Return the number of tasks queued but not yet submitted
virtual std::size_t QueueSize() = 0;

/// Create a throttled view of a scheduler
///
Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/util/async_util_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,7 @@ class PriorityQueue : public ThrottledAsyncTaskScheduler::Queue {
queue_.pop();
}
}
std::size_t Size() const { return queue_.size(); }

private:
std::priority_queue<TaskWithPriority*, std::vector<TaskWithPriority*>,
Expand Down

0 comments on commit 640c101

Please sign in to comment.