include/boost/corosio/native/detail/io_uring/io_uring_socket_ops.hpp

85.5% Lines (247/289) 92.6% List of functions (25/27)
io_uring_socket_ops.hpp
f(x) Functions (27)
Function Calls Lines Blocks
boost::corosio::detail::uring_set_result(boost::corosio::detail::io_uring_op*, bool, bool) :59 37922x 80.0% 75.0% boost::corosio::detail::uring_read_op::uring_read_op() :87 11936x 100.0% 100.0% boost::corosio::detail::uring_read_op::prepare(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, std::error_code*, unsigned long*, int, boost::corosio::detail::io_uring_scheduler*, std::shared_ptr<void>, boost::corosio::detail::speculative_state*, boost::corosio::buffer_param, std::stop_token const&) :101 17072x 100.0% 100.0% boost::corosio::detail::uring_read_op::do_prep(boost::corosio::detail::io_uring_op*, io_uring_sqe*) :131 205x 71.4% 75.0% boost::corosio::detail::uring_read_op::do_cqe(boost::corosio::detail::io_uring_op*, int, unsigned int, boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&) :153 205x 100.0% 100.0% boost::corosio::detail::uring_read_op::do_handler(void*, boost::corosio::detail::scheduler_op*, unsigned int, unsigned int) :163 17072x 83.3% 79.0% boost::corosio::detail::uring_write_op::uring_write_op() :212 11936x 100.0% 100.0% boost::corosio::detail::uring_write_op::prepare(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, std::error_code*, unsigned long*, int, boost::corosio::detail::io_uring_scheduler*, std::shared_ptr<void>, boost::corosio::detail::speculative_state*, boost::corosio::buffer_param, std::stop_token const&) :217 16868x 100.0% 100.0% boost::corosio::detail::uring_write_op::do_prep(boost::corosio::detail::io_uring_op*, io_uring_sqe*) :253 0 0.0% 0.0% boost::corosio::detail::uring_write_op::do_cqe(boost::corosio::detail::io_uring_op*, int, unsigned int, boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&) :274 0 0.0% 0.0% boost::corosio::detail::uring_write_op::do_handler(void*, boost::corosio::detail::scheduler_op*, unsigned int, unsigned int) :284 16868x 83.3% 79.0% boost::corosio::detail::uring_connect_op::uring_connect_op() :331 11955x 100.0% 100.0% boost::corosio::detail::uring_connect_op::prepare(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, std::error_code*, int, boost::corosio::detail::io_uring_scheduler*, std::shared_ptr<void>, boost::corosio::endpoint, boost::corosio::endpoint*, boost::corosio::endpoint*, std::stop_token const&) :343 3962x 100.0% 100.0% boost::corosio::detail::uring_connect_op::do_prep(boost::corosio::detail::io_uring_op*, io_uring_sqe*) :371 3962x 100.0% 100.0% boost::corosio::detail::uring_connect_op::do_cqe(boost::corosio::detail::io_uring_op*, int, unsigned int, boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&) :380 3962x 100.0% 100.0% boost::corosio::detail::uring_connect_op::do_handler(void*, boost::corosio::detail::scheduler_op*, unsigned int, unsigned int) :390 3962x 86.4% 83.0% boost::corosio::detail::io_uring_submit_op(boost::corosio::detail::io_uring_scheduler&, boost::corosio::detail::io_uring_op*) :447 4293x 66.7% 63.0% boost::corosio::detail::uring_wait_op::uring_wait_op() :516 12179x 100.0% 100.0% boost::corosio::detail::uring_wait_op::prepare(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, std::error_code*, int, boost::corosio::detail::io_uring_scheduler*, std::shared_ptr<void>, int, std::stop_token const&) :521 13x 100.0% 100.0% boost::corosio::detail::uring_wait_op::do_prep(boost::corosio::detail::io_uring_op*, io_uring_sqe*) :544 13x 100.0% 100.0% boost::corosio::detail::uring_wait_op::do_cqe(boost::corosio::detail::io_uring_op*, int, unsigned int, boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&) :550 13x 100.0% 100.0% boost::corosio::detail::uring_wait_op::do_handler(void*, boost::corosio::detail::scheduler_op*, unsigned int, unsigned int) :560 13x 77.8% 73.0% boost::corosio::detail::uring_local_connect_op::uring_local_connect_op() :606 111x 100.0% 100.0% boost::corosio::detail::uring_local_connect_op::prepare(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, std::error_code*, int, boost::corosio::detail::io_uring_scheduler*, std::shared_ptr<void>, boost::corosio::local_endpoint, boost::corosio::local_endpoint*, boost::corosio::local_endpoint*, std::stop_token const&) :614 14x 100.0% 100.0% boost::corosio::detail::uring_local_connect_op::do_prep(boost::corosio::detail::io_uring_op*, io_uring_sqe*) :641 14x 100.0% 100.0% boost::corosio::detail::uring_local_connect_op::do_cqe(boost::corosio::detail::io_uring_op*, int, unsigned int, boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&) :650 14x 100.0% 100.0% boost::corosio::detail::uring_local_connect_op::do_handler(void*, boost::corosio::detail::scheduler_op*, unsigned int, unsigned int) :660 14x 87.0% 83.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_IO_URING_IO_URING_SOCKET_OPS_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_IO_URING_IO_URING_SOCKET_OPS_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_IO_URING
16
17 #include <liburing.h>
18
19 #include <boost/capy/buffers.hpp>
20 #include <boost/capy/error.hpp>
21 #include <boost/corosio/detail/buffer_param.hpp>
22 #include <boost/corosio/detail/dispatch_coro.hpp>
23 #include <boost/corosio/local_endpoint.hpp>
24 #include <boost/corosio/native/detail/io_uring/io_uring_buffer.hpp>
25 #include <boost/corosio/native/detail/io_uring/io_uring_op.hpp>
26 #include <boost/corosio/native/detail/io_uring/io_uring_scheduler.hpp>
27 #include <boost/corosio/native/detail/make_err.hpp>
28 #include <boost/corosio/native/detail/speculative_state.hpp>
29
30 #include <system_error>
31
32 #include <netinet/in.h>
33 #include <poll.h>
34 #include <sys/socket.h>
35 #include <sys/uio.h>
36
37 namespace boost::corosio::detail {
38
39 /// Maximum scatter/gather segments per read/write/dgram op.
40 ///
41 /// Bounded well below `IOV_MAX` (1024 on Linux) so each op's
42 /// `iovec[io_uring_max_iov]` lives inside the io_uring_op object on
43 /// the same allocation as the rest of its state. Plan 4's registered-
44 /// buffer work will revisit; until then 16 covers typical scatter use
45 /// cases (fragmented buffers from buffer_sequence) without bloating
46 /// per-op memory.
47 inline constexpr std::size_t io_uring_max_iov = 16;
48
49 /** Resolve ec_out/bytes_out from a CQE result for a completed I/O op.
50
51 Shared by read, write, and connect handlers. For reads, `res == 0`
52 with a non-empty buffer means the peer closed the connection (EOF).
53
54 @param self The completed op.
55 @param is_read True if this is a receive/read operation.
56 @param empty_buf True if the submitted buffer was zero-length.
57 */
58 inline void
59 37922x uring_set_result(io_uring_op* self, bool is_read, bool empty_buf) noexcept
60 {
61 37922x if (!self->ec_out)
62 return;
63
64 37922x if (self->cancelled.load(std::memory_order_acquire))
65 100x *self->ec_out = capy::error::canceled;
66 37822x else if (self->res < 0)
67 47x *self->ec_out = make_err(-self->res);
68 37775x else if (is_read && self->res == 0 && !empty_buf)
69 *self->ec_out = capy::error::eof;
70 else
71 37775x *self->ec_out = {};
72 }
73
74 /** Scatter-gather read via `IORING_OP_READV`.
75
76 @par Handler dispatch
77 do_cqe captures `res`/`cqe_flags` and queues self into `local`;
78 do_handler runs from the scheduler queue and resumes the coroutine.
79 */
80 struct uring_read_op : io_uring_op
81 {
82 iovec iovecs[io_uring_max_iov];
83 int iovec_count = 0;
84 int fd = -1;
85 detail::speculative_state* spec_state = nullptr;
86
87 11936x uring_read_op() noexcept
88 11936x : io_uring_op(&do_handler, &do_cqe, &do_prep)
89 {
90 11936x is_read = true;
91 11936x }
92
93 /** Reset and initialize for a new submission.
94
95 Embedded ops are reused across calls; every mutable field the
96 handler may read must be re-initialized here. `start(token)`
97 also resets `cancelled`, `sqe_set`, and `stop_cb`.
98
99 @pre This slot has no in-flight op (its prior op completed).
100 */
101 17072x void prepare(
102 std::coroutine_handle<> handle,
103 capy::executor_ref executor,
104 std::error_code* ec,
105 std::size_t* bytes,
106 int file_descriptor,
107 io_uring_scheduler* scheduler,
108 std::shared_ptr<void> impl,
109 detail::speculative_state* spec,
110 buffer_param buffers,
111 std::stop_token const& token) noexcept
112 {
113 17072x h = handle;
114 17072x ex = executor;
115 17072x ec_out = ec;
116 17072x bytes_out = bytes;
117 17072x fd = file_descriptor;
118 17072x sched_ = scheduler;
119 17072x impl_ptr = std::move(impl);
120 17072x spec_state = spec;
121 17072x res = 0;
122 17072x cqe_flags = 0;
123 17072x iovec_count = static_cast<int>(
124 17072x buffers.copy_to(
125 17072x reinterpret_cast<capy::mutable_buffer*>(iovecs),
126 io_uring_max_iov));
127 17072x empty_buffer = (iovec_count == 0);
128 17072x start(token);
129 17072x }
130
131 205x static void do_prep(io_uring_op* base, ::io_uring_sqe* sqe) noexcept
132 {
133 205x auto* self = static_cast<uring_read_op*>(base);
134 // Single-buffer fast path: IORING_OP_RECV with a flat
135 // (buffer, length) skips the iovec-array indirection that
136 // IORING_OP_READV pays. For multi-iovec scatter reads, fall
137 // back to readv.
138 205x if (self->iovec_count == 1)
139 {
140 205x ::io_uring_prep_recv(
141 sqe, self->fd,
142 self->iovecs[0].iov_base,
143 self->iovecs[0].iov_len,
144 0);
145 }
146 else
147 {
148 ::io_uring_prep_readv(
149 sqe, self->fd, self->iovecs, self->iovec_count, 0);
150 }
151 205x }
152
153 205x static void do_cqe(
154 io_uring_op* base, int res, unsigned flags,
155 op_queue& local) noexcept
156 {
157 205x auto* self = static_cast<uring_read_op*>(base);
158 205x self->res = res;
159 205x self->cqe_flags = flags;
160 205x local.push(self);
161 205x }
162
163 17072x static void do_handler(
164 void* owner, scheduler_op* base,
165 std::uint32_t /*bytes*/, std::uint32_t /*error*/) noexcept
166 {
167 17072x auto* self = static_cast<uring_read_op*>(base);
168 17072x self->stop_cb.reset();
169
170 17072x if (owner == nullptr)
171 {
172 // Shutdown drain: break the impl_ptr cycle. The op storage
173 // is owned by the impl, which destructs once the cycle is
174 // broken (if this was the last ref).
175 auto suicide = std::move(self->impl_ptr);
176 return;
177 }
178
179 17072x uring_set_result(self, true, self->empty_buffer);
180
181 17072x if (self->res > 0 && self->spec_state)
182 {
183 // Kernel signalled readiness — restore speculation.
184 16969x self->spec_state->on_async_read_ready();
185 }
186
187 17072x if (self->bytes_out)
188 17072x *self->bytes_out =
189 17072x self->res >= 0 ? static_cast<std::size_t>(self->res) : 0u;
190
191 17072x self->cont_op.cont.h = self->h;
192 17072x auto next = dispatch_coro(self->ex, self->cont_op.cont);
193 17072x auto suicide = std::move(self->impl_ptr);
194 17072x next.resume();
195 // suicide drops here; may destroy impl + self.
196 17072x }
197 };
198
199 /** Scatter-gather write via `IORING_OP_SENDMSG` with `MSG_NOSIGNAL`.
200
201 `MSG_NOSIGNAL` prevents `SIGPIPE` when the peer has closed the
202 connection; the error is surfaced as `EPIPE` instead.
203 */
204 struct uring_write_op : io_uring_op
205 {
206 iovec iovecs[io_uring_max_iov];
207 int iovec_count = 0;
208 int fd = -1;
209 msghdr msg{};
210 detail::speculative_state* spec_state = nullptr;
211
212 11936x uring_write_op() noexcept
213 11936x : io_uring_op(&do_handler, &do_cqe, &do_prep)
214 11936x {}
215
216 /** Reset and initialize for a new submission. See uring_read_op::prepare. */
217 16868x void prepare(
218 std::coroutine_handle<> handle,
219 capy::executor_ref executor,
220 std::error_code* ec,
221 std::size_t* bytes,
222 int file_descriptor,
223 io_uring_scheduler* scheduler,
224 std::shared_ptr<void> impl,
225 detail::speculative_state* spec,
226 buffer_param buffers,
227 std::stop_token const& token) noexcept
228 {
229 16868x h = handle;
230 16868x ex = executor;
231 16868x ec_out = ec;
232 16868x bytes_out = bytes;
233 16868x fd = file_descriptor;
234 16868x sched_ = scheduler;
235 16868x impl_ptr = std::move(impl);
236 16868x spec_state = spec;
237 16868x res = 0;
238 16868x cqe_flags = 0;
239 16868x iovec_count = static_cast<int>(
240 16868x buffers.copy_to(
241 16868x reinterpret_cast<capy::mutable_buffer*>(iovecs),
242 io_uring_max_iov));
243 16868x empty_buffer = (iovec_count == 0);
244 16868x if (!empty_buffer)
245 {
246 16868x msg = {};
247 16868x msg.msg_iov = iovecs;
248 16868x msg.msg_iovlen = static_cast<decltype(msg.msg_iovlen)>(iovec_count);
249 }
250 16868x start(token);
251 16868x }
252
253 static void do_prep(io_uring_op* base, ::io_uring_sqe* sqe) noexcept
254 {
255 auto* self = static_cast<uring_write_op*>(base);
256 // Single-buffer fast path: IORING_OP_SEND with MSG_NOSIGNAL
257 // skips the msghdr indirection that IORING_OP_SENDMSG pays.
258 // For multi-iovec scatter writes, fall back to sendmsg.
259 if (self->iovec_count == 1)
260 {
261 ::io_uring_prep_send(
262 sqe, self->fd,
263 self->iovecs[0].iov_base,
264 self->iovecs[0].iov_len,
265 MSG_NOSIGNAL);
266 }
267 else
268 {
269 ::io_uring_prep_sendmsg(
270 sqe, self->fd, &self->msg, MSG_NOSIGNAL);
271 }
272 }
273
274 static void do_cqe(
275 io_uring_op* base, int res, unsigned flags,
276 op_queue& local) noexcept
277 {
278 auto* self = static_cast<uring_write_op*>(base);
279 self->res = res;
280 self->cqe_flags = flags;
281 local.push(self);
282 }
283
284 16868x static void do_handler(
285 void* owner, scheduler_op* base,
286 std::uint32_t /*bytes*/, std::uint32_t /*error*/) noexcept
287 {
288 16868x auto* self = static_cast<uring_write_op*>(base);
289 16868x self->stop_cb.reset();
290
291 16868x if (owner == nullptr)
292 {
293 auto suicide = std::move(self->impl_ptr);
294 return;
295 }
296
297 16868x uring_set_result(self, false, self->empty_buffer);
298
299 16868x if (self->res > 0 && self->spec_state)
300 {
301 // Kernel signalled readiness — restore speculation.
302 16868x self->spec_state->on_async_write_ready();
303 }
304
305 16868x if (self->bytes_out)
306 16868x *self->bytes_out =
307 16868x self->res >= 0 ? static_cast<std::size_t>(self->res) : 0u;
308
309 16868x self->cont_op.cont.h = self->h;
310 16868x auto next = dispatch_coro(self->ex, self->cont_op.cont);
311 16868x auto suicide = std::move(self->impl_ptr);
312 16868x next.resume();
313 16868x }
314 };
315
316 /** Non-blocking connect via `IORING_OP_CONNECT`.
317
318 Negative `res` is the connect error; zero means success.
319 `remote_endpoint_out` is written only on success so a failed
320 connect does not corrupt the socket's cached remote endpoint.
321 */
322 struct uring_connect_op : io_uring_op
323 {
324 sockaddr_storage addr{};
325 socklen_t addrlen = 0;
326 int fd = -1;
327 endpoint target_endpoint{};
328 endpoint* remote_endpoint_out = nullptr;
329 endpoint* local_endpoint_out = nullptr;
330
331 11955x uring_connect_op() noexcept
332 11955x : io_uring_op(&do_handler, &do_cqe, &do_prep)
333 11955x {}
334
335 /** Reset and initialize for a new submission.
336
337 The caller must fill `addr` and `addrlen` before calling this
338 (typically via `to_sockaddr(ep, family, conn_.addr)` which
339 returns the addrlen) — `to_sockaddr` is the family-aware
340 helper and requires the socket family which is known to the
341 caller, not the op.
342 */
343 3962x void prepare(
344 std::coroutine_handle<> handle,
345 capy::executor_ref executor,
346 std::error_code* ec,
347 int file_descriptor,
348 io_uring_scheduler* scheduler,
349 std::shared_ptr<void> impl,
350 endpoint target,
351 endpoint* remote_out,
352 endpoint* local_out,
353 std::stop_token const& token) noexcept
354 {
355 3962x h = handle;
356 3962x ex = executor;
357 3962x ec_out = ec;
358 3962x bytes_out = nullptr;
359 3962x fd = file_descriptor;
360 3962x sched_ = scheduler;
361 3962x impl_ptr = std::move(impl);
362 3962x res = 0;
363 3962x cqe_flags = 0;
364 3962x target_endpoint = target;
365 3962x remote_endpoint_out = remote_out;
366 3962x local_endpoint_out = local_out;
367 // addr / addrlen are pre-filled by the caller.
368 3962x start(token);
369 3962x }
370
371 3962x static void do_prep(io_uring_op* base, ::io_uring_sqe* sqe) noexcept
372 {
373 3962x auto* self = static_cast<uring_connect_op*>(base);
374 3962x ::io_uring_prep_connect(
375 sqe, self->fd,
376 3962x reinterpret_cast<sockaddr const*>(&self->addr),
377 self->addrlen);
378 3962x }
379
380 3962x static void do_cqe(
381 io_uring_op* base, int res, unsigned flags,
382 op_queue& local) noexcept
383 {
384 3962x auto* self = static_cast<uring_connect_op*>(base);
385 3962x self->res = res;
386 3962x self->cqe_flags = flags;
387 3962x local.push(self);
388 3962x }
389
390 3962x static void do_handler(
391 void* owner, scheduler_op* base,
392 std::uint32_t /*bytes*/, std::uint32_t /*error*/) noexcept
393 {
394 3962x auto* self = static_cast<uring_connect_op*>(base);
395 3962x self->stop_cb.reset();
396
397 3962x if (owner == nullptr)
398 {
399 auto suicide = std::move(self->impl_ptr);
400 return;
401 }
402
403 3962x uring_set_result(self, false, false);
404
405 // Write endpoints only on success.
406 3962x if (self->res >= 0)
407 {
408 3954x if (self->remote_endpoint_out)
409 3954x *self->remote_endpoint_out = self->target_endpoint;
410 3954x if (self->local_endpoint_out && self->fd >= 0)
411 {
412 3954x sockaddr_storage local{};
413 3954x socklen_t len = sizeof(local);
414 3954x if (::getsockname(self->fd,
415 3954x reinterpret_cast<sockaddr*>(&local), &len) == 0)
416 3954x *self->local_endpoint_out = sockaddr_to_endpoint(local);
417 }
418 }
419
420 3962x self->cont_op.cont.h = self->h;
421 3962x auto next = dispatch_coro(self->ex, self->cont_op.cont);
422 3962x auto suicide = std::move(self->impl_ptr);
423 3962x next.resume();
424 3962x }
425 };
426
427 /** Submit an `io_uring_op` whose `prep_func` is set.
428
429 Acquires the ring mutex, prepares the SQE, and (under the same
430 mutex) CAS-sets `submit_op_posted_`. The first submitter of a
431 batch wins the CAS and posts the scheduler's `submit_sqes_op`,
432 which later flushes all queued SQEs in a single
433 `io_uring_submit_and_get_events` call and drains any ready CQEs.
434 Subsequent submitters in the same batch piggyback — their SQEs
435 sit in the user-space SQ ring until that op dispatches.
436
437 On SQ-ring exhaustion (after one flush retry), surfaces `EAGAIN`
438 on `*op->ec_out` and queues the op as completed so its handler
439 dispatches on the next `do_one` cycle.
440
441 @pre `op->prep_func != nullptr`.
442
443 @par Exception Safety
444 Nothrow.
445 */
446 inline void
447 4293x io_uring_submit_op(io_uring_scheduler& sched, io_uring_op* op) noexcept
448 {
449 4293x sched.lazy_init_ring();
450
451 4293x bool need_post = false;
452 {
453 4293x typename io_uring_scheduler::lock_type ring_lock(sched.ring_mutex());
454
455 4293x ::io_uring_sqe* sqe = ::io_uring_get_sqe(sched.ring());
456 4293x if (!sqe)
457 {
458 // SQ ring full — flush to kernel and retry once.
459 ::io_uring_submit(sched.ring());
460 sqe = ::io_uring_get_sqe(sched.ring());
461 }
462
463 4293x if (!sqe)
464 {
465 // SQ stayed full after one flush — synchronous failure path.
466 // Surface EAGAIN and queue the op as completed so do_one
467 // dispatches the handler. The caller's work_started() already
468 // counted this op. (CAS path is not entered here.)
469 if (op->ec_out)
470 *op->ec_out = make_err(EAGAIN);
471 typename io_uring_scheduler::lock_type lock(sched.dispatch_mutex());
472 sched.push_completed_locked(op);
473 return;
474 }
475
476 4293x op->prep_func(op, sqe);
477 4293x ::io_uring_sqe_set_data(sqe, op);
478 // Count this op against the in-flight gate in do_one: it
479 // expects exactly one F_MORE-less CQE per submitted SQE
480 // (multishot ops decrement only on the terminal CQE).
481 4293x sched.inflight_inc();
482 // Release pairs with the acquire in io_uring_op::request_cancel:
483 // a stop_token firing after we release the mutex will see
484 // sqe_set==true and submit a cancel-by-user_data SQE.
485 4293x op->sqe_set.store(true, std::memory_order_release);
486
487 // First submitter in a batch wins the CAS and will post
488 // submit_sqes_op; others piggyback on the same flush.
489 4293x if (!sched.submit_op_posted_exchange(true))
490 114x need_post = true;
491 4293x }
492
493 4293x if (need_post)
494 {
495 // Flush is deferred to submit_sqes_op; post() owns the wake.
496 114x sched.post(&sched.submit_op_ref());
497 }
498 }
499
500 /** Readiness wait via `IORING_OP_POLL_ADD`.
501
502 Used to implement the `wait()` virtual for socket and acceptor
503 implementations. The op submits a one-shot poll on `fd` for the
504 requested set of poll flags (POLLIN / POLLOUT / POLLPRI|POLLERR|
505 POLLHUP) and reports completion without transferring any data.
506
507 The CQE's `res` carries the actual revents, but we surface only
508 success/cancel/error on `*ec_out` — callers of `wait()` just need
509 a readiness signal, not the specific event mask.
510 */
511 struct uring_wait_op : io_uring_op
512 {
513 int fd = -1;
514 int poll_flags = 0;
515
516 12179x uring_wait_op() noexcept
517 12179x : io_uring_op(&do_handler, &do_cqe, &do_prep)
518 12179x {}
519
520 /** Reset and initialize for a new submission. */
521 13x void prepare(
522 std::coroutine_handle<> handle,
523 capy::executor_ref executor,
524 std::error_code* ec,
525 int file_descriptor,
526 io_uring_scheduler* scheduler,
527 std::shared_ptr<void> impl,
528 int flags,
529 std::stop_token const& token) noexcept
530 {
531 13x h = handle;
532 13x ex = executor;
533 13x ec_out = ec;
534 13x bytes_out = nullptr;
535 13x fd = file_descriptor;
536 13x sched_ = scheduler;
537 13x impl_ptr = std::move(impl);
538 13x poll_flags = flags;
539 13x res = 0;
540 13x cqe_flags = 0;
541 13x start(token);
542 13x }
543
544 13x static void do_prep(io_uring_op* base, ::io_uring_sqe* sqe) noexcept
545 {
546 13x auto* self = static_cast<uring_wait_op*>(base);
547 13x ::io_uring_prep_poll_add(sqe, self->fd, self->poll_flags);
548 13x }
549
550 13x static void do_cqe(
551 io_uring_op* base, int res, unsigned flags,
552 op_queue& local) noexcept
553 {
554 13x auto* self = static_cast<uring_wait_op*>(base);
555 13x self->res = res;
556 13x self->cqe_flags = flags;
557 13x local.push(self);
558 13x }
559
560 13x static void do_handler(
561 void* owner, scheduler_op* base,
562 std::uint32_t /*bytes*/, std::uint32_t /*error*/) noexcept
563 {
564 13x auto* self = static_cast<uring_wait_op*>(base);
565 13x self->stop_cb.reset();
566
567 13x if (owner == nullptr)
568 {
569 // Shutdown drain: break the impl_ptr cycle.
570 auto suicide = std::move(self->impl_ptr);
571 return;
572 }
573
574 13x if (self->ec_out)
575 {
576 13x if (self->cancelled.load(std::memory_order_acquire))
577 *self->ec_out = capy::error::canceled;
578 13x else if (self->res < 0)
579 2x *self->ec_out = make_err(-self->res);
580 else
581 11x *self->ec_out = {};
582 }
583
584 13x self->cont_op.cont.h = self->h;
585 13x auto next = dispatch_coro(self->ex, self->cont_op.cont);
586 13x auto suicide = std::move(self->impl_ptr);
587 13x next.resume();
588 13x }
589 };
590
591 /** Non-blocking connect for Unix domain sockets via `IORING_OP_CONNECT`.
592
593 Like `uring_connect_op` but stores `local_endpoint` for the target
594 and out-pointers, since `sockaddr_to_local_endpoint` returns
595 `local_endpoint`, not `endpoint`.
596 */
597 struct uring_local_connect_op : io_uring_op
598 {
599 sockaddr_storage addr{};
600 socklen_t addrlen = 0;
601 int fd = -1;
602 corosio::local_endpoint target_endpoint{};
603 corosio::local_endpoint* remote_endpoint_out = nullptr;
604 corosio::local_endpoint* local_endpoint_out = nullptr;
605
606 111x uring_local_connect_op() noexcept
607 111x : io_uring_op(&do_handler, &do_cqe, &do_prep)
608 111x {}
609
610 /** Reset and initialize for a new submission.
611
612 Caller pre-fills `addr` and `addrlen` (see uring_connect_op::prepare).
613 */
614 14x void prepare(
615 std::coroutine_handle<> handle,
616 capy::executor_ref executor,
617 std::error_code* ec,
618 int file_descriptor,
619 io_uring_scheduler* scheduler,
620 std::shared_ptr<void> impl,
621 corosio::local_endpoint target,
622 corosio::local_endpoint* remote_out,
623 corosio::local_endpoint* local_out,
624 std::stop_token const& token) noexcept
625 {
626 14x h = handle;
627 14x ex = executor;
628 14x ec_out = ec;
629 14x bytes_out = nullptr;
630 14x fd = file_descriptor;
631 14x sched_ = scheduler;
632 14x impl_ptr = std::move(impl);
633 14x res = 0;
634 14x cqe_flags = 0;
635 14x target_endpoint = target;
636 14x remote_endpoint_out = remote_out;
637 14x local_endpoint_out = local_out;
638 14x start(token);
639 14x }
640
641 14x static void do_prep(io_uring_op* base, ::io_uring_sqe* sqe) noexcept
642 {
643 14x auto* self = static_cast<uring_local_connect_op*>(base);
644 14x ::io_uring_prep_connect(
645 sqe, self->fd,
646 14x reinterpret_cast<sockaddr const*>(&self->addr),
647 self->addrlen);
648 14x }
649
650 14x static void do_cqe(
651 io_uring_op* base, int res, unsigned flags,
652 op_queue& local) noexcept
653 {
654 14x auto* self = static_cast<uring_local_connect_op*>(base);
655 14x self->res = res;
656 14x self->cqe_flags = flags;
657 14x local.push(self);
658 14x }
659
660 14x static void do_handler(
661 void* owner, scheduler_op* base,
662 std::uint32_t /*bytes*/, std::uint32_t /*error*/) noexcept
663 {
664 14x auto* self = static_cast<uring_local_connect_op*>(base);
665 14x self->stop_cb.reset();
666
667 14x if (owner == nullptr)
668 {
669 auto suicide = std::move(self->impl_ptr);
670 return;
671 }
672
673 14x uring_set_result(self, false, false);
674
675 // Write endpoints only on success.
676 14x if (self->res >= 0)
677 {
678 11x if (self->remote_endpoint_out)
679 11x *self->remote_endpoint_out = self->target_endpoint;
680 11x if (self->local_endpoint_out && self->fd >= 0)
681 {
682 11x sockaddr_storage local{};
683 11x socklen_t len = sizeof(local);
684 11x if (::getsockname(self->fd,
685 11x reinterpret_cast<sockaddr*>(&local), &len) == 0)
686 11x *self->local_endpoint_out =
687 11x sockaddr_to_local_endpoint(local, len);
688 }
689 }
690
691 14x self->cont_op.cont.h = self->h;
692 14x auto next = dispatch_coro(self->ex, self->cont_op.cont);
693 14x auto suicide = std::move(self->impl_ptr);
694 14x next.resume();
695 14x }
696 };
697
698 } // namespace boost::corosio::detail
699
700 #endif // BOOST_COROSIO_HAS_IO_URING
701
702 #endif // BOOST_COROSIO_NATIVE_DETAIL_IO_URING_IO_URING_SOCKET_OPS_HPP
703