Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-35176: [C++] Add support for disabling threading for emscripten #35672

Merged
merged 93 commits into from
Aug 9, 2023
Merged
Show file tree
Hide file tree
Changes from 92 commits
Commits
Show all changes
93 commits
Select commit Hold shift + click to select a range
de0ed5f
Option to disable threading in arrow
joemarshall Apr 21, 2023
30fe815
Merge branch 'main' of github.com:apache/arrow into main
joemarshall Apr 21, 2023
1c62fc1
added threading disable to CI build script
joemarshall Apr 22, 2023
872d317
disable threading test in github actions
joemarshall Apr 22, 2023
4507c31
hopefully fixed thread library errors in cmake on CI
joemarshall Apr 22, 2023
91b9ce3
tying to get threading flag passed through correctly
joemarshall Apr 22, 2023
e9d912f
fix to docker launch command
joemarshall Apr 22, 2023
3c8421d
removed define that goes into config.h
joemarshall Apr 23, 2023
2bc5f98
Merge branch 'main' of github.com:apache/arrow into main
joemarshall Apr 23, 2023
a8e8e1a
added explicit include of config.h
joemarshall Apr 24, 2023
e0d2a12
Merge branch 'main' of github.com:apache/arrow into main
joemarshall Apr 24, 2023
1260f51
typo fix
joemarshall Apr 24, 2023
10b8b1a
removed debug out
joemarshall Apr 24, 2023
6712962
reverted thirdpartytoolchain
joemarshall Apr 24, 2023
4bf128e
link to pthreads even if disabled threading in core
joemarshall Apr 26, 2023
399d5eb
fix cast warning
joemarshall Apr 26, 2023
6555d02
test fixes
joemarshall Apr 26, 2023
d941d12
replaced missing lock
joemarshall Apr 26, 2023
a78d887
fixed missing method in threadpool with threads disabled
joemarshall Apr 26, 2023
9a429eb
Merge branch 'main' of github.com:apache/arrow into main
joemarshall Apr 30, 2023
6da6814
put lock in correct place
joemarshall Apr 30, 2023
8496ead
removed override
joemarshall Apr 30, 2023
9acbf79
made future::wait check other tasks
joemarshall May 1, 2023
3821207
made task group run execution tasks while waiting to finish
joemarshall May 1, 2023
5946cc7
track current executor without threads
joemarshall May 1, 2023
3c5a78c
forgot namespace - fixed
joemarshall May 1, 2023
5380141
fixes to futures for more reliable behaviour
joemarshall May 1, 2023
f0f0565
future test fixes
joemarshall May 1, 2023
313489b
fix error in threadpool.h
joemarshall May 1, 2023
ce25af7
made tasks only launch when waited for
joemarshall May 2, 2023
9e3598d
made runloop quit correctly on pause
joemarshall May 2, 2023
a500fbc
test fixes
joemarshall May 3, 2023
5b2f752
test fixes
joemarshall May 5, 2023
f7b8de2
async generator test fixes
joemarshall May 6, 2023
19c9a9a
test fixes
joemarshall May 6, 2023
ee62ebc
removed asof join benchmark in threads disabled mode
joemarshall May 6, 2023
c50f415
more fixes to makefiles
joemarshall May 6, 2023
28cc947
Merge branch 'apache:main' into main
joemarshall May 6, 2023
d4ec8c8
test fixes for single threading being slower
joemarshall May 7, 2023
e66dffe
Merge branch 'main' of github.com:joemarshall/arrow into main
joemarshall May 7, 2023
85a4b5a
Make no threads test work on Ubuntu-cpp image
joemarshall May 7, 2023
1668254
update cpp_test.sh
joemarshall May 7, 2023
ffa1bdb
Increase timeout for testing
joemarshall May 7, 2023
bc26756
changed to enable_threding
joemarshall May 9, 2023
43cdde4
Merge branch 'main' of github.com:joemarshall/arrow into main
joemarshall May 9, 2023
ef4bf2e
define threading option in cmake file
joemarshall May 9, 2023
8fdacfe
disable = enable
joemarshall May 9, 2023
406d1f4
line too long fixed
joemarshall May 9, 2023
e57b313
Delete cmdline-working.txt
joemarshall May 10, 2023
b46ef4f
Update cpp/src/arrow/acero/bloom_filter_test.cc
joemarshall May 10, 2023
8706022
review fixes
joemarshall May 10, 2023
6d3a7d3
Merge branch 'main' of github.com:joemarshall/arrow into main
joemarshall May 10, 2023
84a8485
reverted github workflow
joemarshall May 10, 2023
ddf0cc4
Merge branch 'apache:main' into disable-threading
joemarshall May 23, 2023
3557514
lint fixes
joemarshall May 24, 2023
17c6b18
flipped #ifndef #else
joemarshall May 24, 2023
d644b96
lint fixes
joemarshall May 24, 2023
d819478
made sure everything that uses ENABLE_THREADING includes util/config.h
joemarshall May 24, 2023
f17eb67
cat typing
joemarshall May 24, 2023
4cba874
restored ThirdpartyToolchain correctly
joemarshall May 24, 2023
b8ceb23
removed blankspace lint change
joemarshall May 25, 2023
d2b9a30
reverted whitespace changes
joemarshall May 25, 2023
c93b788
fixed alphabetical order of includes
joemarshall May 25, 2023
bd96259
cmake lint fixes
joemarshall May 31, 2023
e32299c
Merge remote-tracking branch 'upstream/main' into disable-threading
joemarshall Jun 1, 2023
4c4f43b
Update cpp/cmake_modules/DefineOptions.cmake
joemarshall Jun 2, 2023
8f9305f
Update cpp/src/arrow/dataset/dataset_writer_test.cc
joemarshall Jun 2, 2023
8e101be
review fixes
joemarshall Jun 2, 2023
c64497f
comment ordering (from review)
joemarshall Jun 2, 2023
162bde0
lint fixes
joemarshall Jun 6, 2023
90bdfd6
Update cpp/src/arrow/testing/gtest_util.cc
joemarshall Jun 6, 2023
94ce7bc
Update cpp/src/arrow/testing/gtest_util.cc
joemarshall Jun 6, 2023
c1420e0
review fixes
joemarshall Jun 7, 2023
5dab2ea
review updates
joemarshall Jun 7, 2023
9a43d87
Merge branch 'disable-threading' of github.com:joemarshall/arrow into…
joemarshall Jun 7, 2023
a5a14bb
lint
joemarshall Jun 8, 2023
5224e4d
review fixes
joemarshall Jun 23, 2023
5fb29c7
moved global state into thread_pool.cc
joemarshall Jun 23, 2023
4e73799
typo in gtest_util removed
joemarshall Jun 23, 2023
18ebc4e
updated threadpool to use static local for global state
joemarshall Jun 26, 2023
d9e997c
removed leading underscore on method
joemarshall Jun 26, 2023
798b7cb
lint
joemarshall Jun 30, 2023
85c4e7e
fixes for review:
joemarshall Jul 13, 2023
6147185
warning on too long wait on future with no tasks
joemarshall Jul 14, 2023
bc88243
lint
joemarshall Jul 14, 2023
3629002
Merge branch 'main' of github.com:apache/arrow into disable-threading
joemarshall Jul 21, 2023
e223f05
Fix a few compiler errors introduced during rebase
westonpace Jul 25, 2023
b3b3512
Merge branch 'main' of github.com:apache/arrow into disable-threading
joemarshall Jul 21, 2023
4144352
Merge branch 'disable-threading' of github.com:joemarshall/arrow into…
joemarshall Aug 2, 2023
702d454
removed unneeded typedef
joemarshall Aug 2, 2023
e5601ba
disable test using threads if threads are disabled
joemarshall Aug 2, 2023
a4bfd80
lint fixes
joemarshall Aug 2, 2023
6534a74
Revert a needless change
kou Aug 9, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ci/scripts/cpp_build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ cmake \
-DARROW_C_FLAGS_RELWITHDEBINFO="${ARROW_C_FLAGS_RELWITHDEBINFO:-}" \
-DARROW_DATASET=${ARROW_DATASET:-ON} \
-DARROW_DEPENDENCY_SOURCE=${ARROW_DEPENDENCY_SOURCE:-AUTO} \
-DARROW_ENABLE_THREADING=${ARROW_ENABLE_THREADING:-ON} \
-DARROW_ENABLE_TIMING_TESTS=${ARROW_ENABLE_TIMING_TESTS:-ON} \
-DARROW_EXTRA_ERROR_CONTEXT=${ARROW_EXTRA_ERROR_CONTEXT:-OFF} \
-DARROW_FILESYSTEM=${ARROW_FILESYSTEM:-ON} \
Expand Down
2 changes: 2 additions & 0 deletions cpp/cmake_modules/DefineOptions.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ takes precedence over ccache if a storage backend is configured" ON)

