include/boost/corosio/native/detail/io_uring/io_uring_dgram_ops.hpp

98.4% Lines (120/122) 100.0% List of functions (10/10)
io_uring_dgram_ops.hpp
f(x) Functions (10)
Function Calls Lines Blocks
boost::corosio::detail::uring_dgram_send_op::uring_dgram_send_op() :56 169x 100.0% 100.0% boost::corosio::detail::uring_dgram_send_op::prepare(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, std::error_code*, unsigned long*, int, boost::corosio::detail::io_uring_scheduler*, std::shared_ptr<void>, boost::corosio::detail::speculative_state*, boost::corosio::buffer_param, unsigned int, sockaddr_storage const&, int, std::stop_token const&) :65 20x 100.0% 100.0% boost::corosio::detail::uring_dgram_send_op::do_prep(boost::corosio::detail::io_uring_op*, io_uring_sqe*) :114 20x 100.0% 100.0% boost::corosio::detail::uring_dgram_send_op::do_cqe(boost::corosio::detail::io_uring_op*, int, unsigned int, boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&) :122 20x 100.0% 100.0% boost::corosio::detail::uring_dgram_send_op::do_handler(void*, boost::corosio::detail::scheduler_op*, unsigned int, unsigned int) :131 20x 93.3% 94.0% boost::corosio::detail::uring_dgram_recv_op::uring_dgram_recv_op() :191 169x 100.0% 100.0% boost::corosio::detail::uring_dgram_recv_op::prepare(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, std::error_code*, unsigned long*, int, boost::corosio::detail::io_uring_scheduler*, std::shared_ptr<void>, boost::corosio::detail::speculative_state*, boost::corosio::buffer_param, void*, void (*)(void*, sockaddr_storage const&, unsigned int) noexcept, int, std::stop_token const&) :207 54x 100.0% 100.0% boost::corosio::detail::uring_dgram_recv_op::do_prep(boost::corosio::detail::io_uring_op*, io_uring_sqe*) :272 54x 100.0% 100.0% boost::corosio::detail::uring_dgram_recv_op::do_cqe(boost::corosio::detail::io_uring_op*, int, unsigned int, boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&) :279 54x 100.0% 100.0% boost::corosio::detail::uring_dgram_recv_op::do_handler(void*, boost::corosio::detail::scheduler_op*, unsigned int, unsigned int) :290 54x 94.4% 95.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_IO_URING_IO_URING_DGRAM_OPS_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_IO_URING_IO_URING_DGRAM_OPS_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_IO_URING
16
17 #include <liburing.h>
18
19 #include <boost/corosio/detail/dispatch_coro.hpp>
20 #include <boost/corosio/native/detail/io_uring/io_uring_op.hpp>
21 #include <boost/corosio/native/detail/coro_op_complete.hpp>
22 #include <boost/corosio/native/detail/speculative_state.hpp>
23 #include <boost/corosio/native/detail/io_uring/io_uring_socket_ops.hpp>
24 #include <boost/corosio/native/detail/make_err.hpp>
25 #include <boost/capy/error.hpp>
26
27 #include <cstddef>
28 #include <cstdint>
29
30 #include <netinet/in.h>
31 #include <sys/socket.h>
32 #include <sys/uio.h>
33
34 namespace boost::corosio::detail {
35
36 /** Datagram send op — connected and unconnected.
37
38 Always uses `IORING_OP_SENDMSG`. In connected mode, `dest_len == 0`
39 and `msg.msg_name == nullptr`. In unconnected mode, `dest_storage`
40 holds the destination and `msg.msg_name` points at it.
41
42 `iovec[io_uring_max_iov]` for scatter/gather: a single datagram
43 can be assembled from N user buffers via `msg.msg_iov`.
44 */
45 struct uring_dgram_send_op : io_uring_op
46 {
47 iovec iovecs[io_uring_max_iov];
48 int iovec_count = 0;
49 msghdr msg{};
50 sockaddr_storage dest_storage{};
51 socklen_t dest_len = 0;
52 int fd = -1;
53 int msg_flags = 0;
54 detail::speculative_state* spec_state = nullptr;
55
56 169x uring_dgram_send_op() noexcept
57 169x : io_uring_op(&do_handler, &do_cqe, &do_prep) {}
58
59 /** Reset and initialize for a new submission.
60
61 Pass `dest_addr_len == 0` for connected-mode datagram sockets
62 (the kernel uses the connected peer); otherwise fill
63 `dest_addr_storage` with the destination address.
64 */
65 20x void prepare(
66 std::coroutine_handle<> handle,
67 capy::executor_ref executor,
68 std::error_code* ec,
69 std::size_t* bytes,
70 int file_descriptor,
71 io_uring_scheduler* scheduler,
72 std::shared_ptr<void> impl,
73 detail::speculative_state* spec,
74 buffer_param buffers,
75 socklen_t dest_addr_len,
76 sockaddr_storage const& dest_addr_storage,
77 int flags,
78 std::stop_token const& token) noexcept
79 {
80 20x h = handle;
81 20x ex = executor;
82 20x ec_out = ec;
83 20x bytes_out = bytes;
84 20x fd = file_descriptor;
85 20x sched_ = scheduler;
86 20x impl_ptr = std::move(impl);
87 20x spec_state = spec;
88 20x res = 0;
89 20x cqe_flags = 0;
90 20x msg_flags = flags;
91
92 20x iovec_count = static_cast<int>(
93 20x buffers.copy_to(
94 20x reinterpret_cast<capy::mutable_buffer*>(iovecs),
95 io_uring_max_iov));
96
97 20x msg = {};
98 20x msg.msg_iov = iovecs;
99 20x msg.msg_iovlen = static_cast<decltype(msg.msg_iovlen)>(iovec_count);
100 20x if (dest_addr_len > 0)
101 {
102 10x dest_storage = dest_addr_storage;
103 10x dest_len = dest_addr_len;
104 10x msg.msg_name = &dest_storage;
105 10x msg.msg_namelen = dest_addr_len;
106 }
107 else
108 {
109 10x dest_len = 0;
110 }
111 20x start(token);
112 20x }
113
114 20x static void do_prep(io_uring_op* base, ::io_uring_sqe* sqe) noexcept
115 {
116 20x auto* self = static_cast<uring_dgram_send_op*>(base);
117 20x ::io_uring_prep_sendmsg(
118 20x sqe, self->fd, &self->msg,
119 20x self->msg_flags | MSG_NOSIGNAL);
120 20x }
121
122 20x static void do_cqe(
123 io_uring_op* base, int res, unsigned flags, op_queue& local) noexcept
124 {
125 20x auto* self = static_cast<uring_dgram_send_op*>(base);
126 20x self->res = res;
127 20x self->cqe_flags = flags;
128 20x local.push(self);
129 20x }
130
131 20x static void do_handler(
132 void* owner, scheduler_op* base,
133 std::uint32_t /*bytes*/, std::uint32_t /*error*/) noexcept
134 {
135 20x auto* self = static_cast<uring_dgram_send_op*>(base);
136 20x if (coro_drain_if_shutdown(owner, self))
137 return;
138
139 20x if (self->sched_)
140 20x self->sched_->reset_inline_budget();
141
142 // Datagram send: no EOF (a 0-byte send is success).
143 38x decode_io_result(
144 self->ec_out,
145 20x self->cancelled.load(std::memory_order_acquire),
146 20x self->res < 0 ? make_err(-self->res) : std::error_code{},
147 /*is_read=*/false, /*bytes=*/0, /*empty_buffer=*/false);
148 20x if (self->bytes_out)
149 20x *self->bytes_out = (self->res >= 0)
150 20x ? static_cast<std::size_t>(self->res) : 0;
151
152 20x if (self->res > 0 && self->spec_state)
153 {
154 // Kernel signalled readiness — restore speculation.
155 18x self->spec_state->on_async_write_ready();
156 }
157
158 20x coro_resume(self);
159 }
160 };
161
162 /** Datagram receive op — connected and unconnected.
163
164 Always uses `IORING_OP_RECVMSG`. In connected mode `msg.msg_name`
165 is null. In unconnected mode `msg.msg_name` points at
166 `source_storage` and the kernel writes the source address there.
167
168 `res == 0` is success (zero-byte datagrams are valid), NOT EOF.
169
170 The `source_writer` callback lets the concrete socket type
171 translate `sockaddr_storage` into `endpoint*` or `local_endpoint*`
172 without the op needing to know which family it is.
173 */
174 struct uring_dgram_recv_op : io_uring_op
175 {
176 iovec iovecs[io_uring_max_iov];
177 int iovec_count = 0;
178 msghdr msg{};
179 sockaddr_storage source_storage{};
180 socklen_t source_len = 0;
181 int fd = -1;
182 int msg_flags = 0;
183 detail::speculative_state* spec_state = nullptr;
184
185 /// Type-erased translator: writes source_storage into the user's
186 /// endpoint output via concrete-class-specific conversion.
187 void* source_writer_ctx = nullptr;
188 void (*source_writer)(
189 void*, sockaddr_storage const&, socklen_t) noexcept = nullptr;
190
191 169x uring_dgram_recv_op() noexcept
192 169x : io_uring_op(&do_handler, &do_cqe, &do_prep) {}
193
194 /** Reset and initialize for a new submission.
195
196 When `source_fn` is non-null, the kernel writes the peer
197 address into `source_storage` and `source_fn(source_ctx, ...)`
198 is invoked from the handler on success to translate it to
199 the user's endpoint output. Connected-mode receivers should
200 pass `source_fn = nullptr`.
201
202 A zero-iovec `buffers` argument yields `iovec_count == 0`;
203 the caller should push the slot onto `completed_ops_`
204 directly (bypassing the kernel) since `recvmsg` would
205 otherwise block forever.
206 */
207 54x void prepare(
208 std::coroutine_handle<> handle,
209 capy::executor_ref executor,
210 std::error_code* ec,
211 std::size_t* bytes,
212 int file_descriptor,
213 io_uring_scheduler* scheduler,
214 std::shared_ptr<void> impl,
215 detail::speculative_state* spec,
216 buffer_param buffers,
217 void* source_ctx,
218 void (*source_fn)(void*, sockaddr_storage const&, socklen_t) noexcept,
219 int flags,
220 std::stop_token const& token) noexcept
221 {
222 54x h = handle;
223 54x ex = executor;
224 54x ec_out = ec;
225 54x bytes_out = bytes;
226 54x fd = file_descriptor;
227 54x sched_ = scheduler;
228 54x impl_ptr = std::move(impl);
229 54x spec_state = spec;
230 54x res = 0;
231 54x cqe_flags = 0;
232 54x msg_flags = flags;
233
234 54x iovec_count = static_cast<int>(
235 54x buffers.copy_to(
236 54x reinterpret_cast<capy::mutable_buffer*>(iovecs),
237 io_uring_max_iov));
238
239 54x msg = {};
240 // For the zero-iovec bypass path the caller pushes the slot
241 // straight onto completed_ops_; source_writer must NOT run in
242 // that case (no recvmsg ever happens, source_storage is empty
243 // and would clobber the user's endpoint). Arm the writer only
244 // when there's a real buffer AND the caller asked for it.
245 54x if (iovec_count > 0 && source_fn)
246 {
247 29x msg.msg_iov = iovecs;
248 29x msg.msg_iovlen = static_cast<decltype(msg.msg_iovlen)>(
249 29x iovec_count);
250 29x source_storage = {};
251 29x source_len = sizeof(source_storage);
252 29x msg.msg_name = &source_storage;
253 29x msg.msg_namelen = source_len;
254 29x source_writer_ctx = source_ctx;
255 29x source_writer = source_fn;
256 }
257 else
258 {
259 25x if (iovec_count > 0)
260 {
261 25x msg.msg_iov = iovecs;
262 25x msg.msg_iovlen = static_cast<decltype(msg.msg_iovlen)>(
263 25x iovec_count);
264 }
265 25x source_len = 0;
266 25x source_writer_ctx = nullptr;
267 25x source_writer = nullptr;
268 }
269 54x start(token);
270 54x }
271
272 54x static void do_prep(io_uring_op* base, ::io_uring_sqe* sqe) noexcept
273 {
274 54x auto* self = static_cast<uring_dgram_recv_op*>(base);
275 54x ::io_uring_prep_recvmsg(
276 54x sqe, self->fd, &self->msg, self->msg_flags);
277 54x }
278
279 54x static void do_cqe(
280 io_uring_op* base, int res, unsigned flags, op_queue& local) noexcept
281 {
282 54x auto* self = static_cast<uring_dgram_recv_op*>(base);
283 54x self->res = res;
284 54x self->cqe_flags = flags;
285 // recvmsg writes the actual source addrlen back into msg.msg_namelen.
286 54x self->source_len = self->msg.msg_namelen;
287 54x local.push(self);
288 54x }
289
290 54x static void do_handler(
291 void* owner, scheduler_op* base,
292 std::uint32_t /*bytes*/, std::uint32_t /*error*/) noexcept
293 {
294 54x auto* self = static_cast<uring_dgram_recv_op*>(base);
295 54x if (coro_drain_if_shutdown(owner, self))
296 return;
297
298 54x if (self->sched_)
299 54x self->sched_->reset_inline_budget();
300
301 // Datagram recv: a 0-byte datagram is success, not EOF — is_read
302 // stays false so the shared decode never maps it to end_of_file.
303 99x decode_io_result(
304 self->ec_out,
305 54x self->cancelled.load(std::memory_order_acquire),
306 54x self->res < 0 ? make_err(-self->res) : std::error_code{},
307 /*is_read=*/false, /*bytes=*/0, /*empty_buffer=*/false);
308 54x if (self->bytes_out)
309 54x *self->bytes_out = (self->res >= 0)
310 54x ? static_cast<std::size_t>(self->res) : 0;
311
312 54x if (self->res > 0 && self->spec_state)
313 {
314 // Kernel signalled readiness — restore speculation.
315 45x self->spec_state->on_async_read_ready();
316 }
317
318 // Translate source storage into user's endpoint output (only on
319 // success and only when the concrete socket type asked for it).
320 54x if (self->source_writer && self->res >= 0)
321 23x self->source_writer(self->source_writer_ctx,
322 23x self->source_storage, self->source_len);
323
324 54x coro_resume(self);
325 }
326 };
327
328 } // namespace boost::corosio::detail
329
330 #endif // BOOST_COROSIO_HAS_IO_URING
331
332 #endif // BOOST_COROSIO_NATIVE_DETAIL_IO_URING_IO_URING_DGRAM_OPS_HPP
333