From fdb625bb8bf80a868ce8f7dfbf824049bea307f5 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Thu, 21 Mar 2024 08:25:27 -0700 Subject: [PATCH] fix: improve the backpressure handling in the dataset writer --- cpp/src/arrow/dataset/dataset_writer.cc | 36 ++++++++++++++++++++----- cpp/src/arrow/util/async_util.cc | 7 +++++ cpp/src/arrow/util/async_util.h | 3 +++ cpp/src/arrow/util/async_util_test.cc | 1 + 4 files changed, 41 insertions(+), 6 deletions(-) diff --git a/cpp/src/arrow/dataset/dataset_writer.cc b/cpp/src/arrow/dataset/dataset_writer.cc index 34731d19ab3eb..754386275d60c 100644 --- a/cpp/src/arrow/dataset/dataset_writer.cc +++ b/cpp/src/arrow/dataset/dataset_writer.cc @@ -515,7 +515,7 @@ class DatasetWriter::DatasetWriterImpl { std::function finish_callback, uint64_t max_rows_queued) : scheduler_(scheduler), write_tasks_(util::MakeThrottledAsyncTaskGroup( - scheduler_, 1, /*queue=*/nullptr, + scheduler_, /*max_concurrent_cost=*/1, /*queue=*/nullptr, [finish_callback = std::move(finish_callback)] { finish_callback(); return Status::OK(); @@ -541,6 +541,23 @@ class DatasetWriter::DatasetWriterImpl { } } + void ResumeIfNeeded() { + if (!paused_) { + return; + } + bool needs_resume = false; + { + std::lock_guard lg(mutex_); + if (!write_tasks_ || write_tasks_->QueueSize() == 0) { + needs_resume = true; + } + } + if (needs_resume) { + paused_ = false; + resume_callback_(); + } + } + void WriteRecordBatch(std::shared_ptr batch, const std::string& directory, const std::string& prefix) { write_tasks_->AddSimpleTask( @@ -549,11 +566,14 @@ class DatasetWriter::DatasetWriterImpl { WriteAndCheckBackpressure(std::move(batch), directory, prefix); if (!has_room.is_finished()) { // We don't have to worry about sequencing backpressure here since - // task_group_ serves as our sequencer. If batches continue to arrive after - // we pause they will queue up in task_group_ until we free up and call - // Resume + // task_group_ serves as our sequencer. If batches continue to arrive + // after we pause they will queue up in task_group_ until we free up and + // call Resume pause_callback_(); - return has_room.Then([this] { resume_callback_(); }); + paused_ = true; + return has_room.Then([this] { ResumeIfNeeded(); }); + } else { + ResumeIfNeeded(); } return has_room; }, @@ -571,6 +591,9 @@ class DatasetWriter::DatasetWriterImpl { return Future<>::MakeFinished(); }, "DatasetWriter::FinishAll"sv); + // Reset write_tasks_ to signal that we are done adding tasks, this will allow + // us to invoke the finish callback once the tasks wrap up. + std::lock_guard lg(mutex_); write_tasks_.reset(); } @@ -660,7 +683,7 @@ class DatasetWriter::DatasetWriterImpl { } util::AsyncTaskScheduler* scheduler_ = nullptr; - std::unique_ptr write_tasks_; + std::unique_ptr write_tasks_; Future<> finish_fut_ = Future<>::Make(); FileSystemDatasetWriteOptions write_options_; DatasetWriterState writer_state_; @@ -670,6 +693,7 @@ class DatasetWriter::DatasetWriterImpl { std::unordered_map> directory_queues_; std::mutex mutex_; + bool paused_ = false; Status err_; }; diff --git a/cpp/src/arrow/util/async_util.cc b/cpp/src/arrow/util/async_util.cc index 63e27bfbe5773..fbd45eadac2cd 100644 --- a/cpp/src/arrow/util/async_util.cc +++ b/cpp/src/arrow/util/async_util.cc @@ -118,6 +118,8 @@ class FifoQueue : public ThrottledAsyncTaskScheduler::Queue { void Purge() override { tasks_.clear(); } + std::size_t Size() const override { return tasks_.size(); } + private: std::list> tasks_; }; @@ -332,6 +334,10 @@ class ThrottledAsyncTaskSchedulerImpl void Pause() override { throttle_->Pause(); } void Resume() override { throttle_->Resume(); } + std::size_t QueueSize() override { + std::lock_guard lk(mutex_); + return queue_->Size(); + } const util::tracing::Span& span() const override { return target_->span(); } private: @@ -499,6 +505,7 @@ class ThrottledAsyncTaskGroup : public ThrottledAsyncTaskScheduler { : throttle_(std::move(throttle)), task_group_(std::move(task_group)) {} void Pause() override { throttle_->Pause(); } void Resume() override { throttle_->Resume(); } + std::size_t QueueSize() override { return throttle_->QueueSize(); } const util::tracing::Span& span() const override { return task_group_->span(); } bool AddTask(std::unique_ptr task) override { return task_group_->AddTask(std::move(task)); diff --git a/cpp/src/arrow/util/async_util.h b/cpp/src/arrow/util/async_util.h index 7a675da59facd..d9ed63bdbce22 100644 --- a/cpp/src/arrow/util/async_util.h +++ b/cpp/src/arrow/util/async_util.h @@ -226,6 +226,7 @@ class ARROW_EXPORT ThrottledAsyncTaskScheduler : public AsyncTaskScheduler { virtual bool Empty() = 0; /// Purge the queue of all items virtual void Purge() = 0; + virtual std::size_t Size() const = 0; }; class Throttle { @@ -277,6 +278,8 @@ class ARROW_EXPORT ThrottledAsyncTaskScheduler : public AsyncTaskScheduler { /// Allows task to be submitted again. If there is a max_concurrent_cost limit then /// it will still apply. virtual void Resume() = 0; + /// Return the number of tasks queued but not yet submitted + virtual std::size_t QueueSize() = 0; /// Create a throttled view of a scheduler /// diff --git a/cpp/src/arrow/util/async_util_test.cc b/cpp/src/arrow/util/async_util_test.cc index 313ca91912335..1f9aad453e9c4 100644 --- a/cpp/src/arrow/util/async_util_test.cc +++ b/cpp/src/arrow/util/async_util_test.cc @@ -680,6 +680,7 @@ class PriorityQueue : public ThrottledAsyncTaskScheduler::Queue { queue_.pop(); } } + std::size_t Size() const { return queue_.size(); } private: std::priority_queue,