define_option(ARROW_WITH_MUSL "Whether the system libc is musl or not" OFF)

define_option(ARROW_ENABLE_THREADING "Enable threading in Arrow core" ON)

#----------------------------------------------------------------------
set_option_category("Test and benchmark")

Expand Down
21 changes: 17 additions & 4 deletions cpp/src/arrow/acero/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,14 @@ add_arrow_acero_test(hash_join_node_test SOURCES hash_join_node_test.cc
bloom_filter_test.cc)
add_arrow_acero_test(pivot_longer_node_test SOURCES pivot_longer_node_test.cc
test_nodes.cc)
add_arrow_acero_test(asof_join_node_test SOURCES asof_join_node_test.cc test_nodes.cc)

# asof_join_node uses std::thread internally
# and doesn't use ThreadPool so it will
# be broken if threading is turned off
if(ARROW_ENABLE_THREADING)
add_arrow_acero_test(asof_join_node_test SOURCES asof_join_node_test.cc test_nodes.cc)
endif()

add_arrow_acero_test(tpch_node_test SOURCES tpch_node_test.cc)
add_arrow_acero_test(union_node_test SOURCES union_node_test.cc)
add_arrow_acero_test(aggregate_node_test SOURCES aggregate_node_test.cc)
Expand Down Expand Up @@ -221,7 +228,9 @@ if(ARROW_BUILD_BENCHMARKS)
add_arrow_acero_benchmark(project_benchmark SOURCES benchmark_util.cc
project_benchmark.cc)

