include/boost/corosio/native/detail/io_uring/io_uring_socket_ops.hpp
85.5% Lines (247/289)
92.6% List of functions (25/27)
Functions (27)
Function
Calls
Lines
Blocks
boost::corosio::detail::uring_set_result(boost::corosio::detail::io_uring_op*, bool, bool)
:59
37922x
80.0%
75.0%
boost::corosio::detail::uring_read_op::uring_read_op()
:87
11936x
100.0%
100.0%
boost::corosio::detail::uring_read_op::prepare(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, std::error_code*, unsigned long*, int, boost::corosio::detail::io_uring_scheduler*, std::shared_ptr<void>, boost::corosio::detail::speculative_state*, boost::corosio::buffer_param, std::stop_token const&)
:101
17072x
100.0%
100.0%
boost::corosio::detail::uring_read_op::do_prep(boost::corosio::detail::io_uring_op*, io_uring_sqe*)
:131
205x
71.4%
75.0%
boost::corosio::detail::uring_read_op::do_cqe(boost::corosio::detail::io_uring_op*, int, unsigned int, boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&)
:153
205x
100.0%
100.0%
boost::corosio::detail::uring_read_op::do_handler(void*, boost::corosio::detail::scheduler_op*, unsigned int, unsigned int)
:163
17072x
83.3%
79.0%
boost::corosio::detail::uring_write_op::uring_write_op()
:212
11936x
100.0%
100.0%
boost::corosio::detail::uring_write_op::prepare(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, std::error_code*, unsigned long*, int, boost::corosio::detail::io_uring_scheduler*, std::shared_ptr<void>, boost::corosio::detail::speculative_state*, boost::corosio::buffer_param, std::stop_token const&)
:217
16868x
100.0%
100.0%
boost::corosio::detail::uring_write_op::do_prep(boost::corosio::detail::io_uring_op*, io_uring_sqe*)
:253
0
0.0%
0.0%
boost::corosio::detail::uring_write_op::do_cqe(boost::corosio::detail::io_uring_op*, int, unsigned int, boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&)
:274
0
0.0%
0.0%
boost::corosio::detail::uring_write_op::do_handler(void*, boost::corosio::detail::scheduler_op*, unsigned int, unsigned int)
:284
16868x
83.3%
79.0%
boost::corosio::detail::uring_connect_op::uring_connect_op()
:331
11955x
100.0%
100.0%
boost::corosio::detail::uring_connect_op::prepare(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, std::error_code*, int, boost::corosio::detail::io_uring_scheduler*, std::shared_ptr<void>, boost::corosio::endpoint, boost::corosio::endpoint*, boost::corosio::endpoint*, std::stop_token const&)
:343
3962x
100.0%
100.0%
boost::corosio::detail::uring_connect_op::do_prep(boost::corosio::detail::io_uring_op*, io_uring_sqe*)
:371
3962x
100.0%
100.0%
boost::corosio::detail::uring_connect_op::do_cqe(boost::corosio::detail::io_uring_op*, int, unsigned int, boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&)
:380
3962x
100.0%
100.0%
boost::corosio::detail::uring_connect_op::do_handler(void*, boost::corosio::detail::scheduler_op*, unsigned int, unsigned int)
:390
3962x
86.4%
83.0%
boost::corosio::detail::io_uring_submit_op(boost::corosio::detail::io_uring_scheduler&, boost::corosio::detail::io_uring_op*)
:447
4293x
66.7%
63.0%
boost::corosio::detail::uring_wait_op::uring_wait_op()
:516
12179x
100.0%
100.0%
boost::corosio::detail::uring_wait_op::prepare(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, std::error_code*, int, boost::corosio::detail::io_uring_scheduler*, std::shared_ptr<void>, int, std::stop_token const&)
:521
13x
100.0%
100.0%
boost::corosio::detail::uring_wait_op::do_prep(boost::corosio::detail::io_uring_op*, io_uring_sqe*)
:544
13x
100.0%
100.0%
boost::corosio::detail::uring_wait_op::do_cqe(boost::corosio::detail::io_uring_op*, int, unsigned int, boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&)
:550
13x
100.0%
100.0%
boost::corosio::detail::uring_wait_op::do_handler(void*, boost::corosio::detail::scheduler_op*, unsigned int, unsigned int)
:560
13x
77.8%
73.0%
boost::corosio::detail::uring_local_connect_op::uring_local_connect_op()
:606
111x
100.0%
100.0%
boost::corosio::detail::uring_local_connect_op::prepare(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, std::error_code*, int, boost::corosio::detail::io_uring_scheduler*, std::shared_ptr<void>, boost::corosio::local_endpoint, boost::corosio::local_endpoint*, boost::corosio::local_endpoint*, std::stop_token const&)
:614
14x
100.0%
100.0%
boost::corosio::detail::uring_local_connect_op::do_prep(boost::corosio::detail::io_uring_op*, io_uring_sqe*)
:641
14x
100.0%
100.0%
boost::corosio::detail::uring_local_connect_op::do_cqe(boost::corosio::detail::io_uring_op*, int, unsigned int, boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&)
:650
14x
100.0%
100.0%
boost::corosio::detail::uring_local_connect_op::do_handler(void*, boost::corosio::detail::scheduler_op*, unsigned int, unsigned int)
:660
14x
87.0%
83.0%
| Line | TLA | Hits | Source Code |
|---|---|---|---|
| 1 | // | ||
| 2 | // Copyright (c) 2026 Steve Gerbino | ||
| 3 | // | ||
| 4 | // Distributed under the Boost Software License, Version 1.0. (See accompanying | ||
| 5 | // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) | ||
| 6 | // | ||
| 7 | // Official repository: https://github.com/cppalliance/corosio | ||
| 8 | // | ||
| 9 | |||
| 10 | #ifndef BOOST_COROSIO_NATIVE_DETAIL_IO_URING_IO_URING_SOCKET_OPS_HPP | ||
| 11 | #define BOOST_COROSIO_NATIVE_DETAIL_IO_URING_IO_URING_SOCKET_OPS_HPP | ||
| 12 | |||
| 13 | #include <boost/corosio/detail/platform.hpp> | ||
| 14 | |||
| 15 | #if BOOST_COROSIO_HAS_IO_URING | ||
| 16 | |||
| 17 | #include <liburing.h> | ||
| 18 | |||
| 19 | #include <boost/capy/buffers.hpp> | ||
| 20 | #include <boost/capy/error.hpp> | ||
| 21 | #include <boost/corosio/detail/buffer_param.hpp> | ||
| 22 | #include <boost/corosio/detail/dispatch_coro.hpp> | ||
| 23 | #include <boost/corosio/local_endpoint.hpp> | ||
| 24 | #include <boost/corosio/native/detail/io_uring/io_uring_buffer.hpp> | ||
| 25 | #include <boost/corosio/native/detail/io_uring/io_uring_op.hpp> | ||
| 26 | #include <boost/corosio/native/detail/io_uring/io_uring_scheduler.hpp> | ||
| 27 | #include <boost/corosio/native/detail/make_err.hpp> | ||
| 28 | #include <boost/corosio/native/detail/speculative_state.hpp> | ||
| 29 | |||
| 30 | #include <system_error> | ||
| 31 | |||
| 32 | #include <netinet/in.h> | ||
| 33 | #include <poll.h> | ||
| 34 | #include <sys/socket.h> | ||
| 35 | #include <sys/uio.h> | ||
| 36 | |||
| 37 | namespace boost::corosio::detail { | ||
| 38 | |||
| 39 | /// Maximum scatter/gather segments per read/write/dgram op. | ||
| 40 | /// | ||
| 41 | /// Bounded well below `IOV_MAX` (1024 on Linux) so each op's | ||
| 42 | /// `iovec[io_uring_max_iov]` lives inside the io_uring_op object on | ||
| 43 | /// the same allocation as the rest of its state. Plan 4's registered- | ||
| 44 | /// buffer work will revisit; until then 16 covers typical scatter use | ||
| 45 | /// cases (fragmented buffers from buffer_sequence) without bloating | ||
| 46 | /// per-op memory. | ||
| 47 | inline constexpr std::size_t io_uring_max_iov = 16; | ||
| 48 | |||
| 49 | /** Resolve ec_out/bytes_out from a CQE result for a completed I/O op. | ||
| 50 | |||
| 51 | Shared by read, write, and connect handlers. For reads, `res == 0` | ||
| 52 | with a non-empty buffer means the peer closed the connection (EOF). | ||
| 53 | |||
| 54 | @param self The completed op. | ||
| 55 | @param is_read True if this is a receive/read operation. | ||
| 56 | @param empty_buf True if the submitted buffer was zero-length. | ||
| 57 | */ | ||
| 58 | inline void | ||
| 59 | 37922x | uring_set_result(io_uring_op* self, bool is_read, bool empty_buf) noexcept | |
| 60 | { | ||
| 61 | 37922x | if (!self->ec_out) | |
| 62 | ✗ | return; | |
| 63 | |||
| 64 | 37922x | if (self->cancelled.load(std::memory_order_acquire)) | |
| 65 | 100x | *self->ec_out = capy::error::canceled; | |
| 66 | 37822x | else if (self->res < 0) | |
| 67 | 47x | *self->ec_out = make_err(-self->res); | |
| 68 | 37775x | else if (is_read && self->res == 0 && !empty_buf) | |
| 69 | ✗ | *self->ec_out = capy::error::eof; | |
| 70 | else | ||
| 71 | 37775x | *self->ec_out = {}; | |
| 72 | } | ||
| 73 | |||
| 74 | /** Scatter-gather read via `IORING_OP_READV`. | ||
| 75 | |||
| 76 | @par Handler dispatch | ||
| 77 | do_cqe captures `res`/`cqe_flags` and queues self into `local`; | ||
| 78 | do_handler runs from the scheduler queue and resumes the coroutine. | ||
| 79 | */ | ||
| 80 | struct uring_read_op : io_uring_op | ||
| 81 | { | ||
| 82 | iovec iovecs[io_uring_max_iov]; | ||
| 83 | int iovec_count = 0; | ||
| 84 | int fd = -1; | ||
| 85 | detail::speculative_state* spec_state = nullptr; | ||
| 86 | |||
| 87 | 11936x | uring_read_op() noexcept | |
| 88 | 11936x | : io_uring_op(&do_handler, &do_cqe, &do_prep) | |
| 89 | { | ||
| 90 | 11936x | is_read = true; | |
| 91 | 11936x | } | |
| 92 | |||
| 93 | /** Reset and initialize for a new submission. | ||
| 94 | |||
| 95 | Embedded ops are reused across calls; every mutable field the | ||
| 96 | handler may read must be re-initialized here. `start(token)` | ||
| 97 | also resets `cancelled`, `sqe_set`, and `stop_cb`. | ||
| 98 | |||
| 99 | @pre This slot has no in-flight op (its prior op completed). | ||
| 100 | */ | ||
| 101 | 17072x | void prepare( | |
| 102 | std::coroutine_handle<> handle, | ||
| 103 | capy::executor_ref executor, | ||
| 104 | std::error_code* ec, | ||
| 105 | std::size_t* bytes, | ||
| 106 | int file_descriptor, | ||
| 107 | io_uring_scheduler* scheduler, | ||
| 108 | std::shared_ptr<void> impl, | ||
| 109 | detail::speculative_state* spec, | ||
| 110 | buffer_param buffers, | ||
| 111 | std::stop_token const& token) noexcept | ||
| 112 | { | ||
| 113 | 17072x | h = handle; | |
| 114 | 17072x | ex = executor; | |
| 115 | 17072x | ec_out = ec; | |
| 116 | 17072x | bytes_out = bytes; | |
| 117 | 17072x | fd = file_descriptor; | |
| 118 | 17072x | sched_ = scheduler; | |
| 119 | 17072x | impl_ptr = std::move(impl); | |
| 120 | 17072x | spec_state = spec; | |
| 121 | 17072x | res = 0; | |
| 122 | 17072x | cqe_flags = 0; | |
| 123 | 17072x | iovec_count = static_cast<int>( | |
| 124 | 17072x | buffers.copy_to( | |
| 125 | 17072x | reinterpret_cast<capy::mutable_buffer*>(iovecs), | |
| 126 | io_uring_max_iov)); | ||
| 127 | 17072x | empty_buffer = (iovec_count == 0); | |
| 128 | 17072x | start(token); | |
| 129 | 17072x | } | |
| 130 | |||
| 131 | 205x | static void do_prep(io_uring_op* base, ::io_uring_sqe* sqe) noexcept | |
| 132 | { | ||
| 133 | 205x | auto* self = static_cast<uring_read_op*>(base); | |
| 134 | // Single-buffer fast path: IORING_OP_RECV with a flat | ||
| 135 | // (buffer, length) skips the iovec-array indirection that | ||
| 136 | // IORING_OP_READV pays. For multi-iovec scatter reads, fall | ||
| 137 | // back to readv. | ||
| 138 | 205x | if (self->iovec_count == 1) | |
| 139 | { | ||
| 140 | 205x | ::io_uring_prep_recv( | |
| 141 | sqe, self->fd, | ||
| 142 | self->iovecs[0].iov_base, | ||
| 143 | self->iovecs[0].iov_len, | ||
| 144 | 0); | ||
| 145 | } | ||
| 146 | else | ||
| 147 | { | ||
| 148 | ✗ | ::io_uring_prep_readv( | |
| 149 | ✗ | sqe, self->fd, self->iovecs, self->iovec_count, 0); | |
| 150 | } | ||
| 151 | 205x | } | |
| 152 | |||
| 153 | 205x | static void do_cqe( | |
| 154 | io_uring_op* base, int res, unsigned flags, | ||
| 155 | op_queue& local) noexcept | ||
| 156 | { | ||
| 157 | 205x | auto* self = static_cast<uring_read_op*>(base); | |
| 158 | 205x | self->res = res; | |
| 159 | 205x | self->cqe_flags = flags; | |
| 160 | 205x | local.push(self); | |
| 161 | 205x | } | |
| 162 | |||
| 163 | 17072x | static void do_handler( | |
| 164 | void* owner, scheduler_op* base, | ||
| 165 | std::uint32_t /*bytes*/, std::uint32_t /*error*/) noexcept | ||
| 166 | { | ||
| 167 | 17072x | auto* self = static_cast<uring_read_op*>(base); | |
| 168 | 17072x | self->stop_cb.reset(); | |
| 169 | |||
| 170 | 17072x | if (owner == nullptr) | |
| 171 | { | ||
| 172 | // Shutdown drain: break the impl_ptr cycle. The op storage | ||
| 173 | // is owned by the impl, which destructs once the cycle is | ||
| 174 | // broken (if this was the last ref). | ||
| 175 | ✗ | auto suicide = std::move(self->impl_ptr); | |
| 176 | ✗ | return; | |
| 177 | ✗ | } | |
| 178 | |||
| 179 | 17072x | uring_set_result(self, true, self->empty_buffer); | |
| 180 | |||
| 181 | 17072x | if (self->res > 0 && self->spec_state) | |
| 182 | { | ||
| 183 | // Kernel signalled readiness — restore speculation. | ||
| 184 | 16969x | self->spec_state->on_async_read_ready(); | |
| 185 | } | ||
| 186 | |||
| 187 | 17072x | if (self->bytes_out) | |
| 188 | 17072x | *self->bytes_out = | |
| 189 | 17072x | self->res >= 0 ? static_cast<std::size_t>(self->res) : 0u; | |
| 190 | |||
| 191 | 17072x | self->cont_op.cont.h = self->h; | |
| 192 | 17072x | auto next = dispatch_coro(self->ex, self->cont_op.cont); | |
| 193 | 17072x | auto suicide = std::move(self->impl_ptr); | |
| 194 | 17072x | next.resume(); | |
| 195 | // suicide drops here; may destroy impl + self. | ||
| 196 | 17072x | } | |
| 197 | }; | ||
| 198 | |||
| 199 | /** Scatter-gather write via `IORING_OP_SENDMSG` with `MSG_NOSIGNAL`. | ||
| 200 | |||
| 201 | `MSG_NOSIGNAL` prevents `SIGPIPE` when the peer has closed the | ||
| 202 | connection; the error is surfaced as `EPIPE` instead. | ||
| 203 | */ | ||
| 204 | struct uring_write_op : io_uring_op | ||
| 205 | { | ||
| 206 | iovec iovecs[io_uring_max_iov]; | ||
| 207 | int iovec_count = 0; | ||
| 208 | int fd = -1; | ||
| 209 | msghdr msg{}; | ||
| 210 | detail::speculative_state* spec_state = nullptr; | ||
| 211 | |||
| 212 | 11936x | uring_write_op() noexcept | |
| 213 | 11936x | : io_uring_op(&do_handler, &do_cqe, &do_prep) | |
| 214 | 11936x | {} | |
| 215 | |||
| 216 | /** Reset and initialize for a new submission. See uring_read_op::prepare. */ | ||
| 217 | 16868x | void prepare( | |
| 218 | std::coroutine_handle<> handle, | ||
| 219 | capy::executor_ref executor, | ||
| 220 | std::error_code* ec, | ||
| 221 | std::size_t* bytes, | ||
| 222 | int file_descriptor, | ||
| 223 | io_uring_scheduler* scheduler, | ||
| 224 | std::shared_ptr<void> impl, | ||
| 225 | detail::speculative_state* spec, | ||
| 226 | buffer_param buffers, | ||
| 227 | std::stop_token const& token) noexcept | ||
| 228 | { | ||
| 229 | 16868x | h = handle; | |
| 230 | 16868x | ex = executor; | |
| 231 | 16868x | ec_out = ec; | |
| 232 | 16868x | bytes_out = bytes; | |
| 233 | 16868x | fd = file_descriptor; | |
| 234 | 16868x | sched_ = scheduler; | |
| 235 | 16868x | impl_ptr = std::move(impl); | |
| 236 | 16868x | spec_state = spec; | |
| 237 | 16868x | res = 0; | |
| 238 | 16868x | cqe_flags = 0; | |
| 239 | 16868x | iovec_count = static_cast<int>( | |
| 240 | 16868x | buffers.copy_to( | |
| 241 | 16868x | reinterpret_cast<capy::mutable_buffer*>(iovecs), | |
| 242 | io_uring_max_iov)); | ||
| 243 | 16868x | empty_buffer = (iovec_count == 0); | |
| 244 | 16868x | if (!empty_buffer) | |
| 245 | { | ||
| 246 | 16868x | msg = {}; | |
| 247 | 16868x | msg.msg_iov = iovecs; | |
| 248 | 16868x | msg.msg_iovlen = static_cast<decltype(msg.msg_iovlen)>(iovec_count); | |
| 249 | } | ||
| 250 | 16868x | start(token); | |
| 251 | 16868x | } | |
| 252 | |||
| 253 | ✗ | static void do_prep(io_uring_op* base, ::io_uring_sqe* sqe) noexcept | |
| 254 | { | ||
| 255 | ✗ | auto* self = static_cast<uring_write_op*>(base); | |
| 256 | // Single-buffer fast path: IORING_OP_SEND with MSG_NOSIGNAL | ||
| 257 | // skips the msghdr indirection that IORING_OP_SENDMSG pays. | ||
| 258 | // For multi-iovec scatter writes, fall back to sendmsg. | ||
| 259 | ✗ | if (self->iovec_count == 1) | |
| 260 | { | ||
| 261 | ✗ | ::io_uring_prep_send( | |
| 262 | sqe, self->fd, | ||
| 263 | ✗ | self->iovecs[0].iov_base, | |
| 264 | self->iovecs[0].iov_len, | ||
| 265 | MSG_NOSIGNAL); | ||
| 266 | } | ||
| 267 | else | ||
| 268 | { | ||
| 269 | ✗ | ::io_uring_prep_sendmsg( | |
| 270 | ✗ | sqe, self->fd, &self->msg, MSG_NOSIGNAL); | |
| 271 | } | ||
| 272 | ✗ | } | |
| 273 | |||
| 274 | ✗ | static void do_cqe( | |
| 275 | io_uring_op* base, int res, unsigned flags, | ||
| 276 | op_queue& local) noexcept | ||
| 277 | { | ||
| 278 | ✗ | auto* self = static_cast<uring_write_op*>(base); | |
| 279 | ✗ | self->res = res; | |
| 280 | ✗ | self->cqe_flags = flags; | |
| 281 | ✗ | local.push(self); | |
| 282 | ✗ | } | |
| 283 | |||
| 284 | 16868x | static void do_handler( | |
| 285 | void* owner, scheduler_op* base, | ||
| 286 | std::uint32_t /*bytes*/, std::uint32_t /*error*/) noexcept | ||
| 287 | { | ||
| 288 | 16868x | auto* self = static_cast<uring_write_op*>(base); | |
| 289 | 16868x | self->stop_cb.reset(); | |
| 290 | |||
| 291 | 16868x | if (owner == nullptr) | |
| 292 | { | ||
| 293 | ✗ | auto suicide = std::move(self->impl_ptr); | |
| 294 | ✗ | return; | |
| 295 | ✗ | } | |
| 296 | |||
| 297 | 16868x | uring_set_result(self, false, self->empty_buffer); | |
| 298 | |||
| 299 | 16868x | if (self->res > 0 && self->spec_state) | |
| 300 | { | ||
| 301 | // Kernel signalled readiness — restore speculation. | ||
| 302 | 16868x | self->spec_state->on_async_write_ready(); | |
| 303 | } | ||
| 304 | |||
| 305 | 16868x | if (self->bytes_out) | |
| 306 | 16868x | *self->bytes_out = | |
| 307 | 16868x | self->res >= 0 ? static_cast<std::size_t>(self->res) : 0u; | |
| 308 | |||
| 309 | 16868x | self->cont_op.cont.h = self->h; | |
| 310 | 16868x | auto next = dispatch_coro(self->ex, self->cont_op.cont); | |
| 311 | 16868x | auto suicide = std::move(self->impl_ptr); | |
| 312 | 16868x | next.resume(); | |
| 313 | 16868x | } | |
| 314 | }; | ||
| 315 | |||
| 316 | /** Non-blocking connect via `IORING_OP_CONNECT`. | ||
| 317 | |||
| 318 | Negative `res` is the connect error; zero means success. | ||
| 319 | `remote_endpoint_out` is written only on success so a failed | ||
| 320 | connect does not corrupt the socket's cached remote endpoint. | ||
| 321 | */ | ||
| 322 | struct uring_connect_op : io_uring_op | ||
| 323 | { | ||
| 324 | sockaddr_storage addr{}; | ||
| 325 | socklen_t addrlen = 0; | ||
| 326 | int fd = -1; | ||
| 327 | endpoint target_endpoint{}; | ||
| 328 | endpoint* remote_endpoint_out = nullptr; | ||
| 329 | endpoint* local_endpoint_out = nullptr; | ||
| 330 | |||
| 331 | 11955x | uring_connect_op() noexcept | |
| 332 | 11955x | : io_uring_op(&do_handler, &do_cqe, &do_prep) | |
| 333 | 11955x | {} | |
| 334 | |||
| 335 | /** Reset and initialize for a new submission. | ||
| 336 | |||
| 337 | The caller must fill `addr` and `addrlen` before calling this | ||
| 338 | (typically via `to_sockaddr(ep, family, conn_.addr)` which | ||
| 339 | returns the addrlen) — `to_sockaddr` is the family-aware | ||
| 340 | helper and requires the socket family which is known to the | ||
| 341 | caller, not the op. | ||
| 342 | */ | ||
| 343 | 3962x | void prepare( | |
| 344 | std::coroutine_handle<> handle, | ||
| 345 | capy::executor_ref executor, | ||
| 346 | std::error_code* ec, | ||
| 347 | int file_descriptor, | ||
| 348 | io_uring_scheduler* scheduler, | ||
| 349 | std::shared_ptr<void> impl, | ||
| 350 | endpoint target, | ||
| 351 | endpoint* remote_out, | ||
| 352 | endpoint* local_out, | ||
| 353 | std::stop_token const& token) noexcept | ||
| 354 | { | ||
| 355 | 3962x | h = handle; | |
| 356 | 3962x | ex = executor; | |
| 357 | 3962x | ec_out = ec; | |
| 358 | 3962x | bytes_out = nullptr; | |
| 359 | 3962x | fd = file_descriptor; | |
| 360 | 3962x | sched_ = scheduler; | |
| 361 | 3962x | impl_ptr = std::move(impl); | |
| 362 | 3962x | res = 0; | |
| 363 | 3962x | cqe_flags = 0; | |
| 364 | 3962x | target_endpoint = target; | |
| 365 | 3962x | remote_endpoint_out = remote_out; | |
| 366 | 3962x | local_endpoint_out = local_out; | |
| 367 | // addr / addrlen are pre-filled by the caller. | ||
| 368 | 3962x | start(token); | |
| 369 | 3962x | } | |
| 370 | |||
| 371 | 3962x | static void do_prep(io_uring_op* base, ::io_uring_sqe* sqe) noexcept | |
| 372 | { | ||
| 373 | 3962x | auto* self = static_cast<uring_connect_op*>(base); | |
| 374 | 3962x | ::io_uring_prep_connect( | |
| 375 | sqe, self->fd, | ||
| 376 | 3962x | reinterpret_cast<sockaddr const*>(&self->addr), | |
| 377 | self->addrlen); | ||
| 378 | 3962x | } | |
| 379 | |||
| 380 | 3962x | static void do_cqe( | |
| 381 | io_uring_op* base, int res, unsigned flags, | ||
| 382 | op_queue& local) noexcept | ||
| 383 | { | ||
| 384 | 3962x | auto* self = static_cast<uring_connect_op*>(base); | |
| 385 | 3962x | self->res = res; | |
| 386 | 3962x | self->cqe_flags = flags; | |
| 387 | 3962x | local.push(self); | |
| 388 | 3962x | } | |
| 389 | |||
| 390 | 3962x | static void do_handler( | |
| 391 | void* owner, scheduler_op* base, | ||
| 392 | std::uint32_t /*bytes*/, std::uint32_t /*error*/) noexcept | ||
| 393 | { | ||
| 394 | 3962x | auto* self = static_cast<uring_connect_op*>(base); | |
| 395 | 3962x | self->stop_cb.reset(); | |
| 396 | |||
| 397 | 3962x | if (owner == nullptr) | |
| 398 | { | ||
| 399 | ✗ | auto suicide = std::move(self->impl_ptr); | |
| 400 | ✗ | return; | |
| 401 | ✗ | } | |
| 402 | |||
| 403 | 3962x | uring_set_result(self, false, false); | |
| 404 | |||
| 405 | // Write endpoints only on success. | ||
| 406 | 3962x | if (self->res >= 0) | |
| 407 | { | ||
| 408 | 3954x | if (self->remote_endpoint_out) | |
| 409 | 3954x | *self->remote_endpoint_out = self->target_endpoint; | |
| 410 | 3954x | if (self->local_endpoint_out && self->fd >= 0) | |
| 411 | { | ||
| 412 | 3954x | sockaddr_storage local{}; | |
| 413 | 3954x | socklen_t len = sizeof(local); | |
| 414 | 3954x | if (::getsockname(self->fd, | |
| 415 | 3954x | reinterpret_cast<sockaddr*>(&local), &len) == 0) | |
| 416 | 3954x | *self->local_endpoint_out = sockaddr_to_endpoint(local); | |
| 417 | } | ||
| 418 | } | ||
| 419 | |||
| 420 | 3962x | self->cont_op.cont.h = self->h; | |
| 421 | 3962x | auto next = dispatch_coro(self->ex, self->cont_op.cont); | |
| 422 | 3962x | auto suicide = std::move(self->impl_ptr); | |
| 423 | 3962x | next.resume(); | |
| 424 | 3962x | } | |
| 425 | }; | ||
| 426 | |||
| 427 | /** Submit an `io_uring_op` whose `prep_func` is set. | ||
| 428 | |||
| 429 | Acquires the ring mutex, prepares the SQE, and (under the same | ||
| 430 | mutex) CAS-sets `submit_op_posted_`. The first submitter of a | ||
| 431 | batch wins the CAS and posts the scheduler's `submit_sqes_op`, | ||
| 432 | which later flushes all queued SQEs in a single | ||
| 433 | `io_uring_submit_and_get_events` call and drains any ready CQEs. | ||
| 434 | Subsequent submitters in the same batch piggyback — their SQEs | ||
| 435 | sit in the user-space SQ ring until that op dispatches. | ||
| 436 | |||
| 437 | On SQ-ring exhaustion (after one flush retry), surfaces `EAGAIN` | ||
| 438 | on `*op->ec_out` and queues the op as completed so its handler | ||
| 439 | dispatches on the next `do_one` cycle. | ||
| 440 | |||
| 441 | @pre `op->prep_func != nullptr`. | ||
| 442 | |||
| 443 | @par Exception Safety | ||
| 444 | Nothrow. | ||
| 445 | */ | ||
| 446 | inline void | ||
| 447 | 4293x | io_uring_submit_op(io_uring_scheduler& sched, io_uring_op* op) noexcept | |
| 448 | { | ||
| 449 | 4293x | sched.lazy_init_ring(); | |
| 450 | |||
| 451 | 4293x | bool need_post = false; | |
| 452 | { | ||
| 453 | 4293x | typename io_uring_scheduler::lock_type ring_lock(sched.ring_mutex()); | |
| 454 | |||
| 455 | 4293x | ::io_uring_sqe* sqe = ::io_uring_get_sqe(sched.ring()); | |
| 456 | 4293x | if (!sqe) | |
| 457 | { | ||
| 458 | // SQ ring full — flush to kernel and retry once. | ||
| 459 | ✗ | ::io_uring_submit(sched.ring()); | |
| 460 | ✗ | sqe = ::io_uring_get_sqe(sched.ring()); | |
| 461 | } | ||
| 462 | |||
| 463 | 4293x | if (!sqe) | |
| 464 | { | ||
| 465 | // SQ stayed full after one flush — synchronous failure path. | ||
| 466 | // Surface EAGAIN and queue the op as completed so do_one | ||
| 467 | // dispatches the handler. The caller's work_started() already | ||
| 468 | // counted this op. (CAS path is not entered here.) | ||
| 469 | ✗ | if (op->ec_out) | |
| 470 | ✗ | *op->ec_out = make_err(EAGAIN); | |
| 471 | ✗ | typename io_uring_scheduler::lock_type lock(sched.dispatch_mutex()); | |
| 472 | ✗ | sched.push_completed_locked(op); | |
| 473 | ✗ | return; | |
| 474 | ✗ | } | |
| 475 | |||
| 476 | 4293x | op->prep_func(op, sqe); | |
| 477 | 4293x | ::io_uring_sqe_set_data(sqe, op); | |
| 478 | // Count this op against the in-flight gate in do_one: it | ||
| 479 | // expects exactly one F_MORE-less CQE per submitted SQE | ||
| 480 | // (multishot ops decrement only on the terminal CQE). | ||
| 481 | 4293x | sched.inflight_inc(); | |
| 482 | // Release pairs with the acquire in io_uring_op::request_cancel: | ||
| 483 | // a stop_token firing after we release the mutex will see | ||
| 484 | // sqe_set==true and submit a cancel-by-user_data SQE. | ||
| 485 | 4293x | op->sqe_set.store(true, std::memory_order_release); | |
| 486 | |||
| 487 | // First submitter in a batch wins the CAS and will post | ||
| 488 | // submit_sqes_op; others piggyback on the same flush. | ||
| 489 | 4293x | if (!sched.submit_op_posted_exchange(true)) | |
| 490 | 114x | need_post = true; | |
| 491 | 4293x | } | |
| 492 | |||
| 493 | 4293x | if (need_post) | |
| 494 | { | ||
| 495 | // Flush is deferred to submit_sqes_op; post() owns the wake. | ||
| 496 | 114x | sched.post(&sched.submit_op_ref()); | |
| 497 | } | ||
| 498 | } | ||
| 499 | |||
| 500 | /** Readiness wait via `IORING_OP_POLL_ADD`. | ||
| 501 | |||
| 502 | Used to implement the `wait()` virtual for socket and acceptor | ||
| 503 | implementations. The op submits a one-shot poll on `fd` for the | ||
| 504 | requested set of poll flags (POLLIN / POLLOUT / POLLPRI|POLLERR| | ||
| 505 | POLLHUP) and reports completion without transferring any data. | ||
| 506 | |||
| 507 | The CQE's `res` carries the actual revents, but we surface only | ||
| 508 | success/cancel/error on `*ec_out` — callers of `wait()` just need | ||
| 509 | a readiness signal, not the specific event mask. | ||
| 510 | */ | ||
| 511 | struct uring_wait_op : io_uring_op | ||
| 512 | { | ||
| 513 | int fd = -1; | ||
| 514 | int poll_flags = 0; | ||
| 515 | |||
| 516 | 12179x | uring_wait_op() noexcept | |
| 517 | 12179x | : io_uring_op(&do_handler, &do_cqe, &do_prep) | |
| 518 | 12179x | {} | |
| 519 | |||
| 520 | /** Reset and initialize for a new submission. */ | ||
| 521 | 13x | void prepare( | |
| 522 | std::coroutine_handle<> handle, | ||
| 523 | capy::executor_ref executor, | ||
| 524 | std::error_code* ec, | ||
| 525 | int file_descriptor, | ||
| 526 | io_uring_scheduler* scheduler, | ||
| 527 | std::shared_ptr<void> impl, | ||
| 528 | int flags, | ||
| 529 | std::stop_token const& token) noexcept | ||
| 530 | { | ||
| 531 | 13x | h = handle; | |
| 532 | 13x | ex = executor; | |
| 533 | 13x | ec_out = ec; | |
| 534 | 13x | bytes_out = nullptr; | |
| 535 | 13x | fd = file_descriptor; | |
| 536 | 13x | sched_ = scheduler; | |
| 537 | 13x | impl_ptr = std::move(impl); | |
| 538 | 13x | poll_flags = flags; | |
| 539 | 13x | res = 0; | |
| 540 | 13x | cqe_flags = 0; | |
| 541 | 13x | start(token); | |
| 542 | 13x | } | |
| 543 | |||
| 544 | 13x | static void do_prep(io_uring_op* base, ::io_uring_sqe* sqe) noexcept | |
| 545 | { | ||
| 546 | 13x | auto* self = static_cast<uring_wait_op*>(base); | |
| 547 | 13x | ::io_uring_prep_poll_add(sqe, self->fd, self->poll_flags); | |
| 548 | 13x | } | |
| 549 | |||
| 550 | 13x | static void do_cqe( | |
| 551 | io_uring_op* base, int res, unsigned flags, | ||
| 552 | op_queue& local) noexcept | ||
| 553 | { | ||
| 554 | 13x | auto* self = static_cast<uring_wait_op*>(base); | |
| 555 | 13x | self->res = res; | |
| 556 | 13x | self->cqe_flags = flags; | |
| 557 | 13x | local.push(self); | |
| 558 | 13x | } | |
| 559 | |||
| 560 | 13x | static void do_handler( | |
| 561 | void* owner, scheduler_op* base, | ||
| 562 | std::uint32_t /*bytes*/, std::uint32_t /*error*/) noexcept | ||
| 563 | { | ||
| 564 | 13x | auto* self = static_cast<uring_wait_op*>(base); | |
| 565 | 13x | self->stop_cb.reset(); | |
| 566 | |||
| 567 | 13x | if (owner == nullptr) | |
| 568 | { | ||
| 569 | // Shutdown drain: break the impl_ptr cycle. | ||
| 570 | ✗ | auto suicide = std::move(self->impl_ptr); | |
| 571 | ✗ | return; | |
| 572 | ✗ | } | |
| 573 | |||
| 574 | 13x | if (self->ec_out) | |
| 575 | { | ||
| 576 | 13x | if (self->cancelled.load(std::memory_order_acquire)) | |
| 577 | ✗ | *self->ec_out = capy::error::canceled; | |
| 578 | 13x | else if (self->res < 0) | |
| 579 | 2x | *self->ec_out = make_err(-self->res); | |
| 580 | else | ||
| 581 | 11x | *self->ec_out = {}; | |
| 582 | } | ||
| 583 | |||
| 584 | 13x | self->cont_op.cont.h = self->h; | |
| 585 | 13x | auto next = dispatch_coro(self->ex, self->cont_op.cont); | |
| 586 | 13x | auto suicide = std::move(self->impl_ptr); | |
| 587 | 13x | next.resume(); | |
| 588 | 13x | } | |
| 589 | }; | ||
| 590 | |||
| 591 | /** Non-blocking connect for Unix domain sockets via `IORING_OP_CONNECT`. | ||
| 592 | |||
| 593 | Like `uring_connect_op` but stores `local_endpoint` for the target | ||
| 594 | and out-pointers, since `sockaddr_to_local_endpoint` returns | ||
| 595 | `local_endpoint`, not `endpoint`. | ||
| 596 | */ | ||
| 597 | struct uring_local_connect_op : io_uring_op | ||
| 598 | { | ||
| 599 | sockaddr_storage addr{}; | ||
| 600 | socklen_t addrlen = 0; | ||
| 601 | int fd = -1; | ||
| 602 | corosio::local_endpoint target_endpoint{}; | ||
| 603 | corosio::local_endpoint* remote_endpoint_out = nullptr; | ||
| 604 | corosio::local_endpoint* local_endpoint_out = nullptr; | ||
| 605 | |||
| 606 | 111x | uring_local_connect_op() noexcept | |
| 607 | 111x | : io_uring_op(&do_handler, &do_cqe, &do_prep) | |
| 608 | 111x | {} | |
| 609 | |||
| 610 | /** Reset and initialize for a new submission. | ||
| 611 | |||
| 612 | Caller pre-fills `addr` and `addrlen` (see uring_connect_op::prepare). | ||
| 613 | */ | ||
| 614 | 14x | void prepare( | |
| 615 | std::coroutine_handle<> handle, | ||
| 616 | capy::executor_ref executor, | ||
| 617 | std::error_code* ec, | ||
| 618 | int file_descriptor, | ||
| 619 | io_uring_scheduler* scheduler, | ||
| 620 | std::shared_ptr<void> impl, | ||
| 621 | corosio::local_endpoint target, | ||
| 622 | corosio::local_endpoint* remote_out, | ||
| 623 | corosio::local_endpoint* local_out, | ||
| 624 | std::stop_token const& token) noexcept | ||
| 625 | { | ||
| 626 | 14x | h = handle; | |
| 627 | 14x | ex = executor; | |
| 628 | 14x | ec_out = ec; | |
| 629 | 14x | bytes_out = nullptr; | |
| 630 | 14x | fd = file_descriptor; | |
| 631 | 14x | sched_ = scheduler; | |
| 632 | 14x | impl_ptr = std::move(impl); | |
| 633 | 14x | res = 0; | |
| 634 | 14x | cqe_flags = 0; | |
| 635 | 14x | target_endpoint = target; | |
| 636 | 14x | remote_endpoint_out = remote_out; | |
| 637 | 14x | local_endpoint_out = local_out; | |
| 638 | 14x | start(token); | |
| 639 | 14x | } | |
| 640 | |||
| 641 | 14x | static void do_prep(io_uring_op* base, ::io_uring_sqe* sqe) noexcept | |
| 642 | { | ||
| 643 | 14x | auto* self = static_cast<uring_local_connect_op*>(base); | |
| 644 | 14x | ::io_uring_prep_connect( | |
| 645 | sqe, self->fd, | ||
| 646 | 14x | reinterpret_cast<sockaddr const*>(&self->addr), | |
| 647 | self->addrlen); | ||
| 648 | 14x | } | |
| 649 | |||
| 650 | 14x | static void do_cqe( | |
| 651 | io_uring_op* base, int res, unsigned flags, | ||
| 652 | op_queue& local) noexcept | ||
| 653 | { | ||
| 654 | 14x | auto* self = static_cast<uring_local_connect_op*>(base); | |
| 655 | 14x | self->res = res; | |
| 656 | 14x | self->cqe_flags = flags; | |
| 657 | 14x | local.push(self); | |
| 658 | 14x | } | |
| 659 | |||
| 660 | 14x | static void do_handler( | |
| 661 | void* owner, scheduler_op* base, | ||
| 662 | std::uint32_t /*bytes*/, std::uint32_t /*error*/) noexcept | ||
| 663 | { | ||
| 664 | 14x | auto* self = static_cast<uring_local_connect_op*>(base); | |
| 665 | 14x | self->stop_cb.reset(); | |
| 666 | |||
| 667 | 14x | if (owner == nullptr) | |
| 668 | { | ||
| 669 | ✗ | auto suicide = std::move(self->impl_ptr); | |
| 670 | ✗ | return; | |
| 671 | ✗ | } | |
| 672 | |||
| 673 | 14x | uring_set_result(self, false, false); | |
| 674 | |||
| 675 | // Write endpoints only on success. | ||
| 676 | 14x | if (self->res >= 0) | |
| 677 | { | ||
| 678 | 11x | if (self->remote_endpoint_out) | |
| 679 | 11x | *self->remote_endpoint_out = self->target_endpoint; | |
| 680 | 11x | if (self->local_endpoint_out && self->fd >= 0) | |
| 681 | { | ||
| 682 | 11x | sockaddr_storage local{}; | |
| 683 | 11x | socklen_t len = sizeof(local); | |
| 684 | 11x | if (::getsockname(self->fd, | |
| 685 | 11x | reinterpret_cast<sockaddr*>(&local), &len) == 0) | |
| 686 | 11x | *self->local_endpoint_out = | |
| 687 | 11x | sockaddr_to_local_endpoint(local, len); | |
| 688 | } | ||
| 689 | } | ||
| 690 | |||
| 691 | 14x | self->cont_op.cont.h = self->h; | |
| 692 | 14x | auto next = dispatch_coro(self->ex, self->cont_op.cont); | |
| 693 | 14x | auto suicide = std::move(self->impl_ptr); | |
| 694 | 14x | next.resume(); | |
| 695 | 14x | } | |
| 696 | }; | ||
| 697 | |||
| 698 | } // namespace boost::corosio::detail | ||
| 699 | |||
| 700 | #endif // BOOST_COROSIO_HAS_IO_URING | ||
| 701 | |||
| 702 | #endif // BOOST_COROSIO_NATIVE_DETAIL_IO_URING_IO_URING_SOCKET_OPS_HPP | ||
| 703 |