1  
//
1  
//
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/capy
7  
// Official repository: https://github.com/cppalliance/capy
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_CAPY_TEST_WRITE_SINK_HPP
10  
#ifndef BOOST_CAPY_TEST_WRITE_SINK_HPP
11  
#define BOOST_CAPY_TEST_WRITE_SINK_HPP
11  
#define BOOST_CAPY_TEST_WRITE_SINK_HPP
12  

12  

13  
#include <boost/capy/detail/config.hpp>
13  
#include <boost/capy/detail/config.hpp>
14  
#include <boost/capy/buffers.hpp>
14  
#include <boost/capy/buffers.hpp>
15  
#include <boost/capy/buffers/buffer_copy.hpp>
15  
#include <boost/capy/buffers/buffer_copy.hpp>
16  
#include <boost/capy/buffers/make_buffer.hpp>
16  
#include <boost/capy/buffers/make_buffer.hpp>
17  
#include <boost/capy/coro.hpp>
17  
#include <boost/capy/coro.hpp>
18  
#include <boost/capy/ex/executor_ref.hpp>
18  
#include <boost/capy/ex/executor_ref.hpp>
19  
#include <boost/capy/io_result.hpp>
19  
#include <boost/capy/io_result.hpp>
20  
#include <boost/capy/error.hpp>
20  
#include <boost/capy/error.hpp>
21  
#include <boost/capy/test/fuse.hpp>
21  
#include <boost/capy/test/fuse.hpp>
22  

22  

23  
#include <algorithm>
23  
#include <algorithm>
24  
#include <stop_token>
24  
#include <stop_token>
25  
#include <string>
25  
#include <string>
26  
#include <string_view>
26  
#include <string_view>
27  

27  

