1  
//
1  
//
2  
// Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
2  
// Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/capy
7  
// Official repository: https://github.com/cppalliance/capy
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_CAPY_IO_ANY_WRITE_STREAM_HPP
10  
#ifndef BOOST_CAPY_IO_ANY_WRITE_STREAM_HPP
11  
#define BOOST_CAPY_IO_ANY_WRITE_STREAM_HPP
11  
#define BOOST_CAPY_IO_ANY_WRITE_STREAM_HPP
12  

12  

13  
#include <boost/capy/detail/config.hpp>
13  
#include <boost/capy/detail/config.hpp>
14  
#include <boost/capy/detail/await_suspend_helper.hpp>
14  
#include <boost/capy/detail/await_suspend_helper.hpp>
15  
#include <boost/capy/buffers.hpp>
15  
#include <boost/capy/buffers.hpp>
16  
#include <boost/capy/buffers/buffer_param.hpp>
16  
#include <boost/capy/buffers/buffer_param.hpp>
17  
#include <boost/capy/concept/io_awaitable.hpp>
17  
#include <boost/capy/concept/io_awaitable.hpp>
18  
#include <boost/capy/concept/write_stream.hpp>
18  
#include <boost/capy/concept/write_stream.hpp>
19  
#include <boost/capy/coro.hpp>
19  
#include <boost/capy/coro.hpp>
20  
#include <boost/capy/ex/executor_ref.hpp>
20  
#include <boost/capy/ex/executor_ref.hpp>
21  
#include <boost/capy/io_result.hpp>
21  
#include <boost/capy/io_result.hpp>
22  

22  

23  
#include <concepts>
23  
#include <concepts>
24  
#include <coroutine>
24  
#include <coroutine>
25  
#include <cstddef>
25  
#include <cstddef>
26  
#include <new>
26  
#include <new>
27  
#include <span>
27  
#include <span>
28  
#include <stop_token>
28  
#include <stop_token>
29  
#include <system_error>
29  
#include <system_error>
30  
#include <utility>
30  
#include <utility>
31  

31  

