include/boost/corosio/native/detail/io_uring/io_uring_socket_ops.hpp

88.7% Lines (228/257) 92.6% List of functions (25/27)
io_uring_socket_ops.hpp
f(x) Functions (27)
Function Calls Lines Blocks
boost::corosio::detail::uring_set_result(boost::corosio::detail::io_uring_op*, bool, bool) :60 27179x 100.0% 100.0% boost::corosio::detail::uring_read_op::uring_read_op() :84 13269x 100.0% 100.0% boost::corosio::detail::uring_read_op::prepare(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, std::error_code*, unsigned long*, int, boost::corosio::detail::io_uring_scheduler*, std::shared_ptr<void>, boost::corosio::detail::speculative_state*, boost::corosio::buffer_param, std::stop_token const&) :98 11411x 100.0% 100.0% boost::corosio::detail::uring_read_op::do_prep(boost::corosio::detail::io_uring_op*, io_uring_sqe*) :128 212x 71.4% 75.0% boost::corosio::detail::uring_read_op::do_cqe(boost::corosio::detail::io_uring_op*, int, unsigned int, boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&) :150 212x 100.0% 100.0% boost::corosio::detail::uring_read_op::do_handler(void*, boost::corosio::detail::scheduler_op*, unsigned int, unsigned int) :160 11411x 92.3% 92.0% boost::corosio::detail::uring_write_op::uring_write_op() :201 13269x 100.0% 100.0% boost::corosio::detail::uring_write_op::prepare(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, std::error_code*, unsigned long*, int, boost::corosio::detail::io_uring_scheduler*, std::shared_ptr<void>, boost::corosio::detail::speculative_state*, boost::corosio::buffer_param, std::stop_token const&) :206 11201x 100.0% 100.0% boost::corosio::detail::uring_write_op::do_prep(boost::corosio::detail::io_uring_op*, io_uring_sqe*) :242 0 0.0% 0.0% boost::corosio::detail::uring_write_op::do_cqe(boost::corosio::detail::io_uring_op*, int, unsigned int, boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&) :263 0 0.0% 0.0% boost::corosio::detail::uring_write_op::do_handler(void*, boost::corosio::detail::scheduler_op*, unsigned int, unsigned int) :273 11201x 92.3% 92.0% boost::corosio::detail::uring_connect_op::uring_connect_op() :315 13289x 100.0% 100.0% boost::corosio::detail::uring_connect_op::prepare(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, std::error_code*, int, boost::corosio::detail::io_uring_scheduler*, std::shared_ptr<void>, boost::corosio::endpoint, boost::corosio::endpoint*, boost::corosio::endpoint*, std::stop_token const&) :327 4386x 100.0% 100.0% boost::corosio::detail::uring_connect_op::do_prep(boost::corosio::detail::io_uring_op*, io_uring_sqe*) :355 4386x 100.0% 100.0% boost::corosio::detail::uring_connect_op::do_cqe(boost::corosio::detail::io_uring_op*, int, unsigned int, boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&) :364 4386x 100.0% 100.0% boost::corosio::detail::uring_connect_op::do_handler(void*, boost::corosio::detail::scheduler_op*, unsigned int, unsigned int) :374 4386x 94.1% 94.0% boost::corosio::detail::io_uring_submit_op(boost::corosio::detail::io_uring_scheduler&, boost::corosio::detail::io_uring_op*) :426 4980x 66.7% 63.0% boost::corosio::detail::uring_wait_op::uring_wait_op() :495 13586x 100.0% 100.0% boost::corosio::detail::uring_wait_op::prepare(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, std::error_code*, int, boost::corosio::detail::io_uring_scheduler*, std::shared_ptr<void>, int, std::stop_token const&) :500 21x 100.0% 100.0% boost::corosio::detail::uring_wait_op::do_prep(boost::corosio::detail::io_uring_op*, io_uring_sqe*) :523 17x 100.0% 100.0% boost::corosio::detail::uring_wait_op::do_cqe(boost::corosio::detail::io_uring_op*, int, unsigned int, boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&) :529 17x 100.0% 100.0% boost::corosio::detail::uring_wait_op::do_handler(void*, boost::corosio::detail::scheduler_op*, unsigned int, unsigned int) :539 21x 90.0% 92.0% boost::corosio::detail::uring_local_connect_op::uring_local_connect_op() :576 149x 100.0% 100.0% boost::corosio::detail::uring_local_connect_op::prepare(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, std::error_code*, int, boost::corosio::detail::io_uring_scheduler*, std::shared_ptr<void>, boost::corosio::local_endpoint, boost::corosio::local_endpoint*, boost::corosio::local_endpoint*, std::stop_token const&) :584 14x 100.0% 100.0% boost::corosio::detail::uring_local_connect_op::do_prep(boost::corosio::detail::io_uring_op*, io_uring_sqe*) :611 14x 100.0% 100.0% boost::corosio::detail::uring_local_connect_op::do_cqe(boost::corosio::detail::io_uring_op*, int, unsigned int, boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&) :620 14x 100.0% 100.0% boost::corosio::detail::uring_local_connect_op::do_handler(void*, boost::corosio::detail::scheduler_op*, unsigned int, unsigned int) :630 14x 94.4% 94.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_IO_URING_IO_URING_SOCKET_OPS_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_IO_URING_IO_URING_SOCKET_OPS_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_IO_URING
16
17 #include <liburing.h>
18
19 #include <boost/capy/buffers.hpp>
20 #include <boost/capy/error.hpp>
21 #include <boost/corosio/detail/buffer_param.hpp>
22 #include <boost/corosio/detail/dispatch_coro.hpp>
23 #include <boost/corosio/local_endpoint.hpp>
24 #include <boost/corosio/native/detail/io_uring/io_uring_buffer.hpp>
25 #include <boost/corosio/native/detail/io_uring/io_uring_op.hpp>
26 #include <boost/corosio/native/detail/io_uring/io_uring_scheduler.hpp>
27 #include <boost/corosio/native/detail/coro_op_complete.hpp>
28 #include <boost/corosio/native/detail/make_err.hpp>
29 #include <boost/corosio/native/detail/speculative_state.hpp>
30
31 #include <system_error>
32
33 #include <netinet/in.h>
34 #include <poll.h>
35 #include <sys/socket.h>
36 #include <sys/uio.h>
37
38 namespace boost::corosio::detail {
39
40 /// Maximum scatter/gather segments per read/write/dgram op.
41 ///
42 /// Bounded well below `IOV_MAX` (1024 on Linux) so each op's
43 /// `iovec[io_uring_max_iov]` lives inside the io_uring_op object on
44 /// the same allocation as the rest of its state. Plan 4's registered-
45 /// buffer work will revisit; until then 16 covers typical scatter use
46 /// cases (fragmented buffers from buffer_sequence) without bloating
47 /// per-op memory.
48 inline constexpr std::size_t io_uring_max_iov = 16;
49
50 /** Resolve ec_out/bytes_out from a CQE result for a completed I/O op.
51
52 Shared by read, write, and connect handlers. For reads, `res == 0`
53 with a non-empty buffer means the peer closed the connection (EOF).
54
55 @param self The completed op.
56 @param is_read True if this is a receive/read operation.
57 @param empty_buf True if the submitted buffer was zero-length.
58 */
59 inline void
60 27179x uring_set_result(io_uring_op* self, bool is_read, bool empty_buf) noexcept
61 {
62 54244x decode_io_result(
63 self->ec_out,
64 27179x self->cancelled.load(std::memory_order_acquire),
65 114x self->res < 0 ? make_err(-self->res) : std::error_code{},
66 is_read,
67 27179x self->res >= 0 ? static_cast<std::size_t>(self->res) : 0u,
68 empty_buf);
69 27179x }
70
71 /** Scatter-gather read via `IORING_OP_READV`.
72
73 @par Handler dispatch
74 do_cqe captures `res`/`cqe_flags` and queues self into `local`;
75 do_handler runs from the scheduler queue and resumes the coroutine.
76 */
77 struct uring_read_op : io_uring_op
78 {
79 iovec iovecs[io_uring_max_iov];
80 int iovec_count = 0;
81 int fd = -1;
82 detail::speculative_state* spec_state = nullptr;
83
84 13269x uring_read_op() noexcept
85 13269x : io_uring_op(&do_handler, &do_cqe, &do_prep)
86 {
87 13269x is_read = true;
88 13269x }
89
90 /** Reset and initialize for a new submission.
91
92 Embedded ops are reused across calls; every mutable field the
93 handler may read must be re-initialized here. `start(token)`
94 also resets `cancelled`, `sqe_set`, and `stop_cb`.
95
96 @pre This slot has no in-flight op (its prior op completed).
97 */
98 11411x void prepare(
99 std::coroutine_handle<> handle,
100 capy::executor_ref executor,
101 std::error_code* ec,
102 std::size_t* bytes,
103 int file_descriptor,
104 io_uring_scheduler* scheduler,
105 std::shared_ptr<void> impl,
106 detail::speculative_state* spec,
107 buffer_param buffers,
108 std::stop_token const& token) noexcept
109 {
110 11411x h = handle;
111 11411x ex = executor;
112 11411x ec_out = ec;
113 11411x bytes_out = bytes;
114 11411x fd = file_descriptor;
115 11411x sched_ = scheduler;
116 11411x impl_ptr = std::move(impl);
117 11411x spec_state = spec;
118 11411x res = 0;
119 11411x cqe_flags = 0;
120 11411x iovec_count = static_cast<int>(
121 11411x buffers.copy_to(
122 11411x reinterpret_cast<capy::mutable_buffer*>(iovecs),
123 io_uring_max_iov));
124 11411x empty_buffer = (iovec_count == 0);
125 11411x start(token);
126 11411x }
127
128 212x static void do_prep(io_uring_op* base, ::io_uring_sqe* sqe) noexcept
129 {
130 212x auto* self = static_cast<uring_read_op*>(base);
131 // Single-buffer fast path: IORING_OP_RECV with a flat
132 // (buffer, length) skips the iovec-array indirection that
133 // IORING_OP_READV pays. For multi-iovec scatter reads, fall
134 // back to readv.
135 212x if (self->iovec_count == 1)
136 {
137 212x ::io_uring_prep_recv(
138 sqe, self->fd,
139 self->iovecs[0].iov_base,
140 self->iovecs[0].iov_len,
141 0);
142 }
143 else
144 {
145 ::io_uring_prep_readv(
146 sqe, self->fd, self->iovecs, self->iovec_count, 0);
147 }
148 212x }
149
150 212x static void do_cqe(
151 io_uring_op* base, int res, unsigned flags,
152 op_queue& local) noexcept
153 {
154 212x auto* self = static_cast<uring_read_op*>(base);
155 212x self->res = res;
156 212x self->cqe_flags = flags;
157 212x local.push(self);
158 212x }
159
160 11411x static void do_handler(
161 void* owner, scheduler_op* base,
162 std::uint32_t /*bytes*/, std::uint32_t /*error*/) noexcept
163 {
164 11411x auto* self = static_cast<uring_read_op*>(base);
165 11411x if (coro_drain_if_shutdown(owner, self))
166 return;
167
168 11411x if (self->sched_)
169 11411x self->sched_->reset_inline_budget();
170
171 11411x uring_set_result(self, true, self->empty_buffer);
172
173 11411x if (self->res > 0 && self->spec_state)
174 {
175 // Kernel signalled readiness — restore speculation.
176 11305x self->spec_state->on_async_read_ready();
177 }
178
179 11411x if (self->bytes_out)
180 11411x *self->bytes_out =
181 11411x self->res >= 0 ? static_cast<std::size_t>(self->res) : 0u;
182
183 11411x coro_resume(self);
184 // suicide drops here; may destroy impl + self.
185 }
186 };
187
188 /** Scatter-gather write via `IORING_OP_SENDMSG` with `MSG_NOSIGNAL`.
189
190 `MSG_NOSIGNAL` prevents `SIGPIPE` when the peer has closed the
191 connection; the error is surfaced as `EPIPE` instead.
192 */
193 struct uring_write_op : io_uring_op
194 {
195 iovec iovecs[io_uring_max_iov];
196 int iovec_count = 0;
197 int fd = -1;
198 msghdr msg{};
199 detail::speculative_state* spec_state = nullptr;
200
201 13269x uring_write_op() noexcept
202 13269x : io_uring_op(&do_handler, &do_cqe, &do_prep)
203 13269x {}
204
205 /** Reset and initialize for a new submission. See uring_read_op::prepare. */
206 11201x void prepare(
207 std::coroutine_handle<> handle,
208 capy::executor_ref executor,
209 std::error_code* ec,
210 std::size_t* bytes,
211 int file_descriptor,
212 io_uring_scheduler* scheduler,
213 std::shared_ptr<void> impl,
214 detail::speculative_state* spec,
215 buffer_param buffers,
216 std::stop_token const& token) noexcept
217 {
218 11201x h = handle;
219 11201x ex = executor;
220 11201x ec_out = ec;
221 11201x bytes_out = bytes;
222 11201x fd = file_descriptor;
223 11201x sched_ = scheduler;
224 11201x impl_ptr = std::move(impl);
225 11201x spec_state = spec;
226 11201x res = 0;
227 11201x cqe_flags = 0;
228 11201x iovec_count = static_cast<int>(
229 11201x buffers.copy_to(
230 11201x reinterpret_cast<capy::mutable_buffer*>(iovecs),
231 io_uring_max_iov));
232 11201x empty_buffer = (iovec_count == 0);
233 11201x if (!empty_buffer)
234 {
235 11201x msg = {};
236 11201x msg.msg_iov = iovecs;
237 11201x msg.msg_iovlen = static_cast<decltype(msg.msg_iovlen)>(iovec_count);
238 }
239 11201x start(token);
240 11201x }
241
242 static void do_prep(io_uring_op* base, ::io_uring_sqe* sqe) noexcept
243 {
244 auto* self = static_cast<uring_write_op*>(base);
245 // Single-buffer fast path: IORING_OP_SEND with MSG_NOSIGNAL
246 // skips the msghdr indirection that IORING_OP_SENDMSG pays.
247 // For multi-iovec scatter writes, fall back to sendmsg.
248 if (self->iovec_count == 1)
249 {
250 ::io_uring_prep_send(
251 sqe, self->fd,
252 self->iovecs[0].iov_base,
253 self->iovecs[0].iov_len,
254 MSG_NOSIGNAL);
255 }
256 else
257 {
258 ::io_uring_prep_sendmsg(
259 sqe, self->fd, &self->msg, MSG_NOSIGNAL);
260 }
261 }
262
263 static void do_cqe(
264 io_uring_op* base, int res, unsigned flags,
265 op_queue& local) noexcept
266 {
267 auto* self = static_cast<uring_write_op*>(base);
268 self->res = res;
269 self->cqe_flags = flags;
270 local.push(self);
271 }
272
273 11201x static void do_handler(
274 void* owner, scheduler_op* base,
275 std::uint32_t /*bytes*/, std::uint32_t /*error*/) noexcept
276 {
277 11201x auto* self = static_cast<uring_write_op*>(base);
278 11201x if (coro_drain_if_shutdown(owner, self))
279 return;
280
281 11201x if (self->sched_)
282 11201x self->sched_->reset_inline_budget();
283
284 11201x uring_set_result(self, false, self->empty_buffer);
285
286 11201x if (self->res > 0 && self->spec_state)
287 {
288 // Kernel signalled readiness — restore speculation.
289 11201x self->spec_state->on_async_write_ready();
290 }
291
292 11201x if (self->bytes_out)
293 11201x *self->bytes_out =
294 11201x self->res >= 0 ? static_cast<std::size_t>(self->res) : 0u;
295
296 11201x coro_resume(self);
297 }
298 };
299
300 /** Non-blocking connect via `IORING_OP_CONNECT`.
301
302 Negative `res` is the connect error; zero means success.
303 `remote_endpoint_out` is written only on success so a failed
304 connect does not corrupt the socket's cached remote endpoint.
305 */
306 struct uring_connect_op : io_uring_op
307 {
308 sockaddr_storage addr{};
309 socklen_t addrlen = 0;
310 int fd = -1;
311 endpoint target_endpoint{};
312 endpoint* remote_endpoint_out = nullptr;
313 endpoint* local_endpoint_out = nullptr;
314
315 13289x uring_connect_op() noexcept
316 13289x : io_uring_op(&do_handler, &do_cqe, &do_prep)
317 13289x {}
318
319 /** Reset and initialize for a new submission.
320
321 The caller must fill `addr` and `addrlen` before calling this
322 (typically via `to_sockaddr(ep, family, conn_.addr)` which
323 returns the addrlen) — `to_sockaddr` is the family-aware
324 helper and requires the socket family which is known to the
325 caller, not the op.
326 */
327 4386x void prepare(
328 std::coroutine_handle<> handle,
329 capy::executor_ref executor,
330 std::error_code* ec,
331 int file_descriptor,
332 io_uring_scheduler* scheduler,
333 std::shared_ptr<void> impl,
334 endpoint target,
335 endpoint* remote_out,
336 endpoint* local_out,
337 std::stop_token const& token) noexcept
338 {
339 4386x h = handle;
340 4386x ex = executor;
341 4386x ec_out = ec;
342 4386x bytes_out = nullptr;
343 4386x fd = file_descriptor;
344 4386x sched_ = scheduler;
345 4386x impl_ptr = std::move(impl);
346 4386x res = 0;
347 4386x cqe_flags = 0;
348 4386x target_endpoint = target;
349 4386x remote_endpoint_out = remote_out;
350 4386x local_endpoint_out = local_out;
351 // addr / addrlen are pre-filled by the caller.
352 4386x start(token);
353 4386x }
354
355 4386x static void do_prep(io_uring_op* base, ::io_uring_sqe* sqe) noexcept
356 {
357 4386x auto* self = static_cast<uring_connect_op*>(base);
358 4386x ::io_uring_prep_connect(
359 sqe, self->fd,
360 4386x reinterpret_cast<sockaddr const*>(&self->addr),
361 self->addrlen);
362 4386x }
363
364 4386x static void do_cqe(
365 io_uring_op* base, int res, unsigned flags,
366 op_queue& local) noexcept
367 {
368 4386x auto* self = static_cast<uring_connect_op*>(base);
369 4386x self->res = res;
370 4386x self->cqe_flags = flags;
371 4386x local.push(self);
372 4386x }
373
374 4386x static void do_handler(
375 void* owner, scheduler_op* base,
376 std::uint32_t /*bytes*/, std::uint32_t /*error*/) noexcept
377 {
378 4386x auto* self = static_cast<uring_connect_op*>(base);
379 4386x if (coro_drain_if_shutdown(owner, self))
380 return;
381
382 4386x if (self->sched_)
383 4386x self->sched_->reset_inline_budget();
384
385 4386x uring_set_result(self, false, false);
386
387 // Write endpoints only on success.
388 4386x if (self->res >= 0)
389 {
390 4378x if (self->remote_endpoint_out)
391 4378x *self->remote_endpoint_out = self->target_endpoint;
392 4378x if (self->local_endpoint_out && self->fd >= 0)
393 {
394 4378x sockaddr_storage local{};
395 4378x socklen_t len = sizeof(local);
396 4378x if (::getsockname(self->fd,
397 4378x reinterpret_cast<sockaddr*>(&local), &len) == 0)
398 4378x *self->local_endpoint_out = sockaddr_to_endpoint(local);
399 }
400 }
401
402 4386x coro_resume(self);
403 }
404 };
405
406 /** Submit an `io_uring_op` whose `prep_func` is set.
407
408 Acquires the ring mutex, prepares the SQE, and (under the same
409 mutex) CAS-sets `submit_op_posted_`. The first submitter of a
410 batch wins the CAS and posts the scheduler's `submit_sqes_op`,
411 which later flushes all queued SQEs in a single
412 `io_uring_submit_and_get_events` call and drains any ready CQEs.
413 Subsequent submitters in the same batch piggyback — their SQEs
414 sit in the user-space SQ ring until that op dispatches.
415
416 On SQ-ring exhaustion (after one flush retry), surfaces `EAGAIN`
417 on `*op->ec_out` and queues the op as completed so its handler
418 dispatches on the next `do_one` cycle.
419
420 @pre `op->prep_func != nullptr`.
421
422 @par Exception Safety
423 Nothrow.
424 */
425 inline void
426 4980x io_uring_submit_op(io_uring_scheduler& sched, io_uring_op* op) noexcept
427 {
428 4980x sched.lazy_init_ring();
429
430 4980x bool need_post = false;
431 {
432 4980x typename io_uring_scheduler::lock_type ring_lock(sched.ring_mutex());
433
434 4980x ::io_uring_sqe* sqe = ::io_uring_get_sqe(sched.ring());
435 4980x if (!sqe)
436 {
437 // SQ ring full — flush to kernel and retry once.
438 ::io_uring_submit(sched.ring());
439 sqe = ::io_uring_get_sqe(sched.ring());
440 }
441
442 4980x if (!sqe)
443 {
444 // SQ stayed full after one flush — synchronous failure path.
445 // Surface EAGAIN and queue the op as completed so do_one
446 // dispatches the handler. The caller's work_started() already
447 // counted this op. (CAS path is not entered here.)
448 if (op->ec_out)
449 *op->ec_out = make_err(EAGAIN);
450 typename io_uring_scheduler::lock_type lock(sched.dispatch_mutex());
451 sched.push_completed_locked(op);
452 return;
453 }
454
455 4980x op->prep_func(op, sqe);
456 4980x ::io_uring_sqe_set_data(sqe, op);
457 // Count this op against the in-flight gate in do_one: it
458 // expects exactly one F_MORE-less CQE per submitted SQE
459 // (multishot ops decrement only on the terminal CQE).
460 4980x sched.inflight_inc();
461 // Release pairs with the acquire in io_uring_op::request_cancel:
462 // a stop_token firing after we release the mutex will see
463 // sqe_set==true and submit a cancel-by-user_data SQE.
464 4980x op->sqe_set.store(true, std::memory_order_release);
465
466 // First submitter in a batch wins the CAS and will post
467 // submit_sqes_op; others piggyback on the same flush.
468 4980x if (!sched.submit_op_posted_exchange(true))
469 182x need_post = true;
470 4980x }
471
472 4980x if (need_post)
473 {
474 // Flush is deferred to submit_sqes_op; post() owns the wake.
475 182x sched.post(&sched.submit_op_ref());
476 }
477 }
478
479 /** Readiness wait via `IORING_OP_POLL_ADD`.
480
481 Used to implement the `wait()` virtual for socket and acceptor
482 implementations. The op submits a one-shot poll on `fd` for the
483 requested set of poll flags (POLLIN / POLLOUT / POLLPRI|POLLERR|
484 POLLHUP) and reports completion without transferring any data.
485
486 The CQE's `res` carries the actual revents, but we surface only
487 success/cancel/error on `*ec_out` — callers of `wait()` just need
488 a readiness signal, not the specific event mask.
489 */
490 struct uring_wait_op : io_uring_op
491 {
492 int fd = -1;
493 int poll_flags = 0;
494
495 13586x uring_wait_op() noexcept
496 13586x : io_uring_op(&do_handler, &do_cqe, &do_prep)
497 13586x {}
498
499 /** Reset and initialize for a new submission. */
500 21x void prepare(
501 std::coroutine_handle<> handle,
502 capy::executor_ref executor,
503 std::error_code* ec,
504 int file_descriptor,
505 io_uring_scheduler* scheduler,
506 std::shared_ptr<void> impl,
507 int flags,
508 std::stop_token const& token) noexcept
509 {
510 21x h = handle;
511 21x ex = executor;
512 21x ec_out = ec;
513 21x bytes_out = nullptr;
514 21x fd = file_descriptor;
515 21x sched_ = scheduler;
516 21x impl_ptr = std::move(impl);
517 21x poll_flags = flags;
518 21x res = 0;
519 21x cqe_flags = 0;
520 21x start(token);
521 21x }
522
523 17x static void do_prep(io_uring_op* base, ::io_uring_sqe* sqe) noexcept
524 {
525 17x auto* self = static_cast<uring_wait_op*>(base);
526 17x ::io_uring_prep_poll_add(sqe, self->fd, self->poll_flags);
527 17x }
528
529 17x static void do_cqe(
530 io_uring_op* base, int res, unsigned flags,
531 op_queue& local) noexcept
532 {
533 17x auto* self = static_cast<uring_wait_op*>(base);
534 17x self->res = res;
535 17x self->cqe_flags = flags;
536 17x local.push(self);
537 17x }
538
539 21x static void do_handler(
540 void* owner, scheduler_op* base,
541 std::uint32_t /*bytes*/, std::uint32_t /*error*/) noexcept
542 {
543 21x auto* self = static_cast<uring_wait_op*>(base);
544 21x if (coro_drain_if_shutdown(owner, self))
545 return;
546
547 21x if (self->sched_)
548 21x self->sched_->reset_inline_budget();
549
550 // Wait reports only success/cancel/error — no bytes, no EOF.
551 36x decode_io_result(
552 self->ec_out,
553 21x self->cancelled.load(std::memory_order_acquire),
554 21x self->res < 0 ? make_err(-self->res) : std::error_code{},
555 /*is_read=*/false, /*bytes=*/0, /*empty_buffer=*/false);
556
557 21x coro_resume(self);
558 }
559 };
560
561 /** Non-blocking connect for Unix domain sockets via `IORING_OP_CONNECT`.
562
563 Like `uring_connect_op` but stores `local_endpoint` for the target
564 and out-pointers, since `sockaddr_to_local_endpoint` returns
565 `local_endpoint`, not `endpoint`.
566 */
567 struct uring_local_connect_op : io_uring_op
568 {
569 sockaddr_storage addr{};
570 socklen_t addrlen = 0;
571 int fd = -1;
572 corosio::local_endpoint target_endpoint{};
573 corosio::local_endpoint* remote_endpoint_out = nullptr;
574 corosio::local_endpoint* local_endpoint_out = nullptr;
575
576 149x uring_local_connect_op() noexcept
577 149x : io_uring_op(&do_handler, &do_cqe, &do_prep)
578 149x {}
579
580 /** Reset and initialize for a new submission.
581
582 Caller pre-fills `addr` and `addrlen` (see uring_connect_op::prepare).
583 */
584 14x void prepare(
585 std::coroutine_handle<> handle,
586 capy::executor_ref executor,
587 std::error_code* ec,
588 int file_descriptor,
589 io_uring_scheduler* scheduler,
590 std::shared_ptr<void> impl,
591 corosio::local_endpoint target,
592 corosio::local_endpoint* remote_out,
593 corosio::local_endpoint* local_out,
594 std::stop_token const& token) noexcept
595 {
596 14x h = handle;
597 14x ex = executor;
598 14x ec_out = ec;
599 14x bytes_out = nullptr;
600 14x fd = file_descriptor;
601 14x sched_ = scheduler;
602 14x impl_ptr = std::move(impl);
603 14x res = 0;
604 14x cqe_flags = 0;
605 14x target_endpoint = target;
606 14x remote_endpoint_out = remote_out;
607 14x local_endpoint_out = local_out;
608 14x start(token);
609 14x }
610
611 14x static void do_prep(io_uring_op* base, ::io_uring_sqe* sqe) noexcept
612 {
613 14x auto* self = static_cast<uring_local_connect_op*>(base);
614 14x ::io_uring_prep_connect(
615 sqe, self->fd,
616 14x reinterpret_cast<sockaddr const*>(&self->addr),
617 self->addrlen);
618 14x }
619
620 14x static void do_cqe(
621 io_uring_op* base, int res, unsigned flags,
622 op_queue& local) noexcept
623 {
624 14x auto* self = static_cast<uring_local_connect_op*>(base);
625 14x self->res = res;
626 14x self->cqe_flags = flags;
627 14x local.push(self);
628 14x }
629
630 14x static void do_handler(
631 void* owner, scheduler_op* base,
632 std::uint32_t /*bytes*/, std::uint32_t /*error*/) noexcept
633 {
634 14x auto* self = static_cast<uring_local_connect_op*>(base);
635 14x if (coro_drain_if_shutdown(owner, self))
636 return;
637
638 14x if (self->sched_)
639 14x self->sched_->reset_inline_budget();
640
641 14x uring_set_result(self, false, false);
642
643 // Write endpoints only on success.
644 14x if (self->res >= 0)
645 {
646 11x if (self->remote_endpoint_out)
647 11x *self->remote_endpoint_out = self->target_endpoint;
648 11x if (self->local_endpoint_out && self->fd >= 0)
649 {
650 11x sockaddr_storage local{};
651 11x socklen_t len = sizeof(local);
652 11x if (::getsockname(self->fd,
653 11x reinterpret_cast<sockaddr*>(&local), &len) == 0)
654 11x *self->local_endpoint_out =
655 11x sockaddr_to_local_endpoint(local, len);
656 }
657 }
658
659 14x coro_resume(self);
660 }
661 };
662
663 } // namespace boost::corosio::detail
664
665 #endif // BOOST_COROSIO_HAS_IO_URING
666
667 #endif // BOOST_COROSIO_NATIVE_DETAIL_IO_URING_IO_URING_SOCKET_OPS_HPP
668