28  
namespace boost {
28  
namespace boost {
29  
namespace capy {
29  
namespace capy {
30  
namespace test {
30  
namespace test {
31  

31  

32  
/** A mock sink for testing write operations.
32  
/** A mock sink for testing write operations.
33  

33  

34  
    Use this to verify code that performs complete writes without needing
34  
    Use this to verify code that performs complete writes without needing
35  
    real I/O. Call @ref write to write data, then @ref data to retrieve
35  
    real I/O. Call @ref write to write data, then @ref data to retrieve
36  
    what was written. The associated @ref fuse enables error injection
36  
    what was written. The associated @ref fuse enables error injection
37  
    at controlled points.
37  
    at controlled points.
38  

38  

39  
    Unlike @ref write_stream which provides partial writes via `write_some`,
39  
    Unlike @ref write_stream which provides partial writes via `write_some`,
40  
    this class satisfies the @ref WriteSink concept by providing complete
40  
    this class satisfies the @ref WriteSink concept by providing complete
41  
    writes and EOF signaling.
41  
    writes and EOF signaling.
42  

42  

43  
    @par Thread Safety
43  
    @par Thread Safety
44  
    Not thread-safe.
44  
    Not thread-safe.
45  

45  

46  
    @par Example
46  
    @par Example
47  
    @code
47  
    @code
48  
    fuse f;
48  
    fuse f;
49  
    write_sink ws( f );
49  
    write_sink ws( f );
50  

50  

51  
    auto r = f.armed( [&]( fuse& ) -> task<void> {
51  
    auto r = f.armed( [&]( fuse& ) -> task<void> {
52  
        auto [ec, n] = co_await ws.write(
52  
        auto [ec, n] = co_await ws.write(
53  
            const_buffer( "Hello", 5 ) );
53  
            const_buffer( "Hello", 5 ) );
54  
        if( ec )
54  
        if( ec )
55  
            co_return;
55  
            co_return;
56  
        auto [ec2] = co_await ws.write_eof();
56  
        auto [ec2] = co_await ws.write_eof();
57  
        if( ec2 )
57  
        if( ec2 )
58  
            co_return;
58  
            co_return;
59  
        // ws.data() returns "Hello"
59  
        // ws.data() returns "Hello"
60  
    } );
60  
    } );
61  
    @endcode
61  
    @endcode
62  

62  

63  
    @see fuse, WriteSink
63  
    @see fuse, WriteSink
64  
*/
64  
*/
65  
class write_sink
65  
class write_sink
66  
{
66  
{
67  
    fuse* f_;
67  
    fuse* f_;
68  
    std::string data_;
68  
    std::string data_;
69  
    std::string expect_;
69  
    std::string expect_;
70  
    std::size_t max_write_size_;
70  
    std::size_t max_write_size_;
71  
    bool eof_called_ = false;
71  
    bool eof_called_ = false;
72  

72  

73  
    std::error_code
73  
    std::error_code
74  
    consume_match_() noexcept
74  
    consume_match_() noexcept
75  
    {
75  
    {
76  
        if(data_.empty() || expect_.empty())
76  
        if(data_.empty() || expect_.empty())
77  
            return {};
77  
            return {};
78  
        std::size_t const n = (std::min)(data_.size(), expect_.size());
78  
        std::size_t const n = (std::min)(data_.size(), expect_.size());
79  
        if(std::string_view(data_.data(), n) !=
79  
        if(std::string_view(data_.data(), n) !=
80  
            std::string_view(expect_.data(), n))
80  
            std::string_view(expect_.data(), n))
81  
            return error::test_failure;
81  
            return error::test_failure;
82  
        data_.erase(0, n);
82  
        data_.erase(0, n);
83  
        expect_.erase(0, n);
83  
        expect_.erase(0, n);
84  
        return {};
84  
        return {};
85  
    }
85  
    }
86  

86  

87  
public:
87  
public:
88  
    /** Construct a write sink.
88  
    /** Construct a write sink.
89  

89  

90  
        @param f The fuse used to inject errors during writes.
90  
        @param f The fuse used to inject errors during writes.
91  

91  

92  
        @param max_write_size Maximum bytes transferred per write.
92  
        @param max_write_size Maximum bytes transferred per write.
93  
        Use to simulate chunked delivery.
93  
        Use to simulate chunked delivery.
94  
    */
94  
    */
95  
    explicit write_sink(
95  
    explicit write_sink(
96  
        fuse& f,
96  
        fuse& f,
97  
        std::size_t max_write_size = std::size_t(-1)) noexcept
97  
        std::size_t max_write_size = std::size_t(-1)) noexcept
98  
        : f_(&f)
98  
        : f_(&f)
99  
        , max_write_size_(max_write_size)
99  
        , max_write_size_(max_write_size)
100  
    {
100  
    {
101  
    }
101  
    }
102  

102  

103  
    /// Return the written data as a string view.
103  
    /// Return the written data as a string view.
104  
    std::string_view
104  
    std::string_view
105  
    data() const noexcept
105  
    data() const noexcept
106  
    {
106  
    {
107  
        return data_;
107  
        return data_;
108  
    }
108  
    }
109  

109  

110  
    /** Set the expected data for subsequent writes.
110  
    /** Set the expected data for subsequent writes.
111  

111  

112  
        Stores the expected data and immediately tries to match
112  
        Stores the expected data and immediately tries to match
113  
        against any data already written. Matched data is consumed
113  
        against any data already written. Matched data is consumed
114  
        from both buffers.
114  
        from both buffers.
115  

115  

116  
        @param sv The expected data.
116  
        @param sv The expected data.
117  

117  

118  
        @return An error if existing data does not match.
118  
        @return An error if existing data does not match.
119  
    */
119  
    */
120  
    std::error_code
120  
    std::error_code
121  
    expect(std::string_view sv)
121  
    expect(std::string_view sv)
122  
    {
122  
    {
123  
        expect_.assign(sv);
123  
        expect_.assign(sv);
124  
        return consume_match_();
124  
        return consume_match_();
125  
    }
125  
    }
126  

126  

127  
    /// Return the number of bytes written.
127  
    /// Return the number of bytes written.
128  
    std::size_t
128  
    std::size_t
129  
    size() const noexcept
129  
    size() const noexcept
130  
    {
130  
    {
131  
        return data_.size();
131  
        return data_.size();
132  
    }
132  
    }
133  

133  

134  
    /// Return whether write_eof has been called.
134  
    /// Return whether write_eof has been called.
135  
    bool
135  
    bool
136  
    eof_called() const noexcept
136  
    eof_called() const noexcept
137  
    {
137  
    {
138  
        return eof_called_;
138  
        return eof_called_;
139  
    }
139  
    }
140  

140  

141  
    /// Clear all data and reset state.
141  
    /// Clear all data and reset state.
142  
    void
142  
    void
143  
    clear() noexcept
143  
    clear() noexcept
144  
    {
144  
    {
145  
        data_.clear();
145  
        data_.clear();
146  
        expect_.clear();
146  
        expect_.clear();
147  
        eof_called_ = false;
147  
        eof_called_ = false;
148  
    }
148  
    }
149  

149  

150  
    /** Asynchronously write data to the sink.
150  
    /** Asynchronously write data to the sink.
151  

151  

152  
        Transfers all bytes from the provided const buffer sequence to
152  
        Transfers all bytes from the provided const buffer sequence to
153  
        the internal buffer. Before every write, the attached @ref fuse
153  
        the internal buffer. Before every write, the attached @ref fuse
154  
        is consulted to possibly inject an error for testing fault
154  
        is consulted to possibly inject an error for testing fault
155  
        scenarios. The returned `std::size_t` is the number of bytes
155  
        scenarios. The returned `std::size_t` is the number of bytes
156  
        transferred.
156  
        transferred.
157  

157  

158  
        @par Effects
158  
        @par Effects
159  
        On success, appends the written bytes to the internal buffer.
159  
        On success, appends the written bytes to the internal buffer.
160  
        If an error is injected by the fuse, the internal buffer remains
160  
        If an error is injected by the fuse, the internal buffer remains
161  
        unchanged.
161  
        unchanged.
162  

162  

163  
        @par Exception Safety
163  
        @par Exception Safety
164  
        No-throw guarantee.
164  
        No-throw guarantee.
165  

165  

166  
        @param buffers The const buffer sequence containing data to write.
166  
        @param buffers The const buffer sequence containing data to write.
167  

167  

168  
        @return An awaitable yielding `(error_code,std::size_t)`.
168  
        @return An awaitable yielding `(error_code,std::size_t)`.
169  

169  

170  
        @see fuse
170  
        @see fuse
171  
    */
171  
    */
172  
    template<ConstBufferSequence CB>
172  
    template<ConstBufferSequence CB>
173  
    auto
173  
    auto
174  
    write(CB buffers)
174  
    write(CB buffers)
175  
    {
175  
    {
176  
        struct awaitable
176  
        struct awaitable
177  
        {
177  
        {
178  
            write_sink* self_;
178  
            write_sink* self_;
179  
            CB buffers_;
179  
            CB buffers_;
180  

180  

181  
            bool await_ready() const noexcept { return true; }
181  
            bool await_ready() const noexcept { return true; }
182  

182  

183  
            void await_suspend(
183  
            void await_suspend(
184  
                coro,
184  
                coro,
185  
                executor_ref,
185  
                executor_ref,
186  
                std::stop_token) const noexcept
186  
                std::stop_token) const noexcept
187  
            {
187  
            {
188  
            }
188  
            }
189  

189  

190  
            io_result<std::size_t>
190  
            io_result<std::size_t>
191  
            await_resume()
191  
            await_resume()
192  
            {
192  
            {
193  
                auto ec = self_->f_->maybe_fail();
193  
                auto ec = self_->f_->maybe_fail();
194  
                if(ec)
194  
                if(ec)
195  
                    return {ec, 0};
195  
                    return {ec, 0};
196  

196  

197  
                std::size_t n = buffer_size(buffers_);
197  
                std::size_t n = buffer_size(buffers_);
198  
                n = (std::min)(n, self_->max_write_size_);
198  
                n = (std::min)(n, self_->max_write_size_);
199  
                if(n == 0)
199  
                if(n == 0)
200  
                    return {{}, 0};
200  
                    return {{}, 0};
201  

201  

202  
                std::size_t const old_size = self_->data_.size();
202  
                std::size_t const old_size = self_->data_.size();
203  
                self_->data_.resize(old_size + n);
203  
                self_->data_.resize(old_size + n);
204  
                buffer_copy(make_buffer(
204  
                buffer_copy(make_buffer(
205  
                    self_->data_.data() + old_size, n), buffers_, n);
205  
                    self_->data_.data() + old_size, n), buffers_, n);
206  

206  

207  
                ec = self_->consume_match_();
207  
                ec = self_->consume_match_();
208  
                if(ec)
208  
                if(ec)
209  
                    return {ec, n};
209  
                    return {ec, n};
210  

210  

211  
                return {{}, n};
211  
                return {{}, n};
212  
            }
212  
            }
213  
        };
213  
        };
214  
        return awaitable{this, buffers};
214  
        return awaitable{this, buffers};
215  
    }
215  
    }
216  

216  

217  
    /** Asynchronously write data to the sink with optional EOF.
217  
    /** Asynchronously write data to the sink with optional EOF.
218  

218  

219  
        Transfers all bytes from the provided const buffer sequence to
219  
        Transfers all bytes from the provided const buffer sequence to
220  
        the internal buffer, optionally signaling end-of-stream. Before
220  
        the internal buffer, optionally signaling end-of-stream. Before
221  
        every write, the attached @ref fuse is consulted to possibly
221  
        every write, the attached @ref fuse is consulted to possibly
222  
        inject an error for testing fault scenarios. The returned
222  
        inject an error for testing fault scenarios. The returned
223  
        `std::size_t` is the number of bytes transferred.
223  
        `std::size_t` is the number of bytes transferred.
224  

224  

225  
        @par Effects
225  
        @par Effects
226  
        On success, appends the written bytes to the internal buffer.
226  
        On success, appends the written bytes to the internal buffer.
227  
        If `eof` is `true`, marks the sink as finalized.
227  
        If `eof` is `true`, marks the sink as finalized.
228  
        If an error is injected by the fuse, the internal buffer remains
228  
        If an error is injected by the fuse, the internal buffer remains
229  
        unchanged.
229  
        unchanged.
230  

230  

231  
        @par Exception Safety
231  
        @par Exception Safety
232  
        No-throw guarantee.
232  
        No-throw guarantee.
233  

233  

234  
        @param buffers The const buffer sequence containing data to write.
234  
        @param buffers The const buffer sequence containing data to write.
235  
        @param eof If true, signals end-of-stream after writing.
235  
        @param eof If true, signals end-of-stream after writing.
236  

236  

237  
        @return An awaitable yielding `(error_code,std::size_t)`.
237  
        @return An awaitable yielding `(error_code,std::size_t)`.
238  

238  

239  
        @see fuse
239  
        @see fuse
240  
    */
240  
    */
241  
    template<ConstBufferSequence CB>
241  
    template<ConstBufferSequence CB>
242  
    auto
242  
    auto
243  
    write(CB buffers, bool eof)
243  
    write(CB buffers, bool eof)
244  
    {
244  
    {
245  
        struct awaitable
245  
        struct awaitable
246  
        {
246  
        {
247  
            write_sink* self_;
247  
            write_sink* self_;
248  
            CB buffers_;
248  
            CB buffers_;
249  
            bool eof_;
249  
            bool eof_;
250  

250  

251  
            bool await_ready() const noexcept { return true; }
251  
            bool await_ready() const noexcept { return true; }
252  

252  

253  
            void await_suspend(
253  
            void await_suspend(
254  
                coro,
254  
                coro,
255  
                executor_ref,
255  
                executor_ref,
256  
                std::stop_token) const noexcept
256  
                std::stop_token) const noexcept
257  
            {
257  
            {
258  
            }
258  
            }
259  

259  

260  
            io_result<std::size_t>
260  
            io_result<std::size_t>
261  
            await_resume()
261  
            await_resume()
262  
            {
262  
            {
263  
                auto ec = self_->f_->maybe_fail();
263  
                auto ec = self_->f_->maybe_fail();
264  
                if(ec)
264  
                if(ec)
265  
                    return {ec, 0};
265  
                    return {ec, 0};
266  

266  

267  
                std::size_t n = buffer_size(buffers_);
267  
                std::size_t n = buffer_size(buffers_);
268  
                n = (std::min)(n, self_->max_write_size_);
268  
                n = (std::min)(n, self_->max_write_size_);
269  
                if(n > 0)
269  
                if(n > 0)
270  
                {
270  
                {
271  
                    std::size_t const old_size = self_->data_.size();
271  
                    std::size_t const old_size = self_->data_.size();
272  
                    self_->data_.resize(old_size + n);
272  
                    self_->data_.resize(old_size + n);
273  
                    buffer_copy(make_buffer(
273  
                    buffer_copy(make_buffer(
274  
                        self_->data_.data() + old_size, n), buffers_, n);
274  
                        self_->data_.data() + old_size, n), buffers_, n);
275  

275  

276  
                    ec = self_->consume_match_();
276  
                    ec = self_->consume_match_();
277  
                    if(ec)
277  
                    if(ec)
278  
                        return {ec, n};
278  
                        return {ec, n};
279  
                }
279  
                }
280  

280  

281  
                if(eof_)
281  
                if(eof_)
282  
                    self_->eof_called_ = true;
282  
                    self_->eof_called_ = true;
283  

283  

284  
                return {{}, n};
284  
                return {{}, n};
285  
            }
285  
            }
286  
        };
286  
        };
287  
        return awaitable{this, buffers, eof};
287  
        return awaitable{this, buffers, eof};
288  
    }
288  
    }
289  

289  

290  
    /** Signal end-of-stream.
290  
    /** Signal end-of-stream.
291  

291  

292  
        Marks the sink as finalized, indicating no more data will be
292  
        Marks the sink as finalized, indicating no more data will be
293  
        written. Before signaling, the attached @ref fuse is consulted
293  
        written. Before signaling, the attached @ref fuse is consulted
294  
        to possibly inject an error for testing fault scenarios.
294  
        to possibly inject an error for testing fault scenarios.
295  

295  

296  
        @par Effects
296  
        @par Effects
297  
        On success, marks the sink as finalized.
297  
        On success, marks the sink as finalized.
298  
        If an error is injected by the fuse, the state remains unchanged.
298  
        If an error is injected by the fuse, the state remains unchanged.
299  

299  

300  
        @par Exception Safety
300  
        @par Exception Safety
301  
        No-throw guarantee.
301  
        No-throw guarantee.
302  

302  

303  
        @return An awaitable yielding `(error_code)`.
303  
        @return An awaitable yielding `(error_code)`.
304  

304  

305  
        @see fuse
305  
        @see fuse
306  
    */
306  
    */
307  
    auto
307  
    auto
308  
    write_eof()
308  
    write_eof()
309  
    {
309  
    {
310  
        struct awaitable
310  
        struct awaitable
311  
        {
311  
        {
312  
            write_sink* self_;
312  
            write_sink* self_;
313  

313  

314  
            bool await_ready() const noexcept { return true; }
314  
            bool await_ready() const noexcept { return true; }
315  

315  

316  
            void await_suspend(
316  
            void await_suspend(
317  
                coro,
317  
                coro,
318  
                executor_ref,
318  
                executor_ref,
319  
                std::stop_token) const noexcept
319  
                std::stop_token) const noexcept
320  
            {
320  
            {
321  
            }
321  
            }
322  

322  

323  
            io_result<>
323  
            io_result<>
324  
            await_resume()
324  
            await_resume()
325  
            {
325  
            {
326  
                auto ec = self_->f_->maybe_fail();
326  
                auto ec = self_->f_->maybe_fail();
327  
                if(ec)
327  
                if(ec)
328  
                    return {ec};
328  
                    return {ec};
329  

329  

330  
                self_->eof_called_ = true;
330  
                self_->eof_called_ = true;
331  
                return {};
331  
                return {};
332  
            }
332  
            }
333  
        };
333  
        };
334  
        return awaitable{this};
334  
        return awaitable{this};
335  
    }
335  
    }
336  
};
336  
};
337  

337  

338  
} // test
338  
} // test
339  
} // capy
339  
} // capy
340  
} // boost
340  
} // boost
341  

341  

342  
#endif
342  
#endif