Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the S/R versions of the for loop algorithms #6529

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .circleci/tests.unit1.algorithms
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,18 @@ tests.unit.modules.algorithms.algorithms.foreachn_bad_alloc
tests.unit.modules.algorithms.algorithms.for_loop
tests.unit.modules.algorithms.algorithms.for_loop_exception
tests.unit.modules.algorithms.algorithms.for_loop_induction
tests.unit.modules.algorithms.algorithms.for_loop_induction_sender
tests.unit.modules.algorithms.algorithms.for_loop_induction_async
tests.unit.modules.algorithms.algorithms.for_loop_n
tests.unit.modules.algorithms.algorithms.for_loop_n_sender
tests.unit.modules.algorithms.algorithms.for_loop_n_strided
tests.unit.modules.algorithms.algorithms.for_loop_n_strided_sender
tests.unit.modules.algorithms.algorithms.for_loop_reduction
tests.unit.modules.algorithms.algorithms.for_loop_reduction_sender
tests.unit.modules.algorithms.algorithms.for_loop_reduction_async
tests.unit.modules.algorithms.algorithms.for_loop_sender
tests.unit.modules.algorithms.algorithms.for_loop_strided
tests.unit.modules.algorithms.algorithms.for_loop_strided_sender
tests.unit.modules.algorithms.algorithms.generate
tests.unit.modules.algorithms.algorithms.generaten
tests.unit.modules.algorithms.algorithms.is_heap
Expand Down
70 changes: 27 additions & 43 deletions libs/core/algorithms/include/hpx/parallel/algorithms/for_loop.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1154,13 +1154,13 @@ namespace hpx::parallel {

template <typename ExPolicy, typename IterOrR, typename Size,
typename F, typename... Ts>
static auto parallel(ExPolicy&& policy, IterOrR iter_or_r,
static decltype(auto) parallel(ExPolicy&& policy, IterOrR iter_or_r,
Size size, F&& f, Ts&&... ts)
{
constexpr bool is_scheduler_policy =
constexpr bool has_scheduler_executor =
hpx::execution_policy_has_scheduler_executor_v<ExPolicy>;

if constexpr (!is_scheduler_policy)
if constexpr (!has_scheduler_executor)
{
if (size == 0)
{
Expand All @@ -1171,7 +1171,7 @@ namespace hpx::parallel {
if constexpr (sizeof...(Ts) == 0)
{
if constexpr (hpx::is_async_execution_policy_v<ExPolicy> ||
is_scheduler_policy)
has_scheduler_executor)
{
return util::detail::algorithm_result<ExPolicy>::get(
util::partitioner<ExPolicy>::call(
Expand All @@ -1190,14 +1190,6 @@ namespace hpx::parallel {
}
else
{
// any of the induction or reduction operations prevent us
// from sharing the part_iteration between threads
decltype(auto) hinted_policy =
parallel::util::adapt_sharing_mode(
HPX_FORWARD(ExPolicy, policy),
hpx::threads::thread_sharing_hint::
do_not_share_function);

using policy_type = std::decay_t<decltype(policy)>;

// we need to decay copy here to properly transport
Expand All @@ -1209,10 +1201,10 @@ namespace hpx::parallel {

return util::detail::algorithm_result<policy_type>::get(
util::partitioner<policy_type>::call_with_index(
hinted_policy, iter_or_r, size, 1,
HPX_FORWARD(ExPolicy, policy), iter_or_r, size, 1,
part_iterations<policy_type, F, void, args_type>{
HPX_FORWARD(F, f), args},
[=](auto&&) mutable {
[=](auto&&...) mutable {
auto pack =
hpx::util::make_index_pack_t<sizeof...(
Ts)>();
Expand Down Expand Up @@ -1335,10 +1327,10 @@ namespace hpx::parallel {
static auto parallel(ExPolicy&& policy, B first, Size size,
S stride, F&& f, Ts&&... ts)
{
constexpr bool is_scheduler_policy =
constexpr bool has_scheduler_executor =
hpx::execution_policy_has_scheduler_executor_v<ExPolicy>;

if constexpr (!is_scheduler_policy)
if constexpr (!has_scheduler_executor)
{
if (size == 0)
{
Expand All @@ -1348,7 +1340,7 @@ namespace hpx::parallel {

if constexpr (sizeof...(Ts) == 0)
{
if constexpr (!is_scheduler_policy)
if constexpr (!has_scheduler_executor)
{
if (stride == 1)
{
Expand All @@ -1357,20 +1349,22 @@ namespace hpx::parallel {
HPX_FORWARD(ExPolicy, policy), first, size,
part_iterations<ExPolicy, F, S>{
HPX_FORWARD(F, f)},
[](auto&&) { return hpx::util::unused; }));
[](auto&&...) {
return hpx::util::unused;
}));
}
}

if constexpr (hpx::is_async_execution_policy_v<ExPolicy> ||
is_scheduler_policy)
has_scheduler_executor)
{
return util::detail::algorithm_result<ExPolicy>::get(
util::partitioner<ExPolicy>::call_with_index(
HPX_FORWARD(ExPolicy, policy), first, size,
stride,
part_iterations<ExPolicy, F, S>{
HPX_FORWARD(F, f), stride},
[](auto&&) { return hpx::util::unused; }));
[](auto&&...) { return hpx::util::unused; }));
}
else
{
Expand All @@ -1384,14 +1378,6 @@ namespace hpx::parallel {
}
else
{
// any of the induction or reduction operations prevent us
// from sharing the part_iteration between threads
decltype(auto) hinted_policy =
parallel::util::adapt_sharing_mode(
HPX_FORWARD(ExPolicy, policy),
hpx::threads::thread_sharing_hint::
do_not_share_function);

using policy_type = std::decay_t<decltype(policy)>;

// we need to decay copy here to properly transport
Expand All @@ -1403,10 +1389,10 @@ namespace hpx::parallel {

return util::detail::algorithm_result<policy_type>::get(
util::partitioner<policy_type>::call_with_index(
hinted_policy, first, size, stride,
HPX_FORWARD(ExPolicy, policy), first, size, stride,
part_iterations<policy_type, F, S, args_type>{
HPX_FORWARD(F, f), stride, args},
[=](auto&&) mutable {
[=](auto&&...) mutable {
auto pack =
hpx::util::make_index_pack_t<sizeof...(
Ts)>();
Expand Down Expand Up @@ -1564,9 +1550,8 @@ namespace hpx::parallel {
// reshuffle arguments, last argument is function object, will go first
template <typename ExPolicy, typename B, typename Size, typename S,
std::size_t... Is, typename... Args>
util::detail::algorithm_result_t<ExPolicy> for_loop_n(ExPolicy&& policy,
B first, Size size, S stride, hpx::util::index_pack<Is...>,
Args&&... args)
decltype(auto) for_loop_n(ExPolicy&& policy, B first, Size size,
S stride, hpx::util::index_pack<Is...>, Args&&... args)
{
// stride shall not be zero
HPX_ASSERT(stride != 0);
Expand Down Expand Up @@ -1653,10 +1638,9 @@ namespace hpx::experimental {
(hpx::traits::is_iterator_v<I> || std::is_integral_v<I>)
)>
// clang-format on
friend hpx::parallel::util::detail::algorithm_result_t<ExPolicy>
tag_fallback_invoke(hpx::experimental::for_loop_strided_t,
ExPolicy&& policy, std::decay_t<I> first, I last, S stride,
Args&&... args)
friend decltype(auto) tag_fallback_invoke(
hpx::experimental::for_loop_strided_t, ExPolicy&& policy,
std::decay_t<I> first, I last, S stride, Args&&... args)
{
static_assert(sizeof...(Args) >= 1,
"for_loop_strided must be called with at least a function "
Expand Down Expand Up @@ -1704,9 +1688,9 @@ namespace hpx::experimental {
(hpx::traits::is_iterator_v<I> || std::is_integral_v<I>)
)>
// clang-format on
friend hpx::parallel::util::detail::algorithm_result_t<ExPolicy>
tag_fallback_invoke(hpx::experimental::for_loop_n_t, ExPolicy&& policy,
I first, Size size, Args&&... args)
friend decltype(auto) tag_fallback_invoke(
hpx::experimental::for_loop_n_t, ExPolicy&& policy, I first,
Size size, Args&&... args)
{
static_assert(sizeof...(Args) >= 1,
"for_loop_n must be called with at least a function object");
Expand Down Expand Up @@ -1753,9 +1737,9 @@ namespace hpx::experimental {
(hpx::traits::is_iterator_v<I> || std::is_integral_v<I>)
)>
// clang-format on
friend hpx::parallel::util::detail::algorithm_result_t<ExPolicy>
tag_fallback_invoke(hpx::experimental::for_loop_n_strided_t,
ExPolicy&& policy, I first, Size size, S stride, Args&&... args)
friend decltype(auto) tag_fallback_invoke(
hpx::experimental::for_loop_n_strided_t, ExPolicy&& policy, I first,
Size size, S stride, Args&&... args)
{
static_assert(sizeof...(Args) >= 1,
"for_loop_n_strided must be called with at least a function "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,84 @@
#pragma once

#include <hpx/config.hpp>

#include <hpx/concurrency/cache_line_data.hpp>
#include <hpx/execution/algorithms/detail/predicates.hpp>
#include <hpx/execution/detail/execution_parameter_callbacks.hpp>
#include <hpx/threading_base/thread_num_tss.hpp>

#if !defined(HPX_HAVE_CXX17_SHARED_PTR_ARRAY)
#include <boost/shared_array.hpp>
#else
#include <memory>
#endif

#include <algorithm>
#include <cstddef>
#include <type_traits>
#include <utility>

#include "hpx/concepts/concepts.hpp"

namespace hpx::parallel::detail {
/// \cond NOINTERNAL

template <typename T>
struct hpx_thread_local
{
private:
using element_type = hpx::util::cache_line_data<T>;

#if defined(HPX_HAVE_CXX17_SHARED_PTR_ARRAY)
using array_type = std::shared_ptr<element_type[]>;
#else
using array_type = boost::shared_array<element_type>;
#endif

public:
constexpr explicit hpx_thread_local(T const& init)
{
const std::size_t threads =
hpx::parallel::execution::detail::get_os_thread_count();
data_.reset(new element_type[threads]);
std::fill_n(data_.get(), threads, element_type{init});
}

// clang-format off
template<typename O,
HPX_CONCEPT_REQUIRES_(
std::is_assignable_v<T&, O const&>
)>
// clang-format on
constexpr hpx_thread_local& operator=(O const& other)
{
data_[hpx::get_worker_thread_num()].data_ = other;
return *this;
}

// clang-format off
constexpr operator T const&() const
// clang-format on
{
return data_[hpx::get_worker_thread_num()].data_;
}

constexpr operator T&()
{
return data_[hpx::get_worker_thread_num()].data_;
}

private:
array_type data_;
};

template <typename Iterable, typename Stride>
HPX_HOST_DEVICE HPX_FORCEINLINE constexpr Iterable next(
const hpx_thread_local<Iterable>& val, Stride offset)
{
return hpx::parallel::detail::next(
static_cast<Iterable const&>(val), offset);
}

///////////////////////////////////////////////////////////////////////
template <typename T>
struct induction_helper
Expand Down Expand Up @@ -54,7 +122,7 @@ namespace hpx::parallel::detail {

private:
std::decay_t<T> var_;
T curr_;
hpx_thread_local<T> curr_;
};

template <typename T>
Expand Down Expand Up @@ -94,7 +162,7 @@ namespace hpx::parallel::detail {
private:
T& live_out_var_;
T var_;
T curr_;
hpx_thread_local<T> curr_;
};

///////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -123,6 +191,8 @@ namespace hpx::parallel::detail {
HPX_HOST_DEVICE
constexpr void next_iteration() noexcept
{
/*for (std::size_t i{}; i < stride_; ++i)
++curr_;*/
curr_ = parallel::detail::next(curr_, stride_);
}

Expand All @@ -131,7 +201,7 @@ namespace hpx::parallel::detail {

private:
std::decay_t<T> var_;
T curr_;
hpx_thread_local<T> curr_;
std::size_t stride_;
};

Expand Down Expand Up @@ -174,7 +244,7 @@ namespace hpx::parallel::detail {
private:
T& live_out_var_;
T var_;
T curr_;
hpx_thread_local<T> curr_;
std::size_t stride_;
};

Expand Down
7 changes: 6 additions & 1 deletion libs/core/algorithms/tests/unit/algorithms/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,20 @@ set(tests
foreachn_exception
foreachn_bad_alloc
for_loop
for_loop_sender
for_loop_exception
for_loop_induction
for_loop_induction_sender
for_loop_induction_async
for_loop_n
for_loop_n_sender
for_loop_n_strided
for_loop_n_strided_sender
for_loop_reduction
for_loop_reduction_sender
for_loop_reduction_async
for_loop_sender
for_loop_strided
for_loop_strided_sender
generate
generaten
is_heap
Expand Down
Loading
Loading