add_arrow_acero_benchmark(asof_join_benchmark SOURCES asof_join_benchmark.cc)
if(ARROW_ENABLE_THREADING)
add_arrow_acero_benchmark(asof_join_benchmark SOURCES asof_join_benchmark.cc)
endif()

add_arrow_acero_benchmark(tpch_benchmark SOURCES tpch_benchmark.cc)

Expand All @@ -244,7 +253,9 @@ if(ARROW_BUILD_BENCHMARKS)
target_link_libraries(arrow-acero-expression-benchmark PUBLIC arrow_acero_static)
target_link_libraries(arrow-acero-filter-benchmark PUBLIC arrow_acero_static)
target_link_libraries(arrow-acero-project-benchmark PUBLIC arrow_acero_static)
target_link_libraries(arrow-acero-asof-join-benchmark PUBLIC arrow_acero_static)
if(ARROW_ENABLE_THREADING)
target_link_libraries(arrow-acero-asof-join-benchmark PUBLIC arrow_acero_static)
endif()
target_link_libraries(arrow-acero-tpch-benchmark PUBLIC arrow_acero_static)
if(ARROW_BUILD_OPENMP_BENCHMARKS)
target_link_libraries(arrow-acero-hash-join-benchmark PUBLIC arrow_acero_static)
Expand All @@ -253,7 +264,9 @@ if(ARROW_BUILD_BENCHMARKS)
target_link_libraries(arrow-acero-expression-benchmark PUBLIC arrow_acero_shared)
target_link_libraries(arrow-acero-filter-benchmark PUBLIC arrow_acero_shared)
target_link_libraries(arrow-acero-project-benchmark PUBLIC arrow_acero_shared)
target_link_libraries(arrow-acero-asof-join-benchmark PUBLIC arrow_acero_shared)
if(ARROW_ENABLE_THREADING)
target_link_libraries(arrow-acero-asof-join-benchmark PUBLIC arrow_acero_shared)
endif()
target_link_libraries(arrow-acero-tpch-benchmark PUBLIC arrow_acero_shared)
if(ARROW_BUILD_OPENMP_BENCHMARKS)
target_link_libraries(arrow-acero-hash-join-benchmark PUBLIC arrow_acero_shared)
Expand Down
5 changes: 5 additions & 0 deletions cpp/src/arrow/acero/asof_join_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
#include "arrow/type_traits.h"
#include "arrow/util/bit_util.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/config.h"
#include "arrow/util/future.h"
#include "arrow/util/string.h"

Expand Down Expand Up @@ -1707,6 +1708,10 @@ class AsofJoinNode : public ExecNode {
}

