LCOV - code coverage report
Current view: top level - capy - when_any.hpp (source / functions) Coverage Total Hit
Test: coverage_filtered.info Lines: 99.4 % 157 156
Test Date: 2026-02-03 19:59:28 Functions: 93.5 % 353 330

            Line data    Source code
       1              : //
       2              : // Copyright (c) 2026 Michael Vandeberg
       3              : //
       4              : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5              : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6              : //
       7              : // Official repository: https://github.com/cppalliance/capy
       8              : //
       9              : 
      10              : #ifndef BOOST_CAPY_WHEN_ANY_HPP
      11              : #define BOOST_CAPY_WHEN_ANY_HPP
      12              : 
      13              : #include <boost/capy/detail/config.hpp>
      14              : #include <boost/capy/concept/executor.hpp>
      15              : #include <boost/capy/concept/io_awaitable.hpp>
      16              : #include <boost/capy/coro.hpp>
      17              : #include <boost/capy/ex/executor_ref.hpp>
      18              : #include <boost/capy/ex/frame_allocator.hpp>
      19              : #include <boost/capy/task.hpp>
      20              : 
      21              : #include <array>
      22              : #include <atomic>
      23              : #include <exception>
      24              : #include <optional>
      25              : #include <ranges>
      26              : #include <stdexcept>
      27              : #include <stop_token>
      28              : #include <tuple>
      29              : #include <type_traits>
      30              : #include <utility>
      31              : #include <variant>
      32              : #include <vector>
      33              : 
      34              : /*
      35              :    when_any - Race multiple tasks, return first completion
      36              :    ========================================================
      37              : 
      38              :    OVERVIEW:
      39              :    ---------
      40              :    when_any launches N tasks concurrently and completes when the FIRST task
      41              :    finishes (success or failure). It then requests stop for all siblings and
      42              :    waits for them to acknowledge before returning.
      43              : 
      44              :    ARCHITECTURE:
      45              :    -------------
      46              :    The design mirrors when_all but with inverted completion semantics:
      47              : 
      48              :      when_all:  complete when remaining_count reaches 0 (all done)
      49              :      when_any:  complete when has_winner becomes true (first done)
      50              :                 BUT still wait for remaining_count to reach 0 for cleanup
      51              : 
      52              :    Key components:
      53              :      - when_any_state:    Shared state tracking winner and completion
      54              :      - when_any_runner:   Wrapper coroutine for each child task
      55              :      - when_any_launcher: Awaitable that starts all runners concurrently
      56              : 
      57              :    CRITICAL INVARIANTS:
      58              :    --------------------
      59              :    1. Exactly one task becomes the winner (via atomic compare_exchange)
      60              :    2. All tasks must complete before parent resumes (cleanup safety)
      61              :    3. Stop is requested immediately when winner is determined
      62              :    4. Only the winner's result/exception is stored
      63              : 
      64              :    TYPE DEDUPLICATION:
      65              :    -------------------
      66              :    std::variant requires unique alternative types. Since when_any can race
      67              :    tasks with identical return types (e.g., three task<int>), we must
      68              :    deduplicate types before constructing the variant.
      69              : 
      70              :    Example: when_any(task<int>, task<string>, task<int>)
      71              :      - Raw types after void->monostate: int, string, int
      72              :      - Deduplicated variant: std::variant<int, string>
      73              :      - Return: pair<size_t, variant<int, string>>
      74              : 
      75              :    The winner_index tells you which task won (0, 1, or 2), while the variant
      76              :    holds the result. Use the index to determine how to interpret the variant.
      77              : 
      78              :    VOID HANDLING:
      79              :    --------------
      80              :    void tasks contribute std::monostate to the variant (then deduplicated).
      81              :    All-void tasks result in: pair<size_t, variant<monostate>>
      82              : 
      83              :    MEMORY MODEL:
      84              :    -------------
      85              :    Synchronization chain from winner's write to parent's read:
      86              : 
      87              :    1. Winner thread writes result_/winner_exception_ (non-atomic)
      88              :    2. Winner thread calls signal_completion() → fetch_sub(acq_rel) on remaining_count_
      89              :    3. Last task thread (may be winner or non-winner) calls signal_completion()
      90              :       → fetch_sub(acq_rel) on remaining_count_, observing count becomes 0
      91              :    4. Last task returns caller_ex_.dispatch(continuation_) via symmetric transfer
      92              :    5. Parent coroutine resumes and reads result_/winner_exception_
      93              : 
      94              :    Synchronization analysis:
      95              :    - All fetch_sub operations on remaining_count_ form a release sequence
      96              :    - Winner's fetch_sub releases; subsequent fetch_sub operations participate
      97              :      in the modification order of remaining_count_
      98              :    - Last task's fetch_sub(acq_rel) synchronizes-with prior releases in the
      99              :      modification order, establishing happens-before from winner's writes
     100              :    - Executor dispatch() is expected to provide queue-based synchronization
     101              :      (release-on-post, acquire-on-execute) completing the chain to parent
     102              :    - Even inline executors work (same thread = sequenced-before)
     103              : 
     104              :    Alternative considered: Adding winner_ready_ atomic (set with release after
     105              :    storing winner data, acquired before reading) would make synchronization
     106              :    self-contained and not rely on executor implementation details. Current
     107              :    approach is correct but requires careful reasoning about release sequences
     108              :    and executor behavior.
     109              : 
     110              :    EXCEPTION SEMANTICS:
     111              :    --------------------
     112              :    Unlike when_all (which captures first exception, discards others), when_any
     113              :    treats exceptions as valid completions. If the winning task threw, that
     114              :    exception is rethrown. Exceptions from non-winners are silently discarded.
     115              : */
     116              : 
     117              : namespace boost {
     118              : namespace capy {
     119              : 
     120              : namespace detail {
     121              : 
     122              : /** Convert void to monostate for variant storage.
     123              : 
     124              :     std::variant<void, ...> is ill-formed, so void tasks contribute
     125              :     std::monostate to the result variant instead. Non-void types
     126              :     pass through unchanged.
     127              : 
     128              :     @tparam T The type to potentially convert (void becomes monostate).
     129              : */
     130              : template<typename T>
     131              : using void_to_monostate_t = std::conditional_t<std::is_void_v<T>, std::monostate, T>;
     132              : 
     133              : // Type deduplication: std::variant requires unique alternative types.
     134              : // Fold left over the type list, appending each type only if not already present.
     135              : template<typename Variant, typename T>
     136              : struct variant_append_if_unique;
     137              : 
     138              : template<typename... Vs, typename T>
     139              : struct variant_append_if_unique<std::variant<Vs...>, T>
     140              : {
     141              :     using type = std::conditional_t<
     142              :         (std::is_same_v<T, Vs> || ...),
     143              :         std::variant<Vs...>,
     144              :         std::variant<Vs..., T>>;
     145              : };
     146              : 
     147              : template<typename Accumulated, typename... Remaining>
     148              : struct deduplicate_impl;
     149              : 
     150              : template<typename Accumulated>
     151              : struct deduplicate_impl<Accumulated>
     152              : {
     153              :     using type = Accumulated;
     154              : };
     155              : 
     156              : template<typename Accumulated, typename T, typename... Rest>
     157              : struct deduplicate_impl<Accumulated, T, Rest...>
     158              : {
     159              :     using next = typename variant_append_if_unique<Accumulated, T>::type;
     160              :     using type = typename deduplicate_impl<next, Rest...>::type;
     161              : };
     162              : 
     163              : // Deduplicated variant; void types become monostate before deduplication
     164              : template<typename T0, typename... Ts>
     165              : using unique_variant_t = typename deduplicate_impl<
     166              :     std::variant<void_to_monostate_t<T0>>,
     167              :     void_to_monostate_t<Ts>...>::type;
     168              : 
     169              : // Result: (winner_index, deduplicated_variant). Use index to disambiguate
     170              : // when multiple tasks share the same return type.
     171              : template<typename T0, typename... Ts>
     172              : using when_any_result_t = std::pair<std::size_t, unique_variant_t<T0, Ts...>>;
     173              : 
     174              : // Extract result type from any awaitable via await_resume()
     175              : template<typename A>
     176              : using awaitable_result_t = decltype(std::declval<std::decay_t<A>&>().await_resume());
     177              : 
     178              : /** Core shared state for when_any operations.
     179              : 
     180              :     Contains all members and methods common to both heterogeneous (variadic)
     181              :     and homogeneous (range) when_any implementations. State classes embed
     182              :     this via composition to avoid CRTP destructor ordering issues.
     183              : 
     184              :     @par Thread Safety
     185              :     Atomic operations protect winner selection and completion count.
     186              : */
     187              : struct when_any_core
     188              : {
     189              :     std::atomic<std::size_t> remaining_count_;
     190              :     std::size_t winner_index_{0};
     191              :     std::exception_ptr winner_exception_;
     192              :     std::stop_source stop_source_;
     193              : 
     194              :     // Bridges parent's stop token to our stop_source
     195              :     struct stop_callback_fn
     196              :     {
     197              :         std::stop_source* source_;
     198            6 :         void operator()() const noexcept { source_->request_stop(); }
     199              :     };
     200              :     using stop_callback_t = std::stop_callback<stop_callback_fn>;
     201              :     std::optional<stop_callback_t> parent_stop_callback_;
     202              : 
     203              :     coro continuation_;
     204              :     executor_ref caller_ex_;
     205              : 
     206              :     // Placed last to avoid padding (1-byte atomic followed by 8-byte aligned members)
     207              :     std::atomic<bool> has_winner_{false};
     208              : 
     209           52 :     explicit when_any_core(std::size_t count) noexcept
     210           52 :         : remaining_count_(count)
     211              :     {
     212           52 :     }
     213              : 
     214              :     /** Atomically claim winner status; exactly one task succeeds. */
     215          145 :     bool try_win(std::size_t index) noexcept
     216              :     {
     217          145 :         bool expected = false;
     218          145 :         if(has_winner_.compare_exchange_strong(
     219              :             expected, true, std::memory_order_acq_rel))
     220              :         {
     221           52 :             winner_index_ = index;
     222           52 :             stop_source_.request_stop();
     223           52 :             return true;
     224              :         }
     225           93 :         return false;
     226              :     }
     227              : 
     228              :     /** @pre try_win() returned true. */
     229            8 :     void set_winner_exception(std::exception_ptr ep) noexcept
     230              :     {
     231            8 :         winner_exception_ = ep;
     232            8 :     }
     233              : 
     234              :     /** Last task to complete resumes the parent via symmetric transfer. */
     235          145 :     coro signal_completion()
     236              :     {
     237          145 :         auto remaining = remaining_count_.fetch_sub(1, std::memory_order_acq_rel);
     238          145 :         if(remaining == 1)
     239           52 :             caller_ex_.dispatch(continuation_);
     240          145 :         return std::noop_coroutine();
     241              :     }
     242              : };
     243              : 
     244              : /** Shared state for heterogeneous when_any operation.
     245              : 
     246              :     Coordinates winner selection, result storage, and completion tracking
     247              :     for all child tasks in a when_any operation. Uses composition with
     248              :     when_any_core for shared functionality.
     249              : 
     250              :     @par Lifetime
     251              :     Allocated on the parent coroutine's frame, outlives all runners.
     252              : 
     253              :     @tparam T0 First task's result type.
     254              :     @tparam Ts Remaining tasks' result types.
     255              : */
     256              : template<typename T0, typename... Ts>
     257              : struct when_any_state
     258              : {
     259              :     static constexpr std::size_t task_count = 1 + sizeof...(Ts);
     260              :     using variant_type = unique_variant_t<T0, Ts...>;
     261              : 
     262              :     when_any_core core_;
     263              :     std::optional<variant_type> result_;
     264              :     std::array<coro, task_count> runner_handles_{};
     265              : 
     266           37 :     when_any_state()
     267           37 :         : core_(task_count)
     268              :     {
     269           37 :     }
     270              : 
     271           37 :     ~when_any_state()
     272              :     {
     273          130 :         for(auto h : runner_handles_)
     274           93 :             if(h)
     275           93 :                 h.destroy();
     276           37 :     }
     277              : 
     278              :     /** @pre core_.try_win() returned true.
     279              :         @note Uses in_place_type (not index) because variant is deduplicated.
     280              :     */
     281              :     template<typename T>
     282           30 :     void set_winner_result(T value)
     283              :         noexcept(std::is_nothrow_move_constructible_v<T>)
     284              :     {
     285           30 :         result_.emplace(std::in_place_type<T>, std::move(value));
     286           30 :     }
     287              : 
     288              :     /** @pre core_.try_win() returned true. */
     289            2 :     void set_winner_void() noexcept
     290              :     {
     291            2 :         result_.emplace(std::in_place_type<std::monostate>, std::monostate{});
     292            2 :     }
     293              : };
     294              : 
     295              : /** Wrapper coroutine that runs a single child task for when_any.
     296              : 
     297              :     Propagates executor/stop_token to the child, attempts to claim winner
     298              :     status on completion, and signals completion for cleanup coordination.
     299              : 
     300              :     @tparam StateType The state type (when_any_state or when_any_homogeneous_state).
     301              : */
     302              : template<typename StateType>
     303              : struct when_any_runner
     304              : {
     305              :     struct promise_type // : frame_allocating_base  // DISABLED FOR TESTING
     306              :     {
     307              :         StateType* state_ = nullptr;
     308              :         std::size_t index_ = 0;
     309              :         executor_ref ex_;
     310              :         std::stop_token stop_token_;
     311              : 
     312          145 :         when_any_runner get_return_object() noexcept
     313              :         {
     314          145 :             return when_any_runner(std::coroutine_handle<promise_type>::from_promise(*this));
     315              :         }
     316              : 
     317              :         // Starts suspended; launcher sets up state/ex/token then resumes
     318          145 :         std::suspend_always initial_suspend() noexcept
     319              :         {
     320          145 :             return {};
     321              :         }
     322              : 
     323          145 :         auto final_suspend() noexcept
     324              :         {
     325              :             struct awaiter
     326              :             {
     327              :                 promise_type* p_;
     328          145 :                 bool await_ready() const noexcept { return false; }
     329          145 :                 coro await_suspend(coro) noexcept { return p_->state_->core_.signal_completion(); }
     330            0 :                 void await_resume() const noexcept {}
     331              :             };
     332          145 :             return awaiter{this};
     333              :         }
     334              : 
     335          133 :         void return_void() noexcept {}
     336              : 
     337              :         // Exceptions are valid completions in when_any (unlike when_all)
     338           12 :         void unhandled_exception()
     339              :         {
     340           12 :             if(state_->core_.try_win(index_))
     341            8 :                 state_->core_.set_winner_exception(std::current_exception());
     342           12 :         }
     343              : 
     344              :         /** Injects executor and stop token into child awaitables. */
     345              :         template<class Awaitable>
     346              :         struct transform_awaiter
     347              :         {
     348              :             std::decay_t<Awaitable> a_;
     349              :             promise_type* p_;
     350              : 
     351          145 :             bool await_ready() { return a_.await_ready(); }
     352          145 :             auto await_resume() { return a_.await_resume(); }
     353              : 
     354              :             template<class Promise>
     355          145 :             auto await_suspend(std::coroutine_handle<Promise> h)
     356              :             {
     357          145 :                 return a_.await_suspend(h, p_->ex_, p_->stop_token_);
     358              :             }
     359              :         };
     360              : 
     361              :         template<class Awaitable>
     362          145 :         auto await_transform(Awaitable&& a)
     363              :         {
     364              :             using A = std::decay_t<Awaitable>;
     365              :             if constexpr (IoAwaitable<A>)
     366              :             {
     367              :                 return transform_awaiter<Awaitable>{
     368          290 :                     std::forward<Awaitable>(a), this};
     369              :             }
     370              :             else
     371              :             {
     372              :                 static_assert(sizeof(A) == 0, "requires IoAwaitable");
     373              :             }
     374          145 :         }
     375              :     };
     376              : 
     377              :     std::coroutine_handle<promise_type> h_;
     378              : 
     379          145 :     explicit when_any_runner(std::coroutine_handle<promise_type> h) noexcept
     380          145 :         : h_(h)
     381              :     {
     382          145 :     }
     383              : 
     384              :     // Enable move for all clang versions - some versions need it
     385              :     when_any_runner(when_any_runner&& other) noexcept : h_(std::exchange(other.h_, nullptr)) {}
     386              : 
     387              :     // Non-copyable
     388              :     when_any_runner(when_any_runner const&) = delete;
     389              :     when_any_runner& operator=(when_any_runner const&) = delete;
     390              :     when_any_runner& operator=(when_any_runner&&) = delete;
     391              : 
     392          145 :     auto release() noexcept
     393              :     {
     394          145 :         return std::exchange(h_, nullptr);
     395              :     }
     396              : };
     397              : 
     398              : /** Wraps a child awaitable, attempts to claim winner on completion.
     399              : 
     400              :     Uses requires-expressions to detect state capabilities:
     401              :     - set_winner_void(): for heterogeneous void tasks (stores monostate)
     402              :     - set_winner_result(): for non-void tasks
     403              :     - Neither: for homogeneous void tasks (no result storage)
     404              : */
     405              : template<IoAwaitable Awaitable, typename StateType>
     406              : when_any_runner<StateType>
     407          145 : make_when_any_runner(Awaitable inner, StateType* state, std::size_t index)
     408              : {
     409              :     using T = awaitable_result_t<Awaitable>;
     410              :     if constexpr (std::is_void_v<T>)
     411              :     {
     412              :         co_await std::move(inner);
     413              :         if(state->core_.try_win(index))
     414              :         {
     415              :             // Heterogeneous void tasks store monostate in the variant
     416              :             if constexpr (requires { state->set_winner_void(); })
     417              :                 state->set_winner_void();
     418              :             // Homogeneous void tasks have no result to store
     419              :         }
     420              :     }
     421              :     else
     422              :     {
     423              :         auto result = co_await std::move(inner);
     424              :         if(state->core_.try_win(index))
     425              :         {
     426              :             // Defensive: move should not throw (already moved once), but we
     427              :             // catch just in case since an uncaught exception would be devastating.
     428              :             try
     429              :             {
     430              :                 state->set_winner_result(std::move(result));
     431              :             }
     432              :             catch(...)
     433              :             {
     434              :                 state->core_.set_winner_exception(std::current_exception());
     435              :             }
     436              :         }
     437              :     }
     438          290 : }
     439              : 
     440              : /** Launches all runners concurrently; see await_suspend for lifetime concerns. */
     441              : template<IoAwaitable... Awaitables>
     442              : class when_any_launcher
     443              : {
     444              :     using state_type = when_any_state<awaitable_result_t<Awaitables>...>;
     445              : 
     446              :     std::tuple<Awaitables...>* tasks_;
     447              :     state_type* state_;
     448              : 
     449              : public:
     450           37 :     when_any_launcher(
     451              :         std::tuple<Awaitables...>* tasks,
     452              :         state_type* state)
     453           37 :         : tasks_(tasks)
     454           37 :         , state_(state)
     455              :     {
     456           37 :     }
     457              : 
     458           37 :     bool await_ready() const noexcept
     459              :     {
     460           37 :         return sizeof...(Awaitables) == 0;
     461              :     }
     462              : 
     463              :     /** CRITICAL: If the last task finishes synchronously, parent resumes and
     464              :         destroys this object before await_suspend returns. Must not reference
     465              :         `this` after the final launch_one call.
     466              :     */
     467              :     template<Executor Ex>
     468           37 :     coro await_suspend(coro continuation, Ex const& caller_ex, std::stop_token parent_token = {})
     469              :     {
     470           37 :         state_->core_.continuation_ = continuation;
     471           37 :         state_->core_.caller_ex_ = caller_ex;
     472              : 
     473           37 :         if(parent_token.stop_possible())
     474              :         {
     475           16 :             state_->core_.parent_stop_callback_.emplace(
     476              :                 parent_token,
     477            8 :                 when_any_core::stop_callback_fn{&state_->core_.stop_source_});
     478              : 
     479            8 :             if(parent_token.stop_requested())
     480            2 :                 state_->core_.stop_source_.request_stop();
     481              :         }
     482              : 
     483           37 :         auto token = state_->core_.stop_source_.get_token();
     484           74 :         [&]<std::size_t... Is>(std::index_sequence<Is...>) {
     485           37 :             (..., launch_one<Is>(caller_ex, token));
     486           37 :         }(std::index_sequence_for<Awaitables...>{});
     487              : 
     488           74 :         return std::noop_coroutine();
     489           37 :     }
     490              : 
     491           37 :     void await_resume() const noexcept
     492              :     {
     493           37 :     }
     494              : 
     495              : private:
     496              :     /** @pre Ex::dispatch() and coro::resume() must not throw (handle may leak). */
     497              :     template<std::size_t I, Executor Ex>
     498           93 :     void launch_one(Ex const& caller_ex, std::stop_token token)
     499              :     {
     500           93 :         auto runner = make_when_any_runner(
     501           93 :             std::move(std::get<I>(*tasks_)), state_, I);
     502              : 
     503           93 :         auto h = runner.release();
     504           93 :         h.promise().state_ = state_;
     505           93 :         h.promise().index_ = I;
     506           93 :         h.promise().ex_ = caller_ex;
     507           93 :         h.promise().stop_token_ = token;
     508              : 
     509           93 :         coro ch{h};
     510           93 :         state_->runner_handles_[I] = ch;
     511           93 :         caller_ex.dispatch(ch);
     512           93 :     }
     513              : };
     514              : 
     515              : } // namespace detail
     516              : 
     517              : /** Wait for the first awaitable to complete.
     518              : 
     519              :     Races multiple heterogeneous awaitables concurrently and returns when the
     520              :     first one completes. The result includes the winner's index and a
     521              :     deduplicated variant containing the result value.
     522              : 
     523              :     @par Suspends
     524              :     The calling coroutine suspends when co_await is invoked. All awaitables
     525              :     are launched concurrently and execute in parallel. The coroutine resumes
     526              :     only after all awaitables have completed, even though the winner is
     527              :     determined by the first to finish.
     528              : 
     529              :     @par Completion Conditions
     530              :     @li Winner is determined when the first awaitable completes (success or exception)
     531              :     @li Only one task can claim winner status via atomic compare-exchange
     532              :     @li Once a winner exists, stop is requested for all remaining siblings
     533              :     @li Parent coroutine resumes only after all siblings acknowledge completion
     534              :     @li The winner's result is returned; if the winner threw, the exception is rethrown
     535              : 
     536              :     @par Cancellation Semantics
     537              :     Cancellation is supported via stop_token propagated through the
     538              :     IoAwaitable protocol:
     539              :     @li Each child awaitable receives a stop_token derived from a shared stop_source
     540              :     @li When the parent's stop token is activated, the stop is forwarded to all children
     541              :     @li When a winner is determined, stop_source_.request_stop() is called immediately
     542              :     @li Siblings must handle cancellation gracefully and complete before parent resumes
     543              :     @li Stop requests are cooperative; tasks must check and respond to them
     544              : 
     545              :     @par Concurrency/Overlap
     546              :     All awaitables are launched concurrently before any can complete.
     547              :     The launcher iterates through the arguments, starting each task on the
     548              :     caller's executor. Tasks may execute in parallel on multi-threaded
     549              :     executors or interleave on single-threaded executors. There is no
     550              :     guaranteed ordering of task completion.
     551              : 
     552              :     @par Notable Error Conditions
     553              :     @li Winner exception: if the winning task threw, that exception is rethrown
     554              :     @li Non-winner exceptions: silently discarded (only winner's result matters)
     555              :     @li Cancellation: tasks may complete via cancellation without throwing
     556              : 
     557              :     @par Example
     558              :     @code
     559              :     task<void> example() {
     560              :         auto [index, result] = co_await when_any(
     561              :             fetch_from_primary(),   // task<Response>
     562              :             fetch_from_backup()     // task<Response>
     563              :         );
     564              :         // index is 0 or 1, result holds the winner's Response
     565              :         auto response = std::get<Response>(result);
     566              :     }
     567              :     @endcode
     568              : 
     569              :     @par Example with Heterogeneous Types
     570              :     @code
     571              :     task<void> mixed_types() {
     572              :         auto [index, result] = co_await when_any(
     573              :             fetch_int(),      // task<int>
     574              :             fetch_string()    // task<std::string>
     575              :         );
     576              :         if (index == 0)
     577              :             std::cout << "Got int: " << std::get<int>(result) << "\n";
     578              :         else
     579              :             std::cout << "Got string: " << std::get<std::string>(result) << "\n";
     580              :     }
     581              :     @endcode
     582              : 
     583              :     @tparam A0 First awaitable type (must satisfy IoAwaitable).
     584              :     @tparam As Remaining awaitable types (must satisfy IoAwaitable).
     585              :     @param a0 The first awaitable to race.
     586              :     @param as Additional awaitables to race concurrently.
     587              :     @return A task yielding a pair of (winner_index, result_variant).
     588              : 
     589              :     @throws Rethrows the winner's exception if the winning task threw an exception.
     590              : 
     591              :     @par Remarks
     592              :     Awaitables are moved into the coroutine frame; original objects become
     593              :     empty after the call. When multiple awaitables share the same return type,
     594              :     the variant is deduplicated to contain only unique types. Use the winner
     595              :     index to determine which awaitable completed first. Void awaitables
     596              :     contribute std::monostate to the variant.
     597              : 
     598              :     @see when_all, IoAwaitable
     599              : */
     600              : template<IoAwaitable A0, IoAwaitable... As>
     601           37 : [[nodiscard]] auto when_any(A0 a0, As... as)
     602              :     -> task<detail::when_any_result_t<
     603              :         detail::awaitable_result_t<A0>,
     604              :         detail::awaitable_result_t<As>...>>
     605              : {
     606              :     using result_type = detail::when_any_result_t<
     607              :         detail::awaitable_result_t<A0>,
     608              :         detail::awaitable_result_t<As>...>;
     609              : 
     610              :     detail::when_any_state<
     611              :         detail::awaitable_result_t<A0>,
     612              :         detail::awaitable_result_t<As>...> state;
     613              :     std::tuple<A0, As...> awaitable_tuple(std::move(a0), std::move(as)...);
     614              : 
     615              :     co_await detail::when_any_launcher<A0, As...>(&awaitable_tuple, &state);
     616              : 
     617              :     if(state.core_.winner_exception_)
     618              :         std::rethrow_exception(state.core_.winner_exception_);
     619              : 
     620              :     co_return result_type{state.core_.winner_index_, std::move(*state.result_)};
     621           74 : }
     622              : 
     623              : /** Concept for ranges of full I/O awaitables.
     624              : 
     625              :     A range satisfies `IoAwaitableRange` if it is a sized input range
     626              :     whose value type satisfies @ref IoAwaitable. This enables when_any
     627              :     to accept any container or view of awaitables, not just std::vector.
     628              : 
     629              :     @tparam R The range type.
     630              : 
     631              :     @par Requirements
     632              :     @li `R` must satisfy `std::ranges::input_range`
     633              :     @li `R` must satisfy `std::ranges::sized_range`
     634              :     @li `std::ranges::range_value_t<R>` must satisfy @ref IoAwaitable
     635              : 
     636              :     @par Syntactic Requirements
     637              :     Given `r` of type `R`:
     638              :     @li `std::ranges::begin(r)` is valid
     639              :     @li `std::ranges::end(r)` is valid
     640              :     @li `std::ranges::size(r)` returns `std::ranges::range_size_t<R>`
     641              :     @li `*std::ranges::begin(r)` satisfies @ref IoAwaitable
     642              : 
     643              :     @par Example
     644              :     @code
     645              :     template<IoAwaitableRange R>
     646              :     task<void> race_all(R&& awaitables) {
     647              :         auto winner = co_await when_any(std::forward<R>(awaitables));
     648              :         // Process winner...
     649              :     }
     650              :     @endcode
     651              : 
     652              :     @see when_any, IoAwaitable
     653              : */
     654              : template<typename R>
     655              : concept IoAwaitableRange =
     656              :     std::ranges::input_range<R> &&
     657              :     std::ranges::sized_range<R> &&
     658              :     IoAwaitable<std::ranges::range_value_t<R>>;
     659              : 
     660              : namespace detail {
     661              : 
     662              : /** Shared state for homogeneous when_any (range overload).
     663              : 
     664              :     Uses composition with when_any_core for shared functionality.
     665              :     Simpler than heterogeneous: optional<T> instead of variant, vector
     666              :     instead of array for runner handles.
     667              : */
     668              : template<typename T>
     669              : struct when_any_homogeneous_state
     670              : {
     671              :     when_any_core core_;
     672              :     std::optional<T> result_;
     673              :     std::vector<coro> runner_handles_;
     674              : 
     675           13 :     explicit when_any_homogeneous_state(std::size_t count)
     676           13 :         : core_(count)
     677           26 :         , runner_handles_(count)
     678              :     {
     679           13 :     }
     680              : 
     681           13 :     ~when_any_homogeneous_state()
     682              :     {
     683           60 :         for(auto h : runner_handles_)
     684           47 :             if(h)
     685           47 :                 h.destroy();
     686           13 :     }
     687              : 
     688              :     /** @pre core_.try_win() returned true. */
     689           11 :     void set_winner_result(T value)
     690              :         noexcept(std::is_nothrow_move_constructible_v<T>)
     691              :     {
     692           11 :         result_.emplace(std::move(value));
     693           11 :     }
     694              : };
     695              : 
     696              : /** Specialization for void tasks (no result storage needed). */
     697              : template<>
     698              : struct when_any_homogeneous_state<void>
     699              : {
     700              :     when_any_core core_;
     701              :     std::vector<coro> runner_handles_;
     702              : 
     703            2 :     explicit when_any_homogeneous_state(std::size_t count)
     704            2 :         : core_(count)
     705            4 :         , runner_handles_(count)
     706              :     {
     707            2 :     }
     708              : 
     709            2 :     ~when_any_homogeneous_state()
     710              :     {
     711            7 :         for(auto h : runner_handles_)
     712            5 :             if(h)
     713            5 :                 h.destroy();
     714            2 :     }
     715              : 
     716              :     // No set_winner_result - void tasks have no result to store
     717              : };
     718              : 
     719              : /** Launches all runners concurrently; see await_suspend for lifetime concerns. */
     720              : template<IoAwaitableRange Range>
     721              : class when_any_homogeneous_launcher
     722              : {
     723              :     using Awaitable = std::ranges::range_value_t<Range>;
     724              :     using T = awaitable_result_t<Awaitable>;
     725              : 
     726              :     Range* range_;
     727              :     when_any_homogeneous_state<T>* state_;
     728              : 
     729              : public:
     730           15 :     when_any_homogeneous_launcher(
     731              :         Range* range,
     732              :         when_any_homogeneous_state<T>* state)
     733           15 :         : range_(range)
     734           15 :         , state_(state)
     735              :     {
     736           15 :     }
     737              : 
     738           15 :     bool await_ready() const noexcept
     739              :     {
     740           15 :         return std::ranges::empty(*range_);
     741              :     }
     742              : 
     743              :     /** CRITICAL: If the last task finishes synchronously, parent resumes and
     744              :         destroys this object before await_suspend returns. Must not reference
     745              :         `this` after the final launch_one call.
     746              :     */
     747              :     template<Executor Ex>
     748           15 :     coro await_suspend(coro continuation, Ex const& caller_ex, std::stop_token parent_token = {})
     749              :     {
     750           15 :         state_->core_.continuation_ = continuation;
     751           15 :         state_->core_.caller_ex_ = caller_ex;
     752              : 
     753           15 :         if(parent_token.stop_possible())
     754              :         {
     755           10 :             state_->core_.parent_stop_callback_.emplace(
     756              :                 parent_token,
     757            5 :                 when_any_core::stop_callback_fn{&state_->core_.stop_source_});
     758              : 
     759            5 :             if(parent_token.stop_requested())
     760            2 :                 state_->core_.stop_source_.request_stop();
     761              :         }
     762              : 
     763           15 :         auto token = state_->core_.stop_source_.get_token();
     764           15 :         std::size_t index = 0;
     765           67 :         for(auto&& a : *range_)
     766              :         {
     767           52 :             launch_one(index, std::move(a), caller_ex, token);
     768           52 :             ++index;
     769              :         }
     770              : 
     771           30 :         return std::noop_coroutine();
     772           15 :     }
     773              : 
     774           15 :     void await_resume() const noexcept
     775              :     {
     776           15 :     }
     777              : 
     778              : private:
     779              :     /** @pre Ex::dispatch() and coro::resume() must not throw (handle may leak). */
     780              :     template<Executor Ex>
     781           52 :     void launch_one(std::size_t index, Awaitable&& awaitable, Ex const& caller_ex, std::stop_token token)
     782              :     {
     783           52 :         auto runner = make_when_any_runner(
     784           52 :             std::move(awaitable), state_, index);
     785              : 
     786           52 :         auto h = runner.release();
     787           52 :         h.promise().state_ = state_;
     788           52 :         h.promise().index_ = index;
     789           52 :         h.promise().ex_ = caller_ex;
     790           52 :         h.promise().stop_token_ = token;
     791              : 
     792           52 :         coro ch{h};
     793           52 :         state_->runner_handles_[index] = ch;
     794           52 :         caller_ex.dispatch(ch);
     795           52 :     }
     796              : };
     797              : 
     798              : } // namespace detail
     799              : 
     800              : /** Wait for the first awaitable to complete (range overload).
     801              : 
     802              :     Races a range of awaitables with the same result type. Accepts any
     803              :     sized input range of IoAwaitable types, enabling use with arrays,
     804              :     spans, or custom containers.
     805              : 
     806              :     @par Suspends
     807              :     The calling coroutine suspends when co_await is invoked. All awaitables
     808              :     in the range are launched concurrently and execute in parallel. The
     809              :     coroutine resumes only after all awaitables have completed, even though
     810              :     the winner is determined by the first to finish.
     811              : 
     812              :     @par Completion Conditions
     813              :     @li Winner is determined when the first awaitable completes (success or exception)
     814              :     @li Only one task can claim winner status via atomic compare-exchange
     815              :     @li Once a winner exists, stop is requested for all remaining siblings
     816              :     @li Parent coroutine resumes only after all siblings acknowledge completion
     817              :     @li The winner's index and result are returned; if the winner threw, the exception is rethrown
     818              : 
     819              :     @par Cancellation Semantics
     820              :     Cancellation is supported via stop_token propagated through the
     821              :     IoAwaitable protocol:
     822              :     @li Each child awaitable receives a stop_token derived from a shared stop_source
     823              :     @li When the parent's stop token is activated, the stop is forwarded to all children
     824              :     @li When a winner is determined, stop_source_.request_stop() is called immediately
     825              :     @li Siblings must handle cancellation gracefully and complete before parent resumes
     826              :     @li Stop requests are cooperative; tasks must check and respond to them
     827              : 
     828              :     @par Concurrency/Overlap
     829              :     All awaitables are launched concurrently before any can complete.
     830              :     The launcher iterates through the range, starting each task on the
     831              :     caller's executor. Tasks may execute in parallel on multi-threaded
     832              :     executors or interleave on single-threaded executors. There is no
     833              :     guaranteed ordering of task completion.
     834              : 
     835              :     @par Notable Error Conditions
     836              :     @li Empty range: throws std::invalid_argument immediately (not via co_return)
     837              :     @li Winner exception: if the winning task threw, that exception is rethrown
     838              :     @li Non-winner exceptions: silently discarded (only winner's result matters)
     839              :     @li Cancellation: tasks may complete via cancellation without throwing
     840              : 
     841              :     @par Example
     842              :     @code
     843              :     task<void> example() {
     844              :         std::array<task<Response>, 3> requests = {
     845              :             fetch_from_server(0),
     846              :             fetch_from_server(1),
     847              :             fetch_from_server(2)
     848              :         };
     849              : 
     850              :         auto [index, response] = co_await when_any(std::move(requests));
     851              :     }
     852              :     @endcode
     853              : 
     854              :     @par Example with Vector
     855              :     @code
     856              :     task<Response> fetch_fastest(std::vector<Server> const& servers) {
     857              :         std::vector<task<Response>> requests;
     858              :         for (auto const& server : servers)
     859              :             requests.push_back(fetch_from(server));
     860              : 
     861              :         auto [index, response] = co_await when_any(std::move(requests));
     862              :         co_return response;
     863              :     }
     864              :     @endcode
     865              : 
     866              :     @tparam R Range type satisfying IoAwaitableRange.
     867              :     @param awaitables Range of awaitables to race concurrently (must not be empty).
     868              :     @return A task yielding a pair of (winner_index, result).
     869              : 
     870              :     @throws std::invalid_argument if range is empty (thrown before coroutine suspends).
     871              :     @throws Rethrows the winner's exception if the winning task threw an exception.
     872              : 
     873              :     @par Remarks
     874              :     Elements are moved from the range; for lvalue ranges, the original
     875              :     container will have moved-from elements after this call. The range
     876              :     is moved onto the coroutine frame to ensure lifetime safety. Unlike
     877              :     the variadic overload, no variant wrapper is needed since all tasks
     878              :     share the same return type.
     879              : 
     880              :     @see when_any, IoAwaitableRange
     881              : */
     882              : template<IoAwaitableRange R>
     883              :     requires (!std::is_void_v<detail::awaitable_result_t<std::ranges::range_value_t<R>>>)
     884           14 : [[nodiscard]] auto when_any(R&& awaitables)
     885              :     -> task<std::pair<std::size_t, detail::awaitable_result_t<std::ranges::range_value_t<R>>>>
     886              : {
     887              :     using Awaitable = std::ranges::range_value_t<R>;
     888              :     using T = detail::awaitable_result_t<Awaitable>;
     889              :     using result_type = std::pair<std::size_t, T>;
     890              :     using OwnedRange = std::remove_cvref_t<R>;
     891              : 
     892              :     auto count = std::ranges::size(awaitables);
     893              :     if(count == 0)
     894              :         throw std::invalid_argument("when_any requires at least one awaitable");
     895              : 
     896              :     // Move/copy range onto coroutine frame to ensure lifetime
     897              :     OwnedRange owned_awaitables = std::forward<R>(awaitables);
     898              : 
     899              :     detail::when_any_homogeneous_state<T> state(count);
     900              : 
     901              :     co_await detail::when_any_homogeneous_launcher<OwnedRange>(&owned_awaitables, &state);
     902              : 
     903              :     if(state.core_.winner_exception_)
     904              :         std::rethrow_exception(state.core_.winner_exception_);
     905              : 
     906              :     co_return result_type{state.core_.winner_index_, std::move(*state.result_)};
     907           28 : }
     908              : 
     909              : /** Wait for the first awaitable to complete (void range overload).
     910              : 
     911              :     Races a range of void-returning awaitables. Since void awaitables have
     912              :     no result value, only the winner's index is returned.
     913              : 
     914              :     @par Suspends
     915              :     The calling coroutine suspends when co_await is invoked. All awaitables
     916              :     in the range are launched concurrently and execute in parallel. The
     917              :     coroutine resumes only after all awaitables have completed, even though
     918              :     the winner is determined by the first to finish.
     919              : 
     920              :     @par Completion Conditions
     921              :     @li Winner is determined when the first awaitable completes (success or exception)
     922              :     @li Only one task can claim winner status via atomic compare-exchange
     923              :     @li Once a winner exists, stop is requested for all remaining siblings
     924              :     @li Parent coroutine resumes only after all siblings acknowledge completion
     925              :     @li The winner's index is returned; if the winner threw, the exception is rethrown
     926              : 
     927              :     @par Cancellation Semantics
     928              :     Cancellation is supported via stop_token propagated through the
     929              :     IoAwaitable protocol:
     930              :     @li Each child awaitable receives a stop_token derived from a shared stop_source
     931              :     @li When the parent's stop token is activated, the stop is forwarded to all children
     932              :     @li When a winner is determined, stop_source_.request_stop() is called immediately
     933              :     @li Siblings must handle cancellation gracefully and complete before parent resumes
     934              :     @li Stop requests are cooperative; tasks must check and respond to them
     935              : 
     936              :     @par Concurrency/Overlap
     937              :     All awaitables are launched concurrently before any can complete.
     938              :     The launcher iterates through the range, starting each task on the
     939              :     caller's executor. Tasks may execute in parallel on multi-threaded
     940              :     executors or interleave on single-threaded executors. There is no
     941              :     guaranteed ordering of task completion.
     942              : 
     943              :     @par Notable Error Conditions
     944              :     @li Empty range: throws std::invalid_argument immediately (not via co_return)
     945              :     @li Winner exception: if the winning task threw, that exception is rethrown
     946              :     @li Non-winner exceptions: silently discarded (only winner's result matters)
     947              :     @li Cancellation: tasks may complete via cancellation without throwing
     948              : 
     949              :     @par Example
     950              :     @code
     951              :     task<void> example() {
     952              :         std::vector<task<void>> tasks;
     953              :         for (int i = 0; i < 5; ++i)
     954              :             tasks.push_back(background_work(i));
     955              : 
     956              :         std::size_t winner = co_await when_any(std::move(tasks));
     957              :         // winner is the index of the first task to complete
     958              :     }
     959              :     @endcode
     960              : 
     961              :     @par Example with Timeout
     962              :     @code
     963              :     task<void> with_timeout() {
     964              :         std::vector<task<void>> tasks;
     965              :         tasks.push_back(long_running_operation());
     966              :         tasks.push_back(delay(std::chrono::seconds(5)));
     967              : 
     968              :         std::size_t winner = co_await when_any(std::move(tasks));
     969              :         if (winner == 1) {
     970              :             // Timeout occurred
     971              :         }
     972              :     }
     973              :     @endcode
     974              : 
     975              :     @tparam R Range type satisfying IoAwaitableRange with void result.
     976              :     @param awaitables Range of void awaitables to race concurrently (must not be empty).
     977              :     @return A task yielding the winner's index (zero-based).
     978              : 
     979              :     @throws std::invalid_argument if range is empty (thrown before coroutine suspends).
     980              :     @throws Rethrows the winner's exception if the winning task threw an exception.
     981              : 
     982              :     @par Remarks
     983              :     Elements are moved from the range; for lvalue ranges, the original
     984              :     container will have moved-from elements after this call. The range
     985              :     is moved onto the coroutine frame to ensure lifetime safety. Unlike
     986              :     the non-void overload, no result storage is needed since void tasks
     987              :     produce no value.
     988              : 
     989              :     @see when_any, IoAwaitableRange
     990              : */
     991              : template<IoAwaitableRange R>
     992              :     requires std::is_void_v<detail::awaitable_result_t<std::ranges::range_value_t<R>>>
     993            2 : [[nodiscard]] auto when_any(R&& awaitables) -> task<std::size_t>
     994              : {
     995              :     using OwnedRange = std::remove_cvref_t<R>;
     996              : 
     997              :     auto count = std::ranges::size(awaitables);
     998              :     if(count == 0)
     999              :         throw std::invalid_argument("when_any requires at least one awaitable");
    1000              : 
    1001              :     // Move/copy range onto coroutine frame to ensure lifetime
    1002              :     OwnedRange owned_awaitables = std::forward<R>(awaitables);
    1003              : 
    1004              :     detail::when_any_homogeneous_state<void> state(count);
    1005              : 
    1006              :     co_await detail::when_any_homogeneous_launcher<OwnedRange>(&owned_awaitables, &state);
    1007              : 
    1008              :     if(state.core_.winner_exception_)
    1009              :         std::rethrow_exception(state.core_.winner_exception_);
    1010              : 
    1011              :     co_return state.core_.winner_index_;
    1012            4 : }
    1013              : 
    1014              : } // namespace capy
    1015              : } // namespace boost
    1016              : 
    1017              : #endif
        

Generated by: LCOV version 2.3