libs/capy/include/boost/capy/when_any.hpp

99.3% Lines (152/153) 96.4% Functions (296/307) 92.5% Branches (37/40)
libs/capy/include/boost/capy/when_any.hpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Michael Vandeberg
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #ifndef BOOST_CAPY_WHEN_ANY_HPP
11 #define BOOST_CAPY_WHEN_ANY_HPP
12
13 #include <boost/capy/detail/config.hpp>
14 #include <boost/capy/concept/executor.hpp>
15 #include <boost/capy/concept/io_awaitable.hpp>
16 #include <boost/capy/coro.hpp>
17 #include <boost/capy/ex/executor_ref.hpp>
18 #include <boost/capy/ex/frame_allocator.hpp>
19 #include <boost/capy/task.hpp>
20
21 #include <array>
22 #include <atomic>
23 #include <exception>
24 #include <optional>
25 #include <ranges>
26 #include <stdexcept>
27 #include <stop_token>
28 #include <tuple>
29 #include <type_traits>
30 #include <utility>
31 #include <variant>
32 #include <vector>
33
34 /*
35 when_any - Race multiple tasks, return first completion
36 ========================================================
37
38 OVERVIEW:
39 ---------
40 when_any launches N tasks concurrently and completes when the FIRST task
41 finishes (success or failure). It then requests stop for all siblings and
42 waits for them to acknowledge before returning.
43
44 ARCHITECTURE:
45 -------------
46 The design mirrors when_all but with inverted completion semantics:
47
48 when_all: complete when remaining_count reaches 0 (all done)
49 when_any: complete when has_winner becomes true (first done)
50 BUT still wait for remaining_count to reach 0 for cleanup
51
52 Key components:
53 - when_any_state: Shared state tracking winner and completion
54 - when_any_runner: Wrapper coroutine for each child task
55 - when_any_launcher: Awaitable that starts all runners concurrently
56
57 CRITICAL INVARIANTS:
58 --------------------
59 1. Exactly one task becomes the winner (via atomic compare_exchange)
60 2. All tasks must complete before parent resumes (cleanup safety)
61 3. Stop is requested immediately when winner is determined
62 4. Only the winner's result/exception is stored
63
64 TYPE DEDUPLICATION:
65 -------------------
66 std::variant requires unique alternative types. Since when_any can race
67 tasks with identical return types (e.g., three task<int>), we must
68 deduplicate types before constructing the variant.
69
70 Example: when_any(task<int>, task<string>, task<int>)
71 - Raw types after void->monostate: int, string, int
72 - Deduplicated variant: std::variant<int, string>
73 - Return: pair<size_t, variant<int, string>>
74
75 The winner_index tells you which task won (0, 1, or 2), while the variant
76 holds the result. Use the index to determine how to interpret the variant.
77
78 VOID HANDLING:
79 --------------
80 void tasks contribute std::monostate to the variant (then deduplicated).
81 All-void tasks result in: pair<size_t, variant<monostate>>
82
83 MEMORY MODEL:
84 -------------
85 Synchronization chain from winner's write to parent's read:
86
87 1. Winner thread writes result_/winner_exception_ (non-atomic)
88 2. Winner thread calls signal_completion() → fetch_sub(acq_rel) on remaining_count_
89 3. Last task thread (may be winner or non-winner) calls signal_completion()
90 → fetch_sub(acq_rel) on remaining_count_, observing count becomes 0
91 4. Last task returns caller_ex_.dispatch(continuation_) via symmetric transfer
92 5. Parent coroutine resumes and reads result_/winner_exception_
93
94 Synchronization analysis:
95 - All fetch_sub operations on remaining_count_ form a release sequence
96 - Winner's fetch_sub releases; subsequent fetch_sub operations participate
97 in the modification order of remaining_count_
98 - Last task's fetch_sub(acq_rel) synchronizes-with prior releases in the
99 modification order, establishing happens-before from winner's writes
100 - Executor dispatch() is expected to provide queue-based synchronization
101 (release-on-post, acquire-on-execute) completing the chain to parent
102 - Even inline executors work (same thread = sequenced-before)
103
104 Alternative considered: Adding winner_ready_ atomic (set with release after
105 storing winner data, acquired before reading) would make synchronization
106 self-contained and not rely on executor implementation details. Current
107 approach is correct but requires careful reasoning about release sequences
108 and executor behavior.
109
110 EXCEPTION SEMANTICS:
111 --------------------
112 Unlike when_all (which captures first exception, discards others), when_any
113 treats exceptions as valid completions. If the winning task threw, that
114 exception is rethrown. Exceptions from non-winners are silently discarded.
115 */
116
117 namespace boost {
118 namespace capy {
119
120 namespace detail {
121
122 /** Convert void to monostate for variant storage.
123
124 std::variant<void, ...> is ill-formed, so void tasks contribute
125 std::monostate to the result variant instead. Non-void types
126 pass through unchanged.
127
128 @tparam T The type to potentially convert (void becomes monostate).
129 */
130 template<typename T>
131 using void_to_monostate_t = std::conditional_t<std::is_void_v<T>, std::monostate, T>;
132
133 // Type deduplication: std::variant requires unique alternative types.
134 // Fold left over the type list, appending each type only if not already present.
135 template<typename Variant, typename T>
136 struct variant_append_if_unique;
137
138 template<typename... Vs, typename T>
139 struct variant_append_if_unique<std::variant<Vs...>, T>
140 {
141 using type = std::conditional_t<
142 (std::is_same_v<T, Vs> || ...),
143 std::variant<Vs...>,
144 std::variant<Vs..., T>>;
145 };
146
147 template<typename Accumulated, typename... Remaining>
148 struct deduplicate_impl;
149
150 template<typename Accumulated>
151 struct deduplicate_impl<Accumulated>
152 {
153 using type = Accumulated;
154 };
155
156 template<typename Accumulated, typename T, typename... Rest>
157 struct deduplicate_impl<Accumulated, T, Rest...>
158 {
159 using next = typename variant_append_if_unique<Accumulated, T>::type;
160 using type = typename deduplicate_impl<next, Rest...>::type;
161 };
162
163 // Deduplicated variant; void types become monostate before deduplication
164 template<typename T0, typename... Ts>
165 using unique_variant_t = typename deduplicate_impl<
166 std::variant<void_to_monostate_t<T0>>,
167 void_to_monostate_t<Ts>...>::type;
168
169 // Result: (winner_index, deduplicated_variant). Use index to disambiguate
170 // when multiple tasks share the same return type.
171 template<typename T0, typename... Ts>
172 using when_any_result_t = std::pair<std::size_t, unique_variant_t<T0, Ts...>>;
173
174 // Extract result type from any awaitable via await_resume()
175 template<typename A>
176 using awaitable_result_t = decltype(std::declval<std::decay_t<A>&>().await_resume());
177
178 /** Core shared state for when_any operations.
179
180 Contains all members and methods common to both heterogeneous (variadic)
181 and homogeneous (range) when_any implementations. State classes embed
182 this via composition to avoid CRTP destructor ordering issues.
183
184 @par Thread Safety
185 Atomic operations protect winner selection and completion count.
186 */
187 struct when_any_core
188 {
189 std::atomic<std::size_t> remaining_count_;
190 std::size_t winner_index_{0};
191 std::exception_ptr winner_exception_;
192 std::stop_source stop_source_;
193
194 // Bridges parent's stop token to our stop_source
195 struct stop_callback_fn
196 {
197 std::stop_source* source_;
198 6 void operator()() const noexcept { source_->request_stop(); }
199 };
200 using stop_callback_t = std::stop_callback<stop_callback_fn>;
201 std::optional<stop_callback_t> parent_stop_callback_;
202
203 coro continuation_;
204 executor_ref caller_ex_;
205
206 // Placed last to avoid padding (1-byte atomic followed by 8-byte aligned members)
207 std::atomic<bool> has_winner_{false};
208
209 52 explicit when_any_core(std::size_t count) noexcept
210 52 : remaining_count_(count)
211 {
212 52 }
213
214 /** Atomically claim winner status; exactly one task succeeds. */
215 145 bool try_win(std::size_t index) noexcept
216 {
217 145 bool expected = false;
218
2/2
✓ Branch 1 taken 52 times.
✓ Branch 2 taken 93 times.
145 if(has_winner_.compare_exchange_strong(
219 expected, true, std::memory_order_acq_rel))
220 {
221 52 winner_index_ = index;
222 52 stop_source_.request_stop();
223 52 return true;
224 }
225 93 return false;
226 }
227
228 /** @pre try_win() returned true. */
229 8 void set_winner_exception(std::exception_ptr ep) noexcept
230 {
231 8 winner_exception_ = ep;
232 8 }
233
234 /** Last task to complete resumes the parent via symmetric transfer. */
235 145 coro signal_completion()
236 {
237 145 auto remaining = remaining_count_.fetch_sub(1, std::memory_order_acq_rel);
238
2/2
✓ Branch 0 taken 52 times.
✓ Branch 1 taken 93 times.
145 if(remaining == 1)
239 52 caller_ex_.dispatch(continuation_);
240 145 return std::noop_coroutine();
241 }
242 };
243
244 /** Shared state for heterogeneous when_any operation.
245
246 Coordinates winner selection, result storage, and completion tracking
247 for all child tasks in a when_any operation. Uses composition with
248 when_any_core for shared functionality.
249
250 @par Lifetime
251 Allocated on the parent coroutine's frame, outlives all runners.
252
253 @tparam T0 First task's result type.
254 @tparam Ts Remaining tasks' result types.
255 */
256 template<typename T0, typename... Ts>
257 struct when_any_state
258 {
259 static constexpr std::size_t task_count = 1 + sizeof...(Ts);
260 using variant_type = unique_variant_t<T0, Ts...>;
261
262 when_any_core core_;
263 std::optional<variant_type> result_;
264 std::array<coro, task_count> runner_handles_{};
265
266 37 when_any_state()
267 37 : core_(task_count)
268 {
269 37 }
270
271 37 ~when_any_state()
272 {
273
2/2
✓ Branch 0 taken 93 times.
✓ Branch 1 taken 37 times.
130 for(auto h : runner_handles_)
274
1/2
✓ Branch 1 taken 93 times.
✗ Branch 2 not taken.
93 if(h)
275 93 h.destroy();
276 37 }
277
278 /** @pre core_.try_win() returned true.
279 @note Uses in_place_type (not index) because variant is deduplicated.
280 */
281 template<typename T>
282 30 void set_winner_result(T value)
283 noexcept(std::is_nothrow_move_constructible_v<T>)
284 {
285 30 result_.emplace(std::in_place_type<T>, std::move(value));
286 30 }
287
288 /** @pre core_.try_win() returned true. */
289 2 void set_winner_void() noexcept
290 {
291 2 result_.emplace(std::in_place_type<std::monostate>, std::monostate{});
292 2 }
293 };
294
295 /** Wrapper coroutine that runs a single child task for when_any.
296
297 Propagates executor/stop_token to the child, attempts to claim winner
298 status on completion, and signals completion for cleanup coordination.
299
300 @tparam StateType The state type (when_any_state or when_any_homogeneous_state).
301 */
302 template<typename StateType>
303 struct when_any_runner
304 {
305 struct promise_type // : frame_allocating_base // DISABLED FOR TESTING
306 {
307 StateType* state_ = nullptr;
308 std::size_t index_ = 0;
309 executor_ref ex_;
310 std::stop_token stop_token_;
311
312 145 when_any_runner get_return_object() noexcept
313 {
314 145 return when_any_runner(std::coroutine_handle<promise_type>::from_promise(*this));
315 }
316
317 // Starts suspended; launcher sets up state/ex/token then resumes
318 145 std::suspend_always initial_suspend() noexcept
319 {
320 145 return {};
321 }
322
323 145 auto final_suspend() noexcept
324 {
325 struct awaiter
326 {
327 promise_type* p_;
328 bool await_ready() const noexcept { return false; }
329 coro await_suspend(coro) noexcept { return p_->state_->core_.signal_completion(); }
330 void await_resume() const noexcept {}
331 };
332 145 return awaiter{this};
333 }
334
335 133 void return_void() noexcept {}
336
337 // Exceptions are valid completions in when_any (unlike when_all)
338 12 void unhandled_exception()
339 {
340
2/2
✓ Branch 1 taken 8 times.
✓ Branch 2 taken 4 times.
12 if(state_->core_.try_win(index_))
341 8 state_->core_.set_winner_exception(std::current_exception());
342 12 }
343
344 /** Injects executor and stop token into child awaitables. */
345 template<class Awaitable>
346 struct transform_awaiter
347 {
348 std::decay_t<Awaitable> a_;
349 promise_type* p_;
350
351 145 bool await_ready() { return a_.await_ready(); }
352 145 auto await_resume() { return a_.await_resume(); }
353
354 template<class Promise>
355 145 auto await_suspend(std::coroutine_handle<Promise> h)
356 {
357 145 return a_.await_suspend(h, p_->ex_, p_->stop_token_);
358 }
359 };
360
361 template<class Awaitable>
362 145 auto await_transform(Awaitable&& a)
363 {
364 using A = std::decay_t<Awaitable>;
365 if constexpr (IoAwaitable<A>)
366 {
367 return transform_awaiter<Awaitable>{
368 290 std::forward<Awaitable>(a), this};
369 }
370 else
371 {
372 static_assert(sizeof(A) == 0, "requires IoAwaitable");
373 }
374 145 }
375 };
376
377 std::coroutine_handle<promise_type> h_;
378
379 145 explicit when_any_runner(std::coroutine_handle<promise_type> h) noexcept
380 145 : h_(h)
381 {
382 145 }
383
384 // Enable move for all clang versions - some versions need it
385 when_any_runner(when_any_runner&& other) noexcept : h_(std::exchange(other.h_, nullptr)) {}
386
387 // Non-copyable
388 when_any_runner(when_any_runner const&) = delete;
389 when_any_runner& operator=(when_any_runner const&) = delete;
390 when_any_runner& operator=(when_any_runner&&) = delete;
391
392 145 auto release() noexcept
393 {
394 145 return std::exchange(h_, nullptr);
395 }
396 };
397
398 /** Wraps a child awaitable, attempts to claim winner on completion.
399
400 Uses requires-expressions to detect state capabilities:
401 - set_winner_void(): for heterogeneous void tasks (stores monostate)
402 - set_winner_result(): for non-void tasks
403 - Neither: for homogeneous void tasks (no result storage)
404 */
405 template<IoAwaitable Awaitable, typename StateType>
406 when_any_runner<StateType>
407
1/1
✓ Branch 1 taken 145 times.
145 make_when_any_runner(Awaitable inner, StateType* state, std::size_t index)
408 {
409 using T = awaitable_result_t<Awaitable>;
410 if constexpr (std::is_void_v<T>)
411 {
412 co_await std::move(inner);
413 if(state->core_.try_win(index))
414 {
415 // Heterogeneous void tasks store monostate in the variant
416 if constexpr (requires { state->set_winner_void(); })
417 state->set_winner_void();
418 // Homogeneous void tasks have no result to store
419 }
420 }
421 else
422 {
423 auto result = co_await std::move(inner);
424 if(state->core_.try_win(index))
425 {
426 // Defensive: move should not throw (already moved once), but we
427 // catch just in case since an uncaught exception would be devastating.
428 try
429 {
430 state->set_winner_result(std::move(result));
431 }
432 catch(...)
433 {
434 state->core_.set_winner_exception(std::current_exception());
435 }
436 }
437 }
438 290 }
439
440 /** Launches all runners concurrently; see await_suspend for lifetime concerns. */
441 template<IoAwaitable... Awaitables>
442 class when_any_launcher
443 {
444 using state_type = when_any_state<awaitable_result_t<Awaitables>...>;
445
446 std::tuple<Awaitables...>* tasks_;
447 state_type* state_;
448
449 public:
450 37 when_any_launcher(
451 std::tuple<Awaitables...>* tasks,
452 state_type* state)
453 37 : tasks_(tasks)
454 37 , state_(state)
455 {
456 37 }
457
458 37 bool await_ready() const noexcept
459 {
460 37 return sizeof...(Awaitables) == 0;
461 }
462
463 /** CRITICAL: If the last task finishes synchronously, parent resumes and
464 destroys this object before await_suspend returns. Must not reference
465 `this` after the final launch_one call.
466 */
467 template<Executor Ex>
468 37 coro await_suspend(coro continuation, Ex const& caller_ex, std::stop_token parent_token = {})
469 {
470 37 state_->core_.continuation_ = continuation;
471 37 state_->core_.caller_ex_ = caller_ex;
472
473
2/2
✓ Branch 1 taken 8 times.
✓ Branch 2 taken 29 times.
37 if(parent_token.stop_possible())
474 {
475 16 state_->core_.parent_stop_callback_.emplace(
476 parent_token,
477 8 when_any_core::stop_callback_fn{&state_->core_.stop_source_});
478
479
2/2
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 6 times.
8 if(parent_token.stop_requested())
480 2 state_->core_.stop_source_.request_stop();
481 }
482
483 37 auto token = state_->core_.stop_source_.get_token();
484 [&]<std::size_t... Is>(std::index_sequence<Is...>) {
485 (..., launch_one<Is>(caller_ex, token));
486
1/1
✓ Branch 1 taken 37 times.
37 }(std::index_sequence_for<Awaitables...>{});
487
488 74 return std::noop_coroutine();
489 37 }
490
491 37 void await_resume() const noexcept
492 {
493 37 }
494
495 private:
496 /** @pre Ex::dispatch() and coro::resume() must not throw (handle may leak). */
497 template<std::size_t I, Executor Ex>
498 93 void launch_one(Ex const& caller_ex, std::stop_token token)
499 {
500
1/1
✓ Branch 2 taken 93 times.
93 auto runner = make_when_any_runner(
501 93 std::move(std::get<I>(*tasks_)), state_, I);
502
503 93 auto h = runner.release();
504 93 h.promise().state_ = state_;
505 93 h.promise().index_ = I;
506 93 h.promise().ex_ = caller_ex;
507 93 h.promise().stop_token_ = token;
508
509 93 coro ch{h};
510 93 state_->runner_handles_[I] = ch;
511
1/1
✓ Branch 1 taken 93 times.
93 caller_ex.dispatch(ch);
512 93 }
513 };
514
515 } // namespace detail
516
517 /** Wait for the first awaitable to complete.
518
519 Races multiple heterogeneous awaitables concurrently and returns when the
520 first one completes. The result includes the winner's index and a
521 deduplicated variant containing the result value.
522
523 @par Suspends
524 The calling coroutine suspends when co_await is invoked. All awaitables
525 are launched concurrently and execute in parallel. The coroutine resumes
526 only after all awaitables have completed, even though the winner is
527 determined by the first to finish.
528
529 @par Completion Conditions
530 @li Winner is determined when the first awaitable completes (success or exception)
531 @li Only one task can claim winner status via atomic compare-exchange
532 @li Once a winner exists, stop is requested for all remaining siblings
533 @li Parent coroutine resumes only after all siblings acknowledge completion
534 @li The winner's result is returned; if the winner threw, the exception is rethrown
535
536 @par Cancellation Semantics
537 Cancellation is supported via stop_token propagated through the
538 IoAwaitable protocol:
539 @li Each child awaitable receives a stop_token derived from a shared stop_source
540 @li When the parent's stop token is activated, the stop is forwarded to all children
541 @li When a winner is determined, stop_source_.request_stop() is called immediately
542 @li Siblings must handle cancellation gracefully and complete before parent resumes
543 @li Stop requests are cooperative; tasks must check and respond to them
544
545 @par Concurrency/Overlap
546 All awaitables are launched concurrently before any can complete.
547 The launcher iterates through the arguments, starting each task on the
548 caller's executor. Tasks may execute in parallel on multi-threaded
549 executors or interleave on single-threaded executors. There is no
550 guaranteed ordering of task completion.
551
552 @par Notable Error Conditions
553 @li Winner exception: if the winning task threw, that exception is rethrown
554 @li Non-winner exceptions: silently discarded (only winner's result matters)
555 @li Cancellation: tasks may complete via cancellation without throwing
556
557 @par Example
558 @code
559 task<void> example() {
560 auto [index, result] = co_await when_any(
561 fetch_from_primary(), // task<Response>
562 fetch_from_backup() // task<Response>
563 );
564 // index is 0 or 1, result holds the winner's Response
565 auto response = std::get<Response>(result);
566 }
567 @endcode
568
569 @par Example with Heterogeneous Types
570 @code
571 task<void> mixed_types() {
572 auto [index, result] = co_await when_any(
573 fetch_int(), // task<int>
574 fetch_string() // task<std::string>
575 );
576 if (index == 0)
577 std::cout << "Got int: " << std::get<int>(result) << "\n";
578 else
579 std::cout << "Got string: " << std::get<std::string>(result) << "\n";
580 }
581 @endcode
582
583 @tparam A0 First awaitable type (must satisfy IoAwaitable).
584 @tparam As Remaining awaitable types (must satisfy IoAwaitable).
585 @param a0 The first awaitable to race.
586 @param as Additional awaitables to race concurrently.
587 @return A task yielding a pair of (winner_index, result_variant).
588
589 @throws Rethrows the winner's exception if the winning task threw an exception.
590
591 @par Remarks
592 Awaitables are moved into the coroutine frame; original objects become
593 empty after the call. When multiple awaitables share the same return type,
594 the variant is deduplicated to contain only unique types. Use the winner
595 index to determine which awaitable completed first. Void awaitables
596 contribute std::monostate to the variant.
597
598 @see when_all, IoAwaitable
599 */
600 template<IoAwaitable A0, IoAwaitable... As>
601
1/1
✓ Branch 1 taken 37 times.
37 [[nodiscard]] auto when_any(A0 a0, As... as)
602 -> task<detail::when_any_result_t<
603 detail::awaitable_result_t<A0>,
604 detail::awaitable_result_t<As>...>>
605 {
606 using result_type = detail::when_any_result_t<
607 detail::awaitable_result_t<A0>,
608 detail::awaitable_result_t<As>...>;
609
610 detail::when_any_state<
611 detail::awaitable_result_t<A0>,
612 detail::awaitable_result_t<As>...> state;
613 std::tuple<A0, As...> awaitable_tuple(std::move(a0), std::move(as)...);
614
615 co_await detail::when_any_launcher<A0, As...>(&awaitable_tuple, &state);
616
617 if(state.core_.winner_exception_)
618 std::rethrow_exception(state.core_.winner_exception_);
619
620 co_return result_type{state.core_.winner_index_, std::move(*state.result_)};
621 74 }
622
623 /** Concept for ranges of full I/O awaitables.
624
625 A range satisfies `IoAwaitableRange` if it is a sized input range
626 whose value type satisfies @ref IoAwaitable. This enables when_any
627 to accept any container or view of awaitables, not just std::vector.
628
629 @tparam R The range type.
630
631 @par Requirements
632 @li `R` must satisfy `std::ranges::input_range`
633 @li `R` must satisfy `std::ranges::sized_range`
634 @li `std::ranges::range_value_t<R>` must satisfy @ref IoAwaitable
635
636 @par Syntactic Requirements
637 Given `r` of type `R`:
638 @li `std::ranges::begin(r)` is valid
639 @li `std::ranges::end(r)` is valid
640 @li `std::ranges::size(r)` returns `std::ranges::range_size_t<R>`
641 @li `*std::ranges::begin(r)` satisfies @ref IoAwaitable
642
643 @par Example
644 @code
645 template<IoAwaitableRange R>
646 task<void> race_all(R&& awaitables) {
647 auto winner = co_await when_any(std::forward<R>(awaitables));
648 // Process winner...
649 }
650 @endcode
651
652 @see when_any, IoAwaitable
653 */
654 template<typename R>
655 concept IoAwaitableRange =
656 std::ranges::input_range<R> &&
657 std::ranges::sized_range<R> &&
658 IoAwaitable<std::ranges::range_value_t<R>>;
659
660 namespace detail {
661
662 /** Shared state for homogeneous when_any (range overload).
663
664 Uses composition with when_any_core for shared functionality.
665 Simpler than heterogeneous: optional<T> instead of variant, vector
666 instead of array for runner handles.
667 */
668 template<typename T>
669 struct when_any_homogeneous_state
670 {
671 when_any_core core_;
672 std::optional<T> result_;
673 std::vector<coro> runner_handles_;
674
675 13 explicit when_any_homogeneous_state(std::size_t count)
676 13 : core_(count)
677
1/1
✓ Branch 2 taken 13 times.
26 , runner_handles_(count)
678 {
679 13 }
680
681 13 ~when_any_homogeneous_state()
682 {
683
2/2
✓ Branch 5 taken 47 times.
✓ Branch 6 taken 13 times.
60 for(auto h : runner_handles_)
684
1/2
✓ Branch 1 taken 47 times.
✗ Branch 2 not taken.
47 if(h)
685 47 h.destroy();
686 13 }
687
688 /** @pre core_.try_win() returned true. */
689 11 void set_winner_result(T value)
690 noexcept(std::is_nothrow_move_constructible_v<T>)
691 {
692 11 result_.emplace(std::move(value));
693 11 }
694 };
695
696 /** Specialization for void tasks (no result storage needed). */
697 template<>
698 struct when_any_homogeneous_state<void>
699 {
700 when_any_core core_;
701 std::vector<coro> runner_handles_;
702
703 2 explicit when_any_homogeneous_state(std::size_t count)
704 2 : core_(count)
705
1/1
✓ Branch 1 taken 2 times.
4 , runner_handles_(count)
706 {
707 2 }
708
709 2 ~when_any_homogeneous_state()
710 {
711
2/2
✓ Branch 5 taken 5 times.
✓ Branch 6 taken 2 times.
7 for(auto h : runner_handles_)
712
1/2
✓ Branch 1 taken 5 times.
✗ Branch 2 not taken.
5 if(h)
713 5 h.destroy();
714 2 }
715
716 // No set_winner_result - void tasks have no result to store
717 };
718
719 /** Launches all runners concurrently; see await_suspend for lifetime concerns. */
720 template<IoAwaitableRange Range>
721 class when_any_homogeneous_launcher
722 {
723 using Awaitable = std::ranges::range_value_t<Range>;
724 using T = awaitable_result_t<Awaitable>;
725
726 Range* range_;
727 when_any_homogeneous_state<T>* state_;
728
729 public:
730 15 when_any_homogeneous_launcher(
731 Range* range,
732 when_any_homogeneous_state<T>* state)
733 15 : range_(range)
734 15 , state_(state)
735 {
736 15 }
737
738 15 bool await_ready() const noexcept
739 {
740 15 return std::ranges::empty(*range_);
741 }
742
743 /** CRITICAL: If the last task finishes synchronously, parent resumes and
744 destroys this object before await_suspend returns. Must not reference
745 `this` after the final launch_one call.
746 */
747 template<Executor Ex>
748 15 coro await_suspend(coro continuation, Ex const& caller_ex, std::stop_token parent_token = {})
749 {
750 15 state_->core_.continuation_ = continuation;
751 15 state_->core_.caller_ex_ = caller_ex;
752
753
2/2
✓ Branch 1 taken 5 times.
✓ Branch 2 taken 10 times.
15 if(parent_token.stop_possible())
754 {
755 10 state_->core_.parent_stop_callback_.emplace(
756 parent_token,
757 5 when_any_core::stop_callback_fn{&state_->core_.stop_source_});
758
759
2/2
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 3 times.
5 if(parent_token.stop_requested())
760 2 state_->core_.stop_source_.request_stop();
761 }
762
763 15 auto token = state_->core_.stop_source_.get_token();
764 15 std::size_t index = 0;
765
2/2
✓ Branch 4 taken 52 times.
✓ Branch 5 taken 15 times.
67 for(auto&& a : *range_)
766 {
767
1/1
✓ Branch 3 taken 52 times.
52 launch_one(index, std::move(a), caller_ex, token);
768 52 ++index;
769 }
770
771 30 return std::noop_coroutine();
772 15 }
773
774 15 void await_resume() const noexcept
775 {
776 15 }
777
778 private:
779 /** @pre Ex::dispatch() and coro::resume() must not throw (handle may leak). */
780 template<Executor Ex>
781 52 void launch_one(std::size_t index, Awaitable&& awaitable, Ex const& caller_ex, std::stop_token token)
782 {
783
1/1
✓ Branch 2 taken 52 times.
52 auto runner = make_when_any_runner(
784 52 std::move(awaitable), state_, index);
785
786 52 auto h = runner.release();
787 52 h.promise().state_ = state_;
788 52 h.promise().index_ = index;
789 52 h.promise().ex_ = caller_ex;
790 52 h.promise().stop_token_ = token;
791
792 52 coro ch{h};
793 52 state_->runner_handles_[index] = ch;
794
1/1
✓ Branch 1 taken 52 times.
52 caller_ex.dispatch(ch);
795 52 }
796 };
797
798 } // namespace detail
799
800 /** Wait for the first awaitable to complete (range overload).
801
802 Races a range of awaitables with the same result type. Accepts any
803 sized input range of IoAwaitable types, enabling use with arrays,
804 spans, or custom containers.
805
806 @par Suspends
807 The calling coroutine suspends when co_await is invoked. All awaitables
808 in the range are launched concurrently and execute in parallel. The
809 coroutine resumes only after all awaitables have completed, even though
810 the winner is determined by the first to finish.
811
812 @par Completion Conditions
813 @li Winner is determined when the first awaitable completes (success or exception)
814 @li Only one task can claim winner status via atomic compare-exchange
815 @li Once a winner exists, stop is requested for all remaining siblings
816 @li Parent coroutine resumes only after all siblings acknowledge completion
817 @li The winner's index and result are returned; if the winner threw, the exception is rethrown
818
819 @par Cancellation Semantics
820 Cancellation is supported via stop_token propagated through the
821 IoAwaitable protocol:
822 @li Each child awaitable receives a stop_token derived from a shared stop_source
823 @li When the parent's stop token is activated, the stop is forwarded to all children
824 @li When a winner is determined, stop_source_.request_stop() is called immediately
825 @li Siblings must handle cancellation gracefully and complete before parent resumes
826 @li Stop requests are cooperative; tasks must check and respond to them
827
828 @par Concurrency/Overlap
829 All awaitables are launched concurrently before any can complete.
830 The launcher iterates through the range, starting each task on the
831 caller's executor. Tasks may execute in parallel on multi-threaded
832 executors or interleave on single-threaded executors. There is no
833 guaranteed ordering of task completion.
834
835 @par Notable Error Conditions
836 @li Empty range: throws std::invalid_argument immediately (not via co_return)
837 @li Winner exception: if the winning task threw, that exception is rethrown
838 @li Non-winner exceptions: silently discarded (only winner's result matters)
839 @li Cancellation: tasks may complete via cancellation without throwing
840
841 @par Example
842 @code
843 task<void> example() {
844 std::array<task<Response>, 3> requests = {
845 fetch_from_server(0),
846 fetch_from_server(1),
847 fetch_from_server(2)
848 };
849
850 auto [index, response] = co_await when_any(std::move(requests));
851 }
852 @endcode
853
854 @par Example with Vector
855 @code
856 task<Response> fetch_fastest(std::vector<Server> const& servers) {
857 std::vector<task<Response>> requests;
858 for (auto const& server : servers)
859 requests.push_back(fetch_from(server));
860
861 auto [index, response] = co_await when_any(std::move(requests));
862 co_return response;
863 }
864 @endcode
865
866 @tparam R Range type satisfying IoAwaitableRange.
867 @param awaitables Range of awaitables to race concurrently (must not be empty).
868 @return A task yielding a pair of (winner_index, result).
869
870 @throws std::invalid_argument if range is empty (thrown before coroutine suspends).
871 @throws Rethrows the winner's exception if the winning task threw an exception.
872
873 @par Remarks
874 Elements are moved from the range; for lvalue ranges, the original
875 container will have moved-from elements after this call. The range
876 is moved onto the coroutine frame to ensure lifetime safety. Unlike
877 the variadic overload, no variant wrapper is needed since all tasks
878 share the same return type.
879
880 @see when_any, IoAwaitableRange
881 */
882 template<IoAwaitableRange R>
883 requires (!std::is_void_v<detail::awaitable_result_t<std::ranges::range_value_t<R>>>)
884
1/1
✓ Branch 1 taken 14 times.
14 [[nodiscard]] auto when_any(R&& awaitables)
885 -> task<std::pair<std::size_t, detail::awaitable_result_t<std::ranges::range_value_t<R>>>>
886 {
887 using Awaitable = std::ranges::range_value_t<R>;
888 using T = detail::awaitable_result_t<Awaitable>;
889 using result_type = std::pair<std::size_t, T>;
890 using OwnedRange = std::remove_cvref_t<R>;
891
892 auto count = std::ranges::size(awaitables);
893 if(count == 0)
894 throw std::invalid_argument("when_any requires at least one awaitable");
895
896 // Move/copy range onto coroutine frame to ensure lifetime
897 OwnedRange owned_awaitables = std::forward<R>(awaitables);
898
899 detail::when_any_homogeneous_state<T> state(count);
900
901 co_await detail::when_any_homogeneous_launcher<OwnedRange>(&owned_awaitables, &state);
902
903 if(state.core_.winner_exception_)
904 std::rethrow_exception(state.core_.winner_exception_);
905
906 co_return result_type{state.core_.winner_index_, std::move(*state.result_)};
907 28 }
908
909 /** Wait for the first awaitable to complete (void range overload).
910
911 Races a range of void-returning awaitables. Since void awaitables have
912 no result value, only the winner's index is returned.
913
914 @par Suspends
915 The calling coroutine suspends when co_await is invoked. All awaitables
916 in the range are launched concurrently and execute in parallel. The
917 coroutine resumes only after all awaitables have completed, even though
918 the winner is determined by the first to finish.
919
920 @par Completion Conditions
921 @li Winner is determined when the first awaitable completes (success or exception)
922 @li Only one task can claim winner status via atomic compare-exchange
923 @li Once a winner exists, stop is requested for all remaining siblings
924 @li Parent coroutine resumes only after all siblings acknowledge completion
925 @li The winner's index is returned; if the winner threw, the exception is rethrown
926
927 @par Cancellation Semantics
928 Cancellation is supported via stop_token propagated through the
929 IoAwaitable protocol:
930 @li Each child awaitable receives a stop_token derived from a shared stop_source
931 @li When the parent's stop token is activated, the stop is forwarded to all children
932 @li When a winner is determined, stop_source_.request_stop() is called immediately
933 @li Siblings must handle cancellation gracefully and complete before parent resumes
934 @li Stop requests are cooperative; tasks must check and respond to them
935
936 @par Concurrency/Overlap
937 All awaitables are launched concurrently before any can complete.
938 The launcher iterates through the range, starting each task on the
939 caller's executor. Tasks may execute in parallel on multi-threaded
940 executors or interleave on single-threaded executors. There is no
941 guaranteed ordering of task completion.
942
943 @par Notable Error Conditions
944 @li Empty range: throws std::invalid_argument immediately (not via co_return)
945 @li Winner exception: if the winning task threw, that exception is rethrown
946 @li Non-winner exceptions: silently discarded (only winner's result matters)
947 @li Cancellation: tasks may complete via cancellation without throwing
948
949 @par Example
950 @code
951 task<void> example() {
952 std::vector<task<void>> tasks;
953 for (int i = 0; i < 5; ++i)
954 tasks.push_back(background_work(i));
955
956 std::size_t winner = co_await when_any(std::move(tasks));
957 // winner is the index of the first task to complete
958 }
959 @endcode
960
961 @par Example with Timeout
962 @code
963 task<void> with_timeout() {
964 std::vector<task<void>> tasks;
965 tasks.push_back(long_running_operation());
966 tasks.push_back(delay(std::chrono::seconds(5)));
967
968 std::size_t winner = co_await when_any(std::move(tasks));
969 if (winner == 1) {
970 // Timeout occurred
971 }
972 }
973 @endcode
974
975 @tparam R Range type satisfying IoAwaitableRange with void result.
976 @param awaitables Range of void awaitables to race concurrently (must not be empty).
977 @return A task yielding the winner's index (zero-based).
978
979 @throws std::invalid_argument if range is empty (thrown before coroutine suspends).
980 @throws Rethrows the winner's exception if the winning task threw an exception.
981
982 @par Remarks
983 Elements are moved from the range; for lvalue ranges, the original
984 container will have moved-from elements after this call. The range
985 is moved onto the coroutine frame to ensure lifetime safety. Unlike
986 the non-void overload, no result storage is needed since void tasks
987 produce no value.
988
989 @see when_any, IoAwaitableRange
990 */
991 template<IoAwaitableRange R>
992 requires std::is_void_v<detail::awaitable_result_t<std::ranges::range_value_t<R>>>
993
1/1
✓ Branch 1 taken 2 times.
2 [[nodiscard]] auto when_any(R&& awaitables) -> task<std::size_t>
994 {
995 using OwnedRange = std::remove_cvref_t<R>;
996
997 auto count = std::ranges::size(awaitables);
998 if(count == 0)
999 throw std::invalid_argument("when_any requires at least one awaitable");
1000
1001 // Move/copy range onto coroutine frame to ensure lifetime
1002 OwnedRange owned_awaitables = std::forward<R>(awaitables);
1003
1004 detail::when_any_homogeneous_state<void> state(count);
1005
1006 co_await detail::when_any_homogeneous_launcher<OwnedRange>(&owned_awaitables, &state);
1007
1008 if(state.core_.winner_exception_)
1009 std::rethrow_exception(state.core_.winner_exception_);
1010
1011 co_return state.core_.winner_index_;
1012 4 }
1013
1014 } // namespace capy
1015 } // namespace boost
1016
1017 #endif
1018