Status StartProducing() override {
#ifndef ARROW_ENABLE_THREADING
return Status::NotImplemented("ASOF join requires threading enabled");
#endif

ARROW_ASSIGN_OR_RAISE(process_task_, plan_->query_context()->BeginExternalTask(
"AsofJoinNode::ProcessThread"));
if (!process_task_.is_valid()) {
Expand Down
4 changes: 4 additions & 0 deletions cpp/src/arrow/acero/bloom_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "arrow/acero/util.h" // PREFETCH
#include "arrow/util/bit_util.h" // Log2
#include "arrow/util/bitmap_ops.h" // CountSetBits
#include "arrow/util/config.h"

namespace arrow {
namespace acero {
Expand Down Expand Up @@ -426,6 +427,9 @@ void BloomFilterBuilder_Parallel::CleanUp() {

std::unique_ptr<BloomFilterBuilder> BloomFilterBuilder::Make(
BloomFilterBuildStrategy strategy) {
#ifndef ARROW_ENABLE_THREADING
strategy = BloomFilterBuildStrategy::SINGLE_THREADED;
#endif
switch (strategy) {
case BloomFilterBuildStrategy::SINGLE_THREADED: {
std::unique_ptr<BloomFilterBuilder> impl{new BloomFilterBuilder_SingleThreaded()};
Expand Down
7 changes: 6 additions & 1 deletion cpp/src/arrow/acero/bloom_filter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
#include "arrow/acero/util.h"
#include "arrow/compute/key_hash.h"
#include "arrow/util/bitmap_ops.h"
#include "arrow/util/config.h"
#include "arrow/util/cpu_info.h"

namespace arrow {

Expand Down Expand Up @@ -468,7 +470,7 @@ TEST(BloomFilter, Basic) {

std::vector<BloomFilterBuildStrategy> strategies;
strategies.push_back(BloomFilterBuildStrategy::SINGLE_THREADED);
#ifndef ARROW_VALGRIND
#if defined(ARROW_ENABLE_THREADING) && !defined(ARROW_VALGRIND)
strategies.push_back(BloomFilterBuildStrategy::PARALLEL);
#endif

Expand Down Expand Up @@ -501,7 +503,10 @@ TEST(BloomFilter, Scaling) {
num_build.push_back(4000000);

std::vector<BloomFilterBuildStrategy> strategies;
#ifdef ARROW_ENABLE_THREADING
strategies.push_back(BloomFilterBuildStrategy::PARALLEL);
#endif
strategies.push_back(BloomFilterBuildStrategy::SINGLE_THREADED);

for (const auto hardware_flags : HardwareFlagsForTesting()) {
for (const auto& strategy : strategies) {
Expand Down
4 changes: 4 additions & 0 deletions cpp/src/arrow/acero/plan_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "arrow/testing/matchers.h"
#include "arrow/testing/random.h"
#include "arrow/util/async_generator.h"
#include "arrow/util/config.h"
#include "arrow/util/logging.h"
#include "arrow/util/macros.h"
#include "arrow/util/thread_pool.h"
Expand Down Expand Up @@ -1619,6 +1620,9 @@ TEST(ExecPlan, SourceEnforcesBatchLimit) {
}

TEST(ExecPlanExecution, SegmentedAggregationWithMultiThreading) {
#ifndef ARROW_ENABLE_THREADING
GTEST_SKIP() << "Test requires threading enabled";
#endif
BatchesWithSchema data;
data.batches = {ExecBatchFromJSON({int32()}, "[[1]]")};
data.schema = schema({field("i32", int32())});
Expand Down
5 changes: 5 additions & 0 deletions cpp/src/arrow/acero/task_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <algorithm>
#include <mutex>

#include "arrow/util/config.h"
#include "arrow/util/logging.h"

namespace arrow {
Expand Down Expand Up @@ -316,7 +317,11 @@ Status TaskSchedulerImpl::StartScheduling(size_t thread_id, ScheduleImpl schedul
int num_concurrent_tasks,
bool use_sync_execution) {
schedule_impl_ = std::move(schedule_impl);
#ifdef ARROW_ENABLE_THREADING
use_sync_execution_ = use_sync_execution;
#else
use_sync_execution_ = true;
#endif
num_concurrent_tasks_ = num_concurrent_tasks;
num_tasks_to_schedule_.value += num_concurrent_tasks;
return ScheduleMore(thread_id);
Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/acero/task_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

#include "arrow/acero/visibility.h"
#include "arrow/status.h"
#include "arrow/util/config.h"
#include "arrow/util/logging.h"

namespace arrow {
Expand Down
7 changes: 7 additions & 0 deletions cpp/src/arrow/acero/task_util_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

#include "arrow/acero/util.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/util/config.h"
#include "arrow/util/thread_pool.h"

namespace arrow {
Expand Down Expand Up @@ -101,6 +102,9 @@ TaskScheduler::TaskGroupContinuationImpl MakeFinalContinuation(
// concurrently. When all groups in that stage finish the next
// stage is started.
TEST(TaskScheduler, Stress) {
#ifndef ARROW_ENABLE_THREADING
GTEST_SKIP() << "Test requires threading support";
#endif
constexpr int kNumThreads = 8;
constexpr int kNumGroups = 8;
constexpr int kGroupsPerStage = 3;
Expand Down Expand Up @@ -176,6 +180,9 @@ TEST(TaskScheduler, Stress) {
// thread starts a task group while another thread is finishing
// the last of its tasks.
TEST(TaskScheduler, StressTwo) {
#ifndef ARROW_ENABLE_THREADING
GTEST_SKIP() << "Test requires threading support";
#endif
constexpr int kNumThreads = 16;
constexpr int kNumGroups = 8;
constexpr int kTasksPerGroup = 1;
Expand Down
10 changes: 10 additions & 0 deletions cpp/src/arrow/dataset/dataset_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "arrow/table.h"
#include "arrow/testing/future_util.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/util/config.h"
#include "gtest/gtest.h"

using namespace std::string_view_literals; // NOLINT
Expand Down Expand Up @@ -380,6 +381,9 @@ TEST_F(DatasetWriterTestFixture, MinRowGroupBackpressure) {
}

TEST_F(DatasetWriterTestFixture, ConcurrentWritesSameFile) {
#ifndef ARROW_ENABLE_THREADING
GTEST_SKIP() << "Concurrent writes tests need threads";
#endif
// Use a gated filesystem to queue up many writes behind a file open to make sure the
// file isn't opened multiple times.
auto gated_fs = UseGatedFs();
Expand All @@ -394,6 +398,9 @@ TEST_F(DatasetWriterTestFixture, ConcurrentWritesSameFile) {
}

TEST_F(DatasetWriterTestFixture, ConcurrentWritesDifferentFiles) {
#ifndef ARROW_ENABLE_THREADING
GTEST_SKIP() << "Concurrent writes tests need threads";
#endif
// NBATCHES must be less than I/O executor concurrency to avoid deadlock / test failure
constexpr int NBATCHES = 6;
auto gated_fs = UseGatedFs();
Expand All @@ -412,6 +419,9 @@ TEST_F(DatasetWriterTestFixture, ConcurrentWritesDifferentFiles) {
}

TEST_F(DatasetWriterTestFixture, MaxOpenFiles) {
#ifndef ARROW_ENABLE_THREADING
GTEST_SKIP() << "Concurrent writes tests need threads";
#endif
auto gated_fs = UseGatedFs();
std::atomic<bool> paused = false;
write_options_.max_open_files = 2;
Expand Down
11 changes: 11 additions & 0 deletions cpp/src/arrow/engine/substrait/serde_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
#include "arrow/type_fwd.h"
#include "arrow/util/async_generator_fwd.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/config.h"
#include "arrow/util/decimal.h"
#include "arrow/util/future.h"
#include "arrow/util/hash_util.h"
Expand Down Expand Up @@ -4458,6 +4459,9 @@ TEST(Substrait, SetRelationBasic) {
}

TEST(Substrait, PlanWithAsOfJoinExtension) {
#ifndef ARROW_ENABLE_THREADING
GTEST_SKIP() << "ASOF join requires threading";
#endif
// This demos an extension relation
std::string substrait_json = R"({
"extensionUris": [],
Expand Down Expand Up @@ -5477,6 +5481,10 @@ TEST(Substrait, MixedSort) {
}

TEST(Substrait, PlanWithExtension) {
#ifndef ARROW_ENABLE_THREADING
GTEST_SKIP() << "ASOF join requires threading";
#endif

// This demos an extension relation
std::string substrait_json = R"({
"extensionUris": [],
Expand Down Expand Up @@ -5665,6 +5673,9 @@ TEST(Substrait, PlanWithExtension) {
}

TEST(Substrait, AsOfJoinDefaultEmit) {
#ifndef ARROW_ENABLE_THREADING
GTEST_SKIP() << "ASOF join requires threading";
#endif
std::string substrait_json = R"({
"extensionUris": [],
"extensions": [],
Expand Down
4 changes: 4 additions & 0 deletions cpp/src/arrow/io/memory_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#include "arrow/testing/util.h"
#include "arrow/util/bit_util.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/config.h"
#include "arrow/util/future.h"
#include "arrow/util/iterator.h"
#include "arrow/util/logging.h"
Expand Down Expand Up @@ -918,6 +919,9 @@ TEST(CacheOptions, Basics) {
}

TEST(IOThreadPool, Capacity) {
#ifndef ARROW_ENABLE_THREADING
GTEST_SKIP() << "Test requires threading enabled";
#endif
// Simple sanity check
auto pool = internal::GetIOThreadPool();
int capacity = pool->GetCapacity();
Expand Down
Loading
Loading