include/boost/corosio/native/detail/io_uring/io_uring_dgram_ops.hpp

53.6% Lines (74/138) 60.0% List of functions (6/10)
io_uring_dgram_ops.hpp
f(x) Functions (10)
Function Calls Lines Blocks
boost::corosio::detail::uring_dgram_send_op::uring_dgram_send_op() :55 130x 100.0% 100.0% boost::corosio::detail::uring_dgram_send_op::prepare(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, std::error_code*, unsigned long*, int, boost::corosio::detail::io_uring_scheduler*, std::shared_ptr<void>, boost::corosio::detail::speculative_state*, boost::corosio::buffer_param, unsigned int, sockaddr_storage const&, int, std::stop_token const&) :64 0 0.0% 0.0% boost::corosio::detail::uring_dgram_send_op::do_prep(boost::corosio::detail::io_uring_op*, io_uring_sqe*) :113 0 0.0% 0.0% boost::corosio::detail::uring_dgram_send_op::do_cqe(boost::corosio::detail::io_uring_op*, int, unsigned int, boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&) :121 0 0.0% 0.0% boost::corosio::detail::uring_dgram_send_op::do_handler(void*, boost::corosio::detail::scheduler_op*, unsigned int, unsigned int) :130 0 0.0% 0.0% boost::corosio::detail::uring_dgram_recv_op::uring_dgram_recv_op() :198 130x 100.0% 100.0% boost::corosio::detail::uring_dgram_recv_op::prepare(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, std::error_code*, unsigned long*, int, boost::corosio::detail::io_uring_scheduler*, std::shared_ptr<void>, boost::corosio::detail::speculative_state*, boost::corosio::buffer_param, void*, void (*)(void*, sockaddr_storage const&, unsigned int) noexcept, int, std::stop_token const&) :214 8x 100.0% 100.0% boost::corosio::detail::uring_dgram_recv_op::do_prep(boost::corosio::detail::io_uring_op*, io_uring_sqe*) :279 8x 100.0% 100.0% boost::corosio::detail::uring_dgram_recv_op::do_cqe(boost::corosio::detail::io_uring_op*, int, unsigned int, boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&) :286 8x 100.0% 100.0% boost::corosio::detail::uring_dgram_recv_op::do_handler(void*, boost::corosio::detail::scheduler_op*, unsigned int, unsigned int) :297 8x 88.5% 87.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_IO_URING_IO_URING_DGRAM_OPS_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_IO_URING_IO_URING_DGRAM_OPS_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_IO_URING
16
17 #include <liburing.h>
18
19 #include <boost/corosio/detail/dispatch_coro.hpp>
20 #include <boost/corosio/native/detail/io_uring/io_uring_op.hpp>
21 #include <boost/corosio/native/detail/speculative_state.hpp>
22 #include <boost/corosio/native/detail/io_uring/io_uring_socket_ops.hpp>
23 #include <boost/corosio/native/detail/make_err.hpp>
24 #include <boost/capy/error.hpp>
25
26 #include <cstddef>
27 #include <cstdint>
28
29 #include <netinet/in.h>
30 #include <sys/socket.h>
31 #include <sys/uio.h>
32
33 namespace boost::corosio::detail {
34
35 /** Datagram send op — connected and unconnected.
36
37 Always uses `IORING_OP_SENDMSG`. In connected mode, `dest_len == 0`
38 and `msg.msg_name == nullptr`. In unconnected mode, `dest_storage`
39 holds the destination and `msg.msg_name` points at it.
40
41 `iovec[io_uring_max_iov]` for scatter/gather: a single datagram
42 can be assembled from N user buffers via `msg.msg_iov`.
43 */
44 struct uring_dgram_send_op : io_uring_op
45 {
46 iovec iovecs[io_uring_max_iov];
47 int iovec_count = 0;
48 msghdr msg{};
49 sockaddr_storage dest_storage{};
50 socklen_t dest_len = 0;
51 int fd = -1;
52 int msg_flags = 0;
53 detail::speculative_state* spec_state = nullptr;
54
55 130x uring_dgram_send_op() noexcept
56 130x : io_uring_op(&do_handler, &do_cqe, &do_prep) {}
57
58 /** Reset and initialize for a new submission.
59
60 Pass `dest_addr_len == 0` for connected-mode datagram sockets
61 (the kernel uses the connected peer); otherwise fill
62 `dest_addr_storage` with the destination address.
63 */
64 void prepare(
65 std::coroutine_handle<> handle,
66 capy::executor_ref executor,
67 std::error_code* ec,
68 std::size_t* bytes,
69 int file_descriptor,
70 io_uring_scheduler* scheduler,
71 std::shared_ptr<void> impl,
72 detail::speculative_state* spec,
73 buffer_param buffers,
74 socklen_t dest_addr_len,
75 sockaddr_storage const& dest_addr_storage,
76 int flags,
77 std::stop_token const& token) noexcept
78 {
79 h = handle;
80 ex = executor;
81 ec_out = ec;
82 bytes_out = bytes;
83 fd = file_descriptor;
84 sched_ = scheduler;
85 impl_ptr = std::move(impl);
86 spec_state = spec;
87 res = 0;
88 cqe_flags = 0;
89 msg_flags = flags;
90
91 iovec_count = static_cast<int>(
92 buffers.copy_to(
93 reinterpret_cast<capy::mutable_buffer*>(iovecs),
94 io_uring_max_iov));
95
96 msg = {};
97 msg.msg_iov = iovecs;
98 msg.msg_iovlen = static_cast<decltype(msg.msg_iovlen)>(iovec_count);
99 if (dest_addr_len > 0)
100 {
101 dest_storage = dest_addr_storage;
102 dest_len = dest_addr_len;
103 msg.msg_name = &dest_storage;
104 msg.msg_namelen = dest_addr_len;
105 }
106 else
107 {
108 dest_len = 0;
109 }
110 start(token);
111 }
112
113 static void do_prep(io_uring_op* base, ::io_uring_sqe* sqe) noexcept
114 {
115 auto* self = static_cast<uring_dgram_send_op*>(base);
116 ::io_uring_prep_sendmsg(
117 sqe, self->fd, &self->msg,
118 self->msg_flags | MSG_NOSIGNAL);
119 }
120
121 static void do_cqe(
122 io_uring_op* base, int res, unsigned flags, op_queue& local) noexcept
123 {
124 auto* self = static_cast<uring_dgram_send_op*>(base);
125 self->res = res;
126 self->cqe_flags = flags;
127 local.push(self);
128 }
129
130 static void do_handler(
131 void* owner, scheduler_op* base,
132 std::uint32_t /*bytes*/, std::uint32_t /*error*/) noexcept
133 {
134 auto* self = static_cast<uring_dgram_send_op*>(base);
135 self->stop_cb.reset();
136
137 if (owner == nullptr)
138 {
139 auto suicide = std::move(self->impl_ptr);
140 return;
141 }
142
143 if (self->ec_out)
144 {
145 if (self->cancelled.load(std::memory_order_acquire))
146 *self->ec_out = capy::error::canceled;
147 else if (self->res < 0)
148 *self->ec_out = make_err(-self->res);
149 else
150 *self->ec_out = {};
151 }
152 if (self->bytes_out)
153 *self->bytes_out = (self->res >= 0)
154 ? static_cast<std::size_t>(self->res) : 0;
155
156 if (self->res > 0 && self->spec_state)
157 {
158 // Kernel signalled readiness — restore speculation.
159 self->spec_state->on_async_write_ready();
160 }
161
162 self->cont_op.cont.h = self->h;
163 auto next = dispatch_coro(self->ex, self->cont_op.cont);
164 auto suicide = std::move(self->impl_ptr);
165 next.resume();
166 }
167 };
168
169 /** Datagram receive op — connected and unconnected.
170
171 Always uses `IORING_OP_RECVMSG`. In connected mode `msg.msg_name`
172 is null. In unconnected mode `msg.msg_name` points at
173 `source_storage` and the kernel writes the source address there.
174
175 `res == 0` is success (zero-byte datagrams are valid), NOT EOF.
176
177 The `source_writer` callback lets the concrete socket type
178 translate `sockaddr_storage` into `endpoint*` or `local_endpoint*`
179 without the op needing to know which family it is.
180 */
181 struct uring_dgram_recv_op : io_uring_op
182 {
183 iovec iovecs[io_uring_max_iov];
184 int iovec_count = 0;
185 msghdr msg{};
186 sockaddr_storage source_storage{};
187 socklen_t source_len = 0;
188 int fd = -1;
189 int msg_flags = 0;
190 detail::speculative_state* spec_state = nullptr;
191
192 /// Type-erased translator: writes source_storage into the user's
193 /// endpoint output via concrete-class-specific conversion.
194 void* source_writer_ctx = nullptr;
195 void (*source_writer)(
196 void*, sockaddr_storage const&, socklen_t) noexcept = nullptr;
197
198 130x uring_dgram_recv_op() noexcept
199 130x : io_uring_op(&do_handler, &do_cqe, &do_prep) {}
200
201 /** Reset and initialize for a new submission.
202
203 When `source_fn` is non-null, the kernel writes the peer
204 address into `source_storage` and `source_fn(source_ctx, ...)`
205 is invoked from the handler on success to translate it to
206 the user's endpoint output. Connected-mode receivers should
207 pass `source_fn = nullptr`.
208
209 A zero-iovec `buffers` argument yields `iovec_count == 0`;
210 the caller should push the slot onto `completed_ops_`
211 directly (bypassing the kernel) since `recvmsg` would
212 otherwise block forever.
213 */
214 8x void prepare(
215 std::coroutine_handle<> handle,
216 capy::executor_ref executor,
217 std::error_code* ec,
218 std::size_t* bytes,
219 int file_descriptor,
220 io_uring_scheduler* scheduler,
221 std::shared_ptr<void> impl,
222 detail::speculative_state* spec,
223 buffer_param buffers,
224 void* source_ctx,
225 void (*source_fn)(void*, sockaddr_storage const&, socklen_t) noexcept,
226 int flags,
227 std::stop_token const& token) noexcept
228 {
229 8x h = handle;
230 8x ex = executor;
231 8x ec_out = ec;
232 8x bytes_out = bytes;
233 8x fd = file_descriptor;
234 8x sched_ = scheduler;
235 8x impl_ptr = std::move(impl);
236 8x spec_state = spec;
237 8x res = 0;
238 8x cqe_flags = 0;
239 8x msg_flags = flags;
240
241 8x iovec_count = static_cast<int>(
242 8x buffers.copy_to(
243 8x reinterpret_cast<capy::mutable_buffer*>(iovecs),
244 io_uring_max_iov));
245
246 8x msg = {};
247 // For the zero-iovec bypass path the caller pushes the slot
248 // straight onto completed_ops_; source_writer must NOT run in
249 // that case (no recvmsg ever happens, source_storage is empty
250 // and would clobber the user's endpoint). Arm the writer only
251 // when there's a real buffer AND the caller asked for it.
252 8x if (iovec_count > 0 && source_fn)
253 {
254 6x msg.msg_iov = iovecs;
255 6x msg.msg_iovlen = static_cast<decltype(msg.msg_iovlen)>(
256 6x iovec_count);
257 6x source_storage = {};
258 6x source_len = sizeof(source_storage);
259 6x msg.msg_name = &source_storage;
260 6x msg.msg_namelen = source_len;
261 6x source_writer_ctx = source_ctx;
262 6x source_writer = source_fn;
263 }
264 else
265 {
266 2x if (iovec_count > 0)
267 {
268 2x msg.msg_iov = iovecs;
269 2x msg.msg_iovlen = static_cast<decltype(msg.msg_iovlen)>(
270 2x iovec_count);
271 }
272 2x source_len = 0;
273 2x source_writer_ctx = nullptr;
274 2x source_writer = nullptr;
275 }
276 8x start(token);
277 8x }
278
279 8x static void do_prep(io_uring_op* base, ::io_uring_sqe* sqe) noexcept
280 {
281 8x auto* self = static_cast<uring_dgram_recv_op*>(base);
282 8x ::io_uring_prep_recvmsg(
283 8x sqe, self->fd, &self->msg, self->msg_flags);
284 8x }
285
286 8x static void do_cqe(
287 io_uring_op* base, int res, unsigned flags, op_queue& local) noexcept
288 {
289 8x auto* self = static_cast<uring_dgram_recv_op*>(base);
290 8x self->res = res;
291 8x self->cqe_flags = flags;
292 // recvmsg writes the actual source addrlen back into msg.msg_namelen.
293 8x self->source_len = self->msg.msg_namelen;
294 8x local.push(self);
295 8x }
296
297 8x static void do_handler(
298 void* owner, scheduler_op* base,
299 std::uint32_t /*bytes*/, std::uint32_t /*error*/) noexcept
300 {
301 8x auto* self = static_cast<uring_dgram_recv_op*>(base);
302 8x self->stop_cb.reset();
303
304 8x if (owner == nullptr)
305 {
306 auto suicide = std::move(self->impl_ptr);
307 return;
308 }
309
310 8x if (self->ec_out)
311 {
312 8x if (self->cancelled.load(std::memory_order_acquire))
313 1x *self->ec_out = capy::error::canceled;
314 7x else if (self->res < 0)
315 6x *self->ec_out = make_err(-self->res);
316 else
317 1x *self->ec_out = {}; // zero-byte datagram is success, not EOF
318 }
319 8x if (self->bytes_out)
320 8x *self->bytes_out = (self->res >= 0)
321 8x ? static_cast<std::size_t>(self->res) : 0;
322
323 8x if (self->res > 0 && self->spec_state)
324 {
325 // Kernel signalled readiness — restore speculation.
326 1x self->spec_state->on_async_read_ready();
327 }
328
329 // Translate source storage into user's endpoint output (only on
330 // success and only when the concrete socket type asked for it).
331 8x if (self->source_writer && self->res >= 0)
332 1x self->source_writer(self->source_writer_ctx,
333 1x self->source_storage, self->source_len);
334
335 8x self->cont_op.cont.h = self->h;
336 8x auto next = dispatch_coro(self->ex, self->cont_op.cont);
337 8x auto suicide = std::move(self->impl_ptr);
338 8x next.resume();
339 8x }
340 };
341
342 } // namespace boost::corosio::detail
343
344 #endif // BOOST_COROSIO_HAS_IO_URING
345
346 #endif // BOOST_COROSIO_NATIVE_DETAIL_IO_URING_IO_URING_DGRAM_OPS_HPP
347