include/boost/corosio/native/detail/io_uring/io_uring_multishot_acceptor.hpp
79.7% Lines (200/251)
85.7% List of functions (24/28)
Functions (28)
Function
Calls
Lines
Blocks
boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_local_stream_acceptor, boost::corosio::local_stream_acceptor::implementation, boost::corosio::local_endpoint, boost::corosio::detail::io_uring_local_stream_service>::io_uring_multishot_acceptor_base(boost::corosio::detail::io_uring_scheduler&, boost::corosio::detail::io_uring_local_stream_service&)
:93
27x
100.0%
100.0%
boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_tcp_acceptor, boost::corosio::tcp_acceptor::implementation, boost::corosio::endpoint, boost::corosio::detail::io_uring_tcp_service>::io_uring_multishot_acceptor_base(boost::corosio::detail::io_uring_scheduler&, boost::corosio::detail::io_uring_tcp_service&)
:93
121x
100.0%
100.0%
boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_local_stream_acceptor, boost::corosio::local_stream_acceptor::implementation, boost::corosio::local_endpoint, boost::corosio::detail::io_uring_local_stream_service>::~io_uring_multishot_acceptor_base()
:101
27x
85.0%
90.0%
boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_tcp_acceptor, boost::corosio::tcp_acceptor::implementation, boost::corosio::endpoint, boost::corosio::detail::io_uring_tcp_service>::~io_uring_multishot_acceptor_base()
:101
121x
85.0%
90.0%
boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_local_stream_acceptor, boost::corosio::local_stream_acceptor::implementation, boost::corosio::local_endpoint, boost::corosio::detail::io_uring_local_stream_service>::local_endpoint() const
:139
1x
100.0%
100.0%
boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_tcp_acceptor, boost::corosio::tcp_acceptor::implementation, boost::corosio::endpoint, boost::corosio::detail::io_uring_tcp_service>::local_endpoint() const
:139
97x
100.0%
100.0%
boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_local_stream_acceptor, boost::corosio::local_stream_acceptor::implementation, boost::corosio::local_endpoint, boost::corosio::detail::io_uring_local_stream_service>::is_open() const
:144
114x
100.0%
100.0%
boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_tcp_acceptor, boost::corosio::tcp_acceptor::implementation, boost::corosio::endpoint, boost::corosio::detail::io_uring_tcp_service>::is_open() const
:144
5161x
100.0%
100.0%
boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_local_stream_acceptor, boost::corosio::local_stream_acceptor::implementation, boost::corosio::local_endpoint, boost::corosio::detail::io_uring_local_stream_service>::cancel()
:149
1x
100.0%
100.0%
boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_tcp_acceptor, boost::corosio::tcp_acceptor::implementation, boost::corosio::endpoint, boost::corosio::detail::io_uring_tcp_service>::cancel()
:149
3x
100.0%
100.0%
boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_local_stream_acceptor, boost::corosio::local_stream_acceptor::implementation, boost::corosio::local_endpoint, boost::corosio::detail::io_uring_local_stream_service>::drain_waiters_only()
:160
22x
100.0%
100.0%
boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_tcp_acceptor, boost::corosio::tcp_acceptor::implementation, boost::corosio::endpoint, boost::corosio::detail::io_uring_tcp_service>::drain_waiters_only()
:160
118x
100.0%
100.0%
boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_local_stream_acceptor, boost::corosio::local_stream_acceptor::implementation, boost::corosio::local_endpoint, boost::corosio::detail::io_uring_local_stream_service>::set_option(int, int, void const*, unsigned long)
:189
0
0.0%
0.0%
boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_tcp_acceptor, boost::corosio::tcp_acceptor::implementation, boost::corosio::endpoint, boost::corosio::detail::io_uring_tcp_service>::set_option(int, int, void const*, unsigned long)
:189
114x
83.3%
78.0%
boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_local_stream_acceptor, boost::corosio::local_stream_acceptor::implementation, boost::corosio::local_endpoint, boost::corosio::detail::io_uring_local_stream_service>::get_option(int, int, void*, unsigned long*) const
:201
0
0.0%
0.0%
boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_tcp_acceptor, boost::corosio::tcp_acceptor::implementation, boost::corosio::endpoint, boost::corosio::detail::io_uring_tcp_service>::get_option(int, int, void*, unsigned long*) const
:201
2x
87.5%
80.0%
boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_local_stream_acceptor, boost::corosio::local_stream_acceptor::implementation, boost::corosio::local_endpoint, boost::corosio::detail::io_uring_local_stream_service>::start_multishot()
:214
15x
83.3%
81.0%
boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_tcp_acceptor, boost::corosio::tcp_acceptor::implementation, boost::corosio::endpoint, boost::corosio::detail::io_uring_tcp_service>::start_multishot()
:214
101x
100.0%
95.0%
boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_local_stream_acceptor, boost::corosio::local_stream_acceptor::implementation, boost::corosio::local_endpoint, boost::corosio::detail::io_uring_local_stream_service>::dispatch_or_queue(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, std::stop_token const&, std::error_code*, boost::corosio::io_object::implementation**)
:242
13x
71.6%
72.0%
boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_tcp_acceptor, boost::corosio::tcp_acceptor::implementation, boost::corosio::endpoint, boost::corosio::detail::io_uring_tcp_service>::dispatch_or_queue(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, std::stop_token const&, std::error_code*, boost::corosio::io_object::implementation**)
:242
4382x
71.6%
72.0%
boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_local_stream_acceptor, boost::corosio::local_stream_acceptor::implementation, boost::corosio::local_endpoint, boost::corosio::detail::io_uring_local_stream_service>::cancel_waiter(boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_local_stream_acceptor, boost::corosio::local_stream_acceptor::implementation, boost::corosio::local_endpoint, boost::corosio::detail::io_uring_local_stream_service>::waiter_node*)
:382
2x
100.0%
95.0%
boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_tcp_acceptor, boost::corosio::tcp_acceptor::implementation, boost::corosio::endpoint, boost::corosio::detail::io_uring_tcp_service>::cancel_waiter(boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_tcp_acceptor, boost::corosio::tcp_acceptor::implementation, boost::corosio::endpoint, boost::corosio::detail::io_uring_tcp_service>::waiter_node*)
:382
12x
100.0%
95.0%
boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_local_stream_acceptor, boost::corosio::local_stream_acceptor::implementation, boost::corosio::local_endpoint, boost::corosio::detail::io_uring_local_stream_service>::on_accept_cqe(void*, int, int, bool)
:407
11x
100.0%
100.0%
boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_tcp_acceptor, boost::corosio::tcp_acceptor::implementation, boost::corosio::endpoint, boost::corosio::detail::io_uring_tcp_service>::on_accept_cqe(void*, int, int, bool)
:407
4370x
100.0%
100.0%
boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_local_stream_acceptor, boost::corosio::local_stream_acceptor::implementation, boost::corosio::local_endpoint, boost::corosio::detail::io_uring_local_stream_service>::on_accept_cqe_impl(int, int, bool)
:415
11x
67.7%
61.0%
boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_tcp_acceptor, boost::corosio::tcp_acceptor::implementation, boost::corosio::endpoint, boost::corosio::detail::io_uring_tcp_service>::on_accept_cqe_impl(int, int, bool)
:415
4370x
67.7%
61.0%
boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_local_stream_acceptor, boost::corosio::local_stream_acceptor::implementation, boost::corosio::local_endpoint, boost::corosio::detail::io_uring_local_stream_service>::waiter_canceller::operator()() const
:541
0
0.0%
0.0%
boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_tcp_acceptor, boost::corosio::tcp_acceptor::implementation, boost::corosio::endpoint, boost::corosio::detail::io_uring_tcp_service>::waiter_canceller::operator()() const
:541
0
0.0%
0.0%
| Line | TLA | Hits | Source Code |
|---|---|---|---|
| 1 | // | ||
| 2 | // Copyright (c) 2026 Steve Gerbino | ||
| 3 | // | ||
| 4 | // Distributed under the Boost Software License, Version 1.0. (See accompanying | ||
| 5 | // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) | ||
| 6 | // | ||
| 7 | // Official repository: https://github.com/cppalliance/corosio | ||
| 8 | // | ||
| 9 | |||
| 10 | #ifndef BOOST_COROSIO_NATIVE_DETAIL_IO_URING_IO_URING_MULTISHOT_ACCEPTOR_HPP | ||
| 11 | #define BOOST_COROSIO_NATIVE_DETAIL_IO_URING_IO_URING_MULTISHOT_ACCEPTOR_HPP | ||
| 12 | |||
| 13 | #include <boost/corosio/detail/platform.hpp> | ||
| 14 | |||
| 15 | #if BOOST_COROSIO_HAS_IO_URING | ||
| 16 | |||
| 17 | #include <liburing.h> | ||
| 18 | |||
| 19 | #include <boost/corosio/detail/intrusive.hpp> | ||
| 20 | #include <boost/corosio/native/detail/io_uring/io_uring_acceptor_ops.hpp> | ||
| 21 | #include <boost/corosio/native/detail/io_uring/io_uring_buffer.hpp> | ||
| 22 | #include <boost/corosio/native/detail/io_uring/io_uring_op.hpp> | ||
| 23 | #include <boost/corosio/native/detail/io_uring/io_uring_scheduler.hpp> | ||
| 24 | #include <boost/corosio/native/detail/io_uring/io_uring_socket_ops.hpp> | ||
| 25 | #include <boost/corosio/native/detail/make_err.hpp> | ||
| 26 | #include <boost/corosio/io/io_object.hpp> | ||
| 27 | |||
| 28 | #include <atomic> | ||
| 29 | #include <coroutine> | ||
| 30 | #include <memory> | ||
| 31 | #include <mutex> | ||
| 32 | #include <optional> | ||
| 33 | #include <stop_token> | ||
| 34 | #include <system_error> | ||
| 35 | |||
| 36 | #include <netinet/in.h> | ||
| 37 | #include <sys/socket.h> | ||
| 38 | #include <unistd.h> | ||
| 39 | |||
| 40 | namespace boost::corosio::detail { | ||
| 41 | |||
| 42 | template<class Derived, class ImplBase, class Endpoint, class PeerService> | ||
| 43 | class io_uring_multishot_acceptor_base | ||
| 44 | : public ImplBase | ||
| 45 | , public std::enable_shared_from_this<Derived> | ||
| 46 | { | ||
| 47 | protected: | ||
| 48 | struct ready_fd_node : intrusive_list<ready_fd_node>::node | ||
| 49 | { | ||
| 50 | int fd = -1; | ||
| 51 | sockaddr_storage peer{}; | ||
| 52 | socklen_t peer_len = 0; | ||
| 53 | }; | ||
| 54 | |||
| 55 | struct waiter_node; | ||
| 56 | |||
| 57 | struct waiter_canceller | ||
| 58 | { | ||
| 59 | waiter_node* w; | ||
| 60 | void operator()() const noexcept; | ||
| 61 | }; | ||
| 62 | |||
| 63 | struct waiter_node : intrusive_list<waiter_node>::node | ||
| 64 | { | ||
| 65 | std::coroutine_handle<> h; | ||
| 66 | capy::executor_ref ex; | ||
| 67 | std::error_code* ec_out = nullptr; | ||
| 68 | io_object::implementation** impl_out = nullptr; | ||
| 69 | Derived* owner = nullptr; | ||
| 70 | std::atomic<bool> cancelled{false}; | ||
| 71 | /// True once linked into `waiters_` (guarded by `mutex_`). | ||
| 72 | /// The stop callback is armed before the node is queued, so | ||
| 73 | /// cancel_waiter must not unlink a node it never queued. | ||
| 74 | bool queued = false; | ||
| 75 | std::optional<std::stop_callback<waiter_canceller>> stop_cb; | ||
| 76 | }; | ||
| 77 | |||
| 78 | int fd_ = -1; | ||
| 79 | io_uring_scheduler* sched_; | ||
| 80 | PeerService* peer_service_; | ||
| 81 | Endpoint local_endpoint_{}; | ||
| 82 | mutable std::mutex mutex_; | ||
| 83 | intrusive_list<ready_fd_node> ready_fds_; | ||
| 84 | intrusive_list<waiter_node> waiters_; | ||
| 85 | std::unique_ptr<uring_multi_accept_op> multi_op_; | ||
| 86 | bool closing_ = false; | ||
| 87 | |||
| 88 | private: | ||
| 89 | // CRTP ctor private + Derived friended so the base cannot be | ||
| 90 | // constructed except as a CRTP base of Derived | ||
| 91 | // (clang-tidy bugprone-crtp-constructor-accessibility). | ||
| 92 | friend Derived; | ||
| 93 | 148x | io_uring_multishot_acceptor_base( | |
| 94 | io_uring_scheduler& sched, PeerService& peer_svc) noexcept | ||
| 95 | 148x | : sched_(&sched) | |
| 96 | 148x | , peer_service_(&peer_svc) | |
| 97 | 148x | {} | |
| 98 | |||
| 99 | public: | ||
| 100 | |||
| 101 | 148x | ~io_uring_multishot_acceptor_base() override | |
| 102 | { | ||
| 103 | { | ||
| 104 | 148x | std::lock_guard lk(mutex_); | |
| 105 | 148x | closing_ = true; | |
| 106 | 148x | } | |
| 107 | 148x | if (fd_ >= 0) | |
| 108 | { | ||
| 109 | ✗ | sched_->submit_cancel_by_fd(fd_); | |
| 110 | ✗ | ::close(fd_); | |
| 111 | ✗ | fd_ = -1; | |
| 112 | } | ||
| 113 | |||
| 114 | // Drain parked accepted-connection fds unconditionally. These are | ||
| 115 | // distinct from the listener fd and can be present even when the | ||
| 116 | // service close() path already closed and cleared fd_ — that path | ||
| 117 | // does not touch ready_fds_, so the drain must run here. | ||
| 118 | 148x | intrusive_list<ready_fd_node> drained; | |
| 119 | { | ||
| 120 | 148x | std::lock_guard lk(mutex_); | |
| 121 | 150x | while (auto* r = ready_fds_.pop_front()) | |
| 122 | 2x | drained.push_back(r); | |
| 123 | 148x | } | |
| 124 | 150x | while (auto* r = drained.pop_front()) | |
| 125 | { | ||
| 126 | 2x | ::close(r->fd); | |
| 127 | 2x | delete r; | |
| 128 | } | ||
| 129 | |||
| 130 | // Break the multi_op_ → impl_ptr (shared_ptr<this>) cycle and | ||
| 131 | // drain pending CQEs so unique_ptr<multi_op_> can free safely. | ||
| 132 | 148x | if (multi_op_) | |
| 133 | { | ||
| 134 | 115x | multi_op_->impl_ptr.reset(); | |
| 135 | 115x | sched_->drain_cqes_for(multi_op_.get()); | |
| 136 | } | ||
| 137 | 296x | } | |
| 138 | |||
| 139 | 98x | Endpoint local_endpoint() const noexcept override | |
| 140 | { | ||
| 141 | 98x | return local_endpoint_; | |
| 142 | } | ||
| 143 | |||
| 144 | 5275x | bool is_open() const noexcept override | |
| 145 | { | ||
| 146 | 5275x | return fd_ >= 0; | |
| 147 | } | ||
| 148 | |||
| 149 | 4x | void cancel() noexcept override | |
| 150 | { | ||
| 151 | 4x | drain_waiters_only(); | |
| 152 | 4x | if (fd_ >= 0) | |
| 153 | 4x | sched_->submit_cancel_by_fd(fd_); | |
| 154 | 4x | } | |
| 155 | |||
| 156 | /// Drain queued waiters with operation_aborted but do NOT submit | ||
| 157 | /// any kernel cancel for the fd. Used by service close() paths | ||
| 158 | /// that have already submitted (or are about to submit) the | ||
| 159 | /// cancel-by-fd themselves via `cancel_and_flush`. | ||
| 160 | 140x | void drain_waiters_only() noexcept | |
| 161 | { | ||
| 162 | 140x | intrusive_list<waiter_node> drained; | |
| 163 | { | ||
| 164 | 140x | std::lock_guard lk(mutex_); | |
| 165 | 140x | closing_ = true; | |
| 166 | // Drain under the lock — the kernel cancel may not produce | ||
| 167 | // a !more CQE before the fd is closed, so we can't rely on | ||
| 168 | // on_accept_cqe_impl to surface operation_aborted. | ||
| 169 | 144x | while (auto* w = waiters_.pop_front()) | |
| 170 | 4x | drained.push_back(w); | |
| 171 | 140x | } | |
| 172 | |||
| 173 | 144x | while (auto* w = drained.pop_front()) | |
| 174 | { | ||
| 175 | 4x | w->stop_cb.reset(); | |
| 176 | // NOLINTNEXTLINE(bugprone-unhandled-exception-at-new) — noexcept destructor path: OOM => std::terminate is the intended behavior | ||
| 177 | 4x | auto* op = new uring_accept_op(); | |
| 178 | 4x | op->h = w->h; | |
| 179 | 4x | op->ex = w->ex; | |
| 180 | 4x | op->ec_out = w->ec_out; | |
| 181 | 4x | op->impl_out = w->impl_out; | |
| 182 | 4x | op->cancelled.store(true, std::memory_order_release); | |
| 183 | 4x | delete w; | |
| 184 | 4x | sched_->post(op); | |
| 185 | 4x | sched_->work_finished(); | |
| 186 | } | ||
| 187 | 140x | } | |
| 188 | |||
| 189 | 114x | std::error_code set_option( | |
| 190 | int level, int optname, | ||
| 191 | void const* data, std::size_t size) noexcept override | ||
| 192 | { | ||
| 193 | 114x | if (fd_ < 0) return make_err(EBADF); | |
| 194 | 114x | if (::setsockopt(fd_, level, optname, | |
| 195 | reinterpret_cast<char const*>(data), | ||
| 196 | 114x | static_cast<socklen_t>(size)) < 0) | |
| 197 | ✗ | return make_err(errno); | |
| 198 | 114x | return {}; | |
| 199 | } | ||
| 200 | |||
| 201 | 2x | std::error_code get_option( | |
| 202 | int level, int optname, | ||
| 203 | void* data, std::size_t* size) const noexcept override | ||
| 204 | { | ||
| 205 | 2x | if (fd_ < 0) return make_err(EBADF); | |
| 206 | 2x | socklen_t len = static_cast<socklen_t>(*size); | |
| 207 | 2x | if (::getsockopt(fd_, level, optname, | |
| 208 | 2x | reinterpret_cast<char*>(data), &len) < 0) | |
| 209 | ✗ | return make_err(errno); | |
| 210 | 2x | *size = static_cast<std::size_t>(len); | |
| 211 | 2x | return {}; | |
| 212 | } | ||
| 213 | |||
| 214 | 116x | void start_multishot() | |
| 215 | { | ||
| 216 | 116x | if (!multi_op_) | |
| 217 | { | ||
| 218 | 115x | multi_op_ = std::make_unique<uring_multi_accept_op>(); | |
| 219 | 115x | multi_op_->listen_fd = fd_; | |
| 220 | 115x | multi_op_->acceptor_impl = this; | |
| 221 | 115x | multi_op_->on_cqe = | |
| 222 | &io_uring_multishot_acceptor_base::on_accept_cqe; | ||
| 223 | 115x | multi_op_->impl_ptr = this->shared_from_this(); | |
| 224 | } | ||
| 225 | else | ||
| 226 | { | ||
| 227 | // Reuse the existing op (re-arm path). Reset peer scratch | ||
| 228 | // so the kernel writes into a clean slot. | ||
| 229 | 1x | multi_op_->peer_storage = sockaddr_storage{}; | |
| 230 | 1x | multi_op_->peer_len = sizeof(sockaddr_storage); | |
| 231 | } | ||
| 232 | |||
| 233 | 116x | auto* op = multi_op_.get(); | |
| 234 | 116x | io_uring_submit_op(*sched_, op); | |
| 235 | // Deliberately no work_started(): the multishot SQE is a persistent | ||
| 236 | // internal mechanism. User-visible work is tracked per-accept call. | ||
| 237 | 116x | } | |
| 238 | |||
| 239 | /// Pull a parked fd or queue a waiter — used by Derived::accept(). | ||
| 240 | /// Either case ends with the calling coroutine suspending; the | ||
| 241 | /// caller returns `std::noop_coroutine()` unconditionally. | ||
| 242 | 4395x | void dispatch_or_queue( | |
| 243 | std::coroutine_handle<> h, | ||
| 244 | capy::executor_ref ex, | ||
| 245 | std::stop_token const& token, | ||
| 246 | std::error_code* ec, | ||
| 247 | io_object::implementation** impl_out) | ||
| 248 | { | ||
| 249 | 4395x | sockaddr_storage peer_storage{}; | |
| 250 | 4395x | socklen_t peer_len = sizeof(peer_storage); | |
| 251 | 4395x | int accepted_fd = ::accept4(fd_, | |
| 252 | reinterpret_cast<sockaddr*>(&peer_storage), &peer_len, | ||
| 253 | SOCK_NONBLOCK | SOCK_CLOEXEC); | ||
| 254 | 4395x | if (accepted_fd >= 0) | |
| 255 | { | ||
| 256 | ✗ | auto* op = new uring_accept_op(); | |
| 257 | ✗ | op->h = h; | |
| 258 | ✗ | op->ex = ex; | |
| 259 | ✗ | op->ec_out = ec; | |
| 260 | ✗ | op->impl_out = impl_out; | |
| 261 | ✗ | op->peer_service = peer_service_; | |
| 262 | ✗ | op->adopt_fn = &Derived::adopt_thunk; | |
| 263 | ✗ | op->accepted_fd = accepted_fd; | |
| 264 | ✗ | op->peer_storage = peer_storage; | |
| 265 | ✗ | op->peer_len = peer_len; | |
| 266 | ✗ | sched_->post(op); | |
| 267 | 4395x | return; | |
| 268 | } | ||
| 269 | // accept4 returned <0 — only EAGAIN/EWOULDBLOCK should fall | ||
| 270 | // through to the parked/waiter path. Other errors (EBADF, etc.) | ||
| 271 | // surface through the existing scheduler-completion path so the | ||
| 272 | // user sees them via the op's ec_out. Build an op with `err` | ||
| 273 | // set so do_handler delivers make_err(err). | ||
| 274 | 4395x | if (errno != EAGAIN && errno != EWOULDBLOCK) | |
| 275 | { | ||
| 276 | 2x | int saved_errno = errno; | |
| 277 | 2x | auto* op = new uring_accept_op(); | |
| 278 | 2x | op->h = h; | |
| 279 | 2x | op->ex = ex; | |
| 280 | 2x | op->ec_out = ec; | |
| 281 | 2x | op->impl_out = impl_out; | |
| 282 | 2x | op->err = saved_errno; | |
| 283 | 2x | sched_->post(op); | |
| 284 | 2x | return; | |
| 285 | } | ||
| 286 | |||
| 287 | 4393x | uring_accept_op* ready_op = nullptr; | |
| 288 | { | ||
| 289 | 4393x | std::lock_guard lk(mutex_); | |
| 290 | 4393x | if (auto* r = ready_fds_.pop_front()) | |
| 291 | { | ||
| 292 | 6x | ready_op = new uring_accept_op(); | |
| 293 | 6x | ready_op->h = h; | |
| 294 | 6x | ready_op->ex = ex; | |
| 295 | 6x | ready_op->ec_out = ec; | |
| 296 | 6x | ready_op->impl_out = impl_out; | |
| 297 | 6x | ready_op->peer_service = peer_service_; | |
| 298 | 6x | ready_op->adopt_fn = &Derived::adopt_thunk; | |
| 299 | 6x | ready_op->accepted_fd = r->fd; | |
| 300 | 6x | ready_op->peer_storage = r->peer; | |
| 301 | 6x | ready_op->peer_len = r->peer_len; | |
| 302 | 6x | delete r; | |
| 303 | } | ||
| 304 | 4393x | } | |
| 305 | 4393x | if (ready_op) | |
| 306 | { | ||
| 307 | // Post outside the lock — acceptor mutex_ must never be | ||
| 308 | // held while dispatch_mutex_ is acquired by sched_->post(). | ||
| 309 | 6x | sched_->post(ready_op); | |
| 310 | 6x | return; | |
| 311 | } | ||
| 312 | |||
| 313 | 4387x | auto* w = new waiter_node{}; | |
| 314 | 4387x | w->h = h; | |
| 315 | 4387x | w->ex = ex; | |
| 316 | 4387x | w->ec_out = ec; | |
| 317 | 4387x | w->impl_out = impl_out; | |
| 318 | 4387x | w->owner = static_cast<Derived*>(this); | |
| 319 | |||
| 320 | // Arm the stop callback before the node is visible in | ||
| 321 | // `waiters_` and outside `mutex_`: an already-stopped token | ||
| 322 | // invokes the canceller synchronously from emplace, and | ||
| 323 | // cancel_waiter takes `mutex_` (self-deadlock if held). | ||
| 324 | // Arming pre-queue also keeps the CQE handler from claiming | ||
| 325 | // and deleting a node whose callback is not yet constructed. | ||
| 326 | 4387x | if (token.stop_possible()) | |
| 327 | 25x | w->stop_cb.emplace(token, waiter_canceller{w}); | |
| 328 | |||
| 329 | 4387x | bool was_cancelled = false; | |
| 330 | { | ||
| 331 | 4387x | std::lock_guard lk(mutex_); | |
| 332 | 4387x | if (w->cancelled.load(std::memory_order_acquire)) | |
| 333 | { | ||
| 334 | // Canceller already fired (pre-stopped token); it saw | ||
| 335 | // queued == false and left completion to us. | ||
| 336 | 2x | was_cancelled = true; | |
| 337 | } | ||
| 338 | 4385x | else if (auto* r = ready_fds_.pop_front()) | |
| 339 | { | ||
| 340 | // A connection arrived while the callback was armed; | ||
| 341 | // prefer it over parking the waiter behind it. | ||
| 342 | ✗ | ready_op = new uring_accept_op(); | |
| 343 | ✗ | ready_op->h = h; | |
| 344 | ✗ | ready_op->ex = ex; | |
| 345 | ✗ | ready_op->ec_out = ec; | |
| 346 | ✗ | ready_op->impl_out = impl_out; | |
| 347 | ✗ | ready_op->peer_service = peer_service_; | |
| 348 | ✗ | ready_op->adopt_fn = &Derived::adopt_thunk; | |
| 349 | ✗ | ready_op->accepted_fd = r->fd; | |
| 350 | ✗ | ready_op->peer_storage = r->peer; | |
| 351 | ✗ | ready_op->peer_len = r->peer_len; | |
| 352 | ✗ | delete r; | |
| 353 | } | ||
| 354 | else | ||
| 355 | { | ||
| 356 | 4385x | w->queued = true; | |
| 357 | 4385x | sched_->work_started(); | |
| 358 | 4385x | waiters_.push_back(w); | |
| 359 | 4385x | return; | |
| 360 | } | ||
| 361 | 4387x | } | |
| 362 | |||
| 363 | 2x | if (was_cancelled) | |
| 364 | { | ||
| 365 | 2x | auto* op = new uring_accept_op(); | |
| 366 | 2x | op->h = w->h; | |
| 367 | 2x | op->ex = w->ex; | |
| 368 | 2x | op->ec_out = w->ec_out; | |
| 369 | 2x | op->impl_out = w->impl_out; | |
| 370 | 2x | op->cancelled.store(true, std::memory_order_release); | |
| 371 | 2x | w->stop_cb.reset(); | |
| 372 | 2x | delete w; | |
| 373 | 2x | sched_->post(op); | |
| 374 | 2x | return; | |
| 375 | } | ||
| 376 | |||
| 377 | ✗ | w->stop_cb.reset(); | |
| 378 | ✗ | delete w; | |
| 379 | ✗ | sched_->post(ready_op); | |
| 380 | } | ||
| 381 | |||
| 382 | 14x | void cancel_waiter(waiter_node* w) noexcept | |
| 383 | { | ||
| 384 | { | ||
| 385 | 14x | std::lock_guard lk(mutex_); | |
| 386 | 14x | if (closing_) return; // on_accept_cqe_impl will drain with closing_ set | |
| 387 | 14x | if (!w->queued) | |
| 388 | 2x | return; // not in waiters_ yet; dispatch_or_queue | |
| 389 | // observes `cancelled` and completes the op | ||
| 390 | 12x | waiters_.remove(w); | |
| 391 | 14x | } | |
| 392 | // NOLINTNEXTLINE(bugprone-unhandled-exception-at-new) — stop-token callback: noexcept, OOM => std::terminate is the intended behavior | ||
| 393 | 12x | auto* op = new uring_accept_op(); | |
| 394 | 12x | op->h = w->h; | |
| 395 | 12x | op->ex = w->ex; | |
| 396 | 12x | op->ec_out = w->ec_out; | |
| 397 | 12x | op->impl_out = w->impl_out; | |
| 398 | 12x | op->cancelled.store(true, std::memory_order_release); | |
| 399 | 12x | delete w; | |
| 400 | // post() increments outstanding_work_; balances the work_started() | ||
| 401 | // from accept() when the waiter was queued. | ||
| 402 | 12x | sched_->post(op); | |
| 403 | 12x | sched_->work_finished(); // balance the work_started() from accept() | |
| 404 | } | ||
| 405 | |||
| 406 | private: | ||
| 407 | 4381x | static void on_accept_cqe( | |
| 408 | void* self_ptr, int new_fd, int err, bool more) noexcept | ||
| 409 | { | ||
| 410 | static_cast<Derived*>(self_ptr) | ||
| 411 | 4381x | ->on_accept_cqe_impl(new_fd, err, more); | |
| 412 | 4381x | } | |
| 413 | |||
| 414 | protected: | ||
| 415 | 4381x | void on_accept_cqe_impl(int new_fd, int err, bool more) noexcept | |
| 416 | { | ||
| 417 | 4381x | bool was_closing = false; | |
| 418 | 4381x | waiter_node* matched = nullptr; | |
| 419 | 4381x | intrusive_list<waiter_node> closing_waiters; | |
| 420 | { | ||
| 421 | 4381x | std::lock_guard lk(mutex_); | |
| 422 | 4381x | was_closing = closing_; | |
| 423 | 4381x | if (was_closing) | |
| 424 | { | ||
| 425 | 4x | if (new_fd >= 0) | |
| 426 | ✗ | ::close(new_fd); | |
| 427 | 4x | if (!more) | |
| 428 | { | ||
| 429 | // Collect waiters to drain after the lock is released. | ||
| 430 | 4x | while (auto* w = waiters_.pop_front()) | |
| 431 | ✗ | closing_waiters.push_back(w); | |
| 432 | } | ||
| 433 | } | ||
| 434 | 4377x | else if (!waiters_.empty()) | |
| 435 | { | ||
| 436 | // Claim the head waiter atomically. If the canceller | ||
| 437 | // already won the race (cancelled was already true), | ||
| 438 | // leave the waiter in the list for cancel_waiter to | ||
| 439 | // remove and dispatch with operation_aborted; park the | ||
| 440 | // new_fd so the next waiter consumes it. | ||
| 441 | 4369x | auto* head_w = waiters_.front(); | |
| 442 | 4369x | if (!head_w->cancelled.exchange( | |
| 443 | true, std::memory_order_acq_rel)) | ||
| 444 | { | ||
| 445 | 4369x | waiters_.pop_front(); | |
| 446 | 4369x | matched = head_w; | |
| 447 | } | ||
| 448 | ✗ | else if (new_fd >= 0) | |
| 449 | { | ||
| 450 | // NOLINTNEXTLINE(bugprone-unhandled-exception-at-new) — CQE handler: noexcept, OOM => std::terminate is the intended behavior | ||
| 451 | ✗ | auto* node = new ready_fd_node{}; | |
| 452 | ✗ | node->fd = new_fd; | |
| 453 | ✗ | node->peer = multi_op_->peer_storage; | |
| 454 | ✗ | node->peer_len = multi_op_->peer_len; | |
| 455 | ✗ | ready_fds_.push_back(node); | |
| 456 | } | ||
| 457 | } | ||
| 458 | 8x | else if (new_fd >= 0) | |
| 459 | { | ||
| 460 | // NOLINTNEXTLINE(bugprone-unhandled-exception-at-new) — CQE handler: noexcept, OOM => std::terminate is the intended behavior | ||
| 461 | 8x | auto* node = new ready_fd_node{}; | |
| 462 | 8x | node->fd = new_fd; | |
| 463 | 8x | node->peer = multi_op_->peer_storage; | |
| 464 | 8x | node->peer_len = multi_op_->peer_len; | |
| 465 | 8x | ready_fds_.push_back(node); | |
| 466 | } | ||
| 467 | 4381x | } | |
| 468 | |||
| 469 | 4381x | if (matched) | |
| 470 | { | ||
| 471 | 4369x | matched->stop_cb.reset(); | |
| 472 | // NOLINTNEXTLINE(bugprone-unhandled-exception-at-new) — CQE handler: noexcept, OOM => std::terminate is the intended behavior | ||
| 473 | 4369x | auto* op = new uring_accept_op(); | |
| 474 | 4369x | op->h = matched->h; | |
| 475 | 4369x | op->ex = matched->ex; | |
| 476 | 4369x | op->ec_out = matched->ec_out; | |
| 477 | 4369x | op->impl_out = matched->impl_out; | |
| 478 | 4369x | op->peer_service = peer_service_; | |
| 479 | 4369x | op->adopt_fn = &Derived::adopt_thunk; | |
| 480 | 4369x | if (err) | |
| 481 | { | ||
| 482 | ✗ | op->err = err; | |
| 483 | } | ||
| 484 | 4369x | else if (new_fd >= 0) | |
| 485 | { | ||
| 486 | 4369x | op->accepted_fd = new_fd; | |
| 487 | 4369x | op->peer_storage = multi_op_->peer_storage; | |
| 488 | 4369x | op->peer_len = multi_op_->peer_len; | |
| 489 | } | ||
| 490 | 4369x | delete matched; | |
| 491 | 4369x | sched_->post(op); | |
| 492 | 4369x | sched_->work_finished(); // balance waiter's work_started | |
| 493 | } | ||
| 494 | |||
| 495 | 4381x | while (auto* w = closing_waiters.pop_front()) | |
| 496 | { | ||
| 497 | ✗ | w->stop_cb.reset(); | |
| 498 | // NOLINTNEXTLINE(bugprone-unhandled-exception-at-new) — CQE handler shutdown path: noexcept, OOM => std::terminate is the intended behavior | ||
| 499 | ✗ | auto* op = new uring_accept_op(); | |
| 500 | ✗ | op->h = w->h; | |
| 501 | ✗ | op->ex = w->ex; | |
| 502 | ✗ | op->ec_out = w->ec_out; | |
| 503 | ✗ | op->impl_out = w->impl_out; | |
| 504 | ✗ | op->cancelled.store(true, std::memory_order_release); | |
| 505 | ✗ | delete w; | |
| 506 | ✗ | sched_->post(op); | |
| 507 | ✗ | sched_->work_finished(); // balance waiter's work_started | |
| 508 | } | ||
| 509 | |||
| 510 | 4381x | if (!more && !was_closing) | |
| 511 | { | ||
| 512 | // Re-arm: kernel terminated multishot non-fatally. | ||
| 513 | struct rearm_op final : scheduler_op | ||
| 514 | { | ||
| 515 | std::shared_ptr<Derived> self_; | ||
| 516 | explicit rearm_op(std::shared_ptr<Derived> s) noexcept | ||
| 517 | : self_(std::move(s)) {} | ||
| 518 | |||
| 519 | void operator()() override | ||
| 520 | { | ||
| 521 | auto self = std::move(self_); | ||
| 522 | delete this; | ||
| 523 | { | ||
| 524 | std::lock_guard lk(self->mutex_); | ||
| 525 | if (self->closing_) | ||
| 526 | return; | ||
| 527 | } | ||
| 528 | self->start_multishot(); | ||
| 529 | } | ||
| 530 | |||
| 531 | void destroy() override { delete this; } | ||
| 532 | }; | ||
| 533 | // NOLINTNEXTLINE(bugprone-unhandled-exception-at-new) — CQE handler re-arm: noexcept, OOM => std::terminate is the intended behavior | ||
| 534 | ✗ | sched_->post(new rearm_op(this->shared_from_this())); | |
| 535 | } | ||
| 536 | 4381x | } | |
| 537 | }; | ||
| 538 | |||
| 539 | template<class Derived, class ImplBase, class Endpoint, class PeerService> | ||
| 540 | inline void | ||
| 541 | 14x | io_uring_multishot_acceptor_base<Derived, ImplBase, Endpoint, PeerService> | |
| 542 | ::waiter_canceller::operator()() const noexcept | ||
| 543 | { | ||
| 544 | 14x | if (w->cancelled.exchange(true, std::memory_order_acq_rel)) | |
| 545 | ✗ | return; | |
| 546 | 14x | w->owner->cancel_waiter(w); | |
| 547 | } | ||
| 548 | |||
| 549 | } // namespace boost::corosio::detail | ||
| 550 | |||
| 551 | #endif // BOOST_COROSIO_HAS_IO_URING | ||
| 552 | |||
| 553 | #endif // BOOST_COROSIO_NATIVE_DETAIL_IO_URING_IO_URING_MULTISHOT_ACCEPTOR_HPP | ||
| 554 |