Line data Source code
1 : //
2 : // Copyright (c) 2026 Michael Vandeberg
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #ifndef BOOST_CAPY_WHEN_ANY_HPP
11 : #define BOOST_CAPY_WHEN_ANY_HPP
12 :
13 : #include <boost/capy/detail/config.hpp>
14 : #include <boost/capy/concept/executor.hpp>
15 : #include <boost/capy/concept/io_awaitable.hpp>
16 : #include <boost/capy/coro.hpp>
17 : #include <boost/capy/ex/executor_ref.hpp>
18 : #include <boost/capy/ex/frame_allocator.hpp>
19 : #include <boost/capy/task.hpp>
20 :
21 : #include <array>
22 : #include <atomic>
23 : #include <exception>
24 : #include <optional>
25 : #include <ranges>
26 : #include <stdexcept>
27 : #include <stop_token>
28 : #include <tuple>
29 : #include <type_traits>
30 : #include <utility>
31 : #include <variant>
32 : #include <vector>
33 :
34 : /*
35 : when_any - Race multiple tasks, return first completion
36 : ========================================================
37 :
38 : OVERVIEW:
39 : ---------
40 : when_any launches N tasks concurrently and completes when the FIRST task
41 : finishes (success or failure). It then requests stop for all siblings and
42 : waits for them to acknowledge before returning.
43 :
44 : ARCHITECTURE:
45 : -------------
46 : The design mirrors when_all but with inverted completion semantics:
47 :
48 : when_all: complete when remaining_count reaches 0 (all done)
49 : when_any: complete when has_winner becomes true (first done)
50 : BUT still wait for remaining_count to reach 0 for cleanup
51 :
52 : Key components:
53 : - when_any_state: Shared state tracking winner and completion
54 : - when_any_runner: Wrapper coroutine for each child task
55 : - when_any_launcher: Awaitable that starts all runners concurrently
56 :
57 : CRITICAL INVARIANTS:
58 : --------------------
59 : 1. Exactly one task becomes the winner (via atomic compare_exchange)
60 : 2. All tasks must complete before parent resumes (cleanup safety)
61 : 3. Stop is requested immediately when winner is determined
62 : 4. Only the winner's result/exception is stored
63 :
64 : TYPE DEDUPLICATION:
65 : -------------------
66 : std::variant requires unique alternative types. Since when_any can race
67 : tasks with identical return types (e.g., three task<int>), we must
68 : deduplicate types before constructing the variant.
69 :
70 : Example: when_any(task<int>, task<string>, task<int>)
71 : - Raw types after void->monostate: int, string, int
72 : - Deduplicated variant: std::variant<int, string>
73 : - Return: pair<size_t, variant<int, string>>
74 :
75 : The winner_index tells you which task won (0, 1, or 2), while the variant
76 : holds the result. Use the index to determine how to interpret the variant.
77 :
78 : VOID HANDLING:
79 : --------------
80 : void tasks contribute std::monostate to the variant (then deduplicated).
81 : All-void tasks result in: pair<size_t, variant<monostate>>
82 :
83 : MEMORY MODEL:
84 : -------------
85 : Synchronization chain from winner's write to parent's read:
86 :
87 : 1. Winner thread writes result_/winner_exception_ (non-atomic)
88 : 2. Winner thread calls signal_completion() → fetch_sub(acq_rel) on remaining_count_
89 : 3. Last task thread (may be winner or non-winner) calls signal_completion()
90 : → fetch_sub(acq_rel) on remaining_count_, observing count becomes 0
91 : 4. Last task returns caller_ex_.dispatch(continuation_) via symmetric transfer
92 : 5. Parent coroutine resumes and reads result_/winner_exception_
93 :
94 : Synchronization analysis:
95 : - All fetch_sub operations on remaining_count_ form a release sequence
96 : - Winner's fetch_sub releases; subsequent fetch_sub operations participate
97 : in the modification order of remaining_count_
98 : - Last task's fetch_sub(acq_rel) synchronizes-with prior releases in the
99 : modification order, establishing happens-before from winner's writes
100 : - Executor dispatch() is expected to provide queue-based synchronization
101 : (release-on-post, acquire-on-execute) completing the chain to parent
102 : - Even inline executors work (same thread = sequenced-before)
103 :
104 : Alternative considered: Adding winner_ready_ atomic (set with release after
105 : storing winner data, acquired before reading) would make synchronization
106 : self-contained and not rely on executor implementation details. Current
107 : approach is correct but requires careful reasoning about release sequences
108 : and executor behavior.
109 :
110 : EXCEPTION SEMANTICS:
111 : --------------------
112 : Unlike when_all (which captures first exception, discards others), when_any
113 : treats exceptions as valid completions. If the winning task threw, that
114 : exception is rethrown. Exceptions from non-winners are silently discarded.
115 : */
116 :
117 : namespace boost {
118 : namespace capy {
119 :
120 : namespace detail {
121 :
122 : /** Convert void to monostate for variant storage.
123 :
124 : std::variant<void, ...> is ill-formed, so void tasks contribute
125 : std::monostate to the result variant instead. Non-void types
126 : pass through unchanged.
127 :
128 : @tparam T The type to potentially convert (void becomes monostate).
129 : */
130 : template<typename T>
131 : using void_to_monostate_t = std::conditional_t<std::is_void_v<T>, std::monostate, T>;
132 :
133 : // Type deduplication: std::variant requires unique alternative types.
134 : // Fold left over the type list, appending each type only if not already present.
135 : template<typename Variant, typename T>
136 : struct variant_append_if_unique;
137 :
138 : template<typename... Vs, typename T>
139 : struct variant_append_if_unique<std::variant<Vs...>, T>
140 : {
141 : using type = std::conditional_t<
142 : (std::is_same_v<T, Vs> || ...),
143 : std::variant<Vs...>,
144 : std::variant<Vs..., T>>;
145 : };
146 :
147 : template<typename Accumulated, typename... Remaining>
148 : struct deduplicate_impl;
149 :
150 : template<typename Accumulated>
151 : struct deduplicate_impl<Accumulated>
152 : {
153 : using type = Accumulated;
154 : };
155 :
156 : template<typename Accumulated, typename T, typename... Rest>
157 : struct deduplicate_impl<Accumulated, T, Rest...>
158 : {
159 : using next = typename variant_append_if_unique<Accumulated, T>::type;
160 : using type = typename deduplicate_impl<next, Rest...>::type;
161 : };
162 :
163 : // Deduplicated variant; void types become monostate before deduplication
164 : template<typename T0, typename... Ts>
165 : using unique_variant_t = typename deduplicate_impl<
166 : std::variant<void_to_monostate_t<T0>>,
167 : void_to_monostate_t<Ts>...>::type;
168 :
169 : // Result: (winner_index, deduplicated_variant). Use index to disambiguate
170 : // when multiple tasks share the same return type.
171 : template<typename T0, typename... Ts>
172 : using when_any_result_t = std::pair<std::size_t, unique_variant_t<T0, Ts...>>;
173 :
174 : // Extract result type from any awaitable via await_resume()
175 : template<typename A>
176 : using awaitable_result_t = decltype(std::declval<std::decay_t<A>&>().await_resume());
177 :
178 : /** Core shared state for when_any operations.
179 :
180 : Contains all members and methods common to both heterogeneous (variadic)
181 : and homogeneous (range) when_any implementations. State classes embed
182 : this via composition to avoid CRTP destructor ordering issues.
183 :
184 : @par Thread Safety
185 : Atomic operations protect winner selection and completion count.
186 : */
187 : struct when_any_core
188 : {
189 : std::atomic<std::size_t> remaining_count_;
190 : std::size_t winner_index_{0};
191 : std::exception_ptr winner_exception_;
192 : std::stop_source stop_source_;
193 :
194 : // Bridges parent's stop token to our stop_source
195 : struct stop_callback_fn
196 : {
197 : std::stop_source* source_;
198 6 : void operator()() const noexcept { source_->request_stop(); }
199 : };
200 : using stop_callback_t = std::stop_callback<stop_callback_fn>;
201 : std::optional<stop_callback_t> parent_stop_callback_;
202 :
203 : coro continuation_;
204 : executor_ref caller_ex_;
205 :
206 : // Placed last to avoid padding (1-byte atomic followed by 8-byte aligned members)
207 : std::atomic<bool> has_winner_{false};
208 :
209 52 : explicit when_any_core(std::size_t count) noexcept
210 52 : : remaining_count_(count)
211 : {
212 52 : }
213 :
214 : /** Atomically claim winner status; exactly one task succeeds. */
215 145 : bool try_win(std::size_t index) noexcept
216 : {
217 145 : bool expected = false;
218 145 : if(has_winner_.compare_exchange_strong(
219 : expected, true, std::memory_order_acq_rel))
220 : {
221 52 : winner_index_ = index;
222 52 : stop_source_.request_stop();
223 52 : return true;
224 : }
225 93 : return false;
226 : }
227 :
228 : /** @pre try_win() returned true. */
229 8 : void set_winner_exception(std::exception_ptr ep) noexcept
230 : {
231 8 : winner_exception_ = ep;
232 8 : }
233 :
234 : /** Last task to complete resumes the parent via symmetric transfer. */
235 145 : coro signal_completion()
236 : {
237 145 : auto remaining = remaining_count_.fetch_sub(1, std::memory_order_acq_rel);
238 145 : if(remaining == 1)
239 52 : caller_ex_.dispatch(continuation_);
240 145 : return std::noop_coroutine();
241 : }
242 : };
243 :
244 : /** Shared state for heterogeneous when_any operation.
245 :
246 : Coordinates winner selection, result storage, and completion tracking
247 : for all child tasks in a when_any operation. Uses composition with
248 : when_any_core for shared functionality.
249 :
250 : @par Lifetime
251 : Allocated on the parent coroutine's frame, outlives all runners.
252 :
253 : @tparam T0 First task's result type.
254 : @tparam Ts Remaining tasks' result types.
255 : */
256 : template<typename T0, typename... Ts>
257 : struct when_any_state
258 : {
259 : static constexpr std::size_t task_count = 1 + sizeof...(Ts);
260 : using variant_type = unique_variant_t<T0, Ts...>;
261 :
262 : when_any_core core_;
263 : std::optional<variant_type> result_;
264 : std::array<coro, task_count> runner_handles_{};
265 :
266 37 : when_any_state()
267 37 : : core_(task_count)
268 : {
269 37 : }
270 :
271 37 : ~when_any_state()
272 : {
273 130 : for(auto h : runner_handles_)
274 93 : if(h)
275 93 : h.destroy();
276 37 : }
277 :
278 : /** @pre core_.try_win() returned true.
279 : @note Uses in_place_type (not index) because variant is deduplicated.
280 : */
281 : template<typename T>
282 30 : void set_winner_result(T value)
283 : noexcept(std::is_nothrow_move_constructible_v<T>)
284 : {
285 30 : result_.emplace(std::in_place_type<T>, std::move(value));
286 30 : }
287 :
288 : /** @pre core_.try_win() returned true. */
289 2 : void set_winner_void() noexcept
290 : {
291 2 : result_.emplace(std::in_place_type<std::monostate>, std::monostate{});
292 2 : }
293 : };
294 :
295 : /** Wrapper coroutine that runs a single child task for when_any.
296 :
297 : Propagates executor/stop_token to the child, attempts to claim winner
298 : status on completion, and signals completion for cleanup coordination.
299 :
300 : @tparam StateType The state type (when_any_state or when_any_homogeneous_state).
301 : */
302 : template<typename StateType>
303 : struct when_any_runner
304 : {
305 : struct promise_type // : frame_allocating_base // DISABLED FOR TESTING
306 : {
307 : StateType* state_ = nullptr;
308 : std::size_t index_ = 0;
309 : executor_ref ex_;
310 : std::stop_token stop_token_;
311 :
312 145 : when_any_runner get_return_object() noexcept
313 : {
314 145 : return when_any_runner(std::coroutine_handle<promise_type>::from_promise(*this));
315 : }
316 :
317 : // Starts suspended; launcher sets up state/ex/token then resumes
318 145 : std::suspend_always initial_suspend() noexcept
319 : {
320 145 : return {};
321 : }
322 :
323 145 : auto final_suspend() noexcept
324 : {
325 : struct awaiter
326 : {
327 : promise_type* p_;
328 145 : bool await_ready() const noexcept { return false; }
329 145 : coro await_suspend(coro) noexcept { return p_->state_->core_.signal_completion(); }
330 0 : void await_resume() const noexcept {}
331 : };
332 145 : return awaiter{this};
333 : }
334 :
335 133 : void return_void() noexcept {}
336 :
337 : // Exceptions are valid completions in when_any (unlike when_all)
338 12 : void unhandled_exception()
339 : {
340 12 : if(state_->core_.try_win(index_))
341 8 : state_->core_.set_winner_exception(std::current_exception());
342 12 : }
343 :
344 : /** Injects executor and stop token into child awaitables. */
345 : template<class Awaitable>
346 : struct transform_awaiter
347 : {
348 : std::decay_t<Awaitable> a_;
349 : promise_type* p_;
350 :
351 145 : bool await_ready() { return a_.await_ready(); }
352 145 : auto await_resume() { return a_.await_resume(); }
353 :
354 : template<class Promise>
355 145 : auto await_suspend(std::coroutine_handle<Promise> h)
356 : {
357 145 : return a_.await_suspend(h, p_->ex_, p_->stop_token_);
358 : }
359 : };
360 :
361 : template<class Awaitable>
362 145 : auto await_transform(Awaitable&& a)
363 : {
364 : using A = std::decay_t<Awaitable>;
365 : if constexpr (IoAwaitable<A>)
366 : {
367 : return transform_awaiter<Awaitable>{
368 290 : std::forward<Awaitable>(a), this};
369 : }
370 : else
371 : {
372 : static_assert(sizeof(A) == 0, "requires IoAwaitable");
373 : }
374 145 : }
375 : };
376 :
377 : std::coroutine_handle<promise_type> h_;
378 :
379 145 : explicit when_any_runner(std::coroutine_handle<promise_type> h) noexcept
380 145 : : h_(h)
381 : {
382 145 : }
383 :
384 : // Enable move for all clang versions - some versions need it
385 : when_any_runner(when_any_runner&& other) noexcept : h_(std::exchange(other.h_, nullptr)) {}
386 :
387 : // Non-copyable
388 : when_any_runner(when_any_runner const&) = delete;
389 : when_any_runner& operator=(when_any_runner const&) = delete;
390 : when_any_runner& operator=(when_any_runner&&) = delete;
391 :
392 145 : auto release() noexcept
393 : {
394 145 : return std::exchange(h_, nullptr);
395 : }
396 : };
397 :
398 : /** Wraps a child awaitable, attempts to claim winner on completion.
399 :
400 : Uses requires-expressions to detect state capabilities:
401 : - set_winner_void(): for heterogeneous void tasks (stores monostate)
402 : - set_winner_result(): for non-void tasks
403 : - Neither: for homogeneous void tasks (no result storage)
404 : */
405 : template<IoAwaitable Awaitable, typename StateType>
406 : when_any_runner<StateType>
407 145 : make_when_any_runner(Awaitable inner, StateType* state, std::size_t index)
408 : {
409 : using T = awaitable_result_t<Awaitable>;
410 : if constexpr (std::is_void_v<T>)
411 : {
412 : co_await std::move(inner);
413 : if(state->core_.try_win(index))
414 : {
415 : // Heterogeneous void tasks store monostate in the variant
416 : if constexpr (requires { state->set_winner_void(); })
417 : state->set_winner_void();
418 : // Homogeneous void tasks have no result to store
419 : }
420 : }
421 : else
422 : {
423 : auto result = co_await std::move(inner);
424 : if(state->core_.try_win(index))
425 : {
426 : // Defensive: move should not throw (already moved once), but we
427 : // catch just in case since an uncaught exception would be devastating.
428 : try
429 : {
430 : state->set_winner_result(std::move(result));
431 : }
432 : catch(...)
433 : {
434 : state->core_.set_winner_exception(std::current_exception());
435 : }
436 : }
437 : }
438 290 : }
439 :
440 : /** Launches all runners concurrently; see await_suspend for lifetime concerns. */
441 : template<IoAwaitable... Awaitables>
442 : class when_any_launcher
443 : {
444 : using state_type = when_any_state<awaitable_result_t<Awaitables>...>;
445 :
446 : std::tuple<Awaitables...>* tasks_;
447 : state_type* state_;
448 :
449 : public:
450 37 : when_any_launcher(
451 : std::tuple<Awaitables...>* tasks,
452 : state_type* state)
453 37 : : tasks_(tasks)
454 37 : , state_(state)
455 : {
456 37 : }
457 :
458 37 : bool await_ready() const noexcept
459 : {
460 37 : return sizeof...(Awaitables) == 0;
461 : }
462 :
463 : /** CRITICAL: If the last task finishes synchronously, parent resumes and
464 : destroys this object before await_suspend returns. Must not reference
465 : `this` after the final launch_one call.
466 : */
467 : template<Executor Ex>
468 37 : coro await_suspend(coro continuation, Ex const& caller_ex, std::stop_token parent_token = {})
469 : {
470 37 : state_->core_.continuation_ = continuation;
471 37 : state_->core_.caller_ex_ = caller_ex;
472 :
473 37 : if(parent_token.stop_possible())
474 : {
475 16 : state_->core_.parent_stop_callback_.emplace(
476 : parent_token,
477 8 : when_any_core::stop_callback_fn{&state_->core_.stop_source_});
478 :
479 8 : if(parent_token.stop_requested())
480 2 : state_->core_.stop_source_.request_stop();
481 : }
482 :
483 37 : auto token = state_->core_.stop_source_.get_token();
484 74 : [&]<std::size_t... Is>(std::index_sequence<Is...>) {
485 37 : (..., launch_one<Is>(caller_ex, token));
486 37 : }(std::index_sequence_for<Awaitables...>{});
487 :
488 74 : return std::noop_coroutine();
489 37 : }
490 :
491 37 : void await_resume() const noexcept
492 : {
493 37 : }
494 :
495 : private:
496 : /** @pre Ex::dispatch() and coro::resume() must not throw (handle may leak). */
497 : template<std::size_t I, Executor Ex>
498 93 : void launch_one(Ex const& caller_ex, std::stop_token token)
499 : {
500 93 : auto runner = make_when_any_runner(
501 93 : std::move(std::get<I>(*tasks_)), state_, I);
502 :
503 93 : auto h = runner.release();
504 93 : h.promise().state_ = state_;
505 93 : h.promise().index_ = I;
506 93 : h.promise().ex_ = caller_ex;
507 93 : h.promise().stop_token_ = token;
508 :
509 93 : coro ch{h};
510 93 : state_->runner_handles_[I] = ch;
511 93 : caller_ex.dispatch(ch);
512 93 : }
513 : };
514 :
515 : } // namespace detail
516 :
517 : /** Wait for the first awaitable to complete.
518 :
519 : Races multiple heterogeneous awaitables concurrently and returns when the
520 : first one completes. The result includes the winner's index and a
521 : deduplicated variant containing the result value.
522 :
523 : @par Suspends
524 : The calling coroutine suspends when co_await is invoked. All awaitables
525 : are launched concurrently and execute in parallel. The coroutine resumes
526 : only after all awaitables have completed, even though the winner is
527 : determined by the first to finish.
528 :
529 : @par Completion Conditions
530 : @li Winner is determined when the first awaitable completes (success or exception)
531 : @li Only one task can claim winner status via atomic compare-exchange
532 : @li Once a winner exists, stop is requested for all remaining siblings
533 : @li Parent coroutine resumes only after all siblings acknowledge completion
534 : @li The winner's result is returned; if the winner threw, the exception is rethrown
535 :
536 : @par Cancellation Semantics
537 : Cancellation is supported via stop_token propagated through the
538 : IoAwaitable protocol:
539 : @li Each child awaitable receives a stop_token derived from a shared stop_source
540 : @li When the parent's stop token is activated, the stop is forwarded to all children
541 : @li When a winner is determined, stop_source_.request_stop() is called immediately
542 : @li Siblings must handle cancellation gracefully and complete before parent resumes
543 : @li Stop requests are cooperative; tasks must check and respond to them
544 :
545 : @par Concurrency/Overlap
546 : All awaitables are launched concurrently before any can complete.
547 : The launcher iterates through the arguments, starting each task on the
548 : caller's executor. Tasks may execute in parallel on multi-threaded
549 : executors or interleave on single-threaded executors. There is no
550 : guaranteed ordering of task completion.
551 :
552 : @par Notable Error Conditions
553 : @li Winner exception: if the winning task threw, that exception is rethrown
554 : @li Non-winner exceptions: silently discarded (only winner's result matters)
555 : @li Cancellation: tasks may complete via cancellation without throwing
556 :
557 : @par Example
558 : @code
559 : task<void> example() {
560 : auto [index, result] = co_await when_any(
561 : fetch_from_primary(), // task<Response>
562 : fetch_from_backup() // task<Response>
563 : );
564 : // index is 0 or 1, result holds the winner's Response
565 : auto response = std::get<Response>(result);
566 : }
567 : @endcode
568 :
569 : @par Example with Heterogeneous Types
570 : @code
571 : task<void> mixed_types() {
572 : auto [index, result] = co_await when_any(
573 : fetch_int(), // task<int>
574 : fetch_string() // task<std::string>
575 : );
576 : if (index == 0)
577 : std::cout << "Got int: " << std::get<int>(result) << "\n";
578 : else
579 : std::cout << "Got string: " << std::get<std::string>(result) << "\n";
580 : }
581 : @endcode
582 :
583 : @tparam A0 First awaitable type (must satisfy IoAwaitable).
584 : @tparam As Remaining awaitable types (must satisfy IoAwaitable).
585 : @param a0 The first awaitable to race.
586 : @param as Additional awaitables to race concurrently.
587 : @return A task yielding a pair of (winner_index, result_variant).
588 :
589 : @throws Rethrows the winner's exception if the winning task threw an exception.
590 :
591 : @par Remarks
592 : Awaitables are moved into the coroutine frame; original objects become
593 : empty after the call. When multiple awaitables share the same return type,
594 : the variant is deduplicated to contain only unique types. Use the winner
595 : index to determine which awaitable completed first. Void awaitables
596 : contribute std::monostate to the variant.
597 :
598 : @see when_all, IoAwaitable
599 : */
600 : template<IoAwaitable A0, IoAwaitable... As>
601 37 : [[nodiscard]] auto when_any(A0 a0, As... as)
602 : -> task<detail::when_any_result_t<
603 : detail::awaitable_result_t<A0>,
604 : detail::awaitable_result_t<As>...>>
605 : {
606 : using result_type = detail::when_any_result_t<
607 : detail::awaitable_result_t<A0>,
608 : detail::awaitable_result_t<As>...>;
609 :
610 : detail::when_any_state<
611 : detail::awaitable_result_t<A0>,
612 : detail::awaitable_result_t<As>...> state;
613 : std::tuple<A0, As...> awaitable_tuple(std::move(a0), std::move(as)...);
614 :
615 : co_await detail::when_any_launcher<A0, As...>(&awaitable_tuple, &state);
616 :
617 : if(state.core_.winner_exception_)
618 : std::rethrow_exception(state.core_.winner_exception_);
619 :
620 : co_return result_type{state.core_.winner_index_, std::move(*state.result_)};
621 74 : }
622 :
623 : /** Concept for ranges of full I/O awaitables.
624 :
625 : A range satisfies `IoAwaitableRange` if it is a sized input range
626 : whose value type satisfies @ref IoAwaitable. This enables when_any
627 : to accept any container or view of awaitables, not just std::vector.
628 :
629 : @tparam R The range type.
630 :
631 : @par Requirements
632 : @li `R` must satisfy `std::ranges::input_range`
633 : @li `R` must satisfy `std::ranges::sized_range`
634 : @li `std::ranges::range_value_t<R>` must satisfy @ref IoAwaitable
635 :
636 : @par Syntactic Requirements
637 : Given `r` of type `R`:
638 : @li `std::ranges::begin(r)` is valid
639 : @li `std::ranges::end(r)` is valid
640 : @li `std::ranges::size(r)` returns `std::ranges::range_size_t<R>`
641 : @li `*std::ranges::begin(r)` satisfies @ref IoAwaitable
642 :
643 : @par Example
644 : @code
645 : template<IoAwaitableRange R>
646 : task<void> race_all(R&& awaitables) {
647 : auto winner = co_await when_any(std::forward<R>(awaitables));
648 : // Process winner...
649 : }
650 : @endcode
651 :
652 : @see when_any, IoAwaitable
653 : */
654 : template<typename R>
655 : concept IoAwaitableRange =
656 : std::ranges::input_range<R> &&
657 : std::ranges::sized_range<R> &&
658 : IoAwaitable<std::ranges::range_value_t<R>>;
659 :
660 : namespace detail {
661 :
662 : /** Shared state for homogeneous when_any (range overload).
663 :
664 : Uses composition with when_any_core for shared functionality.
665 : Simpler than heterogeneous: optional<T> instead of variant, vector
666 : instead of array for runner handles.
667 : */
668 : template<typename T>
669 : struct when_any_homogeneous_state
670 : {
671 : when_any_core core_;
672 : std::optional<T> result_;
673 : std::vector<coro> runner_handles_;
674 :
675 13 : explicit when_any_homogeneous_state(std::size_t count)
676 13 : : core_(count)
677 26 : , runner_handles_(count)
678 : {
679 13 : }
680 :
681 13 : ~when_any_homogeneous_state()
682 : {
683 60 : for(auto h : runner_handles_)
684 47 : if(h)
685 47 : h.destroy();
686 13 : }
687 :
688 : /** @pre core_.try_win() returned true. */
689 11 : void set_winner_result(T value)
690 : noexcept(std::is_nothrow_move_constructible_v<T>)
691 : {
692 11 : result_.emplace(std::move(value));
693 11 : }
694 : };
695 :
696 : /** Specialization for void tasks (no result storage needed). */
697 : template<>
698 : struct when_any_homogeneous_state<void>
699 : {
700 : when_any_core core_;
701 : std::vector<coro> runner_handles_;
702 :
703 2 : explicit when_any_homogeneous_state(std::size_t count)
704 2 : : core_(count)
705 4 : , runner_handles_(count)
706 : {
707 2 : }
708 :
709 2 : ~when_any_homogeneous_state()
710 : {
711 7 : for(auto h : runner_handles_)
712 5 : if(h)
713 5 : h.destroy();
714 2 : }
715 :
716 : // No set_winner_result - void tasks have no result to store
717 : };
718 :
719 : /** Launches all runners concurrently; see await_suspend for lifetime concerns. */
720 : template<IoAwaitableRange Range>
721 : class when_any_homogeneous_launcher
722 : {
723 : using Awaitable = std::ranges::range_value_t<Range>;
724 : using T = awaitable_result_t<Awaitable>;
725 :
726 : Range* range_;
727 : when_any_homogeneous_state<T>* state_;
728 :
729 : public:
730 15 : when_any_homogeneous_launcher(
731 : Range* range,
732 : when_any_homogeneous_state<T>* state)
733 15 : : range_(range)
734 15 : , state_(state)
735 : {
736 15 : }
737 :
738 15 : bool await_ready() const noexcept
739 : {
740 15 : return std::ranges::empty(*range_);
741 : }
742 :
743 : /** CRITICAL: If the last task finishes synchronously, parent resumes and
744 : destroys this object before await_suspend returns. Must not reference
745 : `this` after the final launch_one call.
746 : */
747 : template<Executor Ex>
748 15 : coro await_suspend(coro continuation, Ex const& caller_ex, std::stop_token parent_token = {})
749 : {
750 15 : state_->core_.continuation_ = continuation;
751 15 : state_->core_.caller_ex_ = caller_ex;
752 :
753 15 : if(parent_token.stop_possible())
754 : {
755 10 : state_->core_.parent_stop_callback_.emplace(
756 : parent_token,
757 5 : when_any_core::stop_callback_fn{&state_->core_.stop_source_});
758 :
759 5 : if(parent_token.stop_requested())
760 2 : state_->core_.stop_source_.request_stop();
761 : }
762 :
763 15 : auto token = state_->core_.stop_source_.get_token();
764 15 : std::size_t index = 0;
765 67 : for(auto&& a : *range_)
766 : {
767 52 : launch_one(index, std::move(a), caller_ex, token);
768 52 : ++index;
769 : }
770 :
771 30 : return std::noop_coroutine();
772 15 : }
773 :
774 15 : void await_resume() const noexcept
775 : {
776 15 : }
777 :
778 : private:
779 : /** @pre Ex::dispatch() and coro::resume() must not throw (handle may leak). */
780 : template<Executor Ex>
781 52 : void launch_one(std::size_t index, Awaitable&& awaitable, Ex const& caller_ex, std::stop_token token)
782 : {
783 52 : auto runner = make_when_any_runner(
784 52 : std::move(awaitable), state_, index);
785 :
786 52 : auto h = runner.release();
787 52 : h.promise().state_ = state_;
788 52 : h.promise().index_ = index;
789 52 : h.promise().ex_ = caller_ex;
790 52 : h.promise().stop_token_ = token;
791 :
792 52 : coro ch{h};
793 52 : state_->runner_handles_[index] = ch;
794 52 : caller_ex.dispatch(ch);
795 52 : }
796 : };
797 :
798 : } // namespace detail
799 :
800 : /** Wait for the first awaitable to complete (range overload).
801 :
802 : Races a range of awaitables with the same result type. Accepts any
803 : sized input range of IoAwaitable types, enabling use with arrays,
804 : spans, or custom containers.
805 :
806 : @par Suspends
807 : The calling coroutine suspends when co_await is invoked. All awaitables
808 : in the range are launched concurrently and execute in parallel. The
809 : coroutine resumes only after all awaitables have completed, even though
810 : the winner is determined by the first to finish.
811 :
812 : @par Completion Conditions
813 : @li Winner is determined when the first awaitable completes (success or exception)
814 : @li Only one task can claim winner status via atomic compare-exchange
815 : @li Once a winner exists, stop is requested for all remaining siblings
816 : @li Parent coroutine resumes only after all siblings acknowledge completion
817 : @li The winner's index and result are returned; if the winner threw, the exception is rethrown
818 :
819 : @par Cancellation Semantics
820 : Cancellation is supported via stop_token propagated through the
821 : IoAwaitable protocol:
822 : @li Each child awaitable receives a stop_token derived from a shared stop_source
823 : @li When the parent's stop token is activated, the stop is forwarded to all children
824 : @li When a winner is determined, stop_source_.request_stop() is called immediately
825 : @li Siblings must handle cancellation gracefully and complete before parent resumes
826 : @li Stop requests are cooperative; tasks must check and respond to them
827 :
828 : @par Concurrency/Overlap
829 : All awaitables are launched concurrently before any can complete.
830 : The launcher iterates through the range, starting each task on the
831 : caller's executor. Tasks may execute in parallel on multi-threaded
832 : executors or interleave on single-threaded executors. There is no
833 : guaranteed ordering of task completion.
834 :
835 : @par Notable Error Conditions
836 : @li Empty range: throws std::invalid_argument immediately (not via co_return)
837 : @li Winner exception: if the winning task threw, that exception is rethrown
838 : @li Non-winner exceptions: silently discarded (only winner's result matters)
839 : @li Cancellation: tasks may complete via cancellation without throwing
840 :
841 : @par Example
842 : @code
843 : task<void> example() {
844 : std::array<task<Response>, 3> requests = {
845 : fetch_from_server(0),
846 : fetch_from_server(1),
847 : fetch_from_server(2)
848 : };
849 :
850 : auto [index, response] = co_await when_any(std::move(requests));
851 : }
852 : @endcode
853 :
854 : @par Example with Vector
855 : @code
856 : task<Response> fetch_fastest(std::vector<Server> const& servers) {
857 : std::vector<task<Response>> requests;
858 : for (auto const& server : servers)
859 : requests.push_back(fetch_from(server));
860 :
861 : auto [index, response] = co_await when_any(std::move(requests));
862 : co_return response;
863 : }
864 : @endcode
865 :
866 : @tparam R Range type satisfying IoAwaitableRange.
867 : @param awaitables Range of awaitables to race concurrently (must not be empty).
868 : @return A task yielding a pair of (winner_index, result).
869 :
870 : @throws std::invalid_argument if range is empty (thrown before coroutine suspends).
871 : @throws Rethrows the winner's exception if the winning task threw an exception.
872 :
873 : @par Remarks
874 : Elements are moved from the range; for lvalue ranges, the original
875 : container will have moved-from elements after this call. The range
876 : is moved onto the coroutine frame to ensure lifetime safety. Unlike
877 : the variadic overload, no variant wrapper is needed since all tasks
878 : share the same return type.
879 :
880 : @see when_any, IoAwaitableRange
881 : */
882 : template<IoAwaitableRange R>
883 : requires (!std::is_void_v<detail::awaitable_result_t<std::ranges::range_value_t<R>>>)
884 14 : [[nodiscard]] auto when_any(R&& awaitables)
885 : -> task<std::pair<std::size_t, detail::awaitable_result_t<std::ranges::range_value_t<R>>>>
886 : {
887 : using Awaitable = std::ranges::range_value_t<R>;
888 : using T = detail::awaitable_result_t<Awaitable>;
889 : using result_type = std::pair<std::size_t, T>;
890 : using OwnedRange = std::remove_cvref_t<R>;
891 :
892 : auto count = std::ranges::size(awaitables);
893 : if(count == 0)
894 : throw std::invalid_argument("when_any requires at least one awaitable");
895 :
896 : // Move/copy range onto coroutine frame to ensure lifetime
897 : OwnedRange owned_awaitables = std::forward<R>(awaitables);
898 :
899 : detail::when_any_homogeneous_state<T> state(count);
900 :
901 : co_await detail::when_any_homogeneous_launcher<OwnedRange>(&owned_awaitables, &state);
902 :
903 : if(state.core_.winner_exception_)
904 : std::rethrow_exception(state.core_.winner_exception_);
905 :
906 : co_return result_type{state.core_.winner_index_, std::move(*state.result_)};
907 28 : }
908 :
909 : /** Wait for the first awaitable to complete (void range overload).
910 :
911 : Races a range of void-returning awaitables. Since void awaitables have
912 : no result value, only the winner's index is returned.
913 :
914 : @par Suspends
915 : The calling coroutine suspends when co_await is invoked. All awaitables
916 : in the range are launched concurrently and execute in parallel. The
917 : coroutine resumes only after all awaitables have completed, even though
918 : the winner is determined by the first to finish.
919 :
920 : @par Completion Conditions
921 : @li Winner is determined when the first awaitable completes (success or exception)
922 : @li Only one task can claim winner status via atomic compare-exchange
923 : @li Once a winner exists, stop is requested for all remaining siblings
924 : @li Parent coroutine resumes only after all siblings acknowledge completion
925 : @li The winner's index is returned; if the winner threw, the exception is rethrown
926 :
927 : @par Cancellation Semantics
928 : Cancellation is supported via stop_token propagated through the
929 : IoAwaitable protocol:
930 : @li Each child awaitable receives a stop_token derived from a shared stop_source
931 : @li When the parent's stop token is activated, the stop is forwarded to all children
932 : @li When a winner is determined, stop_source_.request_stop() is called immediately
933 : @li Siblings must handle cancellation gracefully and complete before parent resumes
934 : @li Stop requests are cooperative; tasks must check and respond to them
935 :
936 : @par Concurrency/Overlap
937 : All awaitables are launched concurrently before any can complete.
938 : The launcher iterates through the range, starting each task on the
939 : caller's executor. Tasks may execute in parallel on multi-threaded
940 : executors or interleave on single-threaded executors. There is no
941 : guaranteed ordering of task completion.
942 :
943 : @par Notable Error Conditions
944 : @li Empty range: throws std::invalid_argument immediately (not via co_return)
945 : @li Winner exception: if the winning task threw, that exception is rethrown
946 : @li Non-winner exceptions: silently discarded (only winner's result matters)
947 : @li Cancellation: tasks may complete via cancellation without throwing
948 :
949 : @par Example
950 : @code
951 : task<void> example() {
952 : std::vector<task<void>> tasks;
953 : for (int i = 0; i < 5; ++i)
954 : tasks.push_back(background_work(i));
955 :
956 : std::size_t winner = co_await when_any(std::move(tasks));
957 : // winner is the index of the first task to complete
958 : }
959 : @endcode
960 :
961 : @par Example with Timeout
962 : @code
963 : task<void> with_timeout() {
964 : std::vector<task<void>> tasks;
965 : tasks.push_back(long_running_operation());
966 : tasks.push_back(delay(std::chrono::seconds(5)));
967 :
968 : std::size_t winner = co_await when_any(std::move(tasks));
969 : if (winner == 1) {
970 : // Timeout occurred
971 : }
972 : }
973 : @endcode
974 :
975 : @tparam R Range type satisfying IoAwaitableRange with void result.
976 : @param awaitables Range of void awaitables to race concurrently (must not be empty).
977 : @return A task yielding the winner's index (zero-based).
978 :
979 : @throws std::invalid_argument if range is empty (thrown before coroutine suspends).
980 : @throws Rethrows the winner's exception if the winning task threw an exception.
981 :
982 : @par Remarks
983 : Elements are moved from the range; for lvalue ranges, the original
984 : container will have moved-from elements after this call. The range
985 : is moved onto the coroutine frame to ensure lifetime safety. Unlike
986 : the non-void overload, no result storage is needed since void tasks
987 : produce no value.
988 :
989 : @see when_any, IoAwaitableRange
990 : */
991 : template<IoAwaitableRange R>
992 : requires std::is_void_v<detail::awaitable_result_t<std::ranges::range_value_t<R>>>
993 2 : [[nodiscard]] auto when_any(R&& awaitables) -> task<std::size_t>
994 : {
995 : using OwnedRange = std::remove_cvref_t<R>;
996 :
997 : auto count = std::ranges::size(awaitables);
998 : if(count == 0)
999 : throw std::invalid_argument("when_any requires at least one awaitable");
1000 :
1001 : // Move/copy range onto coroutine frame to ensure lifetime
1002 : OwnedRange owned_awaitables = std::forward<R>(awaitables);
1003 :
1004 : detail::when_any_homogeneous_state<void> state(count);
1005 :
1006 : co_await detail::when_any_homogeneous_launcher<OwnedRange>(&owned_awaitables, &state);
1007 :
1008 : if(state.core_.winner_exception_)
1009 : std::rethrow_exception(state.core_.winner_exception_);
1010 :
1011 : co_return state.core_.winner_index_;
1012 4 : }
1013 :
1014 : } // namespace capy
1015 : } // namespace boost
1016 :
1017 : #endif
|