diff --git a/cpp/proto/substrait/extension_rels.proto b/cpp/proto/substrait/extension_rels.proto index 48c5e3a345249..df4d29a0a3836 100644 --- a/cpp/proto/substrait/extension_rels.proto +++ b/cpp/proto/substrait/extension_rels.proto @@ -35,8 +35,10 @@ message AsOfJoinRel { // As-Of-Join key message AsOfJoinKey { // A field reference defining the on-key + // The type and units of the referenced field must be the same across all inputs .substrait.Expression on = 1; - // A set of field references defining the by-key + // A list of field references defining the by-key + // The types corresponding to the referenced fields must be the same across all inputs repeated .substrait.Expression by = 2; } } diff --git a/cpp/src/arrow/compute/exec/asof_join_node.cc b/cpp/src/arrow/compute/exec/asof_join_node.cc index 39ed248840b10..e262a254a2e27 100644 --- a/cpp/src/arrow/compute/exec/asof_join_node.cc +++ b/cpp/src/arrow/compute/exec/asof_join_node.cc @@ -86,6 +86,10 @@ class ConcurrentQueue { T Pop() { std::unique_lock lock(mutex_); cond_.wait(lock, [&] { return !queue_.empty(); }); + return PopUnlocked(); + } + + T PopUnlocked() { auto item = queue_.front(); queue_.pop(); return item; @@ -93,18 +97,28 @@ class ConcurrentQueue { void Push(const T& item) { std::unique_lock lock(mutex_); + return PushUnlocked(item); + } + + void PushUnlocked(const T& item) { queue_.push(item); cond_.notify_one(); } void Clear() { std::unique_lock lock(mutex_); - queue_ = std::queue(); + ClearUnlocked(); } + void ClearUnlocked() { queue_ = std::queue(); } + std::optional TryPop() { - // Try to pop the oldest value from the queue (or return nullopt if none) std::unique_lock lock(mutex_); + return TryPopUnlocked(); + } + + std::optional TryPopUnlocked() { + // Try to pop the oldest value from the queue (or return nullopt if none) if (queue_.empty()) { return std::nullopt; } else { @@ -127,6 +141,9 @@ class ConcurrentQueue { size_t UnsyncSize() const { return queue_.size(); } + protected: + std::mutex& GetMutex() { return mutex_; } + private: std::queue queue_; mutable std::mutex mutex_; @@ -242,8 +259,8 @@ class BackpressureController : public BackpressureControl { std::atomic& backpressure_counter) : node_(node), output_(output), backpressure_counter_(backpressure_counter) {} - void Pause() override { node_->PauseProducing(output_, backpressure_counter_++); } - void Resume() override { node_->ResumeProducing(output_, backpressure_counter_++); } + void Pause() override { node_->PauseProducing(output_, ++backpressure_counter_); } + void Resume() override { node_->ResumeProducing(output_, ++backpressure_counter_); } private: ExecNode* node_; @@ -310,23 +327,27 @@ class BackpressureConcurrentQueue : public ConcurrentQueue { : handler_(std::move(handler)) {} T Pop() { + std::unique_lock lock(ConcurrentQueue::GetMutex()); DoHandle do_handle(*this); - return ConcurrentQueue::Pop(); + return ConcurrentQueue::PopUnlocked(); } void Push(const T& item) { + std::unique_lock lock(ConcurrentQueue::GetMutex()); DoHandle do_handle(*this); - ConcurrentQueue::Push(item); + ConcurrentQueue::PushUnlocked(item); } void Clear() { + std::unique_lock lock(ConcurrentQueue::GetMutex()); DoHandle do_handle(*this); - ConcurrentQueue::Clear(); + ConcurrentQueue::ClearUnlocked(); } std::optional TryPop() { + std::unique_lock lock(ConcurrentQueue::GetMutex()); DoHandle do_handle(*this); - return ConcurrentQueue::TryPop(); + return ConcurrentQueue::TryPopUnlocked(); } private: @@ -914,10 +935,29 @@ class AsofJoinNode : public ExecNode { } } - void Process() { + template + struct Defer { + Callable callable; + explicit Defer(Callable callable) : callable(std::move(callable)) {} + ~Defer() noexcept { callable(); } + }; + + bool CheckEnded() { + if (state_.at(0)->Finished()) { + ErrorIfNotOk(plan_->ScheduleTask([this] { + Defer cleanup([this]() { finished_.MarkFinished(); }); + outputs_[0]->InputFinished(this, batches_produced_); + return Status::OK(); + })); + return false; + } + return true; + } + + bool Process() { std::lock_guard guard(gate_); - if (finished_.is_finished()) { - return; + if (!CheckEnded()) { + return false; } // Process batches while we have data @@ -935,7 +975,7 @@ class AsofJoinNode : public ExecNode { })); } else { ErrorIfNotOk(result.status()); - return; + return false; } } @@ -944,13 +984,13 @@ class AsofJoinNode : public ExecNode { // // It may happen here in cases where InputFinished was called before we were finished // producing results (so we didn't know the output size at that time) - if (state_.at(0)->Finished()) { - ErrorIfNotOk(plan_->ScheduleTask([this] { - outputs_[0]->InputFinished(this, batches_produced_); - return Status::OK(); - })); - finished_.MarkFinished(); + if (!CheckEnded()) { + return false; } + + // There is no more we can do now but there is still work remaining for later when + // more data arrives. + return true; } void ProcessThread() { @@ -958,7 +998,9 @@ class AsofJoinNode : public ExecNode { if (!process_.Pop()) { return; } - Process(); + if (!Process()) { + return; + } } } @@ -1165,7 +1207,6 @@ class AsofJoinNode : public ExecNode { size_t n_by = 0; for (size_t i = 0; i < input_keys.size(); ++i) { const auto& by_key = input_keys[i].by_key; - std::cout << "Input: " << i << " by_key.size()=" << by_key.size() << std::endl; if (i == 0) { n_by = by_key.size(); } else if (n_by != by_key.size()) { diff --git a/cpp/src/arrow/compute/exec/asof_join_node_test.cc b/cpp/src/arrow/compute/exec/asof_join_node_test.cc index 36ae480012b84..66ed873620d59 100644 --- a/cpp/src/arrow/compute/exec/asof_join_node_test.cc +++ b/cpp/src/arrow/compute/exec/asof_join_node_test.cc @@ -1030,7 +1030,9 @@ TRACED_TEST(AsofJoinTest, TestUnorderedOnKey, { schema({field("time", int64()), field("key", int32()), field("r0_v0", float64())})); }) -TEST(AsofJoinTest, BackpressureDemo) { +template +void TestBackpressureDemo(BatchesMaker maker, int num_batches, int batch_size, + double fast_delay, double slow_delay, bool noisy = false) { auto l_schema = schema({field("time", int32()), field("key", int32()), field("l_value", int32())}); auto r0_schema = @@ -1038,34 +1040,76 @@ TEST(AsofJoinTest, BackpressureDemo) { auto r1_schema = schema({field("time", int32()), field("key", int32()), field("r1_value", int32())}); - auto make_integer_batches = [](const std::shared_ptr& schema, int shift) { - constexpr int num_batches = 10, batch_size = 1; - return MakeIntegerBatches({[](int row) -> int64_t { return row; }, - [](int row) -> int64_t { return row / num_batches; }, - [shift](int row) -> int64_t { return row * 10 + shift; }}, - schema, num_batches, batch_size); + auto make_shift = [&maker, num_batches, batch_size]( + const std::shared_ptr& schema, int shift) { + return maker({[](int row) -> int64_t { return row; }, + [num_batches](int row) -> int64_t { return row / num_batches; }, + [shift](int row) -> int64_t { return row * 10 + shift; }}, + schema, num_batches, batch_size); }; - ASSERT_OK_AND_ASSIGN(auto l_batches, make_integer_batches(l_schema, 0)); - ASSERT_OK_AND_ASSIGN(auto r0_batches, make_integer_batches(r0_schema, 1)); - ASSERT_OK_AND_ASSIGN(auto r1_batches, make_integer_batches(r1_schema, 2)); + ASSERT_OK_AND_ASSIGN(auto l_batches, make_shift(l_schema, 0)); + ASSERT_OK_AND_ASSIGN(auto r0_batches, make_shift(r0_schema, 1)); + ASSERT_OK_AND_ASSIGN(auto r1_batches, make_shift(r1_schema, 2)); compute::Declaration l_src = { - "source", SourceNodeOptions(l_batches.schema, - MakeNoisyDelayedGen(l_batches, "0:fast", 0.01))}; + "source", SourceNodeOptions( + l_schema, MakeDelayedGen(l_batches, "0:fast", fast_delay, noisy))}; compute::Declaration r0_src = { - "source", SourceNodeOptions(r0_batches.schema, - MakeNoisyDelayedGen(r0_batches, "1:slow", 0.1))}; + "source", SourceNodeOptions( + r0_schema, MakeDelayedGen(r0_batches, "1:slow", slow_delay, noisy))}; compute::Declaration r1_src = { - "source", SourceNodeOptions(r1_batches.schema, - MakeNoisyDelayedGen(r1_batches, "2:fast", 0.1))}; + "source", SourceNodeOptions( + r1_schema, MakeDelayedGen(r1_batches, "2:fast", fast_delay, noisy))}; compute::Declaration asofjoin = { "asofjoin", {l_src, r0_src, r1_src}, GetRepeatedOptions(3, "time", {"key"}, 1000)}; - ASSERT_OK_AND_ASSIGN(std::vector> batches, - DeclarationToBatches(asofjoin)); + ASSERT_OK_AND_ASSIGN(std::unique_ptr batch_reader, + DeclarationToReader(asofjoin, /*use_threads=*/false)); - ASSERT_EQ(l_batches.batches.size(), batches.size()); + int64_t total_length = 0; + for (;;) { + ASSERT_OK_AND_ASSIGN(auto batch, batch_reader->Next()); + if (!batch) { + break; + } + total_length += batch->num_rows(); + } + ASSERT_EQ(static_cast(num_batches * batch_size), total_length); +} + +TEST(AsofJoinTest, BackpressureDemoWithBatches) { + return TestBackpressureDemo(MakeIntegerBatches, /*num_batches=*/10, /*batch_size=*/1, + /*fast_delay=*/0.01, /*slow_delay=*/0.1, /*noisy=*/true); +} + +namespace { + +Result>> MakeIntegerBatchGenForTest( + const std::vector>& gens, + const std::shared_ptr& schema, int num_batches, int batch_size) { + return MakeIntegerBatchGen(gens, schema, num_batches, batch_size); +} + +template +T GetEnvValue(const std::string& var, T default_value) { + const char* str = std::getenv(var.c_str()); + if (str == NULLPTR) { + return default_value; + } + std::stringstream s(str); + T value = default_value; + s >> value; + return value; +} + +} // namespace + +TEST(AsofJoinTest, BackpressureDemoWithBatchesGen) { + int num_batches = GetEnvValue("ARROW_BACKPRESSURE_DEMO_NUM_BATCHES", 10); + int batch_size = GetEnvValue("ARROW_BACKPRESSURE_DEMO_BATCH_SIZE", 1); + return TestBackpressureDemo(MakeIntegerBatchGenForTest, num_batches, batch_size, + /*fast_delay=*/0.001, /*slow_delay=*/0.01); } } // namespace compute diff --git a/cpp/src/arrow/compute/exec/plan_test.cc b/cpp/src/arrow/compute/exec/plan_test.cc index 02036c493b368..6de09e89f970b 100644 --- a/cpp/src/arrow/compute/exec/plan_test.cc +++ b/cpp/src/arrow/compute/exec/plan_test.cc @@ -1495,13 +1495,15 @@ TEST(ExecPlanExecution, BackpressureDemo) { BatchesWithSchema two = MakeRandomBatches(schema_two, /*num_batches=*/100); BatchesWithSchema three = MakeRandomBatches(schema_three, /*num_batches=*/100); + constexpr bool noisy = true; compute::Declaration src_one = { - "source", SourceNodeOptions(one.schema, MakeNoisyDelayedGen(one, "0:fast", 0.01))}; + "source", + SourceNodeOptions(one.schema, MakeDelayedGen(one, "0:fast", 0.01, noisy))}; compute::Declaration src_two = { - "source", SourceNodeOptions(two.schema, MakeNoisyDelayedGen(two, "1:slow", 0.1))}; + "source", SourceNodeOptions(two.schema, MakeDelayedGen(two, "1:slow", 0.1, noisy))}; compute::Declaration src_three = { "source", - SourceNodeOptions(three.schema, MakeNoisyDelayedGen(three, "2:fast", 0.01))}; + SourceNodeOptions(three.schema, MakeDelayedGen(three, "2:fast", 0.01, noisy))}; compute::Declaration concat = { "concat", {src_one, src_two, src_three}, ConcatNodeOptions()}; diff --git a/cpp/src/arrow/compute/exec/test_nodes.cc b/cpp/src/arrow/compute/exec/test_nodes.cc index 74d55cf298ea8..2ff411500cca3 100644 --- a/cpp/src/arrow/compute/exec/test_nodes.cc +++ b/cpp/src/arrow/compute/exec/test_nodes.cc @@ -28,6 +28,7 @@ #include "arrow/compute/exec/util.h" #include "arrow/io/interfaces.h" #include "arrow/util/checked_cast.h" +#include "arrow/util/iterator.h" namespace arrow { @@ -269,38 +270,60 @@ void RegisterConcatNode(ExecFactoryRegistry* registry) { DCHECK_OK(registry->AddFactory("concat", ConcatNode::Make)); } -// Make a source that is both noisy (prints when it emits) -// and slowed by some delay -AsyncGenerator> MakeNoisyDelayedGen(BatchesWithSchema src, - std::string label, - double delay_sec) { - std::vector> opt_batches = ::arrow::internal::MapVector( - [](ExecBatch batch) { return std::make_optional(std::move(batch)); }, src.batches); +AsyncGenerator> MakeDelayedGen( + Iterator> src, std::string label, double delay_sec, + bool noisy) { struct DelayedIoGenState { - DelayedIoGenState(std::vector> batches, double delay_sec, - std::string label) - : batches(std::move(batches)), delay_sec(delay_sec), label(std::move(label)) {} + DelayedIoGenState(Iterator> batch_it, double delay_sec, + std::string label, bool noisy) + : batch_it(std::move(batch_it)), + delay_sec(delay_sec), + label(std::move(label)), + noisy(noisy) {} std::optional Next() { - if (index == batches.size()) { + Result> opt_batch_res = batch_it.Next(); + if (!opt_batch_res.ok()) { + return std::nullopt; + } + std::optional opt_batch = opt_batch_res.ValueOrDie(); + if (!opt_batch) { return std::nullopt; } - std::cout << label + ": asking for batch(" + std::to_string(index) + ")\n"; + if (noisy) { + std::cout << label + ": asking for batch(" + std::to_string(index) + ")\n"; + } SleepFor(delay_sec); - return batches[index++]; + ++index; + return *opt_batch; } - std::vector> batches; + Iterator> batch_it; double delay_sec; std::string label; + bool noisy; std::size_t index = 0; }; - auto state = std::make_shared(std::move(opt_batches), delay_sec, - std::move(label)); + auto state = std::make_shared(std::move(src), delay_sec, + std::move(label), noisy); return [state]() { return DeferNotOk(::arrow::io::default_io_context().executor()->Submit( [state]() { return state->Next(); })); }; } +AsyncGenerator> MakeDelayedGen( + AsyncGenerator> src, std::string label, double delay_sec, + bool noisy) { + return MakeDelayedGen(MakeGeneratorIterator(src), label, delay_sec, noisy); +} + +AsyncGenerator> MakeDelayedGen(BatchesWithSchema src, + std::string label, + double delay_sec, bool noisy) { + std::vector> opt_batches = ::arrow::internal::MapVector( + [](ExecBatch batch) { return std::make_optional(std::move(batch)); }, src.batches); + return MakeDelayedGen(MakeVectorIterator(opt_batches), label, delay_sec, noisy); +} + } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/exec/test_nodes.h b/cpp/src/arrow/compute/exec/test_nodes.h index 6665db3e84561..a117df0c46005 100644 --- a/cpp/src/arrow/compute/exec/test_nodes.h +++ b/cpp/src/arrow/compute/exec/test_nodes.h @@ -32,9 +32,21 @@ struct ConcatNodeOptions : public ExecNodeOptions { void RegisterConcatNode(ExecFactoryRegistry* registry); -AsyncGenerator> MakeNoisyDelayedGen(BatchesWithSchema src, - std::string label, - double delay_sec); +// \brief Make a delaying source that is optionally noisy (prints when it emits) +AsyncGenerator> MakeDelayedGen( + Iterator> src, std::string label, double delay_sec, + bool noisy = false); + +// \brief Make a delaying source that is optionally noisy (prints when it emits) +AsyncGenerator> MakeDelayedGen( + AsyncGenerator> src, std::string label, double delay_sec, + bool noisy = false); + +// \brief Make a delaying source that is optionally noisy (prints when it emits) +AsyncGenerator> MakeDelayedGen(BatchesWithSchema src, + std::string label, + double delay_sec, + bool noisy = false); } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/exec/test_util.cc b/cpp/src/arrow/compute/exec/test_util.cc index 79b83a2ce17d5..d388b3d33f723 100644 --- a/cpp/src/arrow/compute/exec/test_util.cc +++ b/cpp/src/arrow/compute/exec/test_util.cc @@ -38,6 +38,7 @@ #include "arrow/compute/exec/util.h" #include "arrow/compute/function_internal.h" #include "arrow/datum.h" +#include "arrow/io/interfaces.h" #include "arrow/record_batch.h" #include "arrow/table.h" #include "arrow/testing/builder.h" @@ -194,6 +195,91 @@ Future> StartAndCollect( }); } +namespace { + +Result MakeIntegerBatch(const std::vector>& gens, + const std::shared_ptr& schema, + int batch_start_row, int batch_size) { + int n_fields = schema->num_fields(); + if (gens.size() != static_cast(n_fields)) { + return Status::Invalid("mismatching generator-vector and schema size"); + } + auto memory_pool = default_memory_pool(); + std::vector values(n_fields); + for (int f = 0; f < n_fields; f++) { + std::shared_ptr array; + auto type = schema->field(f)->type(); + +#define ARROW_TEST_INT_BUILD_CASE(id) \ + case Type::id: { \ + using T = typename TypeIdTraits::Type; \ + using CType = typename TypeTraits::CType; \ + using Builder = typename TypeTraits::BuilderType; \ + ARROW_ASSIGN_OR_RAISE(auto a_builder, MakeBuilder(type, memory_pool)); \ + Builder& builder = *checked_cast(a_builder.get()); \ + ARROW_RETURN_NOT_OK(builder.Reserve(batch_size)); \ + for (int j = 0; j < batch_size; j++) { \ + builder.UnsafeAppend(static_cast(gens[f](batch_start_row + j))); \ + } \ + ARROW_RETURN_NOT_OK(builder.Finish(&array)); \ + break; \ + } + + switch (type->id()) { + ARROW_TEST_INT_BUILD_CASE(INT8) + ARROW_TEST_INT_BUILD_CASE(INT16) + ARROW_TEST_INT_BUILD_CASE(INT32) + ARROW_TEST_INT_BUILD_CASE(INT64) + default: + return Status::TypeError("building ", type->ToString()); + } + +#undef ARROW_TEST_INT_BUILD_CASE + + values[f] = Datum(array); + } + return ExecBatch(std::move(values), batch_size); +} + +} // namespace + +AsyncGenerator> MakeIntegerBatchGen( + const std::vector>& gens, + const std::shared_ptr& schema, int num_batches, int batch_size) { + struct IntegerBatchGenState { + IntegerBatchGenState(const std::vector>& gens, + const std::shared_ptr& schema, int num_batches, + int batch_size) + : gens(gens), schema(schema), num_batches(num_batches), batch_size(batch_size) {} + + std::optional Next() { + if (batch_index >= num_batches) { + return std::nullopt; + } + Result batch_res = MakeIntegerBatch(gens, schema, batch_row, batch_size); + if (!batch_res.ok()) { + return std::nullopt; + } + ++batch_index; + batch_row += batch_size; + return batch_res.ValueOrDie(); + } + + std::vector> gens; + std::shared_ptr schema; + int num_batches; + int batch_size; + int batch_index = 0; + int batch_row = 0; + }; + auto state = + std::make_shared(gens, schema, num_batches, batch_size); + return [state]() { + return DeferNotOk(::arrow::io::default_io_context().executor()->Submit( + [state]() { return state->Next(); })); + }; +} + BatchesWithSchema MakeBasicBatches() { BatchesWithSchema out; out.batches = { @@ -237,49 +323,12 @@ BatchesWithSchema MakeRandomBatches(const std::shared_ptr& schema, Result MakeIntegerBatches( const std::vector>& gens, const std::shared_ptr& schema, int num_batches, int batch_size) { - int n_fields = schema->num_fields(); - if (gens.size() != static_cast(n_fields)) { - return Status::Invalid("mismatching generator-vector and schema size"); - } - auto memory_pool = default_memory_pool(); BatchesWithSchema out; out.schema = schema; int row = 0; for (int i = 0; i < num_batches; i++) { - std::vector values(n_fields); - for (int f = 0; f < n_fields; f++) { - std::shared_ptr array; - auto type = schema->field(f)->type(); - -#define ARROW_TEST_INT_BUILD_CASE(id) \ - case Type::id: { \ - using T = typename TypeIdTraits::Type; \ - using CType = typename TypeTraits::CType; \ - using Builder = typename TypeTraits::BuilderType; \ - ARROW_ASSIGN_OR_RAISE(auto a_builder, MakeBuilder(type, memory_pool)); \ - Builder& builder = *checked_cast(a_builder.get()); \ - ARROW_RETURN_NOT_OK(builder.Reserve(batch_size)); \ - for (int j = 0; j < batch_size; j++) { \ - builder.UnsafeAppend(static_cast(gens[f](row + j))); \ - } \ - ARROW_RETURN_NOT_OK(builder.Finish(&array)); \ - break; \ - } - - switch (type->id()) { - ARROW_TEST_INT_BUILD_CASE(INT8) - ARROW_TEST_INT_BUILD_CASE(INT16) - ARROW_TEST_INT_BUILD_CASE(INT32) - ARROW_TEST_INT_BUILD_CASE(INT64) - default: - return Status::TypeError("building ", type->ToString()); - } - -#undef ARROW_TEST_INT_BUILD_CASE - - values[f] = Datum(array); - } - out.batches.push_back(ExecBatch(std::move(values), batch_size)); + ARROW_ASSIGN_OR_RAISE(auto batch, MakeIntegerBatch(gens, schema, row, batch_size)); + out.batches.push_back(std::move(batch)); row += batch_size; } return out; @@ -333,6 +382,15 @@ void AssertTablesEqual(const std::shared_ptr& exp, } } +// TODO(weston) This is just an alias at the moment for AssertTablesEqual +// I think we should rename AssertTablesEqual to this name as it's been a year and I never +// realized this function existed because I always just assumed we were referencing +// ::arrow::AssertTablesEqual. Not critical now. FIXME before merging to master +void AssertTablesEqualUnordered(const std::shared_ptr
& exp, + const std::shared_ptr
& act) { + return AssertTablesEqual(exp, act); +} + void AssertExecBatchesEqual(const std::shared_ptr& schema, const std::vector& exp, const std::vector& act) { diff --git a/cpp/src/arrow/compute/exec/test_util.h b/cpp/src/arrow/compute/exec/test_util.h index 639708098c20e..2b83ffb3492bf 100644 --- a/cpp/src/arrow/compute/exec/test_util.h +++ b/cpp/src/arrow/compute/exec/test_util.h @@ -99,6 +99,10 @@ Future> StartAndCollect( ExecPlan* plan, AsyncGenerator> gen, bool use_threads = true); +AsyncGenerator> MakeIntegerBatchGen( + const std::vector>& gens, + const std::shared_ptr& schema, int num_batches, int batch_size); + ARROW_TESTING_EXPORT BatchesWithSchema MakeBasicBatches(); @@ -126,6 +130,10 @@ ARROW_TESTING_EXPORT void AssertTablesEqual(const std::shared_ptr
& exp, const std::shared_ptr
& act); +ARROW_TESTING_EXPORT +void AssertTablesEqualUnordered(const std::shared_ptr
& exp, + const std::shared_ptr
& act); + ARROW_TESTING_EXPORT void AssertExecBatchesEqual(const std::shared_ptr& schema, const std::vector& exp, diff --git a/cpp/src/arrow/engine/substrait/serde_test.cc b/cpp/src/arrow/engine/substrait/serde_test.cc index dcc13d5a20772..a5db342b2f650 100644 --- a/cpp/src/arrow/engine/substrait/serde_test.cc +++ b/cpp/src/arrow/engine/substrait/serde_test.cc @@ -23,6 +23,7 @@ #include "arrow/compute/exec/asof_join_node.h" #include "arrow/compute/exec/exec_plan.h" #include "arrow/compute/exec/expression_internal.h" +#include "arrow/compute/exec/test_util.h" #include "arrow/dataset/file_base.h" #include "arrow/dataset/file_ipc.h" #include "arrow/dataset/file_parquet.h" @@ -2252,7 +2253,7 @@ TEST(SubstraitRoundTrip, BasicPlanEndToEnd) { } ASSERT_OK_AND_ASSIGN(auto rnd_trp_table, GetTableFromPlan(roundtripped_filter, exec_context, dummy_schema)); - EXPECT_TRUE(expected_table->Equals(*rnd_trp_table)); + compute::AssertTablesEqualUnordered(expected_table, rnd_trp_table); } NamedTableProvider ProvideMadeTable(