Skip to content

Commit

Permalink
ARROW-17762: Add ordering information to exec nodes and add an index …
Browse files Browse the repository at this point in the history
…to exec batches
  • Loading branch information
westonpace committed Sep 19, 2022
1 parent 5f65bce commit 467f26f
Show file tree
Hide file tree
Showing 23 changed files with 285 additions and 38 deletions.
1 change: 1 addition & 0 deletions cpp/examples/arrow/compute_register_example.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ class ExampleNode : public cp::ExecNode {
/*output_schema=*/input->output_schema(), /*num_outputs=*/1) {}

const char* kind_name() const override { return "ExampleNode"; }
const std::vector<int>& ordering() override { return ExecNode::kNoOrdering; }

arrow::Status StartProducing() override {
outputs_[0]->InputFinished(this, 0);
Expand Down
5 changes: 4 additions & 1 deletion cpp/src/arrow/compute/exec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ ExecBatch::ExecBatch(const RecordBatch& batch)
}

bool ExecBatch::Equals(const ExecBatch& other) const {
return guarantee == other.guarantee && values == other.values;
return index == other.index && guarantee == other.guarantee && values == other.values;
}

void PrintTo(const ExecBatch& batch, std::ostream* os) {
Expand All @@ -83,6 +83,9 @@ void PrintTo(const ExecBatch& batch, std::ostream* os) {
if (batch.guarantee != literal(true)) {
*os << indent << "Guarantee: " << batch.guarantee.ToString() << "\n";
}
if (batch.index != ExecBatch::kNoOrdering) {
*os << indent << "Index: " << batch.index << "\n";
}

int i = 0;
for (const Datum& value : batch.values) {
Expand Down
20 changes: 20 additions & 0 deletions cpp/src/arrow/compute/exec.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,26 @@ struct ARROW_EXPORT ExecBatch {
/// whether any values are Scalar.
int64_t length = 0;

/// Indicates a batch is not part of an ordered stream
static constexpr int32_t kNoOrdering = -1;
/// The index of the exec batch in an ordered stream of batches
///
/// Several operations can impose an ordering on their output. Because
/// batches travel through the execution graph at different speeds there
/// is no guarantee those batches will arrive in the same order they are
/// emitted.
///
/// If there is no ordering then the index should be kNoOrdering. If a node rearranges
/// rows within a batch it will destroy the ordering (e.g. a hash-join node) and should
/// set the index of output batches to kNoOrdering. Other nodes which leave
/// row-in-batch ordering alone should maintain the index on their output batches.
/// Nodes that impose an ordering (e.g. sort) should assign index appropriately.
///
/// An ordering must be monotonic and have no gaps. This can be somewhat tricky to
/// maintain. For example, when filtering, an implementation may need to emit empty
/// batches to maintain correct ordering.
int32_t index = kNoOrdering;

/// \brief The sum of bytes in each buffer referenced by the batch
///
/// Note: Scalars are not counted
Expand Down
9 changes: 9 additions & 0 deletions cpp/src/arrow/compute/exec/aggregate_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,11 @@ class ScalarAggregateNode : public ExecNode {

const char* kind_name() const override { return "ScalarAggregateNode"; }

// There is currently no meaningful ordering to the output of the scalar aggregate
// although in the future we may want to allow sorting here since we will have already
// gathered all the data
const std::vector<int32_t>& ordering() override { return ExecNode::kNoOrdering; }

Status DoConsume(const ExecSpan& batch, size_t thread_index) {
util::tracing::Span span;
START_COMPUTE_SPAN(span, "Consume",
Expand Down Expand Up @@ -360,6 +365,10 @@ class GroupByNode : public ExecNode {

const char* kind_name() const override { return "GroupByNode"; }

// There is currently no ordering assigned to the output although we may want
// to consider a future addition to allow ordering by grouping keys
const std::vector<int32_t>& ordering() override { return ExecNode::kNoOrdering; }

Status Consume(ExecSpan batch) {
util::tracing::Span span;
START_COMPUTE_SPAN(span, "Consume",
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/arrow/compute/exec/asof_join_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -872,6 +872,8 @@ class AsofJoinNode : public ExecNode {
return indices_of_by_key_;
}

const std::vector<int>& ordering() override { return indices_of_on_key_; }

static Status is_valid_on_field(const std::shared_ptr<Field>& field) {
switch (field->type()->id()) {
case Type::INT8:
Expand Down
9 changes: 9 additions & 0 deletions cpp/src/arrow/compute/exec/exec_plan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,11 @@ ExecNode::ExecNode(ExecPlan* plan, NodeVector inputs,
}
}

const std::vector<int32_t> ExecNode::kImplicitOrdering = {
ExecNode::kImplicitOrderingColumn};

const std::vector<int32_t> ExecNode::kNoOrdering = {};

Status ExecNode::Init() { return Status::OK(); }

Status ExecNode::Validate() const {
Expand Down Expand Up @@ -536,10 +541,12 @@ void MapNode::SubmitTask(std::function<Result<ExecBatch>(ExecBatch)> map_fn,
}
auto task = [this, map_fn, batch]() {
auto guarantee = batch.guarantee;
auto index = batch.index;
auto output_batch = map_fn(std::move(batch));
if (ErrorIfNotOk(output_batch.status())) {
return output_batch.status();
}
output_batch->index = index;
output_batch->guarantee = guarantee;
outputs_[0]->InputReceived(this, output_batch.MoveValueUnsafe());
return Status::OK();
Expand All @@ -562,6 +569,8 @@ void MapNode::Finish(Status finish_st /*= Status::OK()*/) {
this->finished_.MarkFinished(finish_st);
}

const std::vector<int32_t>& MapNode::ordering() { return inputs_[0]->ordering(); }

std::shared_ptr<RecordBatchReader> MakeGeneratorReader(
std::shared_ptr<Schema> schema, std::function<Future<std::optional<ExecBatch>>()> gen,
MemoryPool* pool) {
Expand Down
37 changes: 37 additions & 0 deletions cpp/src/arrow/compute/exec/exec_plan.h
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,41 @@ class ARROW_EXPORT ExecNode {
/// \brief Stop producing definitively to all outputs
virtual void StopProducing() = 0;

static constexpr int32_t kImplicitOrderingColumn = -1;
static const std::vector<int32_t> kImplicitOrdering;
static const std::vector<int32_t> kNoOrdering;

/// \brief The ordering of the node
///
/// If a node has an ordering then output batches will be labeled with an `index`
/// which should determine the position of the batch in the stream according to
/// the ordering.
///
/// An empty vector indicates that there is no ordering for the node.
///
/// The ordering is a list of column indices which the data is sorted by. For
/// example, an ordering of {1, 0} would mean the data is first sorted by column
/// 1 and then by column 0.
///
/// Nodes which impose an ordering will typically determine their ordering from
/// node options and should return that here. Nodes which pass data through will
/// typically forward the ordering of their input. Nodes which rearrange data and
/// destroy any ordering should return an empty vector.
///
/// There is a special ordering case, represnted by the vector {kImplicitOrderingColumn}
/// which is used to represent the "implicit order" of the data. For example, if the
/// input to Acero is an in-memory table then the implicit ordering is the order of the
/// data according to the row number of the table (even though row number may not be a
/// column). If the input is a dataset then the "implicit order" is the ordering
/// (fragment_index,batch_index). Effectively, the implicit order can be any ordering
/// which is not represented by columns in the dataset.
///
/// The implict ordering cannot be used, for example, to implement an ordered streaming
/// group by or an ordered streaming join. However, it can be used for things like a
/// fetch node or to guaranteed the output of the plan is reassambled in the same input
/// order.
virtual const std::vector<int32_t>& ordering() = 0;

/// \brief A future which will be marked finished when this node has stopped producing.
virtual Future<> finished() { return finished_; }

Expand Down Expand Up @@ -396,6 +431,8 @@ class ARROW_EXPORT MapNode : public ExecNode {

void StopProducing() override;

const std::vector<int32_t>& ordering() override;

protected:
void SubmitTask(std::function<Result<ExecBatch>(ExecBatch)> map_fn, ExecBatch batch);

Expand Down
12 changes: 7 additions & 5 deletions cpp/src/arrow/compute/exec/expression.cc
Original file line number Diff line number Diff line change
Expand Up @@ -464,13 +464,14 @@ Result<Expression> Expression::Bind(const Schema& in_schema,
}

Result<ExecBatch> MakeExecBatch(const Schema& full_schema, const Datum& partial,
Expression guarantee) {
int32_t index, Expression guarantee) {
ExecBatch out;

if (partial.kind() == Datum::RECORD_BATCH) {
const auto& partial_batch = *partial.record_batch();
out.guarantee = std::move(guarantee);
out.length = partial_batch.num_rows();
out.index = index;

ARROW_ASSIGN_OR_RAISE(auto known_field_values,
ExtractKnownFieldValues(out.guarantee));
Expand Down Expand Up @@ -511,14 +512,14 @@ Result<ExecBatch> MakeExecBatch(const Schema& full_schema, const Datum& partial,
ARROW_ASSIGN_OR_RAISE(auto partial_batch,
RecordBatch::FromStructArray(partial.make_array()));

return MakeExecBatch(full_schema, partial_batch, std::move(guarantee));
return MakeExecBatch(full_schema, partial_batch, index, std::move(guarantee));
}

if (partial.is_scalar()) {
ARROW_ASSIGN_OR_RAISE(auto partial_array,
MakeArrayFromScalar(*partial.scalar(), 1));
ARROW_ASSIGN_OR_RAISE(
auto out, MakeExecBatch(full_schema, partial_array, std::move(guarantee)));
ARROW_ASSIGN_OR_RAISE(auto out, MakeExecBatch(full_schema, partial_array, index,
std::move(guarantee)));

for (Datum& value : out.values) {
if (value.is_scalar()) continue;
Expand All @@ -534,7 +535,8 @@ Result<ExecBatch> MakeExecBatch(const Schema& full_schema, const Datum& partial,
Result<Datum> ExecuteScalarExpression(const Expression& expr, const Schema& full_schema,
const Datum& partial_input,
compute::ExecContext* exec_context) {
ARROW_ASSIGN_OR_RAISE(auto input, MakeExecBatch(full_schema, partial_input));
ARROW_ASSIGN_OR_RAISE(
auto input, MakeExecBatch(full_schema, partial_input, ExecBatch::kNoOrdering));
return ExecuteScalarExpression(expr, input, exec_context);
}

Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/compute/exec/expression.h
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ Result<Expression> SimplifyWithGuarantee(Expression,
/// RecordBatch which may have missing or incorrectly ordered columns.
/// Missing fields will be replaced with null scalars.
ARROW_EXPORT Result<ExecBatch> MakeExecBatch(const Schema& full_schema,
const Datum& partial,
const Datum& partial, int32_t index,
Expression guarantee = literal(true));

/// Execute a scalar expression against the provided state and input ExecBatch. This
Expand Down
6 changes: 4 additions & 2 deletions cpp/src/arrow/compute/exec/expression_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,8 @@ TEST(ExpressionUtils, StripOrderPreservingCasts) {
TEST(ExpressionUtils, MakeExecBatch) {
auto Expect = [](std::shared_ptr<RecordBatch> partial_batch) {
SCOPED_TRACE(partial_batch->ToString());
ASSERT_OK_AND_ASSIGN(auto batch, MakeExecBatch(*kBoringSchema, partial_batch));
ASSERT_OK_AND_ASSIGN(
auto batch, MakeExecBatch(*kBoringSchema, partial_batch, ExecBatch::kNoOrdering));

ASSERT_EQ(batch.num_values(), kBoringSchema->num_fields());
for (int i = 0; i < kBoringSchema->num_fields(); ++i) {
Expand Down Expand Up @@ -218,7 +219,8 @@ TEST(ExpressionUtils, MakeExecBatch) {

auto duplicated_names =
RecordBatch::Make(schema({GetField("i32"), GetField("i32")}), kNumRows, {i32, i32});
ASSERT_RAISES(Invalid, MakeExecBatch(*kBoringSchema, duplicated_names));
ASSERT_RAISES(Invalid,
MakeExecBatch(*kBoringSchema, duplicated_names, ExecBatch::kNoOrdering));
}

class WidgetifyOptions : public compute::FunctionOptions {
Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/compute/exec/hash_join_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -772,6 +772,7 @@ class HashJoinNode : public ExecNode {
}

const char* kind_name() const override { return "HashJoinNode"; }
const std::vector<int32_t>& ordering() override { return ExecNode::kNoOrdering; }

Status OnBuildSideBatch(size_t thread_index, ExecBatch batch) {
std::lock_guard<std::mutex> guard(build_side_mutex_);
Expand Down
12 changes: 12 additions & 0 deletions cpp/src/arrow/compute/exec/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,20 @@ class ARROW_EXPORT SourceNodeOptions : public ExecNodeOptions {
static Result<std::shared_ptr<SourceNodeOptions>> FromTable(const Table& table,
arrow::internal::Executor*);

/// \brief The schema of the data generated by generator
std::shared_ptr<Schema> output_schema;
/// \brief A (potentially asynchronous) source of data
std::function<Future<std::optional<ExecBatch>>()> generator;
/// \brief The ordering of the data
///
/// This can be set if `generator` is guaranteed to generate data according to some kind
/// of ordering. The source node will make no attempt to verify this fact but will
/// assign batch indices as if the data is ordered in this way.
///
/// Data from a source node always has some kind of ordering. The default (an empty
/// vector) will actually assign the implicit ordering to outgoing data. \see
/// ExecNode::ordering for more details
std::vector<int32_t> asserted_ordering;
};

/// \brief An extended Source node which accepts a table
Expand Down
40 changes: 40 additions & 0 deletions cpp/src/arrow/compute/exec/plan_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <functional>
#include <memory>
#include <unordered_set>

#include "arrow/compute/exec.h"
#include "arrow/compute/exec/exec_plan.h"
Expand Down Expand Up @@ -1481,5 +1482,44 @@ TEST(ExecPlan, SourceEnforcesBatchLimit) {
}
}

TEST(BatchOrder, FilterNode) {
ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
std::shared_ptr<Schema> test_schema =
schema({field("a", int32()), field("b", boolean())});
auto random_data =
MakeRandomBatches(test_schema,
/*num_batches=*/5, /*batch_size=*/10, /*ordered=*/true);

// Insert a batch that starts empty
random_data.batches[1] = ExecBatchFromJSON({int32(), boolean()}, "[]");
random_data.batches[1].values.emplace_back(1);
random_data.batches[1].index = 1;

// Insert a batch that will be completely filtered
random_data.batches[2] = ExecBatchFromJSON({int32(), boolean()}, "[[-1, false]]");
random_data.batches[2].values.emplace_back(2);
random_data.batches[2].index = 2;

ASSERT_OK_AND_ASSIGN(
std::vector<ExecBatch> filtered_batches,
DeclarationToExecBatches(Declaration::Sequence(
{{"source",
SourceNodeOptions{random_data.schema, random_data.gen(/*parallel=*/true,
/*slow=*/true)}},
{"filter", FilterNodeOptions(greater(field_ref("a"), literal(0)))}})));

std::unordered_set<int32_t> filtered_batch_indices;
int32_t num_rows = 0;
for (const auto& batch : filtered_batches) {
filtered_batch_indices.insert(batch.index);
num_rows += batch.length;
if (batch.index == 2) {
// Sanity check that the filter is actually applied
ASSERT_EQ(0, batch.length);
}
}
ASSERT_EQ(std::unordered_set<int32_t>({0, 1, 2, 3, 4}), filtered_batch_indices);
}

} // namespace compute
} // namespace arrow
6 changes: 6 additions & 0 deletions cpp/src/arrow/compute/exec/sink_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,9 @@ class SinkNode : public ExecNode {
}
[[noreturn]] void StopProducing(ExecNode* output) override { NoOutputs(); }

// There is no output and so there is no ordering
const std::vector<int32_t>& ordering() override { return ExecNode::kNoOrdering; }

void StopProducing() override {
EVENT(span_, "StopProducing");

Expand Down Expand Up @@ -319,6 +322,9 @@ class ConsumingSinkNode : public ExecNode, public BackpressureControl {
}
[[noreturn]] void StopProducing(ExecNode* output) override { NoOutputs(); }

// sink nodes have no output and no ordering
const std::vector<int32_t>& ordering() override { return ExecNode::kNoOrdering; }

void Pause() override { inputs_[0]->PauseProducing(this, ++backpressure_counter_); }

void Resume() override { inputs_[0]->ResumeProducing(this, ++backpressure_counter_); }
Expand Down
20 changes: 16 additions & 4 deletions cpp/src/arrow/compute/exec/source_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,25 @@ namespace {

struct SourceNode : ExecNode {
SourceNode(ExecPlan* plan, std::shared_ptr<Schema> output_schema,
AsyncGenerator<std::optional<ExecBatch>> generator)
AsyncGenerator<std::optional<ExecBatch>> generator,
std::vector<int32_t> asserted_ordering)
: ExecNode(plan, {}, {}, std::move(output_schema),
/*num_outputs=*/1),
generator_(std::move(generator)) {}
generator_(std::move(generator)) {
if (asserted_ordering.size() > 0) {
ordering_ = std::move(asserted_ordering);
} else {
ordering_ = {ExecNode::kImplicitOrderingColumn};
}
}

static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
const ExecNodeOptions& options) {
RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "SourceNode"));
const auto& source_options = checked_cast<const SourceNodeOptions&>(options);
return plan->EmplaceNode<SourceNode>(plan, source_options.output_schema,
source_options.generator);
source_options.generator,
source_options.asserted_ordering);
}

const char* kind_name() const override { return "SourceNode"; }
Expand Down Expand Up @@ -214,6 +222,8 @@ struct SourceNode : ExecNode {
}
}

const std::vector<int>& ordering() override { return ordering_; }

private:
std::mutex mutex_;
int32_t backpressure_counter_{0};
Expand All @@ -222,11 +232,13 @@ struct SourceNode : ExecNode {
bool started_ = false;
int batch_count_{0};
AsyncGenerator<std::optional<ExecBatch>> generator_;
std::vector<int32_t> ordering_;
};

struct TableSourceNode : public SourceNode {
TableSourceNode(ExecPlan* plan, std::shared_ptr<Table> table, int64_t batch_size)
: SourceNode(plan, table->schema(), TableGenerator(*table, batch_size)) {}
: SourceNode(plan, table->schema(), TableGenerator(*table, batch_size),
ExecNode::kImplicitOrdering) {}

static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
const ExecNodeOptions& options) {
Expand Down
Loading

0 comments on commit 467f26f

Please sign in to comment.