diff --git a/asio/include/asio/experimental/promise.hpp b/asio/include/asio/experimental/promise.hpp index 2d657416b3..655d465151 100644 --- a/asio/include/asio/experimental/promise.hpp +++ b/asio/include/asio/experimental/promise.hpp @@ -48,9 +48,6 @@ struct is_promise> : std::true_type {}; template constexpr bool is_promise_v = is_promise::value; -template -concept is_promise_c = is_promise_v>; - template struct promise_value_type { @@ -84,6 +81,15 @@ struct promise using tuple_type = std::tuple; using executor_type = Executor; + /// Rebinds the promise type to another executor. + template + struct rebind_executor + { + /// The file type when rebound to the specified executor. + typedef promise other; + }; + + /// Get the executor of the promise executor_type get_executor() const { if (impl_) @@ -92,6 +98,7 @@ struct promise return {}; } + /// Cancel the promise. Usually done through the destructor. void cancel(cancellation_type level = cancellation_type::all) { if (impl_ && !impl_->done) @@ -101,13 +108,18 @@ struct promise } } + /// Check if the promise is completed already. bool complete() const noexcept { return impl_ && impl_->done; } - template - auto async_wait(CompletionToken&& token) + /// Wait for the promise to become ready. + template < + ASIO_COMPLETION_TOKEN_FOR(void(Ts...)) CompletionToken + ASIO_DEFAULT_COMPLETION_TOKEN_TYPE(executor_type)> + auto async_wait(CompletionToken&& token + ASIO_DEFAULT_COMPLETION_TOKEN(executor_type)) { assert(impl_); @@ -121,410 +133,6 @@ struct promise ~promise() { cancel(); } - template - static auto race(Executor1 exec, Ps ... ps) - -> promise), Executor1> - { - using var_t = std::variant; - using pi = detail::promise_impl; - - struct impl_t : pi - { - impl_t(Executor1 exec, Ps&& ... ps) - : pi(std::move(exec)), - tup(std::move(ps)...) - { - this->slot.template emplace(this); - } - - struct cancel_handler - { - impl_t* self; - - cancel_handler(impl_t* self) - : self(self) - { - } - - void operator()(cancellation_type ct) - { - [ct, s=self](std::index_sequence) - { - (std::get(s->tup).cancel(ct), ... ); - }(std::make_index_sequence{}); - } - }; - - std::tuple...> tup; - cancellation_slot slot{this->cancel.slot()}; - }; - - auto impl = std::allocate_shared( - get_associated_allocator(exec), exec, std::move(ps)...); - - impl->executor = exec; - - [impl, exec](std::index_sequence) - { - auto step = - [&](std::integral_constant) - { - return [impl] (Args&& ... args) - { - if (impl->done) - return; - impl->result = var_t(std::in_place_index, - std::forward(args)...); - impl->done = true; - if (auto f = std::exchange(impl->completion, nullptr); !!f) - std::apply(std::move(f), std::move(*impl->result)); - - auto cancel = - [&](std::integral_constant) - { - if constexpr (I != Id) - get(impl->tup).cancel(); - }; - - (cancel(std::integral_constant{}), ...); - }; - }; - - ( - std::get(impl->tup).async_wait( - bind_executor(exec, - step(std::integral_constant{}))), - ... - ); - }(std::make_index_sequence{}); - - return {impl}; - } - - template - static auto all(Executor1 exec, Ps ... ps) - -> promise - { - using pi = detail::promise_impl< - void(typename Ps::value_type...), Executor1>; - - struct impl_t : pi - { - impl_t(Executor1 exec, Ps&& ... ps) - : pi(std::move(exec)), - tup(std::move(ps)...) - { - this->slot.template emplace(this); - } - - struct cancel_handler - { - impl_t* self; - - cancel_handler(impl_t* self) - : self(self) - { - } - - void operator()(cancellation_type level) - { - [level, s=self](std::index_sequence) - { - (std::get(s->tup).cancel(level), ... ); - }(std::make_index_sequence{}); - } - }; - - std::tuple...> tup; - std::tuple...> partial_result; - cancellation_slot slot{this->cancel.slot()}; - }; - - auto impl = std::allocate_shared( - get_associated_allocator(exec), exec, std::move(ps)...); - impl->executor = exec; - - [impl, exec](std::index_sequence) - { - auto step = - [&](std::integral_constant) - { - return [impl](Args&& ... args) - { - std::get(impl->partial_result).emplace( - std::forward(args)...); - if ((std::get(impl->partial_result) && ...)) // we're done. - { - impl->result = {*std::get(impl->partial_result)...}; - - impl->done = true; - if (auto f = std::exchange(impl->completion, nullptr); !!f) - std::apply(std::move(f), std::move(*impl->result)); - } - }; - }; - - ( - std::get(impl->tup).async_wait( - bind_executor(exec, - step(std::integral_constant{}))), - ... - ); - }(std::make_index_sequence{}); - - return {impl}; - } - - template - static auto race(Ps ... ps) - { - auto exec = get<0>(std::tie(ps...)).get_executor(); - return race(std::move(exec), std::move(ps)...); - } - - template - static auto all(Ps ... ps) - { - auto exec = get<0>(std::tie(ps...)).get_executor(); - return all(std::move(exec), std::move(ps)...); - } - - template -#if !defined(GENERATING_DOCUMENTATION) - requires requires (Range r) - { - {*std::begin(r)} -> is_promise_c; - {*std:: end(r)} -> is_promise_c; - } -#endif // !defined(GENERATING_DOCUMENTATION) - static auto race(Executor1 exec, Range range) - { - using var_t = typename std::decay_t< - decltype(*std::begin(range))>::value_type; - using signature_type = std::conditional_t< - std::is_same_v, - void(std::size_t), - void(std::size_t, var_t)>; - using pi = detail::promise_impl; - using promise_t = promise; - - struct impl_t : pi - { - impl_t(Executor1 exec, Range&& range) - : pi(std::move(exec)), - range(std::move(range)) - { - this->slot.template emplace(this); - } - - struct cancel_handler - { - impl_t* self; - - cancel_handler(impl_t* self) - : self(self) - { - } - - void operator()(asio::cancellation_type ct) - { - for (auto& r : self->range) - r.cancel(ct); - } - }; - - Range range; - cancellation_slot slot{this->cancel.slot()}; - }; - - const auto size = std::distance(std::begin(range), std::end(range)); - auto impl = std::allocate_shared( - get_associated_allocator(exec), exec, std::move(range)); - impl->executor = exec; - - if (size == 0u) - { - if constexpr (std::is_same_v) - impl->result = {-1}; - else - impl->result = {-1, var_t{}}; - - impl->done = true; - if (auto f = std::exchange(impl->completion, nullptr); !!f) - { - asio::post(exec, - [impl, f = std::move(f)]() mutable - { - std::apply(std::move(f), std::move(*impl->result)); - }); - } - return promise_t{impl}; - } - auto idx = 0u; - for (auto& val : impl->range) - { - val.async_wait( - bind_executor(exec, - [idx, impl](Args&&... args) - { - if (impl->done) - return; - if constexpr (std::is_same_v) - impl->result = idx; - else - impl->result = std::make_tuple(idx, - var_t(std::forward(args)...)); - impl->done = true; - if (auto f = std::exchange(impl->completion, nullptr); !!f) - std::apply(std::move(f), std::move(*impl->result)); - - auto jdx = 0u; - - for (auto &tc : impl->range) - if (jdx++ != idx) - tc.cancel(); - })); - idx++; - } - return promise_t{impl}; - } - - - template -#if !defined(GENERATING_DOCUMENTATION) - requires requires (Range r) - { - {*std::begin(r)} -> is_promise_c; - {*std:: end(r)} -> is_promise_c; - } -#endif // !defined(GENERATING_DOCUMENTATION) - static auto all(Executor1 exec, Range range) - -> promise< - void( - std::vector< - typename std::decay_t< - decltype(*std::begin(range)) - >::value_type - > - ), Executor1> - { - using var_t = typename std::decay_t< - decltype(*std::begin(range))>::value_type; - using pi = detail::promise_impl), Executor1>; - - struct impl_t : pi - { - impl_t(Executor1 exec, Range&& range) - : pi(std::move(exec)), - range(std::move(range)) - { - this->slot.template emplace(this); - } - - struct cancel_handler - { - impl_t* self; - - cancel_handler(impl_t* self) - : self(self) - { - } - - void operator()(cancellation_type ct) - { - for (auto& r : self->range) - r.cancel(ct); - } - }; - - Range range; - std::vector> partial_result; - cancellation_slot slot{this->cancel.slot()}; - }; - - const auto size = std::distance(std::begin(range), std::end(range)); - auto impl = std::allocate_shared( - get_associated_allocator(exec), exec, std::move(range)); - impl->executor = exec; - impl->partial_result.resize(size); - - if (size == 0u) - { - impl->result.emplace(); - impl->done = true; - if (auto f = std::exchange(impl->completion, nullptr); !!f) - asio::post(exec, [impl, f = std::move(f)]() mutable - { - std::apply(std::move(f), std::move(*impl->result)); - }); - return {impl}; - } - auto idx = 0u; - for (auto& val : impl->range) { - val.async_wait(bind_executor( - exec, - [idx, impl](Args&&... args) { - - impl->partial_result[idx].emplace(std::forward(args)...); - if (std::all_of(impl->partial_result.begin(), - impl->partial_result.end(), - [](auto &opt) {return opt.has_value();})) - { - impl->result.emplace(); - get<0>(*impl->result).reserve(impl->partial_result.size()); - for (auto& p : impl->partial_result) - get<0>(*impl->result).push_back(std::move(*p)); - - impl->done = true; - if (auto f = std::exchange(impl->completion, nullptr); !!f) - std::apply(std::move(f), std::move(*impl->result)); - } - - })); - idx++; - } - return {impl}; - } - - template -#if !defined(GENERATING_DOCUMENTATION) - requires requires (Range r) - { - {*std::begin(r)} -> is_promise_c; - {*std:: end(r)} -> is_promise_c; - } -#endif // !defined(GENERATING_DOCUMENTATION) - static auto race(Range range) - { - if (std::begin(range) == std::end(range)) - throw std::logic_error( - "Can't use race on an empty range with deduced executor"); - else - { - auto ex = std::begin(range)->get_executor(); - return race(ex, std::move(range)); - } - } - - template -#if !defined(GENERATING_DOCUMENTATION) - requires requires (Range&& r) - { - {*std::begin(r)} -> is_promise_c; - {*std:: end(r)} -> is_promise_c; - } -#endif // !defined(GENERATING_DOCUMENTATION) - static auto all(Range range) - { - if (std::begin(range) == std::end(range)) - throw std::logic_error( - "Can't use all on an empty range with deduced executor"); - else - { - auto ex = std::begin(range)->get_executor(); - return all(ex, std::move(range)); - } - } private: #if !defined(GENERATING_DOCUMENTATION) @@ -597,6 +205,9 @@ struct async_result, R(Args...)> using handler_type = experimental::detail::promise_handler< void(typename decay::type...), Executor>; + using return_type = experimental::promise< + void(typename decay::type...), Executor>; + template static auto initiate(Initiation initiation, experimental::use_promise_t, InitArgs... args) diff --git a/asio/src/doc/overview/promises.qbk b/asio/src/doc/overview/promises.qbk index e3043eb20e..7da28c4886 100644 --- a/asio/src/doc/overview/promises.qbk +++ b/asio/src/doc/overview/promises.qbk @@ -40,14 +40,26 @@ complete: asio::experimental::use_promise); auto promise = - asio::experimental::promise<>::race( - timeout_promise, read_promise); + asio::experimental::make_parallel_group( + timeout_promise.async_wait(asio::deferred), + read_promise.async_wait(asio::deferred) + ).async_wait( + asio::experimental::wait_for_one(), + asio::experimental::use_promise); promise.async_wait( - [](std::variant> v) + [](std::array order, + error_code timer_ec, + error_code read_ec, std::size_t read_size) { - if (v.index() == 0) {} //timed out - else if (v.index() == 1) // completed in time + if (order[0] == 0) + { + // timed out + } + else + { + // completed in time + } }); or to wait for all to complete: @@ -61,12 +73,17 @@ or to wait for all to complete: asio::experimental::use_promise); auto promise = - asio::experimental::promise<>::all( - write_promise, read_promise); + asio::experimental::make_parallel_group( + timeout_promise.async_wait(asio::deferred), + read_promise.async_wait(asio::deferred) + ).async_wait( + asio::experimental::wait_for_all(), + asio::experimental::use_promise); promise.async_wait( - [](std::tuple write_result, - std::tuple read_result) + [](std::array order, + error_code timer_ec, + error_code read_ec, std::size_t read_size) { // ... }); diff --git a/asio/src/tests/unit/experimental/promise.cpp b/asio/src/tests/unit/experimental/promise.cpp index ceedcdc2ff..d93247085a 100644 --- a/asio/src/tests/unit/experimental/promise.cpp +++ b/asio/src/tests/unit/experimental/promise.cpp @@ -19,6 +19,7 @@ #include #include +#include "asio/redirect_error.hpp" #include "asio/steady_timer.hpp" #include "../unit_test.hpp" @@ -27,7 +28,6 @@ namespace promise { void promise_tester() { using namespace asio; - using asio::error_code; using namespace std::chrono; io_context ctx; @@ -40,7 +40,7 @@ void promise_tester() auto p = timer1.async_wait(experimental::use_promise); steady_clock::time_point completed_when; - error_code ec; + asio::error_code ec; bool called = false; p.async_wait( @@ -52,260 +52,37 @@ void promise_tester() }); steady_clock::time_point timer2_done; - timer2.async_wait([&](auto) { - timer2_done = steady_clock::now();; - p.cancel(); - }); - - ctx.run(); - - ASIO_CHECK(timer2_done + milliseconds(1) > completed_when); - ASIO_CHECK(called); - ASIO_CHECK(ec == error::operation_aborted); -} - -void promise_race_tester() -{ - using namespace asio; - using asio::error_code; - using namespace std::chrono; - - io_context ctx; - - steady_timer timer1{ctx}, timer2{ctx}; - - const auto started_when = steady_clock::now(); - timer1.expires_at(started_when + milliseconds(2000)); - timer2.expires_at(started_when + milliseconds(1000)); - - experimental::promise)> p = - experimental::promise<>::race( - timer1.async_wait(experimental::use_promise), - timer2.async_wait(experimental::use_promise)); - - auto called = false; - error_code ec; - steady_clock::time_point completed_when; - p.async_wait( - [&](auto v) + timer2.async_wait( + [&](auto) { - ASIO_CHECK(v.index() == 1); - ec = get<1>(v); - called = true; - completed_when = steady_clock::now(); + timer2_done = steady_clock::now();; + p.cancel(); }); ctx.run(); - ASIO_CHECK(started_when + milliseconds(1000) <= completed_when); - ASIO_CHECK(started_when + milliseconds(1500) > completed_when); - ASIO_CHECK(called); - ASIO_CHECK(!ec); -} - -void promise_all_tester() -{ - using namespace asio; - using asio::error_code; - using namespace std::chrono; - - io_context ctx; - - steady_timer timer1{ctx}, - timer2{ctx}; - - const auto started_when = steady_clock::now(); - timer1.expires_at(started_when + milliseconds(2000)); - timer2.expires_at(started_when + milliseconds(1000)); - - experimental::promise p = - experimental::promise<>::all( - timer1.async_wait(experimental::use_promise), - timer2.async_wait(experimental::use_promise)); - - bool called = false; - steady_clock::time_point completed_when; - - p.async_wait( - [&](auto ec1, auto ec2) - { - ASIO_CHECK(!ec1); - ASIO_CHECK(!ec2); - called = true; - completed_when = steady_clock::now(); - }); - - ctx.run(); - - ASIO_CHECK(started_when + milliseconds(2000) <= completed_when); - ASIO_CHECK(started_when + milliseconds(2500) > completed_when); - ASIO_CHECK(called); -} - -void promise_race_ranged_tester() -{ - using namespace asio; - using asio::error_code; - using namespace std::chrono; - - io_context ctx; - - steady_timer timer1{ctx}, timer2{ctx}; - - const auto started_when = steady_clock::now(); - timer1.expires_at(started_when + milliseconds(2000)); - timer2.expires_at(started_when + milliseconds(1000)); - - // promise< - // std::variant< - // tuple, - // tuple>> - experimental::promise p = - experimental::promise<>::race( - std::array{ - timer1.async_wait(experimental::use_promise), - timer2.async_wait(experimental::use_promise) - }); - - auto called = false; - auto completed_when = steady_clock::time_point(); - - p.async_wait([&](auto idx, auto ec ) - { - ASIO_CHECK(idx == 1); - called = true; - completed_when = steady_clock::now(); - ASIO_CHECK(!ec); - }); - - std::vector> arr; - - experimental::promise<>::race( - ctx.get_executor(), std::move(arr) - ).async_wait( - [](std::size_t idx) {ASIO_CHECK(idx == std::size_t(-1));} - ); - - ctx.run(); - - ASIO_CHECK(started_when + milliseconds(1000) <= completed_when); - ASIO_CHECK(started_when + milliseconds(1500) > completed_when); + ASIO_CHECK(timer2_done + milliseconds(1) > completed_when); ASIO_CHECK(called); - - std::exception_ptr ex; - - try - { - experimental::promise<>::race(std::move(arr)); - } - catch (...) - { - ex = std::current_exception(); - } - - ASIO_CHECK(ex); + ASIO_CHECK(ec == error::operation_aborted); } -void promise_all_ranged_tester() +void test_cancel() { - using namespace asio; - using asio::error_code; - using namespace std::chrono; - - io_context ctx; - - steady_timer timer1{ctx}, timer2{ctx}; - - const auto started_when = steady_clock::now(); - timer1.expires_at(started_when + milliseconds(2000)); - timer2.expires_at(started_when + milliseconds(1000)); - - // promise< - // std::variant< - // tuple, - // tuple>> - experimental::promise)> p = - experimental::promise<>::all( - std::array{ - timer1.async_wait(experimental::use_promise), - timer2.async_wait(experimental::use_promise) - }); - - auto called = false; - auto completed_when = steady_clock::time_point(); - - p.async_wait( - [&](auto v){ - ASIO_CHECK(v.size() == 2u); - completed_when = steady_clock::now(); - ASIO_CHECK(!v[0]); - ASIO_CHECK(!v[1]); - called = true; - }); - - std::vector> arr; - experimental::promise<>::all( - ctx.get_executor(), std::move(arr) - ).async_wait( - [](auto v) {ASIO_CHECK(v.size() == 0);} - ); + asio::io_context ctx; + asio::steady_timer tim{ctx, std::chrono::seconds(10)}; + asio::error_code ec; - ctx.run(); - - ASIO_CHECK(started_when + milliseconds(2000) <= completed_when); - ASIO_CHECK(started_when + milliseconds(2500) > completed_when); - ASIO_CHECK(called == true); - - std::exception_ptr ex; - try { - experimental::promise<>::all(std::move(arr)); + auto p = tim.async_wait( + asio::redirect_error( + asio::experimental::use_promise, ec)); } - catch (...) - { - ex = std::current_exception(); - } - - ASIO_CHECK(ex); -} - -void promise_cancel_tester() -{ - using namespace asio; - using asio::error_code; - using namespace std::chrono; - - io_context ctx; - - steady_timer timer1{ctx}, timer2{ctx}; - - const auto started_when = steady_clock::now(); - timer1.expires_at(started_when + milliseconds(2000)); - timer2.expires_at(started_when + milliseconds(1000)); - - // promise< - // std::variant< - // tuple, - // tuple>> - experimental::promise p = - experimental::promise<>::all( - timer1.async_wait(experimental::use_promise), - timer2.async_wait(experimental::use_promise)); - - bool called = false; - p.async_wait( - [&](auto ec1, auto ec2) - { - called = true; - ASIO_CHECK(ec1 == error::operation_aborted); - ASIO_CHECK(ec2 == error::operation_aborted); - }); - - post(ctx, [&]{p.cancel();}); ctx.run(); - ASIO_CHECK(called); + ASIO_CHECK_MESSAGE( + ec == asio::error::operation_aborted, + ec.message()); } } // namespace promise @@ -314,9 +91,4 @@ ASIO_TEST_SUITE ( "promise", ASIO_TEST_CASE(promise::promise_tester) - ASIO_TEST_CASE(promise::promise_race_tester) - ASIO_TEST_CASE(promise::promise_all_tester) - ASIO_TEST_CASE(promise::promise_race_ranged_tester) - ASIO_TEST_CASE(promise::promise_all_ranged_tester) - ASIO_TEST_CASE(promise::promise_cancel_tester) )