Skip to content

Commit

Permalink
Preserve order when writing dataset multi-threaded
Browse files Browse the repository at this point in the history
  • Loading branch information
EnricoMi committed Oct 31, 2024
1 parent 567f9c5 commit 0377ef9
Show file tree
Hide file tree
Showing 10 changed files with 211 additions and 12 deletions.
4 changes: 2 additions & 2 deletions cpp/src/arrow/acero/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ class ARROW_ACERO_EXPORT SourceNodeOptions : public ExecNodeOptions {
std::shared_ptr<Schema> output_schema;
/// \brief an asynchronous stream of batches ending with std::nullopt
std::function<Future<std::optional<ExecBatch>>()> generator;

Ordering ordering = Ordering::Unordered();
/// \brief the order of the data, defaults to Ordering::Unordered
Ordering ordering;
};

/// \brief a node that generates data from a table already loaded in memory
Expand Down
12 changes: 10 additions & 2 deletions cpp/src/arrow/dataset/file_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -471,9 +471,12 @@ Status FileSystemDataset::Write(const FileSystemDatasetWriteOptions& write_optio

WriteNodeOptions write_node_options(write_options);
write_node_options.custom_schema = custom_schema;
// preserve existing order in dataset by setting implicit_order=true
bool implicit_ordering = write_node_options.write_options.preserve_order;

acero::Declaration plan = acero::Declaration::Sequence({
{"scan", ScanNodeOptions{dataset, scanner->options()}},
{"scan", ScanNodeOptions{dataset, scanner->options(),
/*require_sequenced_output=*/false, implicit_ordering}},
{"filter", acero::FilterNodeOptions{scanner->options()->filter}},
{"project", acero::ProjectNodeOptions{std::move(exprs), std::move(names)}},
{"write", std::move(write_node_options)},
Expand Down Expand Up @@ -539,8 +542,13 @@ Result<acero::ExecNode*> MakeWriteNode(acero::ExecPlan* plan,

ARROW_ASSIGN_OR_RAISE(
auto node,
// to preserve order explicitly sequence the exec batches
// this requires exec batch index to be set upstream (e.g. by SourceNode)
acero::MakeExecNode("consuming_sink", plan, std::move(inputs),
acero::ConsumingSinkNodeOptions{std::move(consumer)}));
acero::ConsumingSinkNodeOptions{
std::move(consumer),
{},
/*sequence_output=*/write_options.preserve_order}));

return node;
}
Expand Down
4 changes: 4 additions & 0 deletions cpp/src/arrow/dataset/file_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,10 @@ struct ARROW_DS_EXPORT FileSystemDatasetWriteOptions {
/// Partitioning used to generate fragment paths.
std::shared_ptr<Partitioning> partitioning;

/// If true the order of rows in the dataset is preserved when writing with
/// multiple threads. This may cause notable performance degradation.
bool preserve_order = false;

/// Maximum number of partitions any batch may be written into, default is 1K.
int max_partitions = 1024;

Expand Down
86 changes: 86 additions & 0 deletions cpp/src/arrow/dataset/file_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@
// specific language governing permissions and limitations
// under the License.

#include <arrow/compute/function.h>
#include <arrow/compute/registry.h>
#include <cstdint>
#include <memory>
#include <string>
#include <thread>
#include <tuple>
#include <vector>

Expand Down Expand Up @@ -351,6 +354,89 @@ TEST_F(TestFileSystemDataset, WriteProjected) {
}
}

// this kernel delays execution for some specific scalar values
Status delay(compute::KernelContext* ctx, const compute::ExecSpan& batch,
compute::ExecResult* out) {
const ArraySpan& input = batch[0].array;
const uint32_t* input_values = input.GetValues<uint32_t>(1);
uint8_t* output_values = out->array_span()->buffers[1].data;

// Boolean data is stored in 1 bit per value
for (int64_t i = 0; i < input.length; ++i) {
if (input_values[i] % 16 == 0) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
bit_util::SetBitTo(output_values, i, true);
}

return Status::OK();
}

TEST_F(TestFileSystemDataset, WritePersistOrder) {
// Test for ARROW-26818
auto format = std::make_shared<IpcFileFormat>();
FileSystemDatasetWriteOptions write_options;
write_options.file_write_options = format->DefaultWriteOptions();
write_options.base_dir = "root";
write_options.partitioning = std::make_shared<HivePartitioning>(schema({}));
write_options.basename_template = "{i}.feather";

auto table = gen::Gen({gen::Step()})->FailOnError()->Table(2, 1024);
auto dataset = std::make_shared<InMemoryDataset>(table);

// register the scalar function that delays execution for some rows
// this guarantees the writing phase sees out-of-order exec batches
auto delay_func = std::make_shared<compute::ScalarFunction>("delay", compute::Arity(1),
compute::FunctionDoc());
compute::ScalarKernel delay_kernel;
delay_kernel.exec = delay;
delay_kernel.signature = compute::KernelSignature::Make({uint32()}, boolean());
ARROW_CHECK_OK(delay_func->AddKernel(delay_kernel));
ARROW_CHECK_OK(compute::GetFunctionRegistry()->AddFunction(delay_func));

for (bool preserve_order : {true, false}) {
ASSERT_OK_AND_ASSIGN(auto scanner_builder, dataset->NewScan());
ARROW_CHECK_OK(scanner_builder->UseThreads(true));
ARROW_CHECK_OK(
scanner_builder->Filter(compute::call("delay", {compute::field_ref("f0")})));
ASSERT_OK_AND_ASSIGN(auto scanner, scanner_builder->Finish());

auto fs = std::make_shared<fs::internal::MockFileSystem>(fs::kNoTime);
write_options.filesystem = fs;
write_options.preserve_order = preserve_order;

ASSERT_OK(FileSystemDataset::Write(write_options, scanner));

// Read the file back out and verify the order
ASSERT_OK_AND_ASSIGN(auto dataset_factory, FileSystemDatasetFactory::Make(
fs, {"root/0.feather"}, format, {}));
ASSERT_OK_AND_ASSIGN(auto written_dataset, dataset_factory->Finish(FinishOptions{}));
ASSERT_OK_AND_ASSIGN(scanner_builder, written_dataset->NewScan());
ASSERT_OK(scanner_builder->UseThreads(false));
ASSERT_OK_AND_ASSIGN(scanner, scanner_builder->Finish());
ASSERT_OK_AND_ASSIGN(auto actual, scanner->ToTable());
TableBatchReader reader(*actual);
std::shared_ptr<RecordBatch> batch;
ABORT_NOT_OK(reader.ReadNext(&batch));
int32_t prev = -1;
int out_of_order = 0;
while (batch != nullptr) {
for (int row = 0; row < batch->num_rows(); ++row) {
auto scalar = batch->column(0)->GetScalar(row).ValueOrDie();
auto numeric_scalar =
std::static_pointer_cast<arrow::NumericScalar<arrow::Int32Type>>(scalar);
int32_t value = numeric_scalar->value;
if (value <= prev) {
out_of_order++;
}
prev = value;
}
ABORT_NOT_OK(reader.ReadNext(&batch));
}
ASSERT_EQ(out_of_order > 0, !preserve_order);
}
}

class FileSystemWriteTest : public testing::TestWithParam<std::tuple<bool, bool>> {
using PlanFactory = std::function<std::vector<acero::Declaration>(
const FileSystemDatasetWriteOptions&,
Expand Down
6 changes: 6 additions & 0 deletions cpp/src/arrow/dataset/scanner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1000,6 +1000,7 @@ Result<acero::ExecNode*> MakeScanNode(acero::ExecPlan* plan,
auto scan_options = scan_node_options.scan_options;
auto dataset = scan_node_options.dataset;
bool require_sequenced_output = scan_node_options.require_sequenced_output;
bool implicit_ordering = scan_node_options.implicit_ordering;

RETURN_NOT_OK(NormalizeScanOptions(scan_options, dataset->schema()));

Expand Down Expand Up @@ -1070,6 +1071,11 @@ Result<acero::ExecNode*> MakeScanNode(acero::ExecPlan* plan,
}
}

Ordering ordering = Ordering::Unordered();
if (implicit_ordering) {
ordering = Ordering::Implicit();
}

return acero::MakeExecNode(
"source", plan, {},
acero::SourceNodeOptions{schema(std::move(fields)), std::move(gen), ordering});
Expand Down
7 changes: 5 additions & 2 deletions cpp/src/arrow/dataset/scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -563,14 +563,17 @@ class ARROW_DS_EXPORT ScanNodeOptions : public acero::ExecNodeOptions {
public:
explicit ScanNodeOptions(std::shared_ptr<Dataset> dataset,
std::shared_ptr<ScanOptions> scan_options,
bool require_sequenced_output = false)
bool require_sequenced_output = false,
bool implicit_ordering = false)
: dataset(std::move(dataset)),
scan_options(std::move(scan_options)),
require_sequenced_output(require_sequenced_output) {}
require_sequenced_output(require_sequenced_output),
implicit_ordering(implicit_ordering) {}

std::shared_ptr<Dataset> dataset;
std::shared_ptr<ScanOptions> scan_options;
bool require_sequenced_output;
bool implicit_ordering;
};

/// @}
Expand Down
84 changes: 84 additions & 0 deletions cpp/src/arrow/dataset/write_node_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@
#include <gmock/gmock-matchers.h>
#include <gtest/gtest.h>

#include <arrow/compute/api_scalar.h>
#include <memory>

#include "arrow/acero/exec_plan.h"
#include "arrow/acero/options.h"
#include "arrow/acero/test_nodes.h"
#include "arrow/dataset/api.h"
#include "arrow/dataset/file_base.h"
#include "arrow/dataset/file_ipc.h"
#include "arrow/dataset/partition.h"
Expand All @@ -33,12 +36,15 @@
#include "arrow/testing/generator.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/testing/matchers.h"
#include "arrow/testing/random.h"

#include "arrow/table.h"
#include "arrow/util/key_value_metadata.h"

namespace arrow {

using internal::checked_cast;

namespace dataset {

class SimpleWriteNodeTest : public ::testing::Test {
Expand Down Expand Up @@ -170,5 +176,83 @@ TEST_F(SimpleWriteNodeTest, CustomMetadata) {
ASSERT_TRUE(custom_metadata->Equals(*file_schema->metadata()));
}

TEST_F(SimpleWriteNodeTest, SequenceOutput) {
// Test for ARROW-26818
auto format = std::make_shared<IpcFileFormat>();
constexpr int kRowsPerBatch = 16;
constexpr int kNumBatches = 32;
constexpr random::SeedType kSeed = 42;
constexpr int kJitterMod = 4;
acero::RegisterTestNodes();

// Create an input table
std::shared_ptr<Table> table =
gen::Gen({gen::Step()})->FailOnError()->Table(kRowsPerBatch, kNumBatches);
auto dataset = std::make_shared<InMemoryDataset>(table);
auto scan_options = std::make_shared<ScanOptions>();
scan_options->use_threads = true;

for (bool preserve_order : {true, false}) {
auto scanner_builder = std::make_shared<ScannerBuilder>(dataset, scan_options);
EXPECT_OK_AND_ASSIGN(auto scanner, scanner_builder->Finish());
auto exprs = scan_options->projection.call()->arguments;
auto names = checked_cast<const compute::MakeStructOptions*>(
scan_options->projection.call()->options.get())
->field_names;

auto fs = std::make_shared<fs::internal::MockFileSystem>(fs::kNoTime);
dataset::WriteNodeOptions write_options(fs_write_options_);
write_options.write_options.file_write_options = format->DefaultWriteOptions();
write_options.write_options.base_dir = "root";
write_options.write_options.partitioning =
std::make_shared<HivePartitioning>(schema({}));
write_options.write_options.basename_template = "{i}.feather";
write_options.write_options.filesystem = fs;
write_options.write_options.preserve_order = preserve_order;

// test plan of FileSystemDataset::Write with a jitter node that guarantees exec
// batches are out of order
acero::Declaration plan = acero::Declaration::Sequence({
{"scan",
ScanNodeOptions{dataset, scanner->options(), /*require_sequenced_output=*/false,
/*implicit_ordering=*/true}},
{"filter", acero::FilterNodeOptions{scanner->options()->filter}},
{"project", acero::ProjectNodeOptions{std::move(exprs), std::move(names)}},
{"jitter", acero::JitterNodeOptions(kSeed, kJitterMod)},
{"write", write_options},
});

ASSERT_OK(DeclarationToStatus(plan));

// Read the file back out and verify the order
ASSERT_OK_AND_ASSIGN(auto dataset_factory, FileSystemDatasetFactory::Make(
fs, {"root/0.feather"}, format, {}));
ASSERT_OK_AND_ASSIGN(auto written_dataset, dataset_factory->Finish(FinishOptions{}));
ASSERT_OK_AND_ASSIGN(scanner_builder, written_dataset->NewScan());
ASSERT_OK(scanner_builder->UseThreads(false));
ASSERT_OK_AND_ASSIGN(scanner, scanner_builder->Finish());
ASSERT_OK_AND_ASSIGN(auto actual, scanner->ToTable());
TableBatchReader reader(*actual);
std::shared_ptr<RecordBatch> batch;
ABORT_NOT_OK(reader.ReadNext(&batch));
int32_t prev = -1;
auto out_of_order = false;
while (batch != nullptr) {
for (int row = 0; row < batch->num_rows(); ++row) {
auto scalar = batch->column(0)->GetScalar(row).ValueOrDie();
auto numeric_scalar =
std::static_pointer_cast<arrow::NumericScalar<arrow::Int32Type>>(scalar);
int32_t value = numeric_scalar->value;
if (value <= prev) {
out_of_order = true;
}
prev = value;
}
ABORT_NOT_OK(reader.ReadNext(&batch));
}
ASSERT_EQ(out_of_order, !preserve_order);
}
}

} // namespace dataset
} // namespace arrow
2 changes: 2 additions & 0 deletions python/pyarrow/_dataset.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -4007,6 +4007,7 @@ def _filesystemdataset_write(
str basename_template not None,
FileSystem filesystem not None,
Partitioning partitioning not None,
bool preserve_order,
FileWriteOptions file_options not None,
int max_partitions,
object file_visitor,
Expand All @@ -4029,6 +4030,7 @@ def _filesystemdataset_write(
c_options.filesystem = filesystem.unwrap()
c_options.base_dir = tobytes(_stringify_path(base_dir))
c_options.partitioning = partitioning.unwrap()
c_options.preserve_order = preserve_order
c_options.max_partitions = max_partitions
c_options.max_open_files = max_open_files
c_options.max_rows_per_file = max_rows_per_file
Expand Down
17 changes: 11 additions & 6 deletions python/pyarrow/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -848,9 +848,9 @@ def _ensure_write_partitioning(part, schema, flavor):


def write_dataset(data, base_dir, *, basename_template=None, format=None,
partitioning=None, partitioning_flavor=None, schema=None,
filesystem=None, file_options=None, use_threads=True,
max_partitions=None, max_open_files=None,
partitioning=None, partitioning_flavor=None,
schema=None, filesystem=None, file_options=None, use_threads=True,
preserve_order=False, max_partitions=None, max_open_files=None,
max_rows_per_file=None, min_rows_per_group=None,
max_rows_per_group=None, file_visitor=None,
existing_data_behavior='error', create_dir=True):
Expand Down Expand Up @@ -893,7 +893,12 @@ def write_dataset(data, base_dir, *, basename_template=None, format=None,
``FileFormat.make_write_options()`` function.
use_threads : bool, default True
Write files in parallel. If enabled, then maximum parallelism will be
used determined by the number of available CPU cores.
used determined by the number of available CPU cores. Using multiple
threads may change the order of rows in the written dataset.
preserve_order : bool, default False
Preserve the order of rows. If enabled, order of rows in the dataset are
guaranteed to be preserved even if use_threads is enabled. This may cause
notable performance degradation.
max_partitions : int, default 1024
Maximum number of partitions any batch may be written into.
max_open_files : int, default 1024
Expand Down Expand Up @@ -1029,7 +1034,7 @@ def file_visitor(written_file):

_filesystemdataset_write(
scanner, base_dir, basename_template, filesystem, partitioning,
file_options, max_partitions, file_visitor, existing_data_behavior,
max_open_files, max_rows_per_file,
preserve_order, file_options, max_partitions, file_visitor,
existing_data_behavior, max_open_files, max_rows_per_file,
min_rows_per_group, max_rows_per_group, create_dir
)
1 change: 1 addition & 0 deletions python/pyarrow/includes/libarrow_dataset.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:
shared_ptr[CFileSystem] filesystem
c_string base_dir
shared_ptr[CPartitioning] partitioning
c_bool preserve_order
int max_partitions
c_string basename_template
function[cb_writer_finish_internal] writer_pre_finish
Expand Down

0 comments on commit 0377ef9

Please sign in to comment.