include/boost/corosio/native/detail/io_uring/io_uring_file_ops.hpp

95.2% Lines (120/126) 100.0% List of functions (18/18)
io_uring_file_ops.hpp
f(x) Functions (18)
Function Calls Lines Blocks
boost::corosio::detail::uring_file_read_op_base::uring_file_read_op_base(void (*)(void*, boost::corosio::detail::scheduler_op*, unsigned int, unsigned int)) :52 173x 100.0% 100.0% boost::corosio::detail::uring_file_read_op_base::prepare(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, std::error_code*, unsigned long*, int, long, boost::corosio::detail::io_uring_scheduler*, std::shared_ptr<void>, boost::corosio::buffer_param, std::stop_token const&) :65 145x 100.0% 100.0% boost::corosio::detail::uring_file_read_op_base::do_prep(boost::corosio::detail::io_uring_op*, io_uring_sqe*) :95 141x 100.0% 100.0% boost::corosio::detail::uring_file_read_op_base::do_cqe(boost::corosio::detail::io_uring_op*, int, unsigned int, boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&) :103 141x 100.0% 100.0% boost::corosio::detail::uring_file_read_op_base::finish(boost::corosio::detail::uring_file_read_op_base*) :116 136x 100.0% 100.0% boost::corosio::detail::uring_file_read_op::uring_file_read_op() :132 37x 100.0% 100.0% boost::corosio::detail::uring_file_read_op::do_handler(void*, boost::corosio::detail::scheduler_op*, unsigned int, unsigned int) :135 9x 90.9% 90.0% boost::corosio::detail::uring_random_access_read_op::uring_random_access_read_op() :159 136x 100.0% 100.0% boost::corosio::detail::uring_random_access_read_op::do_handler(void*, boost::corosio::detail::scheduler_op*, unsigned int, unsigned int) :162 136x 77.8% 73.0% boost::corosio::detail::uring_file_write_op_base::uring_file_write_op_base(void (*)(void*, boost::corosio::detail::scheduler_op*, unsigned int, unsigned int)) :198 50x 100.0% 100.0% boost::corosio::detail::uring_file_write_op_base::prepare(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, std::error_code*, unsigned long*, int, long, boost::corosio::detail::io_uring_scheduler*, std::shared_ptr<void>, boost::corosio::buffer_param, std::stop_token const&) :206 22x 100.0% 100.0% boost::corosio::detail::uring_file_write_op_base::do_prep(boost::corosio::detail::io_uring_op*, io_uring_sqe*) :236 20x 100.0% 100.0% boost::corosio::detail::uring_file_write_op_base::do_cqe(boost::corosio::detail::io_uring_op*, int, unsigned int, boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&) :244 20x 100.0% 100.0% boost::corosio::detail::uring_file_write_op_base::finish(boost::corosio::detail::uring_file_write_op_base*) :255 13x 100.0% 100.0% boost::corosio::detail::uring_file_write_op::uring_file_write_op() :269 37x 100.0% 100.0% boost::corosio::detail::uring_file_write_op::do_handler(void*, boost::corosio::detail::scheduler_op*, unsigned int, unsigned int) :272 9x 90.9% 90.0% boost::corosio::detail::uring_random_access_write_op::uring_random_access_write_op() :294 13x 100.0% 100.0% boost::corosio::detail::uring_random_access_write_op::do_handler(void*, boost::corosio::detail::scheduler_op*, unsigned int, unsigned int) :297 13x 77.8% 73.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_IO_URING_IO_URING_FILE_OPS_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_IO_URING_IO_URING_FILE_OPS_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_IO_URING
16
17 #include <boost/corosio/native/detail/io_uring/io_uring_op.hpp>
18 #include <boost/corosio/native/detail/io_uring/io_uring_socket_ops.hpp>
19 #include <boost/corosio/native/detail/coro_op_complete.hpp>
20 #include <boost/corosio/detail/dispatch_coro.hpp>
21
22 #include <cstdint>
23 #include <sys/uio.h>
24
25 namespace boost::corosio::detail {
26
27 /** Scatter-gather file read via `IORING_OP_READV`.
28
29 Stream files pass `offset == -1` so the kernel uses (and updates)
30 the fd's `f_pos`, matching POSIX `read(2)` semantics. Random-
31 access files pass an explicit caller-supplied offset.
32
33 @par Handler dispatch
34 `do_cqe` captures `res`/`cqe_flags` and queues self into `local`;
35 `do_handler` runs from the scheduler queue and resumes the
36 coroutine.
37 */
38 /// Shared state and submission logic for file read ops. Concrete
39 /// subclasses pick a `do_handler` that matches their storage model:
40 /// `uring_file_read_op` for embedded slots (stream_file), and
41 /// `uring_random_access_read_op` for heap-allocated per-call ops
42 /// (random_access_file, where concurrent reads at different offsets
43 /// are legitimate).
44 struct uring_file_read_op_base : io_uring_op
45 {
46 iovec iovecs[io_uring_max_iov];
47 int iovec_count = 0;
48 int fd = -1;
49 std::int64_t offset = -1; // -1 means kernel f_pos
50
51 protected:
52 173x explicit uring_file_read_op_base(func_type handler) noexcept
53 173x : io_uring_op(handler, &do_cqe, &do_prep)
54 {
55 173x is_read = true;
56 173x }
57
58 public:
59 /** Reset and initialize for a new submission.
60
61 @param file_offset -1 selects the kernel's `f_pos` (POSIX
62 `read(2)` semantics for stream files); otherwise the explicit
63 offset for random-access files.
64 */
65 145x void prepare(
66 std::coroutine_handle<> handle,
67 capy::executor_ref executor,
68 std::error_code* ec,
69 std::size_t* bytes,
70 int file_descriptor,
71 std::int64_t file_offset,
72 io_uring_scheduler* scheduler,
73 std::shared_ptr<void> impl,
74 buffer_param buffers,
75 std::stop_token const& token) noexcept
76 {
77 145x h = handle;
78 145x ex = executor;
79 145x ec_out = ec;
80 145x bytes_out = bytes;
81 145x fd = file_descriptor;
82 145x offset = file_offset;
83 145x sched_ = scheduler;
84 145x impl_ptr = std::move(impl);
85 145x res = 0;
86 145x cqe_flags = 0;
87 145x iovec_count = static_cast<int>(
88 145x buffers.copy_to(
89 145x reinterpret_cast<capy::mutable_buffer*>(iovecs),
90 io_uring_max_iov));
91 145x empty_buffer = (iovec_count == 0);
92 145x start(token);
93 145x }
94
95 141x static void do_prep(io_uring_op* base, ::io_uring_sqe* sqe) noexcept
96 {
97 141x auto* self = static_cast<uring_file_read_op_base*>(base);
98 141x ::io_uring_prep_readv(
99 141x sqe, self->fd, self->iovecs, self->iovec_count,
100 141x static_cast<__u64>(self->offset));
101 141x }
102
103 141x static void do_cqe(
104 io_uring_op* base, int res, unsigned flags,
105 op_queue& local) noexcept
106 {
107 141x auto* self = static_cast<uring_file_read_op_base*>(base);
108 141x self->res = res;
109 141x self->cqe_flags = flags;
110 141x local.push(self);
111 141x }
112
113 /// Common post-completion work used by both handlers: fill ec_out
114 /// and bytes_out, then return the coroutine to resume.
115 static std::coroutine_handle<>
116 136x finish(uring_file_read_op_base* self) noexcept
117 {
118 136x uring_set_result(self, /*is_read=*/true, self->empty_buffer);
119 136x if (self->bytes_out)
120 136x *self->bytes_out =
121 136x self->res >= 0 ? static_cast<std::size_t>(self->res) : 0u;
122 136x self->cont_op.cont.h = self->h;
123 136x return dispatch_coro(self->ex, self->cont_op.cont);
124 }
125 };
126
127 /// Scatter-gather file read embedded as a member of stream_file
128 /// (single-pending per fd). Handler uses the suicide-move pattern;
129 /// the impl owns this slot.
130 struct uring_file_read_op : uring_file_read_op_base
131 {
132 37x uring_file_read_op() noexcept
133 37x : uring_file_read_op_base(&do_handler) {}
134
135 9x static void do_handler(
136 void* owner, scheduler_op* base,
137 std::uint32_t /*bytes*/, std::uint32_t /*error*/) noexcept
138 {
139 9x auto* self = static_cast<uring_file_read_op*>(base);
140 9x if (coro_drain_if_shutdown(owner, self))
141 return;
142
143 9x if (self->sched_)
144 9x self->sched_->reset_inline_budget();
145
146 9x uring_set_result(self, /*is_read=*/true, self->empty_buffer);
147 9x if (self->bytes_out)
148 9x *self->bytes_out =
149 9x self->res >= 0 ? static_cast<std::size_t>(self->res) : 0u;
150 9x coro_resume(self);
151 }
152 };
153
154 /// Heap-allocated scatter-gather file read for random_access_file —
155 /// each `read_some_at` call allocates a fresh op so multiple reads
156 /// at different offsets on the same fd can be in flight concurrently.
157 struct uring_random_access_read_op : uring_file_read_op_base
158 {
159 136x uring_random_access_read_op() noexcept
160 136x : uring_file_read_op_base(&do_handler) {}
161
162 136x static void do_handler(
163 void* owner, scheduler_op* base,
164 std::uint32_t /*bytes*/, std::uint32_t /*error*/) noexcept
165 {
166 136x auto* self = static_cast<uring_random_access_read_op*>(base);
167 136x self->stop_cb.reset();
168
169 136x if (owner == nullptr)
170 {
171 delete self;
172 return;
173 }
174
175 136x auto next = finish(self);
176 136x delete self;
177 136x next.resume();
178 }
179 };
180
181 /** Scatter-gather file write via `IORING_OP_WRITEV`.
182
183 Stream files pass `offset == -1` (kernel f_pos); random-access
184 files pass an explicit caller-supplied offset. Unlike socket
185 writes, no `MSG_NOSIGNAL` is needed — files don't generate
186 SIGPIPE on closed peers.
187 */
188 /// Shared state and submission logic for file write ops. Concrete
189 /// subclasses pick a `do_handler` matching their storage model.
190 struct uring_file_write_op_base : io_uring_op
191 {
192 iovec iovecs[io_uring_max_iov];
193 int iovec_count = 0;
194 int fd = -1;
195 std::int64_t offset = -1;
196
197 protected:
198 50x explicit uring_file_write_op_base(func_type handler) noexcept
199 50x : io_uring_op(handler, &do_cqe, &do_prep) {}
200
201 public:
202 /** Reset and initialize for a new submission.
203
204 See uring_file_read_op_base::prepare for the offset convention.
205 */
206 22x void prepare(
207 std::coroutine_handle<> handle,
208 capy::executor_ref executor,
209 std::error_code* ec,
210 std::size_t* bytes,
211 int file_descriptor,
212 std::int64_t file_offset,
213 io_uring_scheduler* scheduler,
214 std::shared_ptr<void> impl,
215 buffer_param buffers,
216 std::stop_token const& token) noexcept
217 {
218 22x h = handle;
219 22x ex = executor;
220 22x ec_out = ec;
221 22x bytes_out = bytes;
222 22x fd = file_descriptor;
223 22x offset = file_offset;
224 22x sched_ = scheduler;
225 22x impl_ptr = std::move(impl);
226 22x res = 0;
227 22x cqe_flags = 0;
228 22x iovec_count = static_cast<int>(
229 22x buffers.copy_to(
230 22x reinterpret_cast<capy::mutable_buffer*>(iovecs),
231 io_uring_max_iov));
232 22x empty_buffer = (iovec_count == 0);
233 22x start(token);
234 22x }
235
236 20x static void do_prep(io_uring_op* base, ::io_uring_sqe* sqe) noexcept
237 {
238 20x auto* self = static_cast<uring_file_write_op_base*>(base);
239 20x ::io_uring_prep_writev(
240 20x sqe, self->fd, self->iovecs, self->iovec_count,
241 20x static_cast<__u64>(self->offset));
242 20x }
243
244 20x static void do_cqe(
245 io_uring_op* base, int res, unsigned flags,
246 op_queue& local) noexcept
247 {
248 20x auto* self = static_cast<uring_file_write_op_base*>(base);
249 20x self->res = res;
250 20x self->cqe_flags = flags;
251 20x local.push(self);
252 20x }
253
254 static std::coroutine_handle<>
255 13x finish(uring_file_write_op_base* self) noexcept
256 {
257 13x uring_set_result(self, /*is_read=*/false, self->empty_buffer);
258 13x if (self->bytes_out)
259 13x *self->bytes_out =
260 13x self->res >= 0 ? static_cast<std::size_t>(self->res) : 0u;
261 13x self->cont_op.cont.h = self->h;
262 13x return dispatch_coro(self->ex, self->cont_op.cont);
263 }
264 };
265
266 /// Embedded file write op for stream_file.
267 struct uring_file_write_op : uring_file_write_op_base
268 {
269 37x uring_file_write_op() noexcept
270 37x : uring_file_write_op_base(&do_handler) {}
271
272 9x static void do_handler(
273 void* owner, scheduler_op* base,
274 std::uint32_t /*bytes*/, std::uint32_t /*error*/) noexcept
275 {
276 9x auto* self = static_cast<uring_file_write_op*>(base);
277 9x if (coro_drain_if_shutdown(owner, self))
278 return;
279
280 9x if (self->sched_)
281 9x self->sched_->reset_inline_budget();
282
283 9x uring_set_result(self, /*is_read=*/false, self->empty_buffer);
284 9x if (self->bytes_out)
285 9x *self->bytes_out =
286 9x self->res >= 0 ? static_cast<std::size_t>(self->res) : 0u;
287 9x coro_resume(self);
288 }
289 };
290
291 /// Heap-allocated file write op for random_access_file.
292 struct uring_random_access_write_op : uring_file_write_op_base
293 {
294 13x uring_random_access_write_op() noexcept
295 13x : uring_file_write_op_base(&do_handler) {}
296
297 13x static void do_handler(
298 void* owner, scheduler_op* base,
299 std::uint32_t /*bytes*/, std::uint32_t /*error*/) noexcept
300 {
301 13x auto* self = static_cast<uring_random_access_write_op*>(base);
302 13x self->stop_cb.reset();
303
304 13x if (owner == nullptr)
305 {
306 delete self;
307 return;
308 }
309
310 13x auto next = finish(self);
311 13x delete self;
312 13x next.resume();
313 }
314 };
315
316 } // namespace boost::corosio::detail
317
318 #endif // BOOST_COROSIO_HAS_IO_URING
319
320 #endif // BOOST_COROSIO_NATIVE_DETAIL_IO_URING_IO_URING_FILE_OPS_HPP
321