TLA Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #ifndef BOOST_CAPY_READ_HPP
11 : #define BOOST_CAPY_READ_HPP
12 :
13 : #include <boost/capy/detail/config.hpp>
14 : #include <boost/capy/cond.hpp>
15 : #include <boost/capy/io_task.hpp>
16 : #include <boost/capy/buffers.hpp>
17 : #include <boost/capy/buffers/buffer_slice.hpp>
18 : #include <boost/capy/concept/dynamic_buffer.hpp>
19 : #include <boost/capy/concept/read_source.hpp>
20 : #include <boost/capy/concept/read_stream.hpp>
21 : #include <system_error>
22 :
23 : #include <cstddef>
24 :
25 : namespace boost {
26 : namespace capy {
27 :
28 : /** Read data from a stream until the buffer sequence is full.
29 :
30 : @par Await-effects
31 :
32 : Reads data from `stream` via awaiting `stream.read_some` repeatedly
33 : until:
34 :
35 : @li either the entire buffer sequence @c buffers is filled,
36 : @li or a contingency occurs.
37 :
38 : If `buffer_size(buffers) == 0` then no awaiting `stream.read_some`
39 : is performed. This is not a contingency.
40 :
41 : @par Await-returns
42 : An object of type `io_result<std::size_t>` destructuring as `[ec, n]`.
43 :
44 : Upon a contingency, `n` represents the number of bytes read so far,
45 : inclusive of the last partial read.
46 :
47 : Contingencies:
48 :
49 : @li The first contingency reported from awaiting @c stream.read_some
50 : while `buffers` is not yet filled. A contingency that accompanies
51 : the read which fills `buffers` is not reported: a completed
52 : transfer is a success.
53 :
54 : Notable conditions:
55 :
56 : @li @c cond::canceled — Operation was cancelled,
57 : @li @c cond::eof — Stream reached end before `buffers` was filled.
58 :
59 : @par Await-postcondition
60 : If `n == buffer_size(buffers)` the transfer completed and `ec` is
61 : success; otherwise `ec` is set.
62 :
63 : @param stream The stream to read from. If the lifetime of `stream` ends
64 : before the coroutine finishes, the behavior is undefined.
65 :
66 : @param buffers The buffer sequence to fill. If the lifetime of the buffer
67 : sequence represented by `buffers` ends before the coroutine finishes, the behavior is undefined.
68 :
69 :
70 : @par Remarks
71 : Supports _IoAwaitable cancellation_.
72 :
73 :
74 : @par Example
75 :
76 : @code
77 : capy::task<> process_message(capy::ReadStream auto& stream)
78 : {
79 : std::vector<char> header(16); // known header size for some protocol
80 : auto [ec, n] = co_await capy::read(stream, capy::mutable_buffer(header));
81 : if (ec == capy::cond::eof)
82 : co_return; // Connection closed
83 : if (ec)
84 : throw std::system_error(ec);
85 :
86 : // at this point `header` contains exactly 16 bytes
87 : }
88 : @endcode
89 :
90 : @see ReadStream, MutableBufferSequence
91 : */
92 : template <typename S, typename MB>
93 : requires ReadStream<S> && MutableBufferSequence<MB>
94 : auto
95 HIT 98 : read(S& stream, MB buffers) ->
96 : io_task<std::size_t>
97 : {
98 : auto consuming = buffer_slice(buffers);
99 : std::size_t const total_size = buffer_size(buffers);
100 : std::size_t total_read = 0;
101 :
102 : while(total_read < total_size)
103 : {
104 : auto [ec, n] = co_await stream.read_some(consuming.data());
105 : consuming.remove_prefix(n);
106 : total_read += n;
107 : // A contingency that still completed the transfer is a success:
108 : // report it only when the buffer was not filled.
109 : if(ec && total_read < total_size)
110 : co_return {ec, total_read};
111 : }
112 :
113 : co_return {{}, total_read};
114 196 : }
115 :
116 : /** Read all data from a stream into a dynamic buffer.
117 :
118 : @par Await-effects
119 :
120 : Reads data from `stream` via awaiting `stream.read_some` repeatedly
121 : and appending the results to `dynbuf`,
122 : until a contingency occurs.
123 :
124 : Data is appended using prepare/commit semantics.
125 : The buffer grows with 1.5x factor when filled.
126 :
127 : @par Await-returns
128 :
129 : An object of type `io_result<std::size_t>` destructuring as `[ec, n]`.
130 :
131 : `n` represents the total number of bytes read,
132 : inclusive of the last partial read.
133 :
134 : Contingencies:
135 :
136 : @li The first contingency, other than one matching to @c cond::eof, reported from awaiting @c stream.read_some .
137 :
138 : @par Await-throws
139 : `std::bad_alloc` when append to `dynbuf` fails.
140 :
141 : @param stream The stream to read from. If the lifetime of `stream` ends
142 : before the coroutine finishes, the behavior is undefined.
143 :
144 : @param dynbuf The dynamic buffer to append data to. If the lifetime of the buffer
145 : sequence represented by `dynbuf` ends before the coroutine finishes, the behavior is undefined.
146 :
147 : @param initial_amount Initial bytes to prepare (default 2048).
148 :
149 :
150 : @par Remarks
151 : Supports _IoAwaitable cancellation_.
152 :
153 : @par Example
154 :
155 : @code
156 : capy::task<std::string> read_body(capy::ReadStream auto& stream)
157 : {
158 : std::string body;
159 : auto [ec, n] = co_await capy::read(stream, capy::dynamic_buffer(body));
160 : if (ec)
161 : throw std::system_error(ec);
162 : return body;
163 : }
164 : @endcode
165 :
166 : @see read_some, ReadStream, DynamicBufferParam
167 : */
168 : template <typename S, typename DB>
169 : requires ReadStream<S> && DynamicBufferParam<DB>
170 : auto
171 80 : read(
172 : S& stream,
173 : DB&& dynbuf,
174 : std::size_t initial_amount = 2048) ->
175 : io_task<std::size_t>
176 : {
177 : std::size_t amount = initial_amount;
178 : std::size_t total_read = 0;
179 : for(;;)
180 : {
181 : auto mb = dynbuf.prepare(amount);
182 : auto const mb_size = buffer_size(mb);
183 : auto [ec, n] = co_await stream.read_some(mb);
184 : dynbuf.commit(n);
185 : total_read += n;
186 : if(ec == cond::eof)
187 : co_return {{}, total_read};
188 : if(ec)
189 : co_return {ec, total_read};
190 : if(n == mb_size)
191 : amount = amount / 2 + amount;
192 : }
193 160 : }
194 :
195 : /** Read all data from a source into a dynamic buffer.
196 :
197 : @par Await-effects
198 :
199 : Reads data from `stream` by calling `source.read` repeatedly
200 : and appending it to `dynbuf` until a contingency occurs.
201 : The last, potenitally partial, read is also appended.
202 :
203 : Data is appended using prepare/commit semantics.
204 : The buffer grows with 1.5x factor when filled.
205 :
206 : @par Await-returns
207 :
208 : An object of type `io_result<std::size_t>` destructuring as `[ec, n]`.
209 :
210 : `n` represents the total number of bytes read,
211 : inclusive of the last partial read.
212 :
213 :
214 : Contingencies:
215 :
216 : @li The first contingency, other than one matching to @c cond::eof, reported from awaiting @c stream.read_some .
217 :
218 : @par Await-throws
219 :
220 : `std::bad_alloc` when append to `dynbuf` fails.
221 :
222 : @param source The source to read from. If the lifetime of `source` ends
223 : before the coroutine finishes, the behavior is undefined.
224 :
225 : @param dynbuf The dynamic buffer to append data to. If the lifetime of the
226 : buffer sequence represented by `dynbuf` ends before the coroutine finishes,
227 : the behavior is undefined.
228 :
229 : @param initial_amount Initial bytes to prepare (default 2048).
230 :
231 : @par Remarks
232 : Supports _IoAwaitable cancellation_.
233 :
234 : @par Example
235 :
236 : @code
237 : capy::task<std::string> read_body(capy::ReadSource auto& source)
238 : {
239 : std::string body;
240 : auto [ec, n] = co_await capy::read(source, capy::dynamic_buffer(body));
241 : if (ec)
242 : throw std::system_error(ec);
243 : return body;
244 : }
245 : @endcode
246 :
247 : @see ReadSource, DynamicBufferParam
248 : */
249 : template <typename S, typename DB>
250 : requires ReadSource<S> && DynamicBufferParam<DB>
251 : auto
252 54 : read(
253 : S& source,
254 : DB&& dynbuf,
255 : std::size_t initial_amount = 2048) ->
256 : io_task<std::size_t>
257 : {
258 : std::size_t amount = initial_amount;
259 : std::size_t total_read = 0;
260 : for(;;)
261 : {
262 : auto mb = dynbuf.prepare(amount);
263 : auto const mb_size = buffer_size(mb);
264 : auto [ec, n] = co_await source.read(mb);
265 : dynbuf.commit(n);
266 : total_read += n;
267 : if(ec == cond::eof)
268 : co_return {{}, total_read};
269 : if(ec)
270 : co_return {ec, total_read};
271 : if(n == mb_size)
272 : amount = amount / 2 + amount; // 1.5x growth
273 : }
274 108 : }
275 :
276 : } // namespace capy
277 : } // namespace boost
278 :
279 : #endif
|