Skip to content

Commit

Permalink
fix: improve the backpressure handling in the dataset writer
Browse files Browse the repository at this point in the history
  • Loading branch information
westonpace authored and pitrou committed Apr 3, 2024
1 parent be3b789 commit fdb625b
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 6 deletions.
36 changes: 30 additions & 6 deletions cpp/src/arrow/dataset/dataset_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ class DatasetWriter::DatasetWriterImpl {
std::function<void()> finish_callback, uint64_t max_rows_queued)
: scheduler_(scheduler),
write_tasks_(util::MakeThrottledAsyncTaskGroup(
scheduler_, 1, /*queue=*/nullptr,
scheduler_, /*max_concurrent_cost=*/1, /*queue=*/nullptr,
[finish_callback = std::move(finish_callback)] {
finish_callback();
return Status::OK();
Expand All @@ -541,6 +541,23 @@ class DatasetWriter::DatasetWriterImpl {
}
}

void ResumeIfNeeded() {
if (!paused_) {
return;
}
bool needs_resume = false;
{
std::lock_guard lg(mutex_);
if (!write_tasks_ || write_tasks_->QueueSize() == 0) {
needs_resume = true;
}
}
if (needs_resume) {
paused_ = false;
resume_callback_();
}
}

void WriteRecordBatch(std::shared_ptr<RecordBatch> batch, const std::string& directory,
const std::string& prefix) {
write_tasks_->AddSimpleTask(
Expand All @@ -549,11 +566,14 @@ class DatasetWriter::DatasetWriterImpl {
WriteAndCheckBackpressure(std::move(batch), directory, prefix);
if (!has_room.is_finished()) {
// We don't have to worry about sequencing backpressure here since
// task_group_ serves as our sequencer. If batches continue to arrive after
// we pause they will queue up in task_group_ until we free up and call
// Resume
// task_group_ serves as our sequencer. If batches continue to arrive
// after we pause they will queue up in task_group_ until we free up and
// call Resume
pause_callback_();
return has_room.Then([this] { resume_callback_(); });
paused_ = true;
return has_room.Then([this] { ResumeIfNeeded(); });
} else {
ResumeIfNeeded();
}
return has_room;
},
Expand All @@ -571,6 +591,9 @@ class DatasetWriter::DatasetWriterImpl {
return Future<>::MakeFinished();
},
"DatasetWriter::FinishAll"sv);
// Reset write_tasks_ to signal that we are done adding tasks, this will allow
// us to invoke the finish callback once the tasks wrap up.
std::lock_guard lg(mutex_);
write_tasks_.reset();
}

Expand Down Expand Up @@ -660,7 +683,7 @@ class DatasetWriter::DatasetWriterImpl {
}

util::AsyncTaskScheduler* scheduler_ = nullptr;
std::unique_ptr<util::AsyncTaskScheduler> write_tasks_;
std::unique_ptr<util::ThrottledAsyncTaskScheduler> write_tasks_;
Future<> finish_fut_ = Future<>::Make();
FileSystemDatasetWriteOptions write_options_;
DatasetWriterState writer_state_;
Expand All @@ -670,6 +693,7 @@ class DatasetWriter::DatasetWriterImpl {
std::unordered_map<std::string, std::shared_ptr<DatasetWriterDirectoryQueue>>
directory_queues_;
std::mutex mutex_;
bool paused_ = false;
Status err_;
};

Expand Down
7 changes: 7 additions & 0 deletions cpp/src/arrow/util/async_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ class FifoQueue : public ThrottledAsyncTaskScheduler::Queue {

void Purge() override { tasks_.clear(); }

std::size_t Size() const override { return tasks_.size(); }

private:
std::list<std::unique_ptr<Task>> tasks_;
};
Expand Down Expand Up @@ -332,6 +334,10 @@ class ThrottledAsyncTaskSchedulerImpl

void Pause() override { throttle_->Pause(); }
void Resume() override { throttle_->Resume(); }
std::size_t QueueSize() override {
std::lock_guard lk(mutex_);
return queue_->Size();
}
const util::tracing::Span& span() const override { return target_->span(); }

private:
Expand Down Expand Up @@ -499,6 +505,7 @@ class ThrottledAsyncTaskGroup : public ThrottledAsyncTaskScheduler {
: throttle_(std::move(throttle)), task_group_(std::move(task_group)) {}
void Pause() override { throttle_->Pause(); }
void Resume() override { throttle_->Resume(); }
std::size_t QueueSize() override { return throttle_->QueueSize(); }
const util::tracing::Span& span() const override { return task_group_->span(); }
bool AddTask(std::unique_ptr<Task> task) override {
return task_group_->AddTask(std::move(task));
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/arrow/util/async_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ class ARROW_EXPORT ThrottledAsyncTaskScheduler : public AsyncTaskScheduler {
virtual bool Empty() = 0;
/// Purge the queue of all items
virtual void Purge() = 0;
virtual std::size_t Size() const = 0;
};

class Throttle {
Expand Down Expand Up @@ -277,6 +278,8 @@ class ARROW_EXPORT ThrottledAsyncTaskScheduler : public AsyncTaskScheduler {
/// Allows task to be submitted again. If there is a max_concurrent_cost limit then
/// it will still apply.
virtual void Resume() = 0;
/// Return the number of tasks queued but not yet submitted
virtual std::size_t QueueSize() = 0;

/// Create a throttled view of a scheduler
///
Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/util/async_util_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,7 @@ class PriorityQueue : public ThrottledAsyncTaskScheduler::Queue {
queue_.pop();
}
}
std::size_t Size() const { return queue_.size(); }

private:
std::priority_queue<TaskWithPriority*, std::vector<TaskWithPriority*>,
Expand Down

0 comments on commit fdb625b

Please sign in to comment.