1  
//
1  
//
2  
// Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
2  
// Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/capy
7  
// Official repository: https://github.com/cppalliance/capy
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_CAPY_IO_PULL_FROM_HPP
10  
#ifndef BOOST_CAPY_IO_PULL_FROM_HPP
11  
#define BOOST_CAPY_IO_PULL_FROM_HPP
11  
#define BOOST_CAPY_IO_PULL_FROM_HPP
12  

12  

13  
#include <boost/capy/detail/config.hpp>
13  
#include <boost/capy/detail/config.hpp>
14  
#include <boost/capy/buffers.hpp>
14  
#include <boost/capy/buffers.hpp>
15  
#include <boost/capy/cond.hpp>
15  
#include <boost/capy/cond.hpp>
16  
#include <boost/capy/concept/buffer_sink.hpp>
16  
#include <boost/capy/concept/buffer_sink.hpp>
17  
#include <boost/capy/concept/read_source.hpp>
17  
#include <boost/capy/concept/read_source.hpp>
18  
#include <boost/capy/concept/read_stream.hpp>
18  
#include <boost/capy/concept/read_stream.hpp>
19  
#include <boost/capy/io_result.hpp>
19  
#include <boost/capy/io_result.hpp>
20  
#include <boost/capy/task.hpp>
20  
#include <boost/capy/task.hpp>
21  

21  

22  
#include <cstddef>
22  
#include <cstddef>
23  
#include <span>
23  
#include <span>
24  

24  

