include/boost/corosio/native/detail/io_uring/io_uring_socket_ops.hpp

88.7% Lines (228/257) 92.6% List of functions (25/27)
io_uring_socket_ops.hpp
f(x) Functions (27)
Function Calls Lines Blocks
boost::corosio::detail::uring_set_result(boost::corosio::detail::io_uring_op*, bool, bool) :60 20064x 100.0% 100.0% boost::corosio::detail::uring_read_op::uring_read_op() :84 9345x 100.0% 100.0% boost::corosio::detail::uring_read_op::prepare(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, std::error_code*, unsigned long*, int, boost::corosio::detail::io_uring_scheduler*, std::shared_ptr<void>, boost::corosio::detail::speculative_state*, boost::corosio::buffer_param, std::stop_token const&) :98 8507x 100.0% 100.0% boost::corosio::detail::uring_read_op::do_prep(boost::corosio::detail::io_uring_op*, io_uring_sqe*) :128 211x 71.4% 75.0% boost::corosio::detail::uring_read_op::do_cqe(boost::corosio::detail::io_uring_op*, int, unsigned int, boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&) :150 211x 100.0% 100.0% boost::corosio::detail::uring_read_op::do_handler(void*, boost::corosio::detail::scheduler_op*, unsigned int, unsigned int) :160 8507x 92.3% 92.0% boost::corosio::detail::uring_write_op::uring_write_op() :201 9345x 100.0% 100.0% boost::corosio::detail::uring_write_op::prepare(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, std::error_code*, unsigned long*, int, boost::corosio::detail::io_uring_scheduler*, std::shared_ptr<void>, boost::corosio::detail::speculative_state*, boost::corosio::buffer_param, std::stop_token const&) :206 8298x 100.0% 100.0% boost::corosio::detail::uring_write_op::do_prep(boost::corosio::detail::io_uring_op*, io_uring_sqe*) :242 0 0.0% 0.0% boost::corosio::detail::uring_write_op::do_cqe(boost::corosio::detail::io_uring_op*, int, unsigned int, boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&) :263 0 0.0% 0.0% boost::corosio::detail::uring_write_op::do_handler(void*, boost::corosio::detail::scheduler_op*, unsigned int, unsigned int) :273 8298x 92.3% 92.0% boost::corosio::detail::uring_connect_op::uring_connect_op() :315 9365x 100.0% 100.0% boost::corosio::detail::uring_connect_op::prepare(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, std::error_code*, int, boost::corosio::detail::io_uring_scheduler*, std::shared_ptr<void>, boost::corosio::endpoint, boost::corosio::endpoint*, boost::corosio::endpoint*, std::stop_token const&) :327 3078x 100.0% 100.0% boost::corosio::detail::uring_connect_op::do_prep(boost::corosio::detail::io_uring_op*, io_uring_sqe*) :355 3078x 100.0% 100.0% boost::corosio::detail::uring_connect_op::do_cqe(boost::corosio::detail::io_uring_op*, int, unsigned int, boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&) :364 3078x 100.0% 100.0% boost::corosio::detail::uring_connect_op::do_handler(void*, boost::corosio::detail::scheduler_op*, unsigned int, unsigned int) :374 3078x 94.1% 94.0% boost::corosio::detail::io_uring_submit_op(boost::corosio::detail::io_uring_scheduler&, boost::corosio::detail::io_uring_op*) :426 3671x 66.7% 63.0% boost::corosio::detail::uring_wait_op::uring_wait_op() :495 9662x 100.0% 100.0% boost::corosio::detail::uring_wait_op::prepare(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, std::error_code*, int, boost::corosio::detail::io_uring_scheduler*, std::shared_ptr<void>, int, std::stop_token const&) :500 21x 100.0% 100.0% boost::corosio::detail::uring_wait_op::do_prep(boost::corosio::detail::io_uring_op*, io_uring_sqe*) :523 17x 100.0% 100.0% boost::corosio::detail::uring_wait_op::do_cqe(boost::corosio::detail::io_uring_op*, int, unsigned int, boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&) :529 17x 100.0% 100.0% boost::corosio::detail::uring_wait_op::do_handler(void*, boost::corosio::detail::scheduler_op*, unsigned int, unsigned int) :539 21x 90.0% 92.0% boost::corosio::detail::uring_local_connect_op::uring_local_connect_op() :576 149x 100.0% 100.0% boost::corosio::detail::uring_local_connect_op::prepare(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, std::error_code*, int, boost::corosio::detail::io_uring_scheduler*, std::shared_ptr<void>, boost::corosio::local_endpoint, boost::corosio::local_endpoint*, boost::corosio::local_endpoint*, std::stop_token const&) :584 14x 100.0% 100.0% boost::corosio::detail::uring_local_connect_op::do_prep(boost::corosio::detail::io_uring_op*, io_uring_sqe*) :611 14x 100.0% 100.0% boost::corosio::detail::uring_local_connect_op::do_cqe(boost::corosio::detail::io_uring_op*, int, unsigned int, boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&) :620 14x 100.0% 100.0% boost::corosio::detail::uring_local_connect_op::do_handler(void*, boost::corosio::detail::scheduler_op*, unsigned int, unsigned int) :630 14x 94.4% 94.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_IO_URING_IO_URING_SOCKET_OPS_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_IO_URING_IO_URING_SOCKET_OPS_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_IO_URING
16
17 #include <liburing.h>
18
19 #include <boost/capy/buffers.hpp>
20 #include <boost/capy/error.hpp>
21 #include <boost/corosio/detail/buffer_param.hpp>
22 #include <boost/corosio/detail/dispatch_coro.hpp>
23 #include <boost/corosio/local_endpoint.hpp>
24 #include <boost/corosio/native/detail/io_uring/io_uring_buffer.hpp>
25 #include <boost/corosio/native/detail/io_uring/io_uring_op.hpp>
26 #include <boost/corosio/native/detail/io_uring/io_uring_scheduler.hpp>
27 #include <boost/corosio/native/detail/coro_op_complete.hpp>
28 #include <boost/corosio/native/detail/make_err.hpp>
29 #include <boost/corosio/native/detail/speculative_state.hpp>
30
31 #include <system_error>
32
33 #include <netinet/in.h>
34 #include <poll.h>
35 #include <sys/socket.h>
36 #include <sys/uio.h>
37
38 namespace boost::corosio::detail {
39
40 /// Maximum scatter/gather segments per read/write/dgram op.
41 ///
42 /// Bounded well below `IOV_MAX` (1024 on Linux) so each op's
43 /// `iovec[io_uring_max_iov]` lives inside the io_uring_op object on
44 /// the same allocation as the rest of its state. Plan 4's registered-
45 /// buffer work will revisit; until then 16 covers typical scatter use
46 /// cases (fragmented buffers from buffer_sequence) without bloating
47 /// per-op memory.
48 inline constexpr std::size_t io_uring_max_iov = 16;
49
50 /** Resolve ec_out/bytes_out from a CQE result for a completed I/O op.
51
52 Shared by read, write, and connect handlers. For reads, `res == 0`
53 with a non-empty buffer means the peer closed the connection (EOF).
54
55 @param self The completed op.
56 @param is_read True if this is a receive/read operation.
57 @param empty_buf True if the submitted buffer was zero-length.
58 */
59 inline void
60 20064x uring_set_result(io_uring_op* self, bool is_read, bool empty_buf) noexcept
61 {
62 40015x decode_io_result(
63 self->ec_out,
64 20064x self->cancelled.load(std::memory_order_acquire),
65 113x self->res < 0 ? make_err(-self->res) : std::error_code{},
66 is_read,
67 20064x self->res >= 0 ? static_cast<std::size_t>(self->res) : 0u,
68 empty_buf);
69 20064x }
70
71 /** Scatter-gather read via `IORING_OP_READV`.
72
73 @par Handler dispatch
74 do_cqe captures `res`/`cqe_flags` and queues self into `local`;
75 do_handler runs from the scheduler queue and resumes the coroutine.
76 */
77 struct uring_read_op : io_uring_op
78 {
79 iovec iovecs[io_uring_max_iov];
80 int iovec_count = 0;
81 int fd = -1;
82 detail::speculative_state* spec_state = nullptr;
83
84 9345x uring_read_op() noexcept
85 9345x : io_uring_op(&do_handler, &do_cqe, &do_prep)
86 {
87 9345x is_read = true;
88 9345x }
89
90 /** Reset and initialize for a new submission.
91
92 Embedded ops are reused across calls; every mutable field the
93 handler may read must be re-initialized here. `start(token)`
94 also resets `cancelled`, `sqe_set`, and `stop_cb`.
95
96 @pre This slot has no in-flight op (its prior op completed).
97 */
98 8507x void prepare(
99 std::coroutine_handle<> handle,
100 capy::executor_ref executor,
101 std::error_code* ec,
102 std::size_t* bytes,
103 int file_descriptor,
104 io_uring_scheduler* scheduler,
105 std::shared_ptr<void> impl,
106 detail::speculative_state* spec,
107 buffer_param buffers,
108 std::stop_token const& token) noexcept
109 {
110 8507x h = handle;
111 8507x ex = executor;
112 8507x ec_out = ec;
113 8507x bytes_out = bytes;
114 8507x fd = file_descriptor;
115 8507x sched_ = scheduler;
116 8507x impl_ptr = std::move(impl);
117 8507x spec_state = spec;
118 8507x res = 0;
119 8507x cqe_flags = 0;
120 8507x iovec_count = static_cast<int>(
121 8507x buffers.copy_to(
122 8507x reinterpret_cast<capy::mutable_buffer*>(iovecs),
123 io_uring_max_iov));
124 8507x empty_buffer = (iovec_count == 0);
125 8507x start(token);
126 8507x }
127
128 211x static void do_prep(io_uring_op* base, ::io_uring_sqe* sqe) noexcept
129 {
130 211x auto* self = static_cast<uring_read_op*>(base);
131 // Single-buffer fast path: IORING_OP_RECV with a flat
132 // (buffer, length) skips the iovec-array indirection that
133 // IORING_OP_READV pays. For multi-iovec scatter reads, fall
134 // back to readv.
135 211x if (self->iovec_count == 1)
136 {
137 211x ::io_uring_prep_recv(
138 sqe, self->fd,
139 self->iovecs[0].iov_base,
140 self->iovecs[0].iov_len,
141 0);
142 }
143 else
144 {
145 ::io_uring_prep_readv(
146 sqe, self->fd, self->iovecs, self->iovec_count, 0);
147 }
148 211x }
149
150 211x static void do_cqe(
151 io_uring_op* base, int res, unsigned flags,
152 op_queue& local) noexcept
153 {
154 211x auto* self = static_cast<uring_read_op*>(base);
155 211x self->res = res;
156 211x self->cqe_flags = flags;
157 211x local.push(self);
158 211x }
159
160 8507x static void do_handler(
161 void* owner, scheduler_op* base,
162 std::uint32_t /*bytes*/, std::uint32_t /*error*/) noexcept
163 {
164 8507x auto* self = static_cast<uring_read_op*>(base);
165 8507x if (coro_drain_if_shutdown(owner, self))
166 return;
167
168 8507x if (self->sched_)
169 8507x self->sched_->reset_inline_budget();
170
171 8507x uring_set_result(self, true, self->empty_buffer);
172
173 8507x if (self->res > 0 && self->spec_state)
174 {
175 // Kernel signalled readiness — restore speculation.
176 8402x self->spec_state->on_async_read_ready();
177 }
178
179 8507x if (self->bytes_out)
180 8507x *self->bytes_out =
181 8507x self->res >= 0 ? static_cast<std::size_t>(self->res) : 0u;
182
183 8507x coro_resume(self);
184 // suicide drops here; may destroy impl + self.
185 }
186 };
187
188 /** Scatter-gather write via `IORING_OP_SENDMSG` with `MSG_NOSIGNAL`.
189
190 `MSG_NOSIGNAL` prevents `SIGPIPE` when the peer has closed the
191 connection; the error is surfaced as `EPIPE` instead.
192 */
193 struct uring_write_op : io_uring_op
194 {
195 iovec iovecs[io_uring_max_iov];
196 int iovec_count = 0;
197 int fd = -1;
198 msghdr msg{};
199 detail::speculative_state* spec_state = nullptr;
200
201 9345x uring_write_op() noexcept
202 9345x : io_uring_op(&do_handler, &do_cqe, &do_prep)
203 9345x {}
204
205 /** Reset and initialize for a new submission. See uring_read_op::prepare. */
206 8298x void prepare(
207 std::coroutine_handle<> handle,
208 capy::executor_ref executor,
209 std::error_code* ec,
210 std::size_t* bytes,
211 int file_descriptor,
212 io_uring_scheduler* scheduler,
213 std::shared_ptr<void> impl,
214 detail::speculative_state* spec,
215 buffer_param buffers,
216 std::stop_token const& token) noexcept
217 {
218 8298x h = handle;
219 8298x ex = executor;
220 8298x ec_out = ec;
221 8298x bytes_out = bytes;
222 8298x fd = file_descriptor;
223 8298x sched_ = scheduler;
224 8298x impl_ptr = std::move(impl);
225 8298x spec_state = spec;
226 8298x res = 0;
227 8298x cqe_flags = 0;
228 8298x iovec_count = static_cast<int>(
229 8298x buffers.copy_to(
230 8298x reinterpret_cast<capy::mutable_buffer*>(iovecs),
231 io_uring_max_iov));
232 8298x empty_buffer = (iovec_count == 0);
233 8298x if (!empty_buffer)
234 {
235 8298x msg = {};
236 8298x msg.msg_iov = iovecs;
237 8298x msg.msg_iovlen = static_cast<decltype(msg.msg_iovlen)>(iovec_count);
238 }
239 8298x start(token);
240 8298x }
241
242 static void do_prep(io_uring_op* base, ::io_uring_sqe* sqe) noexcept
243 {
244 auto* self = static_cast<uring_write_op*>(base);
245 // Single-buffer fast path: IORING_OP_SEND with MSG_NOSIGNAL
246 // skips the msghdr indirection that IORING_OP_SENDMSG pays.
247 // For multi-iovec scatter writes, fall back to sendmsg.
248 if (self->iovec_count == 1)
249 {
250 ::io_uring_prep_send(
251 sqe, self->fd,
252 self->iovecs[0].iov_base,
253 self->iovecs[0].iov_len,
254 MSG_NOSIGNAL);
255 }
256 else
257 {
258 ::io_uring_prep_sendmsg(
259 sqe, self->fd, &self->msg, MSG_NOSIGNAL);
260 }
261 }
262
263 static void do_cqe(
264 io_uring_op* base, int res, unsigned flags,
265 op_queue& local) noexcept
266 {
267 auto* self = static_cast<uring_write_op*>(base);
268 self->res = res;
269 self->cqe_flags = flags;
270 local.push(self);
271 }
272
273 8298x static void do_handler(
274 void* owner, scheduler_op* base,
275 std::uint32_t /*bytes*/, std::uint32_t /*error*/) noexcept
276 {
277 8298x auto* self = static_cast<uring_write_op*>(base);
278 8298x if (coro_drain_if_shutdown(owner, self))
279 return;
280
281 8298x if (self->sched_)
282 8298x self->sched_->reset_inline_budget();
283
284 8298x uring_set_result(self, false, self->empty_buffer);
285
286 8298x if (self->res > 0 && self->spec_state)
287 {
288 // Kernel signalled readiness — restore speculation.
289 8298x self->spec_state->on_async_write_ready();
290 }
291
292 8298x if (self->bytes_out)
293 8298x *self->bytes_out =
294 8298x self->res >= 0 ? static_cast<std::size_t>(self->res) : 0u;
295
296 8298x coro_resume(self);
297 }
298 };
299
300 /** Non-blocking connect via `IORING_OP_CONNECT`.
301
302 Negative `res` is the connect error; zero means success.
303 `remote_endpoint_out` is written only on success so a failed
304 connect does not corrupt the socket's cached remote endpoint.
305 */
306 struct uring_connect_op : io_uring_op
307 {
308 sockaddr_storage addr{};
309 socklen_t addrlen = 0;
310 int fd = -1;
311 endpoint target_endpoint{};
312 endpoint* remote_endpoint_out = nullptr;
313 endpoint* local_endpoint_out = nullptr;
314
315 9365x uring_connect_op() noexcept
316 9365x : io_uring_op(&do_handler, &do_cqe, &do_prep)
317 9365x {}
318
319 /** Reset and initialize for a new submission.
320
321 The caller must fill `addr` and `addrlen` before calling this
322 (typically via `to_sockaddr(ep, family, conn_.addr)` which
323 returns the addrlen) — `to_sockaddr` is the family-aware
324 helper and requires the socket family which is known to the
325 caller, not the op.
326 */
327 3078x void prepare(
328 std::coroutine_handle<> handle,
329 capy::executor_ref executor,
330 std::error_code* ec,
331 int file_descriptor,
332 io_uring_scheduler* scheduler,
333 std::shared_ptr<void> impl,
334 endpoint target,
335 endpoint* remote_out,
336 endpoint* local_out,
337 std::stop_token const& token) noexcept
338 {
339 3078x h = handle;
340 3078x ex = executor;
341 3078x ec_out = ec;
342 3078x bytes_out = nullptr;
343 3078x fd = file_descriptor;
344 3078x sched_ = scheduler;
345 3078x impl_ptr = std::move(impl);
346 3078x res = 0;
347 3078x cqe_flags = 0;
348 3078x target_endpoint = target;
349 3078x remote_endpoint_out = remote_out;
350 3078x local_endpoint_out = local_out;
351 // addr / addrlen are pre-filled by the caller.
352 3078x start(token);
353 3078x }
354
355 3078x static void do_prep(io_uring_op* base, ::io_uring_sqe* sqe) noexcept
356 {
357 3078x auto* self = static_cast<uring_connect_op*>(base);
358 3078x ::io_uring_prep_connect(
359 sqe, self->fd,
360 3078x reinterpret_cast<sockaddr const*>(&self->addr),
361 self->addrlen);
362 3078x }
363
364 3078x static void do_cqe(
365 io_uring_op* base, int res, unsigned flags,
366 op_queue& local) noexcept
367 {
368 3078x auto* self = static_cast<uring_connect_op*>(base);
369 3078x self->res = res;
370 3078x self->cqe_flags = flags;
371 3078x local.push(self);
372 3078x }
373
374 3078x static void do_handler(
375 void* owner, scheduler_op* base,
376 std::uint32_t /*bytes*/, std::uint32_t /*error*/) noexcept
377 {
378 3078x auto* self = static_cast<uring_connect_op*>(base);
379 3078x if (coro_drain_if_shutdown(owner, self))
380 return;
381
382 3078x if (self->sched_)
383 3078x self->sched_->reset_inline_budget();
384
385 3078x uring_set_result(self, false, false);
386
387 // Write endpoints only on success.
388 3078x if (self->res >= 0)
389 {
390 3070x if (self->remote_endpoint_out)
391 3070x *self->remote_endpoint_out = self->target_endpoint;
392 3070x if (self->local_endpoint_out && self->fd >= 0)
393 {
394 3070x sockaddr_storage local{};
395 3070x socklen_t len = sizeof(local);
396 3070x if (::getsockname(self->fd,
397 3070x reinterpret_cast<sockaddr*>(&local), &len) == 0)
398 3070x *self->local_endpoint_out = sockaddr_to_endpoint(local);
399 }
400 }
401
402 3078x coro_resume(self);
403 }
404 };
405
406 /** Submit an `io_uring_op` whose `prep_func` is set.
407
408 Acquires the ring mutex, prepares the SQE, and (under the same
409 mutex) CAS-sets `submit_op_posted_`. The first submitter of a
410 batch wins the CAS and posts the scheduler's `submit_sqes_op`,
411 which later flushes all queued SQEs in a single
412 `io_uring_submit_and_get_events` call and drains any ready CQEs.
413 Subsequent submitters in the same batch piggyback — their SQEs
414 sit in the user-space SQ ring until that op dispatches.
415
416 On SQ-ring exhaustion (after one flush retry), surfaces `EAGAIN`
417 on `*op->ec_out` and queues the op as completed so its handler
418 dispatches on the next `do_one` cycle.
419
420 @pre `op->prep_func != nullptr`.
421
422 @par Exception Safety
423 Nothrow.
424 */
425 inline void
426 3671x io_uring_submit_op(io_uring_scheduler& sched, io_uring_op* op) noexcept
427 {
428 3671x sched.lazy_init_ring();
429
430 3671x bool need_post = false;
431 {
432 3671x typename io_uring_scheduler::lock_type ring_lock(sched.ring_mutex());
433
434 3671x ::io_uring_sqe* sqe = ::io_uring_get_sqe(sched.ring());
435 3671x if (!sqe)
436 {
437 // SQ ring full — flush to kernel and retry once.
438 ::io_uring_submit(sched.ring());
439 sqe = ::io_uring_get_sqe(sched.ring());
440 }
441
442 3671x if (!sqe)
443 {
444 // SQ stayed full after one flush — synchronous failure path.
445 // Surface EAGAIN and queue the op as completed so do_one
446 // dispatches the handler. The caller's work_started() already
447 // counted this op. (CAS path is not entered here.)
448 if (op->ec_out)
449 *op->ec_out = make_err(EAGAIN);
450 typename io_uring_scheduler::lock_type lock(sched.dispatch_mutex());
451 sched.push_completed_locked(op);
452 return;
453 }
454
455 3671x op->prep_func(op, sqe);
456 3671x ::io_uring_sqe_set_data(sqe, op);
457 // Count this op against the in-flight gate in do_one: it
458 // expects exactly one F_MORE-less CQE per submitted SQE
459 // (multishot ops decrement only on the terminal CQE).
460 3671x sched.inflight_inc();
461 // Release pairs with the acquire in io_uring_op::request_cancel:
462 // a stop_token firing after we release the mutex will see
463 // sqe_set==true and submit a cancel-by-user_data SQE.
464 3671x op->sqe_set.store(true, std::memory_order_release);
465
466 // First submitter in a batch wins the CAS and will post
467 // submit_sqes_op; others piggyback on the same flush.
468 3671x if (!sched.submit_op_posted_exchange(true))
469 182x need_post = true;
470 3671x }
471
472 3671x if (need_post)
473 {
474 // Flush is deferred to submit_sqes_op; post() owns the wake.
475 182x sched.post(&sched.submit_op_ref());
476 }
477 }
478
479 /** Readiness wait via `IORING_OP_POLL_ADD`.
480
481 Used to implement the `wait()` virtual for socket and acceptor
482 implementations. The op submits a one-shot poll on `fd` for the
483 requested set of poll flags (POLLIN / POLLOUT / POLLPRI|POLLERR|
484 POLLHUP) and reports completion without transferring any data.
485
486 The CQE's `res` carries the actual revents, but we surface only
487 success/cancel/error on `*ec_out` — callers of `wait()` just need
488 a readiness signal, not the specific event mask.
489 */
490 struct uring_wait_op : io_uring_op
491 {
492 int fd = -1;
493 int poll_flags = 0;
494
495 9662x uring_wait_op() noexcept
496 9662x : io_uring_op(&do_handler, &do_cqe, &do_prep)
497 9662x {}
498
499 /** Reset and initialize for a new submission. */
500 21x void prepare(
501 std::coroutine_handle<> handle,
502 capy::executor_ref executor,
503 std::error_code* ec,
504 int file_descriptor,
505 io_uring_scheduler* scheduler,
506 std::shared_ptr<void> impl,
507 int flags,
508 std::stop_token const& token) noexcept
509 {
510 21x h = handle;
511 21x ex = executor;
512 21x ec_out = ec;
513 21x bytes_out = nullptr;
514 21x fd = file_descriptor;
515 21x sched_ = scheduler;
516 21x impl_ptr = std::move(impl);
517 21x poll_flags = flags;
518 21x res = 0;
519 21x cqe_flags = 0;
520 21x start(token);
521 21x }
522
523 17x static void do_prep(io_uring_op* base, ::io_uring_sqe* sqe) noexcept
524 {
525 17x auto* self = static_cast<uring_wait_op*>(base);
526 17x ::io_uring_prep_poll_add(sqe, self->fd, self->poll_flags);
527 17x }
528
529 17x static void do_cqe(
530 io_uring_op* base, int res, unsigned flags,
531 op_queue& local) noexcept
532 {
533 17x auto* self = static_cast<uring_wait_op*>(base);
534 17x self->res = res;
535 17x self->cqe_flags = flags;
536 17x local.push(self);
537 17x }
538
539 21x static void do_handler(
540 void* owner, scheduler_op* base,
541 std::uint32_t /*bytes*/, std::uint32_t /*error*/) noexcept
542 {
543 21x auto* self = static_cast<uring_wait_op*>(base);
544 21x if (coro_drain_if_shutdown(owner, self))
545 return;
546
547 21x if (self->sched_)
548 21x self->sched_->reset_inline_budget();
549
550 // Wait reports only success/cancel/error — no bytes, no EOF.
551 36x decode_io_result(
552 self->ec_out,
553 21x self->cancelled.load(std::memory_order_acquire),
554 21x self->res < 0 ? make_err(-self->res) : std::error_code{},
555 /*is_read=*/false, /*bytes=*/0, /*empty_buffer=*/false);
556
557 21x coro_resume(self);
558 }
559 };
560
561 /** Non-blocking connect for Unix domain sockets via `IORING_OP_CONNECT`.
562
563 Like `uring_connect_op` but stores `local_endpoint` for the target
564 and out-pointers, since `sockaddr_to_local_endpoint` returns
565 `local_endpoint`, not `endpoint`.
566 */
567 struct uring_local_connect_op : io_uring_op
568 {
569 sockaddr_storage addr{};
570 socklen_t addrlen = 0;
571 int fd = -1;
572 corosio::local_endpoint target_endpoint{};
573 corosio::local_endpoint* remote_endpoint_out = nullptr;
574 corosio::local_endpoint* local_endpoint_out = nullptr;
575
576 149x uring_local_connect_op() noexcept
577 149x : io_uring_op(&do_handler, &do_cqe, &do_prep)
578 149x {}
579
580 /** Reset and initialize for a new submission.
581
582 Caller pre-fills `addr` and `addrlen` (see uring_connect_op::prepare).
583 */
584 14x void prepare(
585 std::coroutine_handle<> handle,
586 capy::executor_ref executor,
587 std::error_code* ec,
588 int file_descriptor,
589 io_uring_scheduler* scheduler,
590 std::shared_ptr<void> impl,
591 corosio::local_endpoint target,
592 corosio::local_endpoint* remote_out,
593 corosio::local_endpoint* local_out,
594 std::stop_token const& token) noexcept
595 {
596 14x h = handle;
597 14x ex = executor;
598 14x ec_out = ec;
599 14x bytes_out = nullptr;
600 14x fd = file_descriptor;
601 14x sched_ = scheduler;
602 14x impl_ptr = std::move(impl);
603 14x res = 0;
604 14x cqe_flags = 0;
605 14x target_endpoint = target;
606 14x remote_endpoint_out = remote_out;
607 14x local_endpoint_out = local_out;
608 14x start(token);
609 14x }
610
611 14x static void do_prep(io_uring_op* base, ::io_uring_sqe* sqe) noexcept
612 {
613 14x auto* self = static_cast<uring_local_connect_op*>(base);
614 14x ::io_uring_prep_connect(
615 sqe, self->fd,
616 14x reinterpret_cast<sockaddr const*>(&self->addr),
617 self->addrlen);
618 14x }
619
620 14x static void do_cqe(
621 io_uring_op* base, int res, unsigned flags,
622 op_queue& local) noexcept
623 {
624 14x auto* self = static_cast<uring_local_connect_op*>(base);
625 14x self->res = res;
626 14x self->cqe_flags = flags;
627 14x local.push(self);
628 14x }
629
630 14x static void do_handler(
631 void* owner, scheduler_op* base,
632 std::uint32_t /*bytes*/, std::uint32_t /*error*/) noexcept
633 {
634 14x auto* self = static_cast<uring_local_connect_op*>(base);
635 14x if (coro_drain_if_shutdown(owner, self))
636 return;
637
638 14x if (self->sched_)
639 14x self->sched_->reset_inline_budget();
640
641 14x uring_set_result(self, false, false);
642
643 // Write endpoints only on success.
644 14x if (self->res >= 0)
645 {
646 11x if (self->remote_endpoint_out)
647 11x *self->remote_endpoint_out = self->target_endpoint;
648 11x if (self->local_endpoint_out && self->fd >= 0)
649 {
650 11x sockaddr_storage local{};
651 11x socklen_t len = sizeof(local);
652 11x if (::getsockname(self->fd,
653 11x reinterpret_cast<sockaddr*>(&local), &len) == 0)
654 11x *self->local_endpoint_out =
655 11x sockaddr_to_local_endpoint(local, len);
656 }
657 }
658
659 14x coro_resume(self);
660 }
661 };
662
663 } // namespace boost::corosio::detail
664
665 #endif // BOOST_COROSIO_HAS_IO_URING
666
667 #endif // BOOST_COROSIO_NATIVE_DETAIL_IO_URING_IO_URING_SOCKET_OPS_HPP
668