32  
namespace boost {
32  
namespace boost {
33  
namespace capy {
33  
namespace capy {
34  

34  

35  
/** Type-erased wrapper for any WriteStream.
35  
/** Type-erased wrapper for any WriteStream.
36  

36  

37  
    This class provides type erasure for any type satisfying the
37  
    This class provides type erasure for any type satisfying the
38  
    @ref WriteStream concept, enabling runtime polymorphism for
38  
    @ref WriteStream concept, enabling runtime polymorphism for
39  
    write operations. It uses cached awaitable storage to achieve
39  
    write operations. It uses cached awaitable storage to achieve
40  
    zero steady-state allocation after construction.
40  
    zero steady-state allocation after construction.
41  

41  

42  
    The wrapper supports two construction modes:
42  
    The wrapper supports two construction modes:
43  
    - **Owning**: Pass by value to transfer ownership. The wrapper
43  
    - **Owning**: Pass by value to transfer ownership. The wrapper
44  
      allocates storage and owns the stream.
44  
      allocates storage and owns the stream.
45  
    - **Reference**: Pass a pointer to wrap without ownership. The
45  
    - **Reference**: Pass a pointer to wrap without ownership. The
46  
      pointed-to stream must outlive this wrapper.
46  
      pointed-to stream must outlive this wrapper.
47  

47  

48  
    @par Awaitable Preallocation
48  
    @par Awaitable Preallocation
49  
    The constructor preallocates storage for the type-erased awaitable.
49  
    The constructor preallocates storage for the type-erased awaitable.
50  
    This reserves all virtual address space at server startup
50  
    This reserves all virtual address space at server startup
51  
    so memory usage can be measured up front, rather than
51  
    so memory usage can be measured up front, rather than
52  
    allocating piecemeal as traffic arrives.
52  
    allocating piecemeal as traffic arrives.
53  

53  

54  
    @par Thread Safety
54  
    @par Thread Safety
55  
    Not thread-safe. Concurrent operations on the same wrapper
55  
    Not thread-safe. Concurrent operations on the same wrapper
56  
    are undefined behavior.
56  
    are undefined behavior.
57  

57  

58  
    @par Example
58  
    @par Example
59  
    @code
59  
    @code
60  
    // Owning - takes ownership of the stream
60  
    // Owning - takes ownership of the stream
61  
    any_write_stream stream(socket{ioc});
61  
    any_write_stream stream(socket{ioc});
62  

62  

63  
    // Reference - wraps without ownership
63  
    // Reference - wraps without ownership
64  
    socket sock(ioc);
64  
    socket sock(ioc);
65  
    any_write_stream stream(&sock);
65  
    any_write_stream stream(&sock);
66  

66  

67  
    const_buffer buf(data, size);
67  
    const_buffer buf(data, size);
68  
    auto [ec, n] = co_await stream.write_some(std::span(&buf, 1));
68  
    auto [ec, n] = co_await stream.write_some(std::span(&buf, 1));
69  
    @endcode
69  
    @endcode
70  

70  

71  
    @see any_read_stream, any_stream, WriteStream
71  
    @see any_read_stream, any_stream, WriteStream
72  
*/
72  
*/
73  
class any_write_stream
73  
class any_write_stream
74  
{
74  
{
75  
    struct vtable;
75  
    struct vtable;
76  
    struct awaitable_ops;
76  
    struct awaitable_ops;
77  

77  

78  
    template<WriteStream S>
78  
    template<WriteStream S>
79  
    struct vtable_for_impl;
79  
    struct vtable_for_impl;
80  

80  

81  
    void* stream_ = nullptr;
81  
    void* stream_ = nullptr;
82  
    vtable const* vt_ = nullptr;
82  
    vtable const* vt_ = nullptr;
83  
    void* cached_awaitable_ = nullptr;
83  
    void* cached_awaitable_ = nullptr;
84  
    void* storage_ = nullptr;
84  
    void* storage_ = nullptr;
85  
    awaitable_ops const* active_ops_ = nullptr;
85  
    awaitable_ops const* active_ops_ = nullptr;
86  

86  

87  
public:
87  
public:
88  
    /** Destructor.
88  
    /** Destructor.
89  

89  

90  
        Destroys the owned stream (if any) and releases the cached
90  
        Destroys the owned stream (if any) and releases the cached
91  
        awaitable storage.
91  
        awaitable storage.
92  
    */
92  
    */
93  
    ~any_write_stream();
93  
    ~any_write_stream();
94  

94  

95  
    /** Default constructor.
95  
    /** Default constructor.
96  

96  

97  
        Constructs an empty wrapper. Operations on a default-constructed
97  
        Constructs an empty wrapper. Operations on a default-constructed
98  
        wrapper result in undefined behavior.
98  
        wrapper result in undefined behavior.
99  
    */
99  
    */
100  
    any_write_stream() = default;
100  
    any_write_stream() = default;
101  

101  

102  
    /** Non-copyable.
102  
    /** Non-copyable.
103  

103  

104  
        The awaitable cache is per-instance and cannot be shared.
104  
        The awaitable cache is per-instance and cannot be shared.
105  
    */
105  
    */
106  
    any_write_stream(any_write_stream const&) = delete;
106  
    any_write_stream(any_write_stream const&) = delete;
107  
    any_write_stream& operator=(any_write_stream const&) = delete;
107  
    any_write_stream& operator=(any_write_stream const&) = delete;
108  

108  

109  
    /** Move constructor.
109  
    /** Move constructor.
110  

110  

111  
        Transfers ownership of the wrapped stream (if owned) and
111  
        Transfers ownership of the wrapped stream (if owned) and
112  
        cached awaitable storage from `other`. After the move, `other` is
112  
        cached awaitable storage from `other`. After the move, `other` is
113  
        in a default-constructed state.
113  
        in a default-constructed state.
114  

114  

115  
        @param other The wrapper to move from.
115  
        @param other The wrapper to move from.
116  
    */
116  
    */
117  
    any_write_stream(any_write_stream&& other) noexcept
117  
    any_write_stream(any_write_stream&& other) noexcept
118  
        : stream_(std::exchange(other.stream_, nullptr))
118  
        : stream_(std::exchange(other.stream_, nullptr))
119  
        , vt_(std::exchange(other.vt_, nullptr))
119  
        , vt_(std::exchange(other.vt_, nullptr))
120  
        , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
120  
        , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
121  
        , storage_(std::exchange(other.storage_, nullptr))
121  
        , storage_(std::exchange(other.storage_, nullptr))
122  
        , active_ops_(std::exchange(other.active_ops_, nullptr))
122  
        , active_ops_(std::exchange(other.active_ops_, nullptr))
123  
    {
123  
    {
124  
    }
124  
    }
125  

125  

126  
    /** Move assignment operator.
126  
    /** Move assignment operator.
127  

127  

128  
        Destroys any owned stream and releases existing resources,
128  
        Destroys any owned stream and releases existing resources,
129  
        then transfers ownership from `other`.
129  
        then transfers ownership from `other`.
130  

130  

131  
        @param other The wrapper to move from.
131  
        @param other The wrapper to move from.
132  
        @return Reference to this wrapper.
132  
        @return Reference to this wrapper.
133  
    */
133  
    */
134  
    any_write_stream&
134  
    any_write_stream&
135  
    operator=(any_write_stream&& other) noexcept;
135  
    operator=(any_write_stream&& other) noexcept;
136  

136  

137  
    /** Construct by taking ownership of a WriteStream.
137  
    /** Construct by taking ownership of a WriteStream.
138  

138  

139  
        Allocates storage and moves the stream into this wrapper.
139  
        Allocates storage and moves the stream into this wrapper.
140  
        The wrapper owns the stream and will destroy it.
140  
        The wrapper owns the stream and will destroy it.
141  

141  

142  
        @param s The stream to take ownership of.
142  
        @param s The stream to take ownership of.
143  
    */
143  
    */
144  
    template<WriteStream S>
144  
    template<WriteStream S>
145  
        requires (!std::same_as<std::decay_t<S>, any_write_stream>)
145  
        requires (!std::same_as<std::decay_t<S>, any_write_stream>)
146  
    any_write_stream(S s);
146  
    any_write_stream(S s);
147  

147  

148  
    /** Construct by wrapping a WriteStream without ownership.
148  
    /** Construct by wrapping a WriteStream without ownership.
149  

149  

150  
        Wraps the given stream by pointer. The stream must remain
150  
        Wraps the given stream by pointer. The stream must remain
151  
        valid for the lifetime of this wrapper.
151  
        valid for the lifetime of this wrapper.
152  

152  

153  
        @param s Pointer to the stream to wrap.
153  
        @param s Pointer to the stream to wrap.
154  
    */
154  
    */
155  
    template<WriteStream S>
155  
    template<WriteStream S>
156  
    any_write_stream(S* s);
156  
    any_write_stream(S* s);
157  

157  

158  
    /** Check if the wrapper contains a valid stream.
158  
    /** Check if the wrapper contains a valid stream.
159  

159  

160  
        @return `true` if wrapping a stream, `false` if default-constructed
160  
        @return `true` if wrapping a stream, `false` if default-constructed
161  
            or moved-from.
161  
            or moved-from.
162  
    */
162  
    */
163  
    bool
163  
    bool
164  
    has_value() const noexcept
164  
    has_value() const noexcept
165  
    {
165  
    {
166  
        return stream_ != nullptr;
166  
        return stream_ != nullptr;
167  
    }
167  
    }
168  

168  

169  
    /** Check if the wrapper contains a valid stream.
169  
    /** Check if the wrapper contains a valid stream.
170  

170  

171  
        @return `true` if wrapping a stream, `false` if default-constructed
171  
        @return `true` if wrapping a stream, `false` if default-constructed
172  
            or moved-from.
172  
            or moved-from.
173  
    */
173  
    */
174  
    explicit
174  
    explicit
175  
    operator bool() const noexcept
175  
    operator bool() const noexcept
176  
    {
176  
    {
177  
        return has_value();
177  
        return has_value();
178  
    }
178  
    }
179  

179  

180  
    /** Initiate an asynchronous write operation.
180  
    /** Initiate an asynchronous write operation.
181  

181  

182  
        Writes data from the provided buffer sequence. The operation
182  
        Writes data from the provided buffer sequence. The operation
183  
        completes when at least one byte has been written, or an error
183  
        completes when at least one byte has been written, or an error
184  
        occurs.
184  
        occurs.
185  

185  

186  
        @param buffers The buffer sequence containing data to write.
186  
        @param buffers The buffer sequence containing data to write.
187  
            Passed by value to ensure the sequence lives in the
187  
            Passed by value to ensure the sequence lives in the
188  
            coroutine frame across suspension points.
188  
            coroutine frame across suspension points.
189  

189  

190  
        @return An awaitable yielding `(error_code,std::size_t)`.
190  
        @return An awaitable yielding `(error_code,std::size_t)`.
191  

191  

192  
        @par Preconditions
192  
        @par Preconditions
193  
        The wrapper must contain a valid stream (`has_value() == true`).
193  
        The wrapper must contain a valid stream (`has_value() == true`).
194  
    */
194  
    */
195  
    template<ConstBufferSequence CB>
195  
    template<ConstBufferSequence CB>
196  
    auto
196  
    auto
197  
    write_some(CB buffers);
197  
    write_some(CB buffers);
198  

198  

199  
protected:
199  
protected:
200  
    /** Rebind to a new stream after move.
200  
    /** Rebind to a new stream after move.
201  

201  

202  
        Updates the internal pointer to reference a new stream object.
202  
        Updates the internal pointer to reference a new stream object.
203  
        Used by owning wrappers after move assignment when the owned
203  
        Used by owning wrappers after move assignment when the owned
204  
        object has moved to a new location.
204  
        object has moved to a new location.
205  

205  

206  
        @param new_stream The new stream to bind to. Must be the same
206  
        @param new_stream The new stream to bind to. Must be the same
207  
            type as the original stream.
207  
            type as the original stream.
208  

208  

209  
        @note Terminates if called with a stream of different type
209  
        @note Terminates if called with a stream of different type
210  
            than the original.
210  
            than the original.
211  
    */
211  
    */
212  
    template<WriteStream S>
212  
    template<WriteStream S>
213  
    void
213  
    void
214  
    rebind(S& new_stream) noexcept
214  
    rebind(S& new_stream) noexcept
215  
    {
215  
    {
216  
        if(vt_ != &vtable_for_impl<S>::value)
216  
        if(vt_ != &vtable_for_impl<S>::value)
217  
            std::terminate();
217  
            std::terminate();
218  
        stream_ = &new_stream;
218  
        stream_ = &new_stream;
219  
    }
219  
    }
220  
};
220  
};
221  

221  

222  
//----------------------------------------------------------
222  
//----------------------------------------------------------
223  

223  

224  
struct any_write_stream::awaitable_ops
224  
struct any_write_stream::awaitable_ops
225  
{
225  
{
226  
    bool (*await_ready)(void*);
226  
    bool (*await_ready)(void*);
227  
    coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
227  
    coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
228  
    io_result<std::size_t> (*await_resume)(void*);
228  
    io_result<std::size_t> (*await_resume)(void*);
229  
    void (*destroy)(void*) noexcept;
229  
    void (*destroy)(void*) noexcept;
230  
};
230  
};
231  

231  

232  
struct any_write_stream::vtable
232  
struct any_write_stream::vtable
233  
{
233  
{
234  
    void (*destroy)(void*) noexcept;
234  
    void (*destroy)(void*) noexcept;
235  
    std::size_t awaitable_size;
235  
    std::size_t awaitable_size;
236  
    std::size_t awaitable_align;
236  
    std::size_t awaitable_align;
237  
    awaitable_ops const* (*construct_awaitable)(
237  
    awaitable_ops const* (*construct_awaitable)(
238  
        void* stream,
238  
        void* stream,
239  
        void* storage,
239  
        void* storage,
240  
        std::span<const_buffer const> buffers);
240  
        std::span<const_buffer const> buffers);
241  
};
241  
};
242  

242  

243  
template<WriteStream S>
243  
template<WriteStream S>
244  
struct any_write_stream::vtable_for_impl
244  
struct any_write_stream::vtable_for_impl
245  
{
245  
{
246  
    using Awaitable = decltype(std::declval<S&>().write_some(
246  
    using Awaitable = decltype(std::declval<S&>().write_some(
247  
        std::span<const_buffer const>{}));
247  
        std::span<const_buffer const>{}));
248  

248  

249  
    static void
249  
    static void
250  
    do_destroy_impl(void* stream) noexcept
250  
    do_destroy_impl(void* stream) noexcept
251  
    {
251  
    {
252  
        static_cast<S*>(stream)->~S();
252  
        static_cast<S*>(stream)->~S();
253  
    }
253  
    }
254  

254  

255  
    static awaitable_ops const*
255  
    static awaitable_ops const*
256  
    construct_awaitable_impl(
256  
    construct_awaitable_impl(
257  
        void* stream,
257  
        void* stream,
258  
        void* storage,
258  
        void* storage,
259  
        std::span<const_buffer const> buffers)
259  
        std::span<const_buffer const> buffers)
260  
    {
260  
    {
261  
        auto& s = *static_cast<S*>(stream);
261  
        auto& s = *static_cast<S*>(stream);
262  
        ::new(storage) Awaitable(s.write_some(buffers));
262  
        ::new(storage) Awaitable(s.write_some(buffers));
263  

263  

264  
        static constexpr awaitable_ops ops = {
264  
        static constexpr awaitable_ops ops = {
265  
            +[](void* p) {
265  
            +[](void* p) {
266  
                return static_cast<Awaitable*>(p)->await_ready();
266  
                return static_cast<Awaitable*>(p)->await_ready();
267  
            },
267  
            },
268  
            +[](void* p, coro h, executor_ref ex, std::stop_token token) {
268  
            +[](void* p, coro h, executor_ref ex, std::stop_token token) {
269  
                return detail::call_await_suspend(
269  
                return detail::call_await_suspend(
270  
                    static_cast<Awaitable*>(p), h, ex, token);
270  
                    static_cast<Awaitable*>(p), h, ex, token);
271  
            },
271  
            },
272  
            +[](void* p) {
272  
            +[](void* p) {
273  
                return static_cast<Awaitable*>(p)->await_resume();
273  
                return static_cast<Awaitable*>(p)->await_resume();
274  
            },
274  
            },
275  
            +[](void* p) noexcept {
275  
            +[](void* p) noexcept {
276  
                static_cast<Awaitable*>(p)->~Awaitable();
276  
                static_cast<Awaitable*>(p)->~Awaitable();
277  
            }
277  
            }
278  
        };
278  
        };
279  
        return &ops;
279  
        return &ops;
280  
    }
280  
    }
281  

281  

282  
    static constexpr vtable value = {
282  
    static constexpr vtable value = {
283  
        &do_destroy_impl,
283  
        &do_destroy_impl,
284  
        sizeof(Awaitable),
284  
        sizeof(Awaitable),
285  
        alignof(Awaitable),
285  
        alignof(Awaitable),
286  
        &construct_awaitable_impl
286  
        &construct_awaitable_impl
287  
    };
287  
    };
288  
};
288  
};
289  

289  

290  
//----------------------------------------------------------
290  
//----------------------------------------------------------
291  

291  

292  
inline
292  
inline
293  
any_write_stream::~any_write_stream()
293  
any_write_stream::~any_write_stream()
294  
{
294  
{
295  
    if(storage_)
295  
    if(storage_)
296  
    {
296  
    {
297  
        vt_->destroy(stream_);
297  
        vt_->destroy(stream_);
298  
        ::operator delete(storage_);
298  
        ::operator delete(storage_);
299  
    }
299  
    }
300  
    if(cached_awaitable_)
300  
    if(cached_awaitable_)
301  
        ::operator delete(cached_awaitable_);
301  
        ::operator delete(cached_awaitable_);
302  
}
302  
}
303  

303  

304  
inline any_write_stream&
304  
inline any_write_stream&
305  
any_write_stream::operator=(any_write_stream&& other) noexcept
305  
any_write_stream::operator=(any_write_stream&& other) noexcept
306  
{
306  
{
307  
    if(this != &other)
307  
    if(this != &other)
308  
    {
308  
    {
309  
        if(storage_)
309  
        if(storage_)
310  
        {
310  
        {
311  
            vt_->destroy(stream_);
311  
            vt_->destroy(stream_);
312  
            ::operator delete(storage_);
312  
            ::operator delete(storage_);
313  
        }
313  
        }
314  
        if(cached_awaitable_)
314  
        if(cached_awaitable_)
315  
            ::operator delete(cached_awaitable_);
315  
            ::operator delete(cached_awaitable_);
316  
        stream_ = std::exchange(other.stream_, nullptr);
316  
        stream_ = std::exchange(other.stream_, nullptr);
317  
        vt_ = std::exchange(other.vt_, nullptr);
317  
        vt_ = std::exchange(other.vt_, nullptr);
318  
        cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
318  
        cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
319  
        storage_ = std::exchange(other.storage_, nullptr);
319  
        storage_ = std::exchange(other.storage_, nullptr);
320  
        active_ops_ = std::exchange(other.active_ops_, nullptr);
320  
        active_ops_ = std::exchange(other.active_ops_, nullptr);
321  
    }
321  
    }
322  
    return *this;
322  
    return *this;
323  
}
323  
}
324  

324  

325  
template<WriteStream S>
325  
template<WriteStream S>
326  
    requires (!std::same_as<std::decay_t<S>, any_write_stream>)
326  
    requires (!std::same_as<std::decay_t<S>, any_write_stream>)
327  
any_write_stream::any_write_stream(S s)
327  
any_write_stream::any_write_stream(S s)
328  
    : vt_(&vtable_for_impl<S>::value)
328  
    : vt_(&vtable_for_impl<S>::value)
329  
{
329  
{
330  
    struct guard {
330  
    struct guard {
331  
        any_write_stream* self;
331  
        any_write_stream* self;
332  
        bool committed = false;
332  
        bool committed = false;
333  
        ~guard() {
333  
        ~guard() {
334  
            if(!committed && self->storage_) {
334  
            if(!committed && self->storage_) {
335  
                self->vt_->destroy(self->stream_);
335  
                self->vt_->destroy(self->stream_);
336  
                ::operator delete(self->storage_);
336  
                ::operator delete(self->storage_);
337  
                self->storage_ = nullptr;
337  
                self->storage_ = nullptr;
338  
                self->stream_ = nullptr;
338  
                self->stream_ = nullptr;
339  
            }
339  
            }
340  
        }
340  
        }
341  
    } g{this};
341  
    } g{this};
342  

342  

343  
    storage_ = ::operator new(sizeof(S));
343  
    storage_ = ::operator new(sizeof(S));
344  
    stream_ = ::new(storage_) S(std::move(s));
344  
    stream_ = ::new(storage_) S(std::move(s));
345  

345  

346  
    // Preallocate the awaitable storage
346  
    // Preallocate the awaitable storage
347  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
347  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
348  

348  

349  
    g.committed = true;
349  
    g.committed = true;
350  
}
350  
}
351  

351  

352  
template<WriteStream S>
352  
template<WriteStream S>
353  
any_write_stream::any_write_stream(S* s)
353  
any_write_stream::any_write_stream(S* s)
354  
    : stream_(s)
354  
    : stream_(s)
355  
    , vt_(&vtable_for_impl<S>::value)
355  
    , vt_(&vtable_for_impl<S>::value)
356  
{
356  
{
357  
    // Preallocate the awaitable storage
357  
    // Preallocate the awaitable storage
358  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
358  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
359  
}
359  
}
360  

360  

361  
//----------------------------------------------------------
361  
//----------------------------------------------------------
362  

362  

363  
template<ConstBufferSequence CB>
363  
template<ConstBufferSequence CB>
364  
auto
364  
auto
365  
any_write_stream::write_some(CB buffers)
365  
any_write_stream::write_some(CB buffers)
366  
{
366  
{
367  
    struct awaitable
367  
    struct awaitable
368  
    {
368  
    {
369  
        any_write_stream* self_;
369  
        any_write_stream* self_;
370  
        buffer_param<CB> bp_;
370  
        buffer_param<CB> bp_;
371  

371  

372  
        bool
372  
        bool
373  
        await_ready() const noexcept
373  
        await_ready() const noexcept
374  
        {
374  
        {
375  
            return false;
375  
            return false;
376  
        }
376  
        }
377  

377  

378  
        coro
378  
        coro
379  
        await_suspend(coro h, executor_ref ex, std::stop_token token)
379  
        await_suspend(coro h, executor_ref ex, std::stop_token token)
380  
        {
380  
        {
381  
            // Construct the underlying awaitable into cached storage
381  
            // Construct the underlying awaitable into cached storage
382  
            self_->active_ops_ = self_->vt_->construct_awaitable(
382  
            self_->active_ops_ = self_->vt_->construct_awaitable(
383  
                self_->stream_,
383  
                self_->stream_,
384  
                self_->cached_awaitable_,
384  
                self_->cached_awaitable_,
385  
                bp_.data());
385  
                bp_.data());
386  

386  

387  
            // Check if underlying is immediately ready
387  
            // Check if underlying is immediately ready
388  
            if(self_->active_ops_->await_ready(self_->cached_awaitable_))
388  
            if(self_->active_ops_->await_ready(self_->cached_awaitable_))
389  
                return h;
389  
                return h;
390  

390  

391  
            // Forward to underlying awaitable
391  
            // Forward to underlying awaitable
392  
            return self_->active_ops_->await_suspend(
392  
            return self_->active_ops_->await_suspend(
393  
                self_->cached_awaitable_, h, ex, token);
393  
                self_->cached_awaitable_, h, ex, token);
394  
        }
394  
        }
395  

395  

396  
        io_result<std::size_t>
396  
        io_result<std::size_t>
397  
        await_resume()
397  
        await_resume()
398  
        {
398  
        {
399  
            struct guard {
399  
            struct guard {
400  
                any_write_stream* self;
400  
                any_write_stream* self;
401  
                ~guard() {
401  
                ~guard() {
402  
                    self->active_ops_->destroy(self->cached_awaitable_);
402  
                    self->active_ops_->destroy(self->cached_awaitable_);
403  
                    self->active_ops_ = nullptr;
403  
                    self->active_ops_ = nullptr;
404  
                }
404  
                }
405  
            } g{self_};
405  
            } g{self_};
406  
            return self_->active_ops_->await_resume(
406  
            return self_->active_ops_->await_resume(
407  
                self_->cached_awaitable_);
407  
                self_->cached_awaitable_);
408  
        }
408  
        }
409  
    };
409  
    };
410  
    return awaitable{this, buffer_param<CB>(buffers)};
410  
    return awaitable{this, buffer_param<CB>(buffers)};
411  
}
411  
}
412  

412  

413  
} // namespace capy
413  
} // namespace capy
414  
} // namespace boost
414  
} // namespace boost
415  

415  

416  
#endif
416  
#endif