25  
namespace boost {
25  
namespace boost {
26  
namespace capy {
26  
namespace capy {
27  

27  

28  
/** Transfer data from a ReadSource to a BufferSink.
28  
/** Transfer data from a ReadSource to a BufferSink.
29  

29  

30  
    This function reads data from the source directly into the sink's
30  
    This function reads data from the source directly into the sink's
31  
    internal buffers using the callee-owns-buffers model. The sink
31  
    internal buffers using the callee-owns-buffers model. The sink
32  
    provides writable buffers via `prepare()`, the source reads into
32  
    provides writable buffers via `prepare()`, the source reads into
33  
    them, and the sink commits the data. When the source signals EOF,
33  
    them, and the sink commits the data. When the source signals EOF,
34  
    `commit_eof()` is called on the sink to finalize the transfer.
34  
    `commit_eof()` is called on the sink to finalize the transfer.
35  

35  

36  
    @tparam Src The source type, must satisfy @ref ReadSource.
36  
    @tparam Src The source type, must satisfy @ref ReadSource.
37  
    @tparam Sink The sink type, must satisfy @ref BufferSink.
37  
    @tparam Sink The sink type, must satisfy @ref BufferSink.
38  

38  

39  
    @param source The source to read data from.
39  
    @param source The source to read data from.
40  
    @param sink The sink to write data to.
40  
    @param sink The sink to write data to.
41  

41  

42  
    @return A task that yields `(std::error_code, std::size_t)`.
42  
    @return A task that yields `(std::error_code, std::size_t)`.
43  
        On success, `ec` is default-constructed (no error) and `n` is
43  
        On success, `ec` is default-constructed (no error) and `n` is
44  
        the total number of bytes transferred. On error, `ec` contains
44  
        the total number of bytes transferred. On error, `ec` contains
45  
        the error code and `n` is the total number of bytes transferred
45  
        the error code and `n` is the total number of bytes transferred
46  
        before the error.
46  
        before the error.
47  

47  

48  
    @par Example
48  
    @par Example
49  
    @code
49  
    @code
50  
    task<void> transfer_body(ReadSource auto& source, BufferSink auto& sink)
50  
    task<void> transfer_body(ReadSource auto& source, BufferSink auto& sink)
51  
    {
51  
    {
52  
        auto [ec, n] = co_await pull_from(source, sink);
52  
        auto [ec, n] = co_await pull_from(source, sink);
53  
        if (ec)
53  
        if (ec)
54  
        {
54  
        {
55  
            // Handle error
55  
            // Handle error
56  
        }
56  
        }
57  
        // n bytes were transferred
57  
        // n bytes were transferred
58  
    }
58  
    }
59  
    @endcode
59  
    @endcode
60  

60  

61  
    @see ReadSource, BufferSink, push_to
61  
    @see ReadSource, BufferSink, push_to
62  
*/
62  
*/
63  
template<ReadSource Src, BufferSink Sink>
63  
template<ReadSource Src, BufferSink Sink>
64  
task<io_result<std::size_t>>
64  
task<io_result<std::size_t>>
65  
pull_from(Src& source, Sink& sink)
65  
pull_from(Src& source, Sink& sink)
66  
{
66  
{
67  
    mutable_buffer dst_arr[detail::max_iovec_];
67  
    mutable_buffer dst_arr[detail::max_iovec_];
68  
    std::size_t total = 0;
68  
    std::size_t total = 0;
69  

69  

70  
    for(;;)
70  
    for(;;)
71  
    {
71  
    {
72  
        auto dst_bufs = sink.prepare(dst_arr);
72  
        auto dst_bufs = sink.prepare(dst_arr);
73  
        if(dst_bufs.empty())
73  
        if(dst_bufs.empty())
74  
        {
74  
        {
75  
            // No buffer space available; commit nothing to flush
75  
            // No buffer space available; commit nothing to flush
76  
            auto [flush_ec] = co_await sink.commit(0);
76  
            auto [flush_ec] = co_await sink.commit(0);
77  
            if(flush_ec)
77  
            if(flush_ec)
78  
                co_return {flush_ec, total};
78  
                co_return {flush_ec, total};
79  
            continue;
79  
            continue;
80  
        }
80  
        }
81  

81  

82  
        auto [ec, n] = co_await source.read(
82  
        auto [ec, n] = co_await source.read(
83  
            std::span<mutable_buffer const>(dst_bufs));
83  
            std::span<mutable_buffer const>(dst_bufs));
84  

84  

85  
        if(n > 0)
85  
        if(n > 0)
86  
        {
86  
        {
87  
            auto [commit_ec] = co_await sink.commit(n);
87  
            auto [commit_ec] = co_await sink.commit(n);
88  
            if(commit_ec)
88  
            if(commit_ec)
89  
                co_return {commit_ec, total};
89  
                co_return {commit_ec, total};
90  
            total += n;
90  
            total += n;
91  
        }
91  
        }
92  

92  

93  
        if(ec == cond::eof)
93  
        if(ec == cond::eof)
94  
        {
94  
        {
95  
            auto [eof_ec] = co_await sink.commit_eof();
95  
            auto [eof_ec] = co_await sink.commit_eof();
96  
            co_return {eof_ec, total};
96  
            co_return {eof_ec, total};
97  
        }
97  
        }
98  

98  

99  
        if(ec)
99  
        if(ec)
100  
            co_return {ec, total};
100  
            co_return {ec, total};
101  
    }
101  
    }
102  
}
102  
}
103  

103  

104  
/** Transfer data from a ReadStream to a BufferSink.
104  
/** Transfer data from a ReadStream to a BufferSink.
105  

105  

106  
    This function reads data from the stream directly into the sink's
106  
    This function reads data from the stream directly into the sink's
107  
    internal buffers using the callee-owns-buffers model. The sink
107  
    internal buffers using the callee-owns-buffers model. The sink
108  
    provides writable buffers via `prepare()`, the stream reads into
108  
    provides writable buffers via `prepare()`, the stream reads into
109  
    them using `read_some()`, and the sink commits the data. When the
109  
    them using `read_some()`, and the sink commits the data. When the
110  
    stream signals EOF, `commit_eof()` is called on the sink to
110  
    stream signals EOF, `commit_eof()` is called on the sink to
111  
    finalize the transfer.
111  
    finalize the transfer.
112  

112  

113  
    This overload handles partial reads from the stream, committing
113  
    This overload handles partial reads from the stream, committing
114  
    data incrementally as it arrives. It loops until EOF is encountered
114  
    data incrementally as it arrives. It loops until EOF is encountered
115  
    or an error occurs.
115  
    or an error occurs.
116  

116  

117  
    @tparam Src The source type, must satisfy @ref ReadStream.
117  
    @tparam Src The source type, must satisfy @ref ReadStream.
118  
    @tparam Sink The sink type, must satisfy @ref BufferSink.
118  
    @tparam Sink The sink type, must satisfy @ref BufferSink.
119  

119  

120  
    @param source The stream to read data from.
120  
    @param source The stream to read data from.
121  
    @param sink The sink to write data to.
121  
    @param sink The sink to write data to.
122  

122  

123  
    @return A task that yields `(std::error_code, std::size_t)`.
123  
    @return A task that yields `(std::error_code, std::size_t)`.
124  
        On success, `ec` is default-constructed (no error) and `n` is
124  
        On success, `ec` is default-constructed (no error) and `n` is
125  
        the total number of bytes transferred. On error, `ec` contains
125  
        the total number of bytes transferred. On error, `ec` contains
126  
        the error code and `n` is the total number of bytes transferred
126  
        the error code and `n` is the total number of bytes transferred
127  
        before the error.
127  
        before the error.
128  

128  

129  
    @par Example
129  
    @par Example
130  
    @code
130  
    @code
131  
    task<void> transfer_body(ReadStream auto& stream, BufferSink auto& sink)
131  
    task<void> transfer_body(ReadStream auto& stream, BufferSink auto& sink)
132  
    {
132  
    {
133  
        auto [ec, n] = co_await pull_from(stream, sink);
133  
        auto [ec, n] = co_await pull_from(stream, sink);
134  
        if (ec)
134  
        if (ec)
135  
        {
135  
        {
136  
            // Handle error
136  
            // Handle error
137  
        }
137  
        }
138  
        // n bytes were transferred
138  
        // n bytes were transferred
139  
    }
139  
    }
140  
    @endcode
140  
    @endcode
141  

141  

142  
    @see ReadStream, BufferSink, push_to
142  
    @see ReadStream, BufferSink, push_to
143  
*/
143  
*/
144  
template<ReadStream Src, BufferSink Sink>
144  
template<ReadStream Src, BufferSink Sink>
145  
task<io_result<std::size_t>>
145  
task<io_result<std::size_t>>
146  
pull_from(Src& source, Sink& sink)
146  
pull_from(Src& source, Sink& sink)
147  
{
147  
{
148  
    mutable_buffer dst_arr[detail::max_iovec_];
148  
    mutable_buffer dst_arr[detail::max_iovec_];
149  
    std::size_t total = 0;
149  
    std::size_t total = 0;
150  

150  

151  
    for(;;)
151  
    for(;;)
152  
    {
152  
    {
153  
        // Prepare destination buffers from the sink
153  
        // Prepare destination buffers from the sink
154  
        auto dst_bufs = sink.prepare(dst_arr);
154  
        auto dst_bufs = sink.prepare(dst_arr);
155  
        if(dst_bufs.empty())
155  
        if(dst_bufs.empty())
156  
        {
156  
        {
157  
            // No buffer space available; commit nothing to flush
157  
            // No buffer space available; commit nothing to flush
158  
            auto [flush_ec] = co_await sink.commit(0);
158  
            auto [flush_ec] = co_await sink.commit(0);
159  
            if(flush_ec)
159  
            if(flush_ec)
160  
                co_return {flush_ec, total};
160  
                co_return {flush_ec, total};
161  
            continue;
161  
            continue;
162  
        }
162  
        }
163  

163  

164  
        // Read data from the stream into the sink's buffers
164  
        // Read data from the stream into the sink's buffers
165  
        auto [ec, n] = co_await source.read_some(
165  
        auto [ec, n] = co_await source.read_some(
166  
            std::span<mutable_buffer const>(dst_bufs));
166  
            std::span<mutable_buffer const>(dst_bufs));
167  

167  

168  
        // Commit any data that was read
168  
        // Commit any data that was read
169  
        if(n > 0)
169  
        if(n > 0)
170  
        {
170  
        {
171  
            auto [commit_ec] = co_await sink.commit(n);
171  
            auto [commit_ec] = co_await sink.commit(n);
172  
            if(commit_ec)
172  
            if(commit_ec)
173  
                co_return {commit_ec, total};
173  
                co_return {commit_ec, total};
174  
            total += n;
174  
            total += n;
175  
        }
175  
        }
176  

176  

177  
        // Check for EOF condition
177  
        // Check for EOF condition
178  
        if(ec == cond::eof)
178  
        if(ec == cond::eof)
179  
        {
179  
        {
180  
            auto [eof_ec] = co_await sink.commit_eof();
180  
            auto [eof_ec] = co_await sink.commit_eof();
181  
            co_return {eof_ec, total};
181  
            co_return {eof_ec, total};
182  
        }
182  
        }
183  

183  

184  
        // Check for other errors
184  
        // Check for other errors
185  
        if(ec)
185  
        if(ec)
186  
            co_return {ec, total};
186  
            co_return {ec, total};
187  
    }
187  
    }
188  
}
188  
}
189  

189  

190  
} // namespace capy
190  
} // namespace capy
191  
} // namespace boost
191  
} // namespace boost
192  

192  

193  
#endif
193  
#endif