1  
//
1  
//
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/capy
7  
// Official repository: https://github.com/cppalliance/capy
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_CAPY_TEST_BUFFER_SINK_HPP
10  
#ifndef BOOST_CAPY_TEST_BUFFER_SINK_HPP
11  
#define BOOST_CAPY_TEST_BUFFER_SINK_HPP
11  
#define BOOST_CAPY_TEST_BUFFER_SINK_HPP
12  

12  

13  
#include <boost/capy/detail/config.hpp>
13  
#include <boost/capy/detail/config.hpp>
14  
#include <boost/capy/buffers.hpp>
14  
#include <boost/capy/buffers.hpp>
15  
#include <boost/capy/buffers/make_buffer.hpp>
15  
#include <boost/capy/buffers/make_buffer.hpp>
16  
#include <boost/capy/coro.hpp>
16  
#include <boost/capy/coro.hpp>
17  
#include <boost/capy/ex/executor_ref.hpp>
17  
#include <boost/capy/ex/executor_ref.hpp>
18  
#include <boost/capy/io_result.hpp>
18  
#include <boost/capy/io_result.hpp>
19  
#include <boost/capy/test/fuse.hpp>
19  
#include <boost/capy/test/fuse.hpp>
20  

20  

21  
#include <algorithm>
21  
#include <algorithm>
22  
#include <span>
22  
#include <span>
23  
#include <stop_token>
23  
#include <stop_token>
24  
#include <string>
24  
#include <string>
25  
#include <string_view>
25  
#include <string_view>
26  

26  

