Skip to content

Commit

Permalink
Dispatch co_spawn cancellation through the executor, if the completio…
Browse files Browse the repository at this point in the history
…n handler specifies its own associated executor.
  • Loading branch information
chriskohlhoff committed Apr 5, 2022
1 parent 6a509b2 commit f0a1e1c
Showing 1 changed file with 70 additions and 6 deletions.
76 changes: 70 additions & 6 deletions asio/include/asio/impl/co_spawn.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,61 @@ class awaitable_as_function
awaitable<T, Executor> awaitable_;
};

template <typename Handler, typename Executor, typename = void>
class co_spawn_cancellation_handler
{
public:
co_spawn_cancellation_handler(const Handler& handler, const Executor& ex)
: ex_(asio::get_associated_executor(handler, ex))
{
}

cancellation_slot slot()
{
return signal_.slot();
}

void operator()(cancellation_type_t type)
{
cancellation_signal* sig = &signal_;
asio::dispatch(ex_, [sig, type]{ sig->emit(type); });
}

private:
cancellation_signal signal_;
typename associated_executor<Handler, Executor>::type ex_;
};


template <typename Handler, typename Executor>
class co_spawn_cancellation_handler<Handler, Executor,
typename enable_if<
is_same<
typename associated_executor<Handler,
Executor>::asio_associated_executor_is_unspecialised,
void
>::value
>::type>
{
public:
co_spawn_cancellation_handler(const Handler&, const Executor&)
{
}

cancellation_slot slot()
{
return signal_.slot();
}

void operator()(cancellation_type_t type)
{
signal_.emit(type);
}

private:
cancellation_signal signal_;
};

template <typename Executor>
class initiate_co_spawn
{
Expand All @@ -220,17 +275,26 @@ class initiate_co_spawn
void operator()(Handler&& handler, F&& f) const
{
typedef typename result_of<F()>::type awaitable_type;
typedef typename decay<Handler>::type handler_type;
typedef co_spawn_cancellation_handler<
handler_type, Executor> cancel_handler_type;

auto slot = asio::get_associated_cancellation_slot(handler);
cancel_handler_type* cancel_handler = slot.is_connected()
? &slot.template emplace<cancel_handler_type>(handler, ex_)
: nullptr;

cancellation_state proxy_cancel_state(
asio::get_associated_cancellation_slot(handler),
enable_total_cancellation());
cancellation_slot proxy_slot(
cancel_handler
? cancel_handler->slot()
: cancellation_slot());

cancellation_state cancel_state(proxy_cancel_state.slot());
cancellation_state cancel_state(proxy_slot);

auto a = (co_spawn_entry_point)(static_cast<awaitable_type*>(nullptr),
ex_, std::forward<F>(f), std::forward<Handler>(handler));
awaitable_handler<executor_type, void>(std::move(a), ex_,
proxy_cancel_state.slot(), cancel_state).launch();
awaitable_handler<executor_type, void>(std::move(a),
ex_, proxy_slot, cancel_state).launch();
}

private:
Expand Down

0 comments on commit f0a1e1c

Please sign in to comment.