1  
//
1  
//
2  
// Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
2  
// Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/capy
7  
// Official repository: https://github.com/cppalliance/capy
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_CAPY_SRC_EX_DETAIL_STRAND_QUEUE_HPP
10  
#ifndef BOOST_CAPY_SRC_EX_DETAIL_STRAND_QUEUE_HPP
11  
#define BOOST_CAPY_SRC_EX_DETAIL_STRAND_QUEUE_HPP
11  
#define BOOST_CAPY_SRC_EX_DETAIL_STRAND_QUEUE_HPP
12  

12  

13  
#include <boost/capy/detail/config.hpp>
13  
#include <boost/capy/detail/config.hpp>
14  

14  

15  
#include <coroutine>
15  
#include <coroutine>
16  
#include <cstddef>
16  
#include <cstddef>
17  
#include <exception>
17  
#include <exception>
18  

18  

19  
namespace boost {
19  
namespace boost {
20  
namespace capy {
20  
namespace capy {
21  
namespace detail {
21  
namespace detail {
22  

22  

23  
class strand_queue;
23  
class strand_queue;
24  

24  

25  
//----------------------------------------------------------
25  
//----------------------------------------------------------
26  

26  

27  
// Metadata stored before the coroutine frame
27  
// Metadata stored before the coroutine frame
28  
struct frame_prefix
28  
struct frame_prefix
29  
{
29  
{
30  
    frame_prefix* next;
30  
    frame_prefix* next;
31  
    strand_queue* queue;
31  
    strand_queue* queue;
32  
    std::size_t alloc_size;
32  
    std::size_t alloc_size;
33  
};
33  
};
34  

34  

35  
//----------------------------------------------------------
35  
//----------------------------------------------------------
36  

36  

37  
/** Wrapper coroutine for strand queue dispatch operations.
37  
/** Wrapper coroutine for strand queue dispatch operations.
38  

38  

39  
    This coroutine wraps a target coroutine handle and resumes
39  
    This coroutine wraps a target coroutine handle and resumes
40  
    it when dispatched. The wrapper ensures control returns to
40  
    it when dispatched. The wrapper ensures control returns to
41  
    the dispatch loop after the target suspends or completes.
41  
    the dispatch loop after the target suspends or completes.
42  

42  

43  
    The promise contains an intrusive list node for queue
43  
    The promise contains an intrusive list node for queue
44  
    storage and supports a custom allocator that recycles
44  
    storage and supports a custom allocator that recycles
45  
    coroutine frames via a free list.
45  
    coroutine frames via a free list.
46  
*/
46  
*/
47  
struct strand_op
47  
struct strand_op
48  
{
48  
{
49  
    struct promise_type
49  
    struct promise_type
50  
    {
50  
    {
51  
        promise_type* next = nullptr;
51  
        promise_type* next = nullptr;
52  

52  

53  
        void*
53  
        void*
54  
        operator new(
54  
        operator new(
55  
            std::size_t size,
55  
            std::size_t size,
56  
            strand_queue& q,
56  
            strand_queue& q,
57  
            std::coroutine_handle<void>);
57  
            std::coroutine_handle<void>);
58  

58  

59  
        void
59  
        void
60  
        operator delete(void* p, std::size_t);
60  
        operator delete(void* p, std::size_t);
61  

61  

62  
        strand_op
62  
        strand_op
63  
        get_return_object() noexcept
63  
        get_return_object() noexcept
64  
        {
64  
        {
65  
            return {std::coroutine_handle<promise_type>::from_promise(*this)};
65  
            return {std::coroutine_handle<promise_type>::from_promise(*this)};
66  
        }
66  
        }
67  

67  

68  
        std::suspend_always
68  
        std::suspend_always
69  
        initial_suspend() noexcept
69  
        initial_suspend() noexcept
70  
        {
70  
        {
71  
            return {};
71  
            return {};
72  
        }
72  
        }
73  

73  

74  
        std::suspend_always
74  
        std::suspend_always
75  
        final_suspend() noexcept
75  
        final_suspend() noexcept
76  
        {
76  
        {
77  
            return {};
77  
            return {};
78  
        }
78  
        }
79  

79  

80  
        void
80  
        void
81  
        return_void() noexcept
81  
        return_void() noexcept
82  
        {
82  
        {
83  
        }
83  
        }
84  

84  

85  
        void
85  
        void
86  
        unhandled_exception()
86  
        unhandled_exception()
87  
        {
87  
        {
88  
            std::terminate();
88  
            std::terminate();
89  
        }
89  
        }
90  
    };
90  
    };
91  

91  

92  
    std::coroutine_handle<promise_type> h_;
92  
    std::coroutine_handle<promise_type> h_;
93  
};
93  
};
94  

94  

95  
//----------------------------------------------------------
95  
//----------------------------------------------------------
96  

96  

97  
/** Single-threaded dispatch queue for coroutine handles.
97  
/** Single-threaded dispatch queue for coroutine handles.
98  

98  

99  
    This queue stores coroutine handles and resumes them
99  
    This queue stores coroutine handles and resumes them
100  
    sequentially when dispatch() is called. Each pushed
100  
    sequentially when dispatch() is called. Each pushed
101  
    handle is wrapped in a strand_op coroutine that ensures
101  
    handle is wrapped in a strand_op coroutine that ensures
102  
    control returns to the dispatch loop after the target
102  
    control returns to the dispatch loop after the target
103  
    suspends or completes.
103  
    suspends or completes.
104  

104  

105  
    The queue uses an intrusive singly-linked list through
105  
    The queue uses an intrusive singly-linked list through
106  
    the promise type to avoid separate node allocations.
106  
    the promise type to avoid separate node allocations.
107  
    A free list recycles wrapper coroutine frames to reduce
107  
    A free list recycles wrapper coroutine frames to reduce
108  
    allocation overhead during repeated push/dispatch cycles.
108  
    allocation overhead during repeated push/dispatch cycles.
109  

109  

110  
    @par Thread Safety
110  
    @par Thread Safety
111  
    This class is not thread-safe. All operations must be
111  
    This class is not thread-safe. All operations must be
112  
    called from a single thread.
112  
    called from a single thread.
113  
*/
113  
*/
114  
class strand_queue
114  
class strand_queue
115  
{
115  
{
116  
    using promise_type = strand_op::promise_type;
116  
    using promise_type = strand_op::promise_type;
117  

117  

118  
    promise_type* head_ = nullptr;
118  
    promise_type* head_ = nullptr;
119  
    promise_type* tail_ = nullptr;
119  
    promise_type* tail_ = nullptr;
120  
    frame_prefix* free_list_ = nullptr;
120  
    frame_prefix* free_list_ = nullptr;
121  

121  

122  
    friend struct strand_op::promise_type;
122  
    friend struct strand_op::promise_type;
123  

123  

124  
    static
124  
    static
125  
    strand_op
125  
    strand_op
126  
    make_strand_op(
126  
    make_strand_op(
127  
        strand_queue& q,
127  
        strand_queue& q,
128  
        std::coroutine_handle<void> target)
128  
        std::coroutine_handle<void> target)
129  
    {
129  
    {
130  
        (void)q;
130  
        (void)q;
131  
        target.resume();
131  
        target.resume();
132  
        co_return;
132  
        co_return;
133  
    }
133  
    }
134  

134  

135  
public:
135  
public:
136  
    strand_queue() = default;
136  
    strand_queue() = default;
137  

137  

138  
    strand_queue(strand_queue const&) = delete;
138  
    strand_queue(strand_queue const&) = delete;
139  
    strand_queue& operator=(strand_queue const&) = delete;
139  
    strand_queue& operator=(strand_queue const&) = delete;
140  

140  

141  
    /** Destructor.
141  
    /** Destructor.
142  

142  

143  
        Destroys any pending wrappers without resuming them,
143  
        Destroys any pending wrappers without resuming them,
144  
        then frees all memory in the free list.
144  
        then frees all memory in the free list.
145  
    */
145  
    */
146  
    ~strand_queue()
146  
    ~strand_queue()
147  
    {
147  
    {
148  
        // Destroy pending wrappers
148  
        // Destroy pending wrappers
149  
        while(head_)
149  
        while(head_)
150  
        {
150  
        {
151  
            promise_type* p = head_;
151  
            promise_type* p = head_;
152  
            head_ = p->next;
152  
            head_ = p->next;
153  

153  

154  
            auto h = std::coroutine_handle<promise_type>::from_promise(*p);
154  
            auto h = std::coroutine_handle<promise_type>::from_promise(*p);
155  
            h.destroy();
155  
            h.destroy();
156  
        }
156  
        }
157  

157  

158  
        // Free the free list memory
158  
        // Free the free list memory
159  
        while(free_list_)
159  
        while(free_list_)
160  
        {
160  
        {
161  
            frame_prefix* prefix = free_list_;
161  
            frame_prefix* prefix = free_list_;
162  
            free_list_ = prefix->next;
162  
            free_list_ = prefix->next;
163  
            ::operator delete(prefix);
163  
            ::operator delete(prefix);
164  
        }
164  
        }
165  
    }
165  
    }
166  

166  

167  
    /** Returns true if there are no pending operations.
167  
    /** Returns true if there are no pending operations.
168  
    */
168  
    */
169  
    bool
169  
    bool
170  
    empty() const noexcept
170  
    empty() const noexcept
171  
    {
171  
    {
172  
        return head_ == nullptr;
172  
        return head_ == nullptr;
173  
    }
173  
    }
174  

174  

175  
    /** Push a coroutine handle to the queue.
175  
    /** Push a coroutine handle to the queue.
176  

176  

177  
        Creates a wrapper coroutine and appends it to the
177  
        Creates a wrapper coroutine and appends it to the
178  
        queue. The wrapper will resume the target handle
178  
        queue. The wrapper will resume the target handle
179  
        when dispatch() processes it.
179  
        when dispatch() processes it.
180  

180  

181  
        @param h The coroutine handle to dispatch.
181  
        @param h The coroutine handle to dispatch.
182  
    */
182  
    */
183  
    void
183  
    void
184  
    push(std::coroutine_handle<void> h)
184  
    push(std::coroutine_handle<void> h)
185  
    {
185  
    {
186  
        strand_op op = make_strand_op(*this, h);
186  
        strand_op op = make_strand_op(*this, h);
187  

187  

188  
        promise_type* p = &op.h_.promise();
188  
        promise_type* p = &op.h_.promise();
189  
        p->next = nullptr;
189  
        p->next = nullptr;
190  

190  

191  
        if(tail_)
191  
        if(tail_)
192  
            tail_->next = p;
192  
            tail_->next = p;
193  
        else
193  
        else
194  
            head_ = p;
194  
            head_ = p;
195  
        tail_ = p;
195  
        tail_ = p;
196  
    }
196  
    }
197  

197  

198  
    /** Resume all queued coroutines in sequence.
198  
    /** Resume all queued coroutines in sequence.
199  

199  

200  
        Processes each wrapper in FIFO order, resuming its
200  
        Processes each wrapper in FIFO order, resuming its
201  
        target coroutine. After each target suspends or
201  
        target coroutine. After each target suspends or
202  
        completes, the wrapper is destroyed and its frame
202  
        completes, the wrapper is destroyed and its frame
203  
        is added to the free list for reuse.
203  
        is added to the free list for reuse.
204  

204  

205  
        Coroutines resumed during dispatch may push new
205  
        Coroutines resumed during dispatch may push new
206  
        handles, which will also be processed in the same
206  
        handles, which will also be processed in the same
207  
        dispatch call.
207  
        dispatch call.
208  

208  

209  
        @warning Not thread-safe. Do not call while another
209  
        @warning Not thread-safe. Do not call while another
210  
            thread may be calling push().
210  
            thread may be calling push().
211  
    */
211  
    */
212  
    void
212  
    void
213  
    dispatch()
213  
    dispatch()
214  
    {
214  
    {
215  
        while(head_)
215  
        while(head_)
216  
        {
216  
        {
217  
            promise_type* p = head_;
217  
            promise_type* p = head_;
218  
            head_ = p->next;
218  
            head_ = p->next;
219  
            if(!head_)
219  
            if(!head_)
220  
                tail_ = nullptr;
220  
                tail_ = nullptr;
221  

221  

222  
            auto h = std::coroutine_handle<promise_type>::from_promise(*p);
222  
            auto h = std::coroutine_handle<promise_type>::from_promise(*p);
223  
            h.resume();
223  
            h.resume();
224  
            h.destroy();
224  
            h.destroy();
225  
        }
225  
        }
226  
    }
226  
    }
227  

227  

228  
    /** Batch of taken items for thread-safe dispatch. */
228  
    /** Batch of taken items for thread-safe dispatch. */
229  
    struct taken_batch
229  
    struct taken_batch
230  
    {
230  
    {
231  
        promise_type* head = nullptr;
231  
        promise_type* head = nullptr;
232  
        promise_type* tail = nullptr;
232  
        promise_type* tail = nullptr;
233  
    };
233  
    };
234  

234  

235  
    /** Take all pending items atomically.
235  
    /** Take all pending items atomically.
236  

236  

237  
        Removes all items from the queue and returns them
237  
        Removes all items from the queue and returns them
238  
        as a batch. The queue is left empty.
238  
        as a batch. The queue is left empty.
239  

239  

240  
        @return The batch of taken items.
240  
        @return The batch of taken items.
241  
    */
241  
    */
242  
    taken_batch
242  
    taken_batch
243  
    take_all() noexcept
243  
    take_all() noexcept
244  
    {
244  
    {
245  
        taken_batch batch{head_, tail_};
245  
        taken_batch batch{head_, tail_};
246  
        head_ = tail_ = nullptr;
246  
        head_ = tail_ = nullptr;
247  
        return batch;
247  
        return batch;
248  
    }
248  
    }
249  

249  

250  
    /** Dispatch a batch of taken items.
250  
    /** Dispatch a batch of taken items.
251  

251  

252  
        @param batch The batch to dispatch.
252  
        @param batch The batch to dispatch.
253  

253  

254  
        @note This is thread-safe w.r.t. push() because it doesn't
254  
        @note This is thread-safe w.r.t. push() because it doesn't
255  
            access the queue's free_list_. Frames are deleted directly
255  
            access the queue's free_list_. Frames are deleted directly
256  
            rather than recycled.
256  
            rather than recycled.
257  
    */
257  
    */
258  
    static
258  
    static
259  
    void
259  
    void
260  
    dispatch_batch(taken_batch& batch)
260  
    dispatch_batch(taken_batch& batch)
261  
    {
261  
    {
262  
        while(batch.head)
262  
        while(batch.head)
263  
        {
263  
        {
264  
            promise_type* p = batch.head;
264  
            promise_type* p = batch.head;
265  
            batch.head = p->next;
265  
            batch.head = p->next;
266  

266  

267  
            auto h = std::coroutine_handle<promise_type>::from_promise(*p);
267  
            auto h = std::coroutine_handle<promise_type>::from_promise(*p);
268  
            h.resume();
268  
            h.resume();
269  
            // Don't use h.destroy() - it would call operator delete which
269  
            // Don't use h.destroy() - it would call operator delete which
270  
            // accesses the queue's free_list_ (race with push).
270  
            // accesses the queue's free_list_ (race with push).
271  
            // Instead, manually free the frame without recycling.
271  
            // Instead, manually free the frame without recycling.
272  
            // h.address() returns the frame base (what operator new returned).
272  
            // h.address() returns the frame base (what operator new returned).
273  
            frame_prefix* prefix = static_cast<frame_prefix*>(h.address()) - 1;
273  
            frame_prefix* prefix = static_cast<frame_prefix*>(h.address()) - 1;
274  
            ::operator delete(prefix);
274  
            ::operator delete(prefix);
275  
        }
275  
        }
276  
        batch.tail = nullptr;
276  
        batch.tail = nullptr;
277  
    }
277  
    }
278  
};
278  
};
279  

279  

280  
//----------------------------------------------------------
280  
//----------------------------------------------------------
281  

281  

282  
inline
282  
inline
283  
void*
283  
void*
284  
strand_op::promise_type::operator new(
284  
strand_op::promise_type::operator new(
285  
    std::size_t size,
285  
    std::size_t size,
286  
    strand_queue& q,
286  
    strand_queue& q,
287  
    std::coroutine_handle<void>)
287  
    std::coroutine_handle<void>)
288  
{
288  
{
289  
    // Total size includes prefix
289  
    // Total size includes prefix
290  
    std::size_t alloc_size = size + sizeof(frame_prefix);
290  
    std::size_t alloc_size = size + sizeof(frame_prefix);
291  
    void* raw;
291  
    void* raw;
292  
    
292  
    
293  
    // Try to reuse from free list
293  
    // Try to reuse from free list
294  
    if(q.free_list_)
294  
    if(q.free_list_)
295  
    {
295  
    {
296  
        frame_prefix* prefix = q.free_list_;
296  
        frame_prefix* prefix = q.free_list_;
297  
        q.free_list_ = prefix->next;
297  
        q.free_list_ = prefix->next;
298  
        raw = prefix;
298  
        raw = prefix;
299  
    }
299  
    }
300  
    else
300  
    else
301  
    {
301  
    {
302  
        raw = ::operator new(alloc_size);
302  
        raw = ::operator new(alloc_size);
303  
    }
303  
    }
304  

304  

305  
    // Initialize prefix
305  
    // Initialize prefix
306  
    frame_prefix* prefix = static_cast<frame_prefix*>(raw);
306  
    frame_prefix* prefix = static_cast<frame_prefix*>(raw);
307  
    prefix->next = nullptr;
307  
    prefix->next = nullptr;
308  
    prefix->queue = &q;
308  
    prefix->queue = &q;
309  
    prefix->alloc_size = alloc_size;
309  
    prefix->alloc_size = alloc_size;
310  

310  

311  
    // Return pointer AFTER the prefix (this is where coroutine frame goes)
311  
    // Return pointer AFTER the prefix (this is where coroutine frame goes)
312  
    return prefix + 1;
312  
    return prefix + 1;
313  
}
313  
}
314  

314  

315  
inline
315  
inline
316  
void
316  
void
317  
strand_op::promise_type::operator delete(void* p, std::size_t)
317  
strand_op::promise_type::operator delete(void* p, std::size_t)
318  
{
318  
{
319  
    // Calculate back to get the prefix
319  
    // Calculate back to get the prefix
320  
    frame_prefix* prefix = static_cast<frame_prefix*>(p) - 1;
320  
    frame_prefix* prefix = static_cast<frame_prefix*>(p) - 1;
321  

321  

322  
    // Add to free list
322  
    // Add to free list
323  
    prefix->next = prefix->queue->free_list_;
323  
    prefix->next = prefix->queue->free_list_;
324  
    prefix->queue->free_list_ = prefix;
324  
    prefix->queue->free_list_ = prefix;
325  
}
325  
}
326  

326  

327  
} // namespace detail
327  
} // namespace detail
328  
} // namespace capy
328  
} // namespace capy
329  
} // namespace boost
329  
} // namespace boost
330  

330  

331  
#endif
331  
#endif