27  
namespace boost {
27  
namespace boost {
28  
namespace capy {
28  
namespace capy {
29  
namespace test {
29  
namespace test {
30  

30  

31  
/** A mock buffer sink for testing callee-owns-buffers write operations.
31  
/** A mock buffer sink for testing callee-owns-buffers write operations.
32  

32  

33  
    Use this to verify code that writes data using the callee-owns-buffers
33  
    Use this to verify code that writes data using the callee-owns-buffers
34  
    pattern without needing real I/O. Call @ref prepare to get writable
34  
    pattern without needing real I/O. Call @ref prepare to get writable
35  
    buffers, write into them, then call @ref commit to finalize. The
35  
    buffers, write into them, then call @ref commit to finalize. The
36  
    associated @ref fuse enables error injection at controlled points.
36  
    associated @ref fuse enables error injection at controlled points.
37  

37  

38  
    This class satisfies the @ref BufferSink concept by providing
38  
    This class satisfies the @ref BufferSink concept by providing
39  
    internal storage that callers write into directly.
39  
    internal storage that callers write into directly.
40  

40  

41  
    @par Thread Safety
41  
    @par Thread Safety
42  
    Not thread-safe.
42  
    Not thread-safe.
43  

43  

44  
    @par Example
44  
    @par Example
45  
    @code
45  
    @code
46  
    fuse f;
46  
    fuse f;
47  
    buffer_sink bs( f );
47  
    buffer_sink bs( f );
48  

48  

49  
    auto r = f.armed( [&]( fuse& ) -> task<void> {
49  
    auto r = f.armed( [&]( fuse& ) -> task<void> {
50  
        mutable_buffer arr[16];
50  
        mutable_buffer arr[16];
51  
        std::size_t count = bs.prepare( arr, 16 );
51  
        std::size_t count = bs.prepare( arr, 16 );
52  
        if( count == 0 )
52  
        if( count == 0 )
53  
            co_return;
53  
            co_return;
54  

54  

55  
        // Write data into arr[0]
55  
        // Write data into arr[0]
56  
        std::memcpy( arr[0].data(), "Hello", 5 );
56  
        std::memcpy( arr[0].data(), "Hello", 5 );
57  

57  

58  
        auto [ec] = co_await bs.commit( 5 );
58  
        auto [ec] = co_await bs.commit( 5 );
59  
        if( ec )
59  
        if( ec )
60  
            co_return;
60  
            co_return;
61  

61  

62  
        auto [ec2] = co_await bs.commit_eof();
62  
        auto [ec2] = co_await bs.commit_eof();
63  
        // bs.data() returns "Hello"
63  
        // bs.data() returns "Hello"
64  
    } );
64  
    } );
65  
    @endcode
65  
    @endcode
66  

66  

67  
    @see fuse, BufferSink
67  
    @see fuse, BufferSink
68  
*/
68  
*/
69  
class buffer_sink
69  
class buffer_sink
70  
{
70  
{
71  
    fuse* f_;
71  
    fuse* f_;
72  
    std::string data_;
72  
    std::string data_;
73  
    std::string prepare_buf_;
73  
    std::string prepare_buf_;
74  
    std::size_t prepare_size_ = 0;
74  
    std::size_t prepare_size_ = 0;
75  
    std::size_t max_prepare_size_;
75  
    std::size_t max_prepare_size_;
76  
    bool eof_called_ = false;
76  
    bool eof_called_ = false;
77  

77  

78  
public:
78  
public:
79  
    /** Construct a buffer sink.
79  
    /** Construct a buffer sink.
80  

80  

81  
        @param f The fuse used to inject errors during commits.
81  
        @param f The fuse used to inject errors during commits.
82  

82  

83  
        @param max_prepare_size Maximum bytes available per prepare.
83  
        @param max_prepare_size Maximum bytes available per prepare.
84  
        Use to simulate limited buffer space.
84  
        Use to simulate limited buffer space.
85  
    */
85  
    */
86  
    explicit buffer_sink(
86  
    explicit buffer_sink(
87  
        fuse& f,
87  
        fuse& f,
88  
        std::size_t max_prepare_size = 4096) noexcept
88  
        std::size_t max_prepare_size = 4096) noexcept
89  
        : f_(&f)
89  
        : f_(&f)
90  
        , max_prepare_size_(max_prepare_size)
90  
        , max_prepare_size_(max_prepare_size)
91  
    {
91  
    {
92  
        prepare_buf_.resize(max_prepare_size_);
92  
        prepare_buf_.resize(max_prepare_size_);
93  
    }
93  
    }
94  

94  

95  
    /// Return the written data as a string view.
95  
    /// Return the written data as a string view.
96  
    std::string_view
96  
    std::string_view
97  
    data() const noexcept
97  
    data() const noexcept
98  
    {
98  
    {
99  
        return data_;
99  
        return data_;
100  
    }
100  
    }
101  

101  

102  
    /// Return the number of bytes written.
102  
    /// Return the number of bytes written.
103  
    std::size_t
103  
    std::size_t
104  
    size() const noexcept
104  
    size() const noexcept
105  
    {
105  
    {
106  
        return data_.size();
106  
        return data_.size();
107  
    }
107  
    }
108  

108  

109  
    /// Return whether commit_eof has been called.
109  
    /// Return whether commit_eof has been called.
110  
    bool
110  
    bool
111  
    eof_called() const noexcept
111  
    eof_called() const noexcept
112  
    {
112  
    {
113  
        return eof_called_;
113  
        return eof_called_;
114  
    }
114  
    }
115  

115  

116  
    /// Clear all data and reset state.
116  
    /// Clear all data and reset state.
117  
    void
117  
    void
118  
    clear() noexcept
118  
    clear() noexcept
119  
    {
119  
    {
120  
        data_.clear();
120  
        data_.clear();
121  
        prepare_size_ = 0;
121  
        prepare_size_ = 0;
122  
        eof_called_ = false;
122  
        eof_called_ = false;
123  
    }
123  
    }
124  

124  

125  
    /** Prepare writable buffers.
125  
    /** Prepare writable buffers.
126  

126  

127  
        Fills the provided span with mutable buffer descriptors pointing
127  
        Fills the provided span with mutable buffer descriptors pointing
128  
        to internal storage. The caller writes data into these buffers,
128  
        to internal storage. The caller writes data into these buffers,
129  
        then calls @ref commit to finalize.
129  
        then calls @ref commit to finalize.
130  

130  

131  
        @param dest Span of mutable_buffer to fill.
131  
        @param dest Span of mutable_buffer to fill.
132  

132  

133  
        @return A span of filled buffers (empty or 1 buffer in this implementation).
133  
        @return A span of filled buffers (empty or 1 buffer in this implementation).
134  
    */
134  
    */
135  
    std::span<mutable_buffer>
135  
    std::span<mutable_buffer>
136  
    prepare(std::span<mutable_buffer> dest)
136  
    prepare(std::span<mutable_buffer> dest)
137  
    {
137  
    {
138  
        if(dest.empty())
138  
        if(dest.empty())
139  
            return {};
139  
            return {};
140  

140  

141  
        prepare_size_ = max_prepare_size_;
141  
        prepare_size_ = max_prepare_size_;
142  
        dest[0] = make_buffer(prepare_buf_.data(), prepare_size_);
142  
        dest[0] = make_buffer(prepare_buf_.data(), prepare_size_);
143  
        return dest.first(1);
143  
        return dest.first(1);
144  
    }
144  
    }
145  

145  

146  
    /** Commit bytes written to the prepared buffers.
146  
    /** Commit bytes written to the prepared buffers.
147  

147  

148  
        Transfers `n` bytes from the prepared buffer to the internal
148  
        Transfers `n` bytes from the prepared buffer to the internal
149  
        data buffer. Before committing, the attached @ref fuse is
149  
        data buffer. Before committing, the attached @ref fuse is
150  
        consulted to possibly inject an error for testing fault scenarios.
150  
        consulted to possibly inject an error for testing fault scenarios.
151  

151  

152  
        @param n The number of bytes to commit.
152  
        @param n The number of bytes to commit.
153  

153  

154  
        @return An awaitable yielding `(error_code)`.
154  
        @return An awaitable yielding `(error_code)`.
155  

155  

156  
        @see fuse
156  
        @see fuse
157  
    */
157  
    */
158  
    auto
158  
    auto
159  
    commit(std::size_t n)
159  
    commit(std::size_t n)
160  
    {
160  
    {
161  
        struct awaitable
161  
        struct awaitable
162  
        {
162  
        {
163  
            buffer_sink* self_;
163  
            buffer_sink* self_;
164  
            std::size_t n_;
164  
            std::size_t n_;
165  

165  

166  
            bool await_ready() const noexcept { return true; }
166  
            bool await_ready() const noexcept { return true; }
167  

167  

168  
            void await_suspend(
168  
            void await_suspend(
169  
                coro,
169  
                coro,
170  
                executor_ref,
170  
                executor_ref,
171  
                std::stop_token) const noexcept
171  
                std::stop_token) const noexcept
172  
            {
172  
            {
173  
            }
173  
            }
174  

174  

175  
            io_result<>
175  
            io_result<>
176  
            await_resume()
176  
            await_resume()
177  
            {
177  
            {
178  
                auto ec = self_->f_->maybe_fail();
178  
                auto ec = self_->f_->maybe_fail();
179  
                if(ec)
179  
                if(ec)
180  
                    return {ec};
180  
                    return {ec};
181  

181  

182  
                std::size_t to_commit = (std::min)(n_, self_->prepare_size_);
182  
                std::size_t to_commit = (std::min)(n_, self_->prepare_size_);
183  
                self_->data_.append(self_->prepare_buf_.data(), to_commit);
183  
                self_->data_.append(self_->prepare_buf_.data(), to_commit);
184  
                self_->prepare_size_ = 0;
184  
                self_->prepare_size_ = 0;
185  

185  

186  
                return {};
186  
                return {};
187  
            }
187  
            }
188  
        };
188  
        };
189  
        return awaitable{this, n};
189  
        return awaitable{this, n};
190  
    }
190  
    }
191  

191  

192  
    /** Commit bytes written with optional end-of-stream.
192  
    /** Commit bytes written with optional end-of-stream.
193  

193  

194  
        Transfers `n` bytes from the prepared buffer to the internal
194  
        Transfers `n` bytes from the prepared buffer to the internal
195  
        data buffer. If `eof` is true, marks the sink as finalized.
195  
        data buffer. If `eof` is true, marks the sink as finalized.
196  

196  

197  
        @param n The number of bytes to commit.
197  
        @param n The number of bytes to commit.
198  
        @param eof If true, signals end-of-stream after committing.
198  
        @param eof If true, signals end-of-stream after committing.
199  

199  

200  
        @return An awaitable yielding `(error_code)`.
200  
        @return An awaitable yielding `(error_code)`.
201  

201  

202  
        @see fuse
202  
        @see fuse
203  
    */
203  
    */
204  
    auto
204  
    auto
205  
    commit(std::size_t n, bool eof)
205  
    commit(std::size_t n, bool eof)
206  
    {
206  
    {
207  
        struct awaitable
207  
        struct awaitable
208  
        {
208  
        {
209  
            buffer_sink* self_;
209  
            buffer_sink* self_;
210  
            std::size_t n_;
210  
            std::size_t n_;
211  
            bool eof_;
211  
            bool eof_;
212  

212  

213  
            bool await_ready() const noexcept { return true; }
213  
            bool await_ready() const noexcept { return true; }
214  

214  

215  
            void await_suspend(
215  
            void await_suspend(
216  
                coro,
216  
                coro,
217  
                executor_ref,
217  
                executor_ref,
218  
                std::stop_token) const noexcept
218  
                std::stop_token) const noexcept
219  
            {
219  
            {
220  
            }
220  
            }
221  

221  

222  
            io_result<>
222  
            io_result<>
223  
            await_resume()
223  
            await_resume()
224  
            {
224  
            {
225  
                auto ec = self_->f_->maybe_fail();
225  
                auto ec = self_->f_->maybe_fail();
226  
                if(ec)
226  
                if(ec)
227  
                    return {ec};
227  
                    return {ec};
228  

228  

229  
                std::size_t to_commit = (std::min)(n_, self_->prepare_size_);
229  
                std::size_t to_commit = (std::min)(n_, self_->prepare_size_);
230  
                self_->data_.append(self_->prepare_buf_.data(), to_commit);
230  
                self_->data_.append(self_->prepare_buf_.data(), to_commit);
231  
                self_->prepare_size_ = 0;
231  
                self_->prepare_size_ = 0;
232  

232  

233  
                if(eof_)
233  
                if(eof_)
234  
                    self_->eof_called_ = true;
234  
                    self_->eof_called_ = true;
235  

235  

236  
                return {};
236  
                return {};
237  
            }
237  
            }
238  
        };
238  
        };
239  
        return awaitable{this, n, eof};
239  
        return awaitable{this, n, eof};
240  
    }
240  
    }
241  

241  

242  
    /** Signal end-of-stream.
242  
    /** Signal end-of-stream.
243  

243  

244  
        Marks the sink as finalized, indicating no more data will be
244  
        Marks the sink as finalized, indicating no more data will be
245  
        written. Before signaling, the attached @ref fuse is consulted
245  
        written. Before signaling, the attached @ref fuse is consulted
246  
        to possibly inject an error for testing fault scenarios.
246  
        to possibly inject an error for testing fault scenarios.
247  

247  

248  
        @return An awaitable yielding `(error_code)`.
248  
        @return An awaitable yielding `(error_code)`.
249  

249  

250  
        @see fuse
250  
        @see fuse
251  
    */
251  
    */
252  
    auto
252  
    auto
253  
    commit_eof()
253  
    commit_eof()
254  
    {
254  
    {
255  
        struct awaitable
255  
        struct awaitable
256  
        {
256  
        {
257  
            buffer_sink* self_;
257  
            buffer_sink* self_;
258  

258  

259  
            bool await_ready() const noexcept { return true; }
259  
            bool await_ready() const noexcept { return true; }
260  

260  

261  
            void await_suspend(
261  
            void await_suspend(
262  
                coro,
262  
                coro,
263  
                executor_ref,
263  
                executor_ref,
264  
                std::stop_token) const noexcept
264  
                std::stop_token) const noexcept
265  
            {
265  
            {
266  
            }
266  
            }
267  

267  

268  
            io_result<>
268  
            io_result<>
269  
            await_resume()
269  
            await_resume()
270  
            {
270  
            {
271  
                auto ec = self_->f_->maybe_fail();
271  
                auto ec = self_->f_->maybe_fail();
272  
                if(ec)
272  
                if(ec)
273  
                    return {ec};
273  
                    return {ec};
274  

274  

275  
                self_->eof_called_ = true;
275  
                self_->eof_called_ = true;
276  
                return {};
276  
                return {};
277  
            }
277  
            }
278  
        };
278  
        };
279  
        return awaitable{this};
279  
        return awaitable{this};
280  
    }
280  
    }
281  
};
281  
};
282  

282  

283  
} // test
283  
} // test
284  
} // capy
284  
} // capy
285  
} // boost
285  
} // boost
286  

286  

287  
#endif
287  
#endif