diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index fa6967078696d..fadbd37c7320c 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -768,12 +768,14 @@ endif() set(ARROW_SHARED_PRIVATE_LINK_LIBS ${ARROW_STATIC_LINK_LIBS} ${BOOST_SYSTEM_LIBRARY} + ${BOOST_THREAD_LIBRARY} ${BOOST_FILESYSTEM_LIBRARY} ${BOOST_REGEX_LIBRARY}) list(APPEND ARROW_STATIC_LINK_LIBS ${BOOST_SYSTEM_LIBRARY} + ${BOOST_THREAD_LIBRARY} ${BOOST_FILESYSTEM_LIBRARY} ${BOOST_REGEX_LIBRARY}) @@ -792,6 +794,11 @@ if (NOT MSVC) ${CMAKE_DL_LIBS}) endif() +if (UNIX AND NOT APPLE) + set(ARROW_LINK_LIBS ${ARROW_LINK_LIBS} -lrt) + set(ARROW_STATIC_LINK_LIBS ${ARROW_STATIC_LINK_LIBS} -lrt) +endif() + set(ARROW_TEST_STATIC_LINK_LIBS arrow_testing_static arrow_static @@ -805,6 +812,7 @@ set(ARROW_TEST_SHARED_LINK_LIBS ${ARROW_LINK_LIBS} double-conversion_static ${BOOST_SYSTEM_LIBRARY} + ${BOOST_THREAD_LIBRARY} ${BOOST_FILESYSTEM_LIBRARY} ${BOOST_REGEX_LIBRARY} gtest_main_static diff --git a/cpp/cmake_modules/ThirdpartyToolchain.cmake b/cpp/cmake_modules/ThirdpartyToolchain.cmake index 1668c00cd44d4..6bb15f9ac1a31 100644 --- a/cpp/cmake_modules/ThirdpartyToolchain.cmake +++ b/cpp/cmake_modules/ThirdpartyToolchain.cmake @@ -369,11 +369,14 @@ if (ARROW_BOOST_VENDORED) set(BOOST_BUILD_LINK "static") set(BOOST_STATIC_SYSTEM_LIBRARY "${BOOST_LIB_DIR}/${CMAKE_STATIC_LIBRARY_PREFIX}boost_system${CMAKE_STATIC_LIBRARY_SUFFIX}") + set(BOOST_STATIC_THREAD_LIBRARY + "${BOOST_LIB_DIR}/${CMAKE_STATIC_LIBRARY_PREFIX}boost_thread${CMAKE_STATIC_LIBRARY_SUFFIX}") set(BOOST_STATIC_FILESYSTEM_LIBRARY "${BOOST_LIB_DIR}/${CMAKE_STATIC_LIBRARY_PREFIX}boost_filesystem${CMAKE_STATIC_LIBRARY_SUFFIX}") set(BOOST_STATIC_REGEX_LIBRARY "${BOOST_LIB_DIR}/${CMAKE_STATIC_LIBRARY_PREFIX}boost_regex${CMAKE_STATIC_LIBRARY_SUFFIX}") set(BOOST_SYSTEM_LIBRARY boost_system_static) + set(BOOST_THREAD_LIBRARY boost_thread_static) set(BOOST_FILESYSTEM_LIBRARY boost_filesystem_static) set(BOOST_REGEX_LIBRARY boost_regex_static) @@ -384,12 +387,13 @@ if (ARROW_BOOST_VENDORED) else() set(BOOST_BUILD_PRODUCTS ${BOOST_STATIC_SYSTEM_LIBRARY} + ${BOOST_STATIC_THREAD_LIBRARY} ${BOOST_STATIC_FILESYSTEM_LIBRARY} ${BOOST_STATIC_REGEX_LIBRARY}) set(BOOST_CONFIGURE_COMMAND "./bootstrap.sh" "--prefix=${BOOST_PREFIX}" - "--with-libraries=filesystem,regex,system") + "--with-libraries=filesystem,regex,system,thread") if ("${CMAKE_BUILD_TYPE}" STREQUAL "DEBUG") set(BOOST_BUILD_VARIANT "debug") else() @@ -436,17 +440,20 @@ else() if (ARROW_BOOST_HEADER_ONLY) find_package(Boost REQUIRED) else() - find_package(Boost COMPONENTS regex system filesystem REQUIRED) + find_package(Boost COMPONENTS regex system thread filesystem REQUIRED) if ("${CMAKE_BUILD_TYPE}" STREQUAL "DEBUG") set(BOOST_SHARED_SYSTEM_LIBRARY ${Boost_SYSTEM_LIBRARY_DEBUG}) + set(BOOST_SHARED_THREAD_LIBRARY ${Boost_THREAD_LIBRARY_DEBUG}) set(BOOST_SHARED_FILESYSTEM_LIBRARY ${Boost_FILESYSTEM_LIBRARY_DEBUG}) set(BOOST_SHARED_REGEX_LIBRARY ${Boost_REGEX_LIBRARY_DEBUG}) else() set(BOOST_SHARED_SYSTEM_LIBRARY ${Boost_SYSTEM_LIBRARY_RELEASE}) + set(BOOST_SHARED_THREAD_LIBRARY ${Boost_THREAD_LIBRARY_RELEASE}) set(BOOST_SHARED_FILESYSTEM_LIBRARY ${Boost_FILESYSTEM_LIBRARY_RELEASE}) set(BOOST_SHARED_REGEX_LIBRARY ${Boost_REGEX_LIBRARY_RELEASE}) endif() set(BOOST_SYSTEM_LIBRARY boost_system_shared) + set(BOOST_THREAD_LIBRARY boost_thread_shared) set(BOOST_FILESYSTEM_LIBRARY boost_filesystem_shared) set(BOOST_REGEX_LIBRARY boost_regex_shared) endif() @@ -457,17 +464,20 @@ else() if (ARROW_BOOST_HEADER_ONLY) find_package(Boost REQUIRED) else() - find_package(Boost COMPONENTS regex system filesystem REQUIRED) + find_package(Boost COMPONENTS regex system thread atomic chrono date_time filesystem REQUIRED) if ("${CMAKE_BUILD_TYPE}" STREQUAL "DEBUG") set(BOOST_STATIC_SYSTEM_LIBRARY ${Boost_SYSTEM_LIBRARY_DEBUG}) + set(BOOST_STATIC_THREAD_LIBRARY ${Boost_THREAD_LIBRARY_DEBUG}) set(BOOST_STATIC_FILESYSTEM_LIBRARY ${Boost_FILESYSTEM_LIBRARY_DEBUG}) set(BOOST_STATIC_REGEX_LIBRARY ${Boost_REGEX_LIBRARY_DEBUG}) else() set(BOOST_STATIC_SYSTEM_LIBRARY ${Boost_SYSTEM_LIBRARY_RELEASE}) + set(BOOST_STATIC_THREAD_LIBRARY ${Boost_THREAD_LIBRARY_RELEASE}) set(BOOST_STATIC_FILESYSTEM_LIBRARY ${Boost_FILESYSTEM_LIBRARY_RELEASE}) set(BOOST_STATIC_REGEX_LIBRARY ${Boost_REGEX_LIBRARY_RELEASE}) endif() set(BOOST_SYSTEM_LIBRARY boost_system_static) + set(BOOST_THREAD_LIBRARY boost_thread_static) set(BOOST_FILESYSTEM_LIBRARY boost_filesystem_static) set(BOOST_REGEX_LIBRARY boost_regex_static) endif() @@ -482,6 +492,10 @@ if (NOT ARROW_BOOST_HEADER_ONLY) STATIC_LIB "${BOOST_STATIC_SYSTEM_LIBRARY}" SHARED_LIB "${BOOST_SHARED_SYSTEM_LIBRARY}") + ADD_THIRDPARTY_LIB(boost_thread + STATIC_LIB "${BOOST_STATIC_THREAD_LIBRARY}" + SHARED_LIB "${BOOST_SHARED_THREAD_LIBRARY}") + ADD_THIRDPARTY_LIB(boost_filesystem STATIC_LIB "${BOOST_STATIC_FILESYSTEM_LIBRARY}" SHARED_LIB "${BOOST_SHARED_FILESYSTEM_LIBRARY}") @@ -490,7 +504,7 @@ if (NOT ARROW_BOOST_HEADER_ONLY) STATIC_LIB "${BOOST_STATIC_REGEX_LIBRARY}" SHARED_LIB "${BOOST_SHARED_REGEX_LIBRARY}") - SET(ARROW_BOOST_LIBS ${BOOST_SYSTEM_LIBRARY} ${BOOST_FILESYSTEM_LIBRARY}) + SET(ARROW_BOOST_LIBS ${BOOST_SYSTEM_LIBRARY} ${BOOST_THREAD_LIBRARY} ${BOOST_FILESYSTEM_LIBRARY}) endif() include_directories(SYSTEM ${Boost_INCLUDE_DIR}) diff --git a/cpp/src/arrow/flight/flight-benchmark.cc b/cpp/src/arrow/flight/flight-benchmark.cc index 898d015431227..06916c7d21378 100644 --- a/cpp/src/arrow/flight/flight-benchmark.cc +++ b/cpp/src/arrow/flight/flight-benchmark.cc @@ -146,7 +146,7 @@ Status RunPerformanceTest(const int port) { std::shared_ptr pool; RETURN_NOT_OK(ThreadPool::Make(FLAGS_num_threads, &pool)); - std::vector> tasks; + std::vector> tasks; for (const auto& endpoint : plan->endpoints()) { tasks.emplace_back(pool->Submit(ConsumeStream, endpoint)); } diff --git a/cpp/src/arrow/python/CMakeLists.txt b/cpp/src/arrow/python/CMakeLists.txt index 0f037ad4b0571..04bad7ea314e0 100644 --- a/cpp/src/arrow/python/CMakeLists.txt +++ b/cpp/src/arrow/python/CMakeLists.txt @@ -55,6 +55,8 @@ endif() set(ARROW_PYTHON_SHARED_LINK_LIBS arrow_shared ${PYTHON_OTHER_LIBS} + ${BOOST_THREAD_LIBRARY} + ${Boost_SYSTEM_LIBRARY} ) if (WIN32) diff --git a/cpp/src/arrow/util/memory.h b/cpp/src/arrow/util/memory.h index bc1a526b72e9b..81a298e055c18 100644 --- a/cpp/src/arrow/util/memory.h +++ b/cpp/src/arrow/util/memory.h @@ -60,7 +60,7 @@ void parallel_memcopy(uint8_t* dst, const uint8_t* src, int64_t nbytes, // Each thread gets a "chunk" of k blocks. // Start all parallel memcpy tasks and handle leftovers while threads run. - std::vector> futures; + std::vector> futures; for (int i = 0; i < num_threads; i++) { futures.emplace_back(pool->Submit(wrap_memcpy, dst + prefix + i * chunk_size, diff --git a/cpp/src/arrow/util/parallel.h b/cpp/src/arrow/util/parallel.h index 8caba5f1f0d08..330856c1a8a87 100644 --- a/cpp/src/arrow/util/parallel.h +++ b/cpp/src/arrow/util/parallel.h @@ -35,8 +35,7 @@ namespace internal { template Status ParallelFor(int num_tasks, FUNCTION&& func) { auto pool = internal::GetCpuThreadPool(); - std::vector> futures(num_tasks); - + std::vector> futures(num_tasks); for (int i = 0; i < num_tasks; ++i) { futures[i] = pool->Submit(func, i); } diff --git a/cpp/src/arrow/util/thread-pool.h b/cpp/src/arrow/util/thread-pool.h index f18cfeb0deef3..02aeaab18b4c6 100644 --- a/cpp/src/arrow/util/thread-pool.h +++ b/cpp/src/arrow/util/thread-pool.h @@ -33,12 +33,19 @@ #include #include +#define BOOST_THREAD_PROVIDES_FUTURE +#include +#include + #include "arrow/status.h" #include "arrow/util/macros.h" #include "arrow/util/visibility.h" namespace arrow { +template +using Future = boost::future; + /// \brief Get the capacity of the global thread pool /// /// Return the number of worker threads in the thread pool to which @@ -64,12 +71,10 @@ namespace detail { // to std::function. template struct packaged_task_wrapper { - using PackagedTask = std::packaged_task; - - explicit packaged_task_wrapper(PackagedTask&& task) + using PackagedTask = boost::packaged_task; + explicit packaged_task_wrapper(PackagedTask&& task) : task_(std::make_shared(std::forward(task))) {} - - void operator()(Args&&... args) { return (*task_)(std::forward(args)...); } + void operator()(Args&&... args) { return (*task_)(std::forward(args)...); } std::shared_ptr task_; }; @@ -118,10 +123,10 @@ class ARROW_EXPORT ThreadPool { // only occurs if the ThreadPool is shutting down). template ::type> - std::future Submit(Function&& func, Args&&... args) { + Future Submit(Function&& func, Args&&... args) { // Trying to templatize std::packaged_task with Function doesn't seem // to work, so go through std::bind to simplify the packaged signature - using PackagedTask = std::packaged_task; + using PackagedTask = boost::packaged_task; auto task = PackagedTask(std::bind(std::forward(func), args...)); auto fut = task.get_future(); diff --git a/cpp/src/arrow/util/utf8.cc b/cpp/src/arrow/util/utf8.cc index 75b0979ab0e4d..6bf372c0948d5 100644 --- a/cpp/src/arrow/util/utf8.cc +++ b/cpp/src/arrow/util/utf8.cc @@ -17,6 +17,9 @@ #include +#include +#include + #include "arrow/util/logging.h" #include "arrow/util/utf8.h" @@ -73,10 +76,10 @@ ARROW_EXPORT void CheckUTF8Initialized() { } // namespace internal -static std::once_flag utf8_initialized; +static boost::once_flag utf8_initialized = BOOST_ONCE_INIT; void InitializeUTF8() { - std::call_once(utf8_initialized, internal::InitializeLargeTable); + boost::call_once(internal::InitializeLargeTable, utf8_initialized); } } // namespace util diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index 58c703f7fe068..ebccfc4bce3b1 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -53,6 +53,7 @@ using arrow::BooleanArray; using arrow::ChunkedArray; using arrow::Column; using arrow::Field; +using arrow::Future; using arrow::Int32Array; using arrow::ListArray; using arrow::MemoryPool; @@ -504,7 +505,7 @@ Status FileReader::Impl::ReadRowGroup(int row_group_index, }; if (use_threads_) { - std::vector> futures; + std::vector> futures; auto pool = ::arrow::internal::GetCpuThreadPool(); for (int i = 0; i < num_columns; i++) { futures.push_back(pool->Submit(ReadColumnFunc, i)); @@ -551,7 +552,7 @@ Status FileReader::Impl::ReadTable(const std::vector& indices, }; if (use_threads_) { - std::vector> futures; + std::vector> futures; auto pool = ::arrow::internal::GetCpuThreadPool(); for (int i = 0; i < num_fields; i++) { futures.push_back(pool->Submit(ReadColumnFunc, i)); diff --git a/cpp/src/plasma/CMakeLists.txt b/cpp/src/plasma/CMakeLists.txt index 566066463fffe..1d278918e8aa7 100644 --- a/cpp/src/plasma/CMakeLists.txt +++ b/cpp/src/plasma/CMakeLists.txt @@ -79,8 +79,8 @@ set(PLASMA_SRCS protocol.cc thirdparty/ae/ae.c) -set(PLASMA_LINK_LIBS arrow_shared) -set(PLASMA_STATIC_LINK_LIBS arrow_static) +set(PLASMA_LINK_LIBS ${ARROW_LINK_LIBS} ${BOOST_THREAD_LIBRARY} arrow_shared) +set(PLASMA_STATIC_LINK_LIBS ${ARROW_STATIC_LINK_LIBS} ${BOOST_THREAD_LIBRARY} arrow_static) if (ARROW_CUDA) set(PLASMA_LINK_LIBS ${PLASMA_LINK_LIBS} arrow_cuda_shared) diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc index f08d6efd71ee7..3eadad0639f4c 100644 --- a/cpp/src/plasma/client.cc +++ b/cpp/src/plasma/client.cc @@ -662,7 +662,7 @@ bool PlasmaClient::Impl::ComputeObjectHashParallel(XXH64_state_t* hash_state, // | num_threads * chunk_size | suffix |, where chunk_size = k * block_size. // Each thread gets a "chunk" of k blocks, except the suffix thread. - std::vector> futures; + std::vector> futures; for (int i = 0; i < num_threads; i++) { futures.push_back(pool->Submit( ComputeBlockHash, reinterpret_cast(data_address) + i * chunk_size, diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt index 7d2882ea4febc..fe7e34c3bfd5f 100644 --- a/python/CMakeLists.txt +++ b/python/CMakeLists.txt @@ -202,9 +202,12 @@ find_package(PythonLibsNew REQUIRED) find_package(NumPy REQUIRED) include(UseCython) +find_package(Boost COMPONENTS system thread atomic chrono date_time filesystem regex REQUIRED) + include_directories(SYSTEM ${NUMPY_INCLUDE_DIRS} ${PYTHON_INCLUDE_DIRS} + ${Boost_INCLUDE_DIR} src) ############################################################ @@ -336,10 +339,14 @@ if (PYARROW_BUNDLE_ARROW_CPP) # disable autolinking in boost add_definitions(-DBOOST_ALL_NO_LIB) endif() - find_package(Boost COMPONENTS system filesystem regex REQUIRED) + find_package(Boost COMPONENTS system thread atomic chrono date_time filesystem regex REQUIRED) bundle_boost_lib(Boost_REGEX_LIBRARY) bundle_boost_lib(Boost_FILESYSTEM_LIBRARY) bundle_boost_lib(Boost_SYSTEM_LIBRARY) + bundle_boost_lib(Boost_THREAD_LIBRARY) + bundle_boost_lib(Boost_ATOMIC_LIBRARY) + bundle_boost_lib(Boost_CHRONO_LIBRARY) + bundle_boost_lib(Boost_DATE_TIME_LIBRARY) endif() bundle_zlib() diff --git a/python/manylinux1/scripts/build_boost.sh b/python/manylinux1/scripts/build_boost.sh index 3fb394d5ab7cc..4e0d6a5d96107 100755 --- a/python/manylinux1/scripts/build_boost.sh +++ b/python/manylinux1/scripts/build_boost.sh @@ -25,13 +25,13 @@ mkdir /arrow_boost pushd /boost_${BOOST_VERSION_UNDERSCORE} ./bootstrap.sh ./b2 tools/bcp -./dist/bin/bcp --namespace=arrow_boost --namespace-alias filesystem date_time system regex build algorithm locale format variant /arrow_boost +./dist/bin/bcp --namespace=arrow_boost --namespace-alias filesystem date_time system thread atomic chrono regex build algorithm locale format variant /arrow_boost popd pushd /arrow_boost ls -l ./bootstrap.sh -./bjam dll-path="'\$ORIGIN/'" cxxflags='-std=c++11 -fPIC' cflags=-fPIC linkflags="-std=c++11" variant=release link=shared --prefix=/arrow_boost_dist --with-filesystem --with-date_time --with-system --with-regex install +./bjam dll-path="'\$ORIGIN/'" cxxflags='-std=c++11 -fPIC' cflags=-fPIC linkflags="-std=c++11" variant=release link=shared --prefix=/arrow_boost_dist --with-filesystem --with-date_time --with-system --with-thread --with-atomic --with-chrono --with-regex install popd rm -rf boost_${BOOST_VERSION_UNDERSCORE}.tar.gz boost_${BOOST_VERSION_UNDERSCORE} arrow_boost # Boost always install header-only parts but they also take up quite some space. diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py index dabcdf1813059..e74a9a168fb01 100644 --- a/python/pyarrow/__init__.py +++ b/python/pyarrow/__init__.py @@ -44,13 +44,6 @@ def parse_git(root, **kwargs): import pyarrow.compat as compat -# Workaround for https://issues.apache.org/jira/browse/ARROW-2657 -# and https://issues.apache.org/jira/browse/ARROW-2920 -if _sys.platform in ('linux', 'linux2'): - compat.import_tensorflow_extension() - compat.import_pytorch_extension() - - from pyarrow.lib import cpu_count, set_cpu_count from pyarrow.lib import (null, bool_, int8, int16, int32, int64, diff --git a/python/pyarrow/compat.py b/python/pyarrow/compat.py index ee924ed388ff1..e41de7395914c 100644 --- a/python/pyarrow/compat.py +++ b/python/pyarrow/compat.py @@ -163,112 +163,6 @@ def encode_file_path(path): # will convert utf8 to utf16 return encoded_path -def _iterate_python_module_paths(package_name): - """ - Return an iterator to full paths of a python package. - - This is a best effort and might fail. - It uses the official way of loading modules from - https://docs.python.org/3/library/importlib.html#approximating-importlib-import-module - """ - if PY2: - import imp - try: - _, pathname, _ = imp.find_module(package_name) - except ImportError: - return - else: - yield pathname - else: - try: - import importlib - absolute_name = importlib.util.resolve_name(package_name, None) - except (ImportError, AttributeError): - # Sometimes, importlib is not available (e.g. Python 2) - # or importlib.util is not available (e.g. Python 2.7) - spec = None - else: - import sys - for finder in sys.meta_path: - try: - spec = finder.find_spec(absolute_name, None) - except (AttributeError, TypeError): - # On Travis (Python 3.5) the above produced: - # AttributeError: 'VendorImporter' object has no - # attribute 'find_spec' - # - # ARROW-4117: When running "asv dev", TypeError is raised - # due to the meta-importer - spec = None - - if spec is not None: - break - - if spec: - module = importlib.util.module_from_spec(spec) - for path in module.__path__: - yield path - -def import_tensorflow_extension(): - """ - Load the TensorFlow extension if it exists. - - This is used to load the TensorFlow extension before - pyarrow.lib. If we don't do this there are symbol clashes - between TensorFlow's use of threading and our global - thread pool, see also - https://issues.apache.org/jira/browse/ARROW-2657 and - https://github.com/apache/arrow/pull/2096. - """ - import os - tensorflow_loaded = False - - # Try to load the tensorflow extension directly - # This is a performance optimization, tensorflow will always be - # loaded via the "import tensorflow" statement below if this - # doesn't succeed. - - for path in _iterate_python_module_paths("tensorflow"): - ext = os.path.join(path, "libtensorflow_framework.so") - if os.path.exists(ext): - import ctypes - try: - ctypes.CDLL(ext) - except OSError: - pass - tensorflow_loaded = True - break - - # If the above failed, try to load tensorflow the normal way - # (this is more expensive) - - if not tensorflow_loaded: - try: - import tensorflow - except ImportError: - pass - -def import_pytorch_extension(): - """ - Load the PyTorch extension if it exists. - - This is used to load the PyTorch extension before - pyarrow.lib. If we don't do this there are symbol clashes - between PyTorch's use of threading and our global - thread pool, see also - https://issues.apache.org/jira/browse/ARROW-2920 - """ - import ctypes - import os - - for path in _iterate_python_module_paths("torch"): - try: - ctypes.CDLL(os.path.join(path, "lib/libcaffe2.so")) - except OSError: - # lib/libcaffe2.so only exists in pytorch starting from 0.4.0, - # in older versions of pytorch there are not symbol clashes - pass - integer_types = six.integer_types + (np.integer,) diff --git a/python/setup.py b/python/setup.py index 99883bf44319d..f2183c83dbb9b 100755 --- a/python/setup.py +++ b/python/setup.py @@ -306,6 +306,18 @@ def _run_cmake(self): move_shared_libs( build_prefix, build_lib, "{}_system".format(self.boost_namespace)) + move_shared_libs( + build_prefix, build_lib, + "{}_thread".format(self.boost_namespace)) + move_shared_libs( + build_prefix, build_lib, + "{}_atomic".format(self.boost_namespace)) + move_shared_libs( + build_prefix, build_lib, + "{}_chrono".format(self.boost_namespace)) + move_shared_libs( + build_prefix, build_lib, + "{}_date_time".format(self.boost_namespace)) move_shared_libs( build_prefix, build_lib, "{}_regex".format(self.boost_namespace))