include/boost/corosio/native/detail/io_uring/io_uring_socket_ops.hpp
88.7% Lines (228/257)
92.6% List of functions (25/27)
Functions (27)
Function
Calls
Lines
Blocks
boost::corosio::detail::uring_set_result(boost::corosio::detail::io_uring_op*, bool, bool)
:60
27179x
100.0%
100.0%
boost::corosio::detail::uring_read_op::uring_read_op()
:84
13269x
100.0%
100.0%
boost::corosio::detail::uring_read_op::prepare(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, std::error_code*, unsigned long*, int, boost::corosio::detail::io_uring_scheduler*, std::shared_ptr<void>, boost::corosio::detail::speculative_state*, boost::corosio::buffer_param, std::stop_token const&)
:98
11411x
100.0%
100.0%
boost::corosio::detail::uring_read_op::do_prep(boost::corosio::detail::io_uring_op*, io_uring_sqe*)
:128
212x
71.4%
75.0%
boost::corosio::detail::uring_read_op::do_cqe(boost::corosio::detail::io_uring_op*, int, unsigned int, boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&)
:150
212x
100.0%
100.0%
boost::corosio::detail::uring_read_op::do_handler(void*, boost::corosio::detail::scheduler_op*, unsigned int, unsigned int)
:160
11411x
92.3%
92.0%
boost::corosio::detail::uring_write_op::uring_write_op()
:201
13269x
100.0%
100.0%
boost::corosio::detail::uring_write_op::prepare(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, std::error_code*, unsigned long*, int, boost::corosio::detail::io_uring_scheduler*, std::shared_ptr<void>, boost::corosio::detail::speculative_state*, boost::corosio::buffer_param, std::stop_token const&)
:206
11201x
100.0%
100.0%
boost::corosio::detail::uring_write_op::do_prep(boost::corosio::detail::io_uring_op*, io_uring_sqe*)
:242
0
0.0%
0.0%
boost::corosio::detail::uring_write_op::do_cqe(boost::corosio::detail::io_uring_op*, int, unsigned int, boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&)
:263
0
0.0%
0.0%
boost::corosio::detail::uring_write_op::do_handler(void*, boost::corosio::detail::scheduler_op*, unsigned int, unsigned int)
:273
11201x
92.3%
92.0%
boost::corosio::detail::uring_connect_op::uring_connect_op()
:315
13289x
100.0%
100.0%
boost::corosio::detail::uring_connect_op::prepare(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, std::error_code*, int, boost::corosio::detail::io_uring_scheduler*, std::shared_ptr<void>, boost::corosio::endpoint, boost::corosio::endpoint*, boost::corosio::endpoint*, std::stop_token const&)
:327
4386x
100.0%
100.0%
boost::corosio::detail::uring_connect_op::do_prep(boost::corosio::detail::io_uring_op*, io_uring_sqe*)
:355
4386x
100.0%
100.0%
boost::corosio::detail::uring_connect_op::do_cqe(boost::corosio::detail::io_uring_op*, int, unsigned int, boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&)
:364
4386x
100.0%
100.0%
boost::corosio::detail::uring_connect_op::do_handler(void*, boost::corosio::detail::scheduler_op*, unsigned int, unsigned int)
:374
4386x
94.1%
94.0%
boost::corosio::detail::io_uring_submit_op(boost::corosio::detail::io_uring_scheduler&, boost::corosio::detail::io_uring_op*)
:426
4980x
66.7%
63.0%
boost::corosio::detail::uring_wait_op::uring_wait_op()
:495
13586x
100.0%
100.0%
boost::corosio::detail::uring_wait_op::prepare(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, std::error_code*, int, boost::corosio::detail::io_uring_scheduler*, std::shared_ptr<void>, int, std::stop_token const&)
:500
21x
100.0%
100.0%
boost::corosio::detail::uring_wait_op::do_prep(boost::corosio::detail::io_uring_op*, io_uring_sqe*)
:523
17x
100.0%
100.0%
boost::corosio::detail::uring_wait_op::do_cqe(boost::corosio::detail::io_uring_op*, int, unsigned int, boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&)
:529
17x
100.0%
100.0%
boost::corosio::detail::uring_wait_op::do_handler(void*, boost::corosio::detail::scheduler_op*, unsigned int, unsigned int)
:539
21x
90.0%
92.0%
boost::corosio::detail::uring_local_connect_op::uring_local_connect_op()
:576
149x
100.0%
100.0%
boost::corosio::detail::uring_local_connect_op::prepare(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, std::error_code*, int, boost::corosio::detail::io_uring_scheduler*, std::shared_ptr<void>, boost::corosio::local_endpoint, boost::corosio::local_endpoint*, boost::corosio::local_endpoint*, std::stop_token const&)
:584
14x
100.0%
100.0%
boost::corosio::detail::uring_local_connect_op::do_prep(boost::corosio::detail::io_uring_op*, io_uring_sqe*)
:611
14x
100.0%
100.0%
boost::corosio::detail::uring_local_connect_op::do_cqe(boost::corosio::detail::io_uring_op*, int, unsigned int, boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&)
:620
14x
100.0%
100.0%
boost::corosio::detail::uring_local_connect_op::do_handler(void*, boost::corosio::detail::scheduler_op*, unsigned int, unsigned int)
:630
14x
94.4%
94.0%
| Line | TLA | Hits | Source Code |
|---|---|---|---|
| 1 | // | ||
| 2 | // Copyright (c) 2026 Steve Gerbino | ||
| 3 | // | ||
| 4 | // Distributed under the Boost Software License, Version 1.0. (See accompanying | ||
| 5 | // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) | ||
| 6 | // | ||
| 7 | // Official repository: https://github.com/cppalliance/corosio | ||
| 8 | // | ||
| 9 | |||
| 10 | #ifndef BOOST_COROSIO_NATIVE_DETAIL_IO_URING_IO_URING_SOCKET_OPS_HPP | ||
| 11 | #define BOOST_COROSIO_NATIVE_DETAIL_IO_URING_IO_URING_SOCKET_OPS_HPP | ||
| 12 | |||
| 13 | #include <boost/corosio/detail/platform.hpp> | ||
| 14 | |||
| 15 | #if BOOST_COROSIO_HAS_IO_URING | ||
| 16 | |||
| 17 | #include <liburing.h> | ||
| 18 | |||
| 19 | #include <boost/capy/buffers.hpp> | ||
| 20 | #include <boost/capy/error.hpp> | ||
| 21 | #include <boost/corosio/detail/buffer_param.hpp> | ||
| 22 | #include <boost/corosio/detail/dispatch_coro.hpp> | ||
| 23 | #include <boost/corosio/local_endpoint.hpp> | ||
| 24 | #include <boost/corosio/native/detail/io_uring/io_uring_buffer.hpp> | ||
| 25 | #include <boost/corosio/native/detail/io_uring/io_uring_op.hpp> | ||
| 26 | #include <boost/corosio/native/detail/io_uring/io_uring_scheduler.hpp> | ||
| 27 | #include <boost/corosio/native/detail/coro_op_complete.hpp> | ||
| 28 | #include <boost/corosio/native/detail/make_err.hpp> | ||
| 29 | #include <boost/corosio/native/detail/speculative_state.hpp> | ||
| 30 | |||
| 31 | #include <system_error> | ||
| 32 | |||
| 33 | #include <netinet/in.h> | ||
| 34 | #include <poll.h> | ||
| 35 | #include <sys/socket.h> | ||
| 36 | #include <sys/uio.h> | ||
| 37 | |||
| 38 | namespace boost::corosio::detail { | ||
| 39 | |||
| 40 | /// Maximum scatter/gather segments per read/write/dgram op. | ||
| 41 | /// | ||
| 42 | /// Bounded well below `IOV_MAX` (1024 on Linux) so each op's | ||
| 43 | /// `iovec[io_uring_max_iov]` lives inside the io_uring_op object on | ||
| 44 | /// the same allocation as the rest of its state. Plan 4's registered- | ||
| 45 | /// buffer work will revisit; until then 16 covers typical scatter use | ||
| 46 | /// cases (fragmented buffers from buffer_sequence) without bloating | ||
| 47 | /// per-op memory. | ||
| 48 | inline constexpr std::size_t io_uring_max_iov = 16; | ||
| 49 | |||
| 50 | /** Resolve ec_out/bytes_out from a CQE result for a completed I/O op. | ||
| 51 | |||
| 52 | Shared by read, write, and connect handlers. For reads, `res == 0` | ||
| 53 | with a non-empty buffer means the peer closed the connection (EOF). | ||
| 54 | |||
| 55 | @param self The completed op. | ||
| 56 | @param is_read True if this is a receive/read operation. | ||
| 57 | @param empty_buf True if the submitted buffer was zero-length. | ||
| 58 | */ | ||
| 59 | inline void | ||
| 60 | 27179x | uring_set_result(io_uring_op* self, bool is_read, bool empty_buf) noexcept | |
| 61 | { | ||
| 62 | 54244x | decode_io_result( | |
| 63 | self->ec_out, | ||
| 64 | 27179x | self->cancelled.load(std::memory_order_acquire), | |
| 65 | 114x | self->res < 0 ? make_err(-self->res) : std::error_code{}, | |
| 66 | is_read, | ||
| 67 | 27179x | self->res >= 0 ? static_cast<std::size_t>(self->res) : 0u, | |
| 68 | empty_buf); | ||
| 69 | 27179x | } | |
| 70 | |||
| 71 | /** Scatter-gather read via `IORING_OP_READV`. | ||
| 72 | |||
| 73 | @par Handler dispatch | ||
| 74 | do_cqe captures `res`/`cqe_flags` and queues self into `local`; | ||
| 75 | do_handler runs from the scheduler queue and resumes the coroutine. | ||
| 76 | */ | ||
| 77 | struct uring_read_op : io_uring_op | ||
| 78 | { | ||
| 79 | iovec iovecs[io_uring_max_iov]; | ||
| 80 | int iovec_count = 0; | ||
| 81 | int fd = -1; | ||
| 82 | detail::speculative_state* spec_state = nullptr; | ||
| 83 | |||
| 84 | 13269x | uring_read_op() noexcept | |
| 85 | 13269x | : io_uring_op(&do_handler, &do_cqe, &do_prep) | |
| 86 | { | ||
| 87 | 13269x | is_read = true; | |
| 88 | 13269x | } | |
| 89 | |||
| 90 | /** Reset and initialize for a new submission. | ||
| 91 | |||
| 92 | Embedded ops are reused across calls; every mutable field the | ||
| 93 | handler may read must be re-initialized here. `start(token)` | ||
| 94 | also resets `cancelled`, `sqe_set`, and `stop_cb`. | ||
| 95 | |||
| 96 | @pre This slot has no in-flight op (its prior op completed). | ||
| 97 | */ | ||
| 98 | 11411x | void prepare( | |
| 99 | std::coroutine_handle<> handle, | ||
| 100 | capy::executor_ref executor, | ||
| 101 | std::error_code* ec, | ||
| 102 | std::size_t* bytes, | ||
| 103 | int file_descriptor, | ||
| 104 | io_uring_scheduler* scheduler, | ||
| 105 | std::shared_ptr<void> impl, | ||
| 106 | detail::speculative_state* spec, | ||
| 107 | buffer_param buffers, | ||
| 108 | std::stop_token const& token) noexcept | ||
| 109 | { | ||
| 110 | 11411x | h = handle; | |
| 111 | 11411x | ex = executor; | |
| 112 | 11411x | ec_out = ec; | |
| 113 | 11411x | bytes_out = bytes; | |
| 114 | 11411x | fd = file_descriptor; | |
| 115 | 11411x | sched_ = scheduler; | |
| 116 | 11411x | impl_ptr = std::move(impl); | |
| 117 | 11411x | spec_state = spec; | |
| 118 | 11411x | res = 0; | |
| 119 | 11411x | cqe_flags = 0; | |
| 120 | 11411x | iovec_count = static_cast<int>( | |
| 121 | 11411x | buffers.copy_to( | |
| 122 | 11411x | reinterpret_cast<capy::mutable_buffer*>(iovecs), | |
| 123 | io_uring_max_iov)); | ||
| 124 | 11411x | empty_buffer = (iovec_count == 0); | |
| 125 | 11411x | start(token); | |
| 126 | 11411x | } | |
| 127 | |||
| 128 | 212x | static void do_prep(io_uring_op* base, ::io_uring_sqe* sqe) noexcept | |
| 129 | { | ||
| 130 | 212x | auto* self = static_cast<uring_read_op*>(base); | |
| 131 | // Single-buffer fast path: IORING_OP_RECV with a flat | ||
| 132 | // (buffer, length) skips the iovec-array indirection that | ||
| 133 | // IORING_OP_READV pays. For multi-iovec scatter reads, fall | ||
| 134 | // back to readv. | ||
| 135 | 212x | if (self->iovec_count == 1) | |
| 136 | { | ||
| 137 | 212x | ::io_uring_prep_recv( | |
| 138 | sqe, self->fd, | ||
| 139 | self->iovecs[0].iov_base, | ||
| 140 | self->iovecs[0].iov_len, | ||
| 141 | 0); | ||
| 142 | } | ||
| 143 | else | ||
| 144 | { | ||
| 145 | ✗ | ::io_uring_prep_readv( | |
| 146 | ✗ | sqe, self->fd, self->iovecs, self->iovec_count, 0); | |
| 147 | } | ||
| 148 | 212x | } | |
| 149 | |||
| 150 | 212x | static void do_cqe( | |
| 151 | io_uring_op* base, int res, unsigned flags, | ||
| 152 | op_queue& local) noexcept | ||
| 153 | { | ||
| 154 | 212x | auto* self = static_cast<uring_read_op*>(base); | |
| 155 | 212x | self->res = res; | |
| 156 | 212x | self->cqe_flags = flags; | |
| 157 | 212x | local.push(self); | |
| 158 | 212x | } | |
| 159 | |||
| 160 | 11411x | static void do_handler( | |
| 161 | void* owner, scheduler_op* base, | ||
| 162 | std::uint32_t /*bytes*/, std::uint32_t /*error*/) noexcept | ||
| 163 | { | ||
| 164 | 11411x | auto* self = static_cast<uring_read_op*>(base); | |
| 165 | 11411x | if (coro_drain_if_shutdown(owner, self)) | |
| 166 | ✗ | return; | |
| 167 | |||
| 168 | 11411x | if (self->sched_) | |
| 169 | 11411x | self->sched_->reset_inline_budget(); | |
| 170 | |||
| 171 | 11411x | uring_set_result(self, true, self->empty_buffer); | |
| 172 | |||
| 173 | 11411x | if (self->res > 0 && self->spec_state) | |
| 174 | { | ||
| 175 | // Kernel signalled readiness — restore speculation. | ||
| 176 | 11305x | self->spec_state->on_async_read_ready(); | |
| 177 | } | ||
| 178 | |||
| 179 | 11411x | if (self->bytes_out) | |
| 180 | 11411x | *self->bytes_out = | |
| 181 | 11411x | self->res >= 0 ? static_cast<std::size_t>(self->res) : 0u; | |
| 182 | |||
| 183 | 11411x | coro_resume(self); | |
| 184 | // suicide drops here; may destroy impl + self. | ||
| 185 | } | ||
| 186 | }; | ||
| 187 | |||
| 188 | /** Scatter-gather write via `IORING_OP_SENDMSG` with `MSG_NOSIGNAL`. | ||
| 189 | |||
| 190 | `MSG_NOSIGNAL` prevents `SIGPIPE` when the peer has closed the | ||
| 191 | connection; the error is surfaced as `EPIPE` instead. | ||
| 192 | */ | ||
| 193 | struct uring_write_op : io_uring_op | ||
| 194 | { | ||
| 195 | iovec iovecs[io_uring_max_iov]; | ||
| 196 | int iovec_count = 0; | ||
| 197 | int fd = -1; | ||
| 198 | msghdr msg{}; | ||
| 199 | detail::speculative_state* spec_state = nullptr; | ||
| 200 | |||
| 201 | 13269x | uring_write_op() noexcept | |
| 202 | 13269x | : io_uring_op(&do_handler, &do_cqe, &do_prep) | |
| 203 | 13269x | {} | |
| 204 | |||
| 205 | /** Reset and initialize for a new submission. See uring_read_op::prepare. */ | ||
| 206 | 11201x | void prepare( | |
| 207 | std::coroutine_handle<> handle, | ||
| 208 | capy::executor_ref executor, | ||
| 209 | std::error_code* ec, | ||
| 210 | std::size_t* bytes, | ||
| 211 | int file_descriptor, | ||
| 212 | io_uring_scheduler* scheduler, | ||
| 213 | std::shared_ptr<void> impl, | ||
| 214 | detail::speculative_state* spec, | ||
| 215 | buffer_param buffers, | ||
| 216 | std::stop_token const& token) noexcept | ||
| 217 | { | ||
| 218 | 11201x | h = handle; | |
| 219 | 11201x | ex = executor; | |
| 220 | 11201x | ec_out = ec; | |
| 221 | 11201x | bytes_out = bytes; | |
| 222 | 11201x | fd = file_descriptor; | |
| 223 | 11201x | sched_ = scheduler; | |
| 224 | 11201x | impl_ptr = std::move(impl); | |
| 225 | 11201x | spec_state = spec; | |
| 226 | 11201x | res = 0; | |
| 227 | 11201x | cqe_flags = 0; | |
| 228 | 11201x | iovec_count = static_cast<int>( | |
| 229 | 11201x | buffers.copy_to( | |
| 230 | 11201x | reinterpret_cast<capy::mutable_buffer*>(iovecs), | |
| 231 | io_uring_max_iov)); | ||
| 232 | 11201x | empty_buffer = (iovec_count == 0); | |
| 233 | 11201x | if (!empty_buffer) | |
| 234 | { | ||
| 235 | 11201x | msg = {}; | |
| 236 | 11201x | msg.msg_iov = iovecs; | |
| 237 | 11201x | msg.msg_iovlen = static_cast<decltype(msg.msg_iovlen)>(iovec_count); | |
| 238 | } | ||
| 239 | 11201x | start(token); | |
| 240 | 11201x | } | |
| 241 | |||
| 242 | ✗ | static void do_prep(io_uring_op* base, ::io_uring_sqe* sqe) noexcept | |
| 243 | { | ||
| 244 | ✗ | auto* self = static_cast<uring_write_op*>(base); | |
| 245 | // Single-buffer fast path: IORING_OP_SEND with MSG_NOSIGNAL | ||
| 246 | // skips the msghdr indirection that IORING_OP_SENDMSG pays. | ||
| 247 | // For multi-iovec scatter writes, fall back to sendmsg. | ||
| 248 | ✗ | if (self->iovec_count == 1) | |
| 249 | { | ||
| 250 | ✗ | ::io_uring_prep_send( | |
| 251 | sqe, self->fd, | ||
| 252 | ✗ | self->iovecs[0].iov_base, | |
| 253 | self->iovecs[0].iov_len, | ||
| 254 | MSG_NOSIGNAL); | ||
| 255 | } | ||
| 256 | else | ||
| 257 | { | ||
| 258 | ✗ | ::io_uring_prep_sendmsg( | |
| 259 | ✗ | sqe, self->fd, &self->msg, MSG_NOSIGNAL); | |
| 260 | } | ||
| 261 | ✗ | } | |
| 262 | |||
| 263 | ✗ | static void do_cqe( | |
| 264 | io_uring_op* base, int res, unsigned flags, | ||
| 265 | op_queue& local) noexcept | ||
| 266 | { | ||
| 267 | ✗ | auto* self = static_cast<uring_write_op*>(base); | |
| 268 | ✗ | self->res = res; | |
| 269 | ✗ | self->cqe_flags = flags; | |
| 270 | ✗ | local.push(self); | |
| 271 | ✗ | } | |
| 272 | |||
| 273 | 11201x | static void do_handler( | |
| 274 | void* owner, scheduler_op* base, | ||
| 275 | std::uint32_t /*bytes*/, std::uint32_t /*error*/) noexcept | ||
| 276 | { | ||
| 277 | 11201x | auto* self = static_cast<uring_write_op*>(base); | |
| 278 | 11201x | if (coro_drain_if_shutdown(owner, self)) | |
| 279 | ✗ | return; | |
| 280 | |||
| 281 | 11201x | if (self->sched_) | |
| 282 | 11201x | self->sched_->reset_inline_budget(); | |
| 283 | |||
| 284 | 11201x | uring_set_result(self, false, self->empty_buffer); | |
| 285 | |||
| 286 | 11201x | if (self->res > 0 && self->spec_state) | |
| 287 | { | ||
| 288 | // Kernel signalled readiness — restore speculation. | ||
| 289 | 11201x | self->spec_state->on_async_write_ready(); | |
| 290 | } | ||
| 291 | |||
| 292 | 11201x | if (self->bytes_out) | |
| 293 | 11201x | *self->bytes_out = | |
| 294 | 11201x | self->res >= 0 ? static_cast<std::size_t>(self->res) : 0u; | |
| 295 | |||
| 296 | 11201x | coro_resume(self); | |
| 297 | } | ||
| 298 | }; | ||
| 299 | |||
| 300 | /** Non-blocking connect via `IORING_OP_CONNECT`. | ||
| 301 | |||
| 302 | Negative `res` is the connect error; zero means success. | ||
| 303 | `remote_endpoint_out` is written only on success so a failed | ||
| 304 | connect does not corrupt the socket's cached remote endpoint. | ||
| 305 | */ | ||
| 306 | struct uring_connect_op : io_uring_op | ||
| 307 | { | ||
| 308 | sockaddr_storage addr{}; | ||
| 309 | socklen_t addrlen = 0; | ||
| 310 | int fd = -1; | ||
| 311 | endpoint target_endpoint{}; | ||
| 312 | endpoint* remote_endpoint_out = nullptr; | ||
| 313 | endpoint* local_endpoint_out = nullptr; | ||
| 314 | |||
| 315 | 13289x | uring_connect_op() noexcept | |
| 316 | 13289x | : io_uring_op(&do_handler, &do_cqe, &do_prep) | |
| 317 | 13289x | {} | |
| 318 | |||
| 319 | /** Reset and initialize for a new submission. | ||
| 320 | |||
| 321 | The caller must fill `addr` and `addrlen` before calling this | ||
| 322 | (typically via `to_sockaddr(ep, family, conn_.addr)` which | ||
| 323 | returns the addrlen) — `to_sockaddr` is the family-aware | ||
| 324 | helper and requires the socket family which is known to the | ||
| 325 | caller, not the op. | ||
| 326 | */ | ||
| 327 | 4386x | void prepare( | |
| 328 | std::coroutine_handle<> handle, | ||
| 329 | capy::executor_ref executor, | ||
| 330 | std::error_code* ec, | ||
| 331 | int file_descriptor, | ||
| 332 | io_uring_scheduler* scheduler, | ||
| 333 | std::shared_ptr<void> impl, | ||
| 334 | endpoint target, | ||
| 335 | endpoint* remote_out, | ||
| 336 | endpoint* local_out, | ||
| 337 | std::stop_token const& token) noexcept | ||
| 338 | { | ||
| 339 | 4386x | h = handle; | |
| 340 | 4386x | ex = executor; | |
| 341 | 4386x | ec_out = ec; | |
| 342 | 4386x | bytes_out = nullptr; | |
| 343 | 4386x | fd = file_descriptor; | |
| 344 | 4386x | sched_ = scheduler; | |
| 345 | 4386x | impl_ptr = std::move(impl); | |
| 346 | 4386x | res = 0; | |
| 347 | 4386x | cqe_flags = 0; | |
| 348 | 4386x | target_endpoint = target; | |
| 349 | 4386x | remote_endpoint_out = remote_out; | |
| 350 | 4386x | local_endpoint_out = local_out; | |
| 351 | // addr / addrlen are pre-filled by the caller. | ||
| 352 | 4386x | start(token); | |
| 353 | 4386x | } | |
| 354 | |||
| 355 | 4386x | static void do_prep(io_uring_op* base, ::io_uring_sqe* sqe) noexcept | |
| 356 | { | ||
| 357 | 4386x | auto* self = static_cast<uring_connect_op*>(base); | |
| 358 | 4386x | ::io_uring_prep_connect( | |
| 359 | sqe, self->fd, | ||
| 360 | 4386x | reinterpret_cast<sockaddr const*>(&self->addr), | |
| 361 | self->addrlen); | ||
| 362 | 4386x | } | |
| 363 | |||
| 364 | 4386x | static void do_cqe( | |
| 365 | io_uring_op* base, int res, unsigned flags, | ||
| 366 | op_queue& local) noexcept | ||
| 367 | { | ||
| 368 | 4386x | auto* self = static_cast<uring_connect_op*>(base); | |
| 369 | 4386x | self->res = res; | |
| 370 | 4386x | self->cqe_flags = flags; | |
| 371 | 4386x | local.push(self); | |
| 372 | 4386x | } | |
| 373 | |||
| 374 | 4386x | static void do_handler( | |
| 375 | void* owner, scheduler_op* base, | ||
| 376 | std::uint32_t /*bytes*/, std::uint32_t /*error*/) noexcept | ||
| 377 | { | ||
| 378 | 4386x | auto* self = static_cast<uring_connect_op*>(base); | |
| 379 | 4386x | if (coro_drain_if_shutdown(owner, self)) | |
| 380 | ✗ | return; | |
| 381 | |||
| 382 | 4386x | if (self->sched_) | |
| 383 | 4386x | self->sched_->reset_inline_budget(); | |
| 384 | |||
| 385 | 4386x | uring_set_result(self, false, false); | |
| 386 | |||
| 387 | // Write endpoints only on success. | ||
| 388 | 4386x | if (self->res >= 0) | |
| 389 | { | ||
| 390 | 4378x | if (self->remote_endpoint_out) | |
| 391 | 4378x | *self->remote_endpoint_out = self->target_endpoint; | |
| 392 | 4378x | if (self->local_endpoint_out && self->fd >= 0) | |
| 393 | { | ||
| 394 | 4378x | sockaddr_storage local{}; | |
| 395 | 4378x | socklen_t len = sizeof(local); | |
| 396 | 4378x | if (::getsockname(self->fd, | |
| 397 | 4378x | reinterpret_cast<sockaddr*>(&local), &len) == 0) | |
| 398 | 4378x | *self->local_endpoint_out = sockaddr_to_endpoint(local); | |
| 399 | } | ||
| 400 | } | ||
| 401 | |||
| 402 | 4386x | coro_resume(self); | |
| 403 | } | ||
| 404 | }; | ||
| 405 | |||
| 406 | /** Submit an `io_uring_op` whose `prep_func` is set. | ||
| 407 | |||
| 408 | Acquires the ring mutex, prepares the SQE, and (under the same | ||
| 409 | mutex) CAS-sets `submit_op_posted_`. The first submitter of a | ||
| 410 | batch wins the CAS and posts the scheduler's `submit_sqes_op`, | ||
| 411 | which later flushes all queued SQEs in a single | ||
| 412 | `io_uring_submit_and_get_events` call and drains any ready CQEs. | ||
| 413 | Subsequent submitters in the same batch piggyback — their SQEs | ||
| 414 | sit in the user-space SQ ring until that op dispatches. | ||
| 415 | |||
| 416 | On SQ-ring exhaustion (after one flush retry), surfaces `EAGAIN` | ||
| 417 | on `*op->ec_out` and queues the op as completed so its handler | ||
| 418 | dispatches on the next `do_one` cycle. | ||
| 419 | |||
| 420 | @pre `op->prep_func != nullptr`. | ||
| 421 | |||
| 422 | @par Exception Safety | ||
| 423 | Nothrow. | ||
| 424 | */ | ||
| 425 | inline void | ||
| 426 | 4980x | io_uring_submit_op(io_uring_scheduler& sched, io_uring_op* op) noexcept | |
| 427 | { | ||
| 428 | 4980x | sched.lazy_init_ring(); | |
| 429 | |||
| 430 | 4980x | bool need_post = false; | |
| 431 | { | ||
| 432 | 4980x | typename io_uring_scheduler::lock_type ring_lock(sched.ring_mutex()); | |
| 433 | |||
| 434 | 4980x | ::io_uring_sqe* sqe = ::io_uring_get_sqe(sched.ring()); | |
| 435 | 4980x | if (!sqe) | |
| 436 | { | ||
| 437 | // SQ ring full — flush to kernel and retry once. | ||
| 438 | ✗ | ::io_uring_submit(sched.ring()); | |
| 439 | ✗ | sqe = ::io_uring_get_sqe(sched.ring()); | |
| 440 | } | ||
| 441 | |||
| 442 | 4980x | if (!sqe) | |
| 443 | { | ||
| 444 | // SQ stayed full after one flush — synchronous failure path. | ||
| 445 | // Surface EAGAIN and queue the op as completed so do_one | ||
| 446 | // dispatches the handler. The caller's work_started() already | ||
| 447 | // counted this op. (CAS path is not entered here.) | ||
| 448 | ✗ | if (op->ec_out) | |
| 449 | ✗ | *op->ec_out = make_err(EAGAIN); | |
| 450 | ✗ | typename io_uring_scheduler::lock_type lock(sched.dispatch_mutex()); | |
| 451 | ✗ | sched.push_completed_locked(op); | |
| 452 | ✗ | return; | |
| 453 | ✗ | } | |
| 454 | |||
| 455 | 4980x | op->prep_func(op, sqe); | |
| 456 | 4980x | ::io_uring_sqe_set_data(sqe, op); | |
| 457 | // Count this op against the in-flight gate in do_one: it | ||
| 458 | // expects exactly one F_MORE-less CQE per submitted SQE | ||
| 459 | // (multishot ops decrement only on the terminal CQE). | ||
| 460 | 4980x | sched.inflight_inc(); | |
| 461 | // Release pairs with the acquire in io_uring_op::request_cancel: | ||
| 462 | // a stop_token firing after we release the mutex will see | ||
| 463 | // sqe_set==true and submit a cancel-by-user_data SQE. | ||
| 464 | 4980x | op->sqe_set.store(true, std::memory_order_release); | |
| 465 | |||
| 466 | // First submitter in a batch wins the CAS and will post | ||
| 467 | // submit_sqes_op; others piggyback on the same flush. | ||
| 468 | 4980x | if (!sched.submit_op_posted_exchange(true)) | |
| 469 | 182x | need_post = true; | |
| 470 | 4980x | } | |
| 471 | |||
| 472 | 4980x | if (need_post) | |
| 473 | { | ||
| 474 | // Flush is deferred to submit_sqes_op; post() owns the wake. | ||
| 475 | 182x | sched.post(&sched.submit_op_ref()); | |
| 476 | } | ||
| 477 | } | ||
| 478 | |||
| 479 | /** Readiness wait via `IORING_OP_POLL_ADD`. | ||
| 480 | |||
| 481 | Used to implement the `wait()` virtual for socket and acceptor | ||
| 482 | implementations. The op submits a one-shot poll on `fd` for the | ||
| 483 | requested set of poll flags (POLLIN / POLLOUT / POLLPRI|POLLERR| | ||
| 484 | POLLHUP) and reports completion without transferring any data. | ||
| 485 | |||
| 486 | The CQE's `res` carries the actual revents, but we surface only | ||
| 487 | success/cancel/error on `*ec_out` — callers of `wait()` just need | ||
| 488 | a readiness signal, not the specific event mask. | ||
| 489 | */ | ||
| 490 | struct uring_wait_op : io_uring_op | ||
| 491 | { | ||
| 492 | int fd = -1; | ||
| 493 | int poll_flags = 0; | ||
| 494 | |||
| 495 | 13586x | uring_wait_op() noexcept | |
| 496 | 13586x | : io_uring_op(&do_handler, &do_cqe, &do_prep) | |
| 497 | 13586x | {} | |
| 498 | |||
| 499 | /** Reset and initialize for a new submission. */ | ||
| 500 | 21x | void prepare( | |
| 501 | std::coroutine_handle<> handle, | ||
| 502 | capy::executor_ref executor, | ||
| 503 | std::error_code* ec, | ||
| 504 | int file_descriptor, | ||
| 505 | io_uring_scheduler* scheduler, | ||
| 506 | std::shared_ptr<void> impl, | ||
| 507 | int flags, | ||
| 508 | std::stop_token const& token) noexcept | ||
| 509 | { | ||
| 510 | 21x | h = handle; | |
| 511 | 21x | ex = executor; | |
| 512 | 21x | ec_out = ec; | |
| 513 | 21x | bytes_out = nullptr; | |
| 514 | 21x | fd = file_descriptor; | |
| 515 | 21x | sched_ = scheduler; | |
| 516 | 21x | impl_ptr = std::move(impl); | |
| 517 | 21x | poll_flags = flags; | |
| 518 | 21x | res = 0; | |
| 519 | 21x | cqe_flags = 0; | |
| 520 | 21x | start(token); | |
| 521 | 21x | } | |
| 522 | |||
| 523 | 17x | static void do_prep(io_uring_op* base, ::io_uring_sqe* sqe) noexcept | |
| 524 | { | ||
| 525 | 17x | auto* self = static_cast<uring_wait_op*>(base); | |
| 526 | 17x | ::io_uring_prep_poll_add(sqe, self->fd, self->poll_flags); | |
| 527 | 17x | } | |
| 528 | |||
| 529 | 17x | static void do_cqe( | |
| 530 | io_uring_op* base, int res, unsigned flags, | ||
| 531 | op_queue& local) noexcept | ||
| 532 | { | ||
| 533 | 17x | auto* self = static_cast<uring_wait_op*>(base); | |
| 534 | 17x | self->res = res; | |
| 535 | 17x | self->cqe_flags = flags; | |
| 536 | 17x | local.push(self); | |
| 537 | 17x | } | |
| 538 | |||
| 539 | 21x | static void do_handler( | |
| 540 | void* owner, scheduler_op* base, | ||
| 541 | std::uint32_t /*bytes*/, std::uint32_t /*error*/) noexcept | ||
| 542 | { | ||
| 543 | 21x | auto* self = static_cast<uring_wait_op*>(base); | |
| 544 | 21x | if (coro_drain_if_shutdown(owner, self)) | |
| 545 | ✗ | return; | |
| 546 | |||
| 547 | 21x | if (self->sched_) | |
| 548 | 21x | self->sched_->reset_inline_budget(); | |
| 549 | |||
| 550 | // Wait reports only success/cancel/error — no bytes, no EOF. | ||
| 551 | 36x | decode_io_result( | |
| 552 | self->ec_out, | ||
| 553 | 21x | self->cancelled.load(std::memory_order_acquire), | |
| 554 | 21x | self->res < 0 ? make_err(-self->res) : std::error_code{}, | |
| 555 | /*is_read=*/false, /*bytes=*/0, /*empty_buffer=*/false); | ||
| 556 | |||
| 557 | 21x | coro_resume(self); | |
| 558 | } | ||
| 559 | }; | ||
| 560 | |||
| 561 | /** Non-blocking connect for Unix domain sockets via `IORING_OP_CONNECT`. | ||
| 562 | |||
| 563 | Like `uring_connect_op` but stores `local_endpoint` for the target | ||
| 564 | and out-pointers, since `sockaddr_to_local_endpoint` returns | ||
| 565 | `local_endpoint`, not `endpoint`. | ||
| 566 | */ | ||
| 567 | struct uring_local_connect_op : io_uring_op | ||
| 568 | { | ||
| 569 | sockaddr_storage addr{}; | ||
| 570 | socklen_t addrlen = 0; | ||
| 571 | int fd = -1; | ||
| 572 | corosio::local_endpoint target_endpoint{}; | ||
| 573 | corosio::local_endpoint* remote_endpoint_out = nullptr; | ||
| 574 | corosio::local_endpoint* local_endpoint_out = nullptr; | ||
| 575 | |||
| 576 | 149x | uring_local_connect_op() noexcept | |
| 577 | 149x | : io_uring_op(&do_handler, &do_cqe, &do_prep) | |
| 578 | 149x | {} | |
| 579 | |||
| 580 | /** Reset and initialize for a new submission. | ||
| 581 | |||
| 582 | Caller pre-fills `addr` and `addrlen` (see uring_connect_op::prepare). | ||
| 583 | */ | ||
| 584 | 14x | void prepare( | |
| 585 | std::coroutine_handle<> handle, | ||
| 586 | capy::executor_ref executor, | ||
| 587 | std::error_code* ec, | ||
| 588 | int file_descriptor, | ||
| 589 | io_uring_scheduler* scheduler, | ||
| 590 | std::shared_ptr<void> impl, | ||
| 591 | corosio::local_endpoint target, | ||
| 592 | corosio::local_endpoint* remote_out, | ||
| 593 | corosio::local_endpoint* local_out, | ||
| 594 | std::stop_token const& token) noexcept | ||
| 595 | { | ||
| 596 | 14x | h = handle; | |
| 597 | 14x | ex = executor; | |
| 598 | 14x | ec_out = ec; | |
| 599 | 14x | bytes_out = nullptr; | |
| 600 | 14x | fd = file_descriptor; | |
| 601 | 14x | sched_ = scheduler; | |
| 602 | 14x | impl_ptr = std::move(impl); | |
| 603 | 14x | res = 0; | |
| 604 | 14x | cqe_flags = 0; | |
| 605 | 14x | target_endpoint = target; | |
| 606 | 14x | remote_endpoint_out = remote_out; | |
| 607 | 14x | local_endpoint_out = local_out; | |
| 608 | 14x | start(token); | |
| 609 | 14x | } | |
| 610 | |||
| 611 | 14x | static void do_prep(io_uring_op* base, ::io_uring_sqe* sqe) noexcept | |
| 612 | { | ||
| 613 | 14x | auto* self = static_cast<uring_local_connect_op*>(base); | |
| 614 | 14x | ::io_uring_prep_connect( | |
| 615 | sqe, self->fd, | ||
| 616 | 14x | reinterpret_cast<sockaddr const*>(&self->addr), | |
| 617 | self->addrlen); | ||
| 618 | 14x | } | |
| 619 | |||
| 620 | 14x | static void do_cqe( | |
| 621 | io_uring_op* base, int res, unsigned flags, | ||
| 622 | op_queue& local) noexcept | ||
| 623 | { | ||
| 624 | 14x | auto* self = static_cast<uring_local_connect_op*>(base); | |
| 625 | 14x | self->res = res; | |
| 626 | 14x | self->cqe_flags = flags; | |
| 627 | 14x | local.push(self); | |
| 628 | 14x | } | |
| 629 | |||
| 630 | 14x | static void do_handler( | |
| 631 | void* owner, scheduler_op* base, | ||
| 632 | std::uint32_t /*bytes*/, std::uint32_t /*error*/) noexcept | ||
| 633 | { | ||
| 634 | 14x | auto* self = static_cast<uring_local_connect_op*>(base); | |
| 635 | 14x | if (coro_drain_if_shutdown(owner, self)) | |
| 636 | ✗ | return; | |
| 637 | |||
| 638 | 14x | if (self->sched_) | |
| 639 | 14x | self->sched_->reset_inline_budget(); | |
| 640 | |||
| 641 | 14x | uring_set_result(self, false, false); | |
| 642 | |||
| 643 | // Write endpoints only on success. | ||
| 644 | 14x | if (self->res >= 0) | |
| 645 | { | ||
| 646 | 11x | if (self->remote_endpoint_out) | |
| 647 | 11x | *self->remote_endpoint_out = self->target_endpoint; | |
| 648 | 11x | if (self->local_endpoint_out && self->fd >= 0) | |
| 649 | { | ||
| 650 | 11x | sockaddr_storage local{}; | |
| 651 | 11x | socklen_t len = sizeof(local); | |
| 652 | 11x | if (::getsockname(self->fd, | |
| 653 | 11x | reinterpret_cast<sockaddr*>(&local), &len) == 0) | |
| 654 | 11x | *self->local_endpoint_out = | |
| 655 | 11x | sockaddr_to_local_endpoint(local, len); | |
| 656 | } | ||
| 657 | } | ||
| 658 | |||
| 659 | 14x | coro_resume(self); | |
| 660 | } | ||
| 661 | }; | ||
| 662 | |||
| 663 | } // namespace boost::corosio::detail | ||
| 664 | |||
| 665 | #endif // BOOST_COROSIO_HAS_IO_URING | ||
| 666 | |||
| 667 | #endif // BOOST_COROSIO_NATIVE_DETAIL_IO_URING_IO_URING_SOCKET_OPS_HPP | ||
| 668 |