include/boost/corosio/native/detail/io_uring/io_uring_multishot_acceptor.hpp
69.3% Lines (149/215)
78.6% List of functions (22/28)
Functions (28)
Function
Calls
Lines
Blocks
boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_local_stream_acceptor, boost::corosio::local_stream_acceptor::implementation, boost::corosio::local_endpoint, boost::corosio::detail::io_uring_local_stream_service>::io_uring_multishot_acceptor_base(boost::corosio::detail::io_uring_scheduler&, boost::corosio::detail::io_uring_local_stream_service&)
:89
22x
100.0%
100.0%
boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_tcp_acceptor, boost::corosio::tcp_acceptor::implementation, boost::corosio::endpoint, boost::corosio::detail::io_uring_tcp_service>::io_uring_multishot_acceptor_base(boost::corosio::detail::io_uring_scheduler&, boost::corosio::detail::io_uring_tcp_service&)
:89
91x
100.0%
100.0%
boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_local_stream_acceptor, boost::corosio::local_stream_acceptor::implementation, boost::corosio::local_endpoint, boost::corosio::detail::io_uring_local_stream_service>::~io_uring_multishot_acceptor_base()
:97
22x
85.0%
90.0%
boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_tcp_acceptor, boost::corosio::tcp_acceptor::implementation, boost::corosio::endpoint, boost::corosio::detail::io_uring_tcp_service>::~io_uring_multishot_acceptor_base()
:97
91x
85.0%
90.0%
boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_local_stream_acceptor, boost::corosio::local_stream_acceptor::implementation, boost::corosio::local_endpoint, boost::corosio::detail::io_uring_local_stream_service>::local_endpoint() const
:135
1x
100.0%
100.0%
boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_tcp_acceptor, boost::corosio::tcp_acceptor::implementation, boost::corosio::endpoint, boost::corosio::detail::io_uring_tcp_service>::local_endpoint() const
:135
80x
100.0%
100.0%
boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_local_stream_acceptor, boost::corosio::local_stream_acceptor::implementation, boost::corosio::local_endpoint, boost::corosio::detail::io_uring_local_stream_service>::is_open() const
:140
91x
100.0%
100.0%
boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_tcp_acceptor, boost::corosio::tcp_acceptor::implementation, boost::corosio::endpoint, boost::corosio::detail::io_uring_tcp_service>::is_open() const
:140
4552x
100.0%
100.0%
boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_local_stream_acceptor, boost::corosio::local_stream_acceptor::implementation, boost::corosio::local_endpoint, boost::corosio::detail::io_uring_local_stream_service>::cancel()
:145
1x
100.0%
100.0%
boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_tcp_acceptor, boost::corosio::tcp_acceptor::implementation, boost::corosio::endpoint, boost::corosio::detail::io_uring_tcp_service>::cancel()
:145
3x
100.0%
100.0%
boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_local_stream_acceptor, boost::corosio::local_stream_acceptor::implementation, boost::corosio::local_endpoint, boost::corosio::detail::io_uring_local_stream_service>::drain_waiters_only()
:156
17x
100.0%
100.0%
boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_tcp_acceptor, boost::corosio::tcp_acceptor::implementation, boost::corosio::endpoint, boost::corosio::detail::io_uring_tcp_service>::drain_waiters_only()
:156
88x
100.0%
100.0%
boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_local_stream_acceptor, boost::corosio::local_stream_acceptor::implementation, boost::corosio::local_endpoint, boost::corosio::detail::io_uring_local_stream_service>::set_option(int, int, void const*, unsigned long)
:185
0
0.0%
0.0%
boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_tcp_acceptor, boost::corosio::tcp_acceptor::implementation, boost::corosio::endpoint, boost::corosio::detail::io_uring_tcp_service>::set_option(int, int, void const*, unsigned long)
:185
85x
83.3%
78.0%
boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_local_stream_acceptor, boost::corosio::local_stream_acceptor::implementation, boost::corosio::local_endpoint, boost::corosio::detail::io_uring_local_stream_service>::get_option(int, int, void*, unsigned long*) const
:197
0
0.0%
0.0%
boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_tcp_acceptor, boost::corosio::tcp_acceptor::implementation, boost::corosio::endpoint, boost::corosio::detail::io_uring_tcp_service>::get_option(int, int, void*, unsigned long*) const
:197
2x
87.5%
80.0%
boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_local_stream_acceptor, boost::corosio::local_stream_acceptor::implementation, boost::corosio::local_endpoint, boost::corosio::detail::io_uring_local_stream_service>::start_multishot()
:210
11x
83.3%
81.0%
boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_tcp_acceptor, boost::corosio::tcp_acceptor::implementation, boost::corosio::endpoint, boost::corosio::detail::io_uring_tcp_service>::start_multishot()
:210
74x
83.3%
81.0%
boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_local_stream_acceptor, boost::corosio::local_stream_acceptor::implementation, boost::corosio::local_endpoint, boost::corosio::detail::io_uring_local_stream_service>::dispatch_or_queue(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, std::stop_token const&, std::error_code*, boost::corosio::io_object::implementation**)
:238
9x
38.9%
46.0%
boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_tcp_acceptor, boost::corosio::tcp_acceptor::implementation, boost::corosio::endpoint, boost::corosio::detail::io_uring_tcp_service>::dispatch_or_queue(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, std::stop_token const&, std::error_code*, boost::corosio::io_object::implementation**)
:238
3947x
61.1%
66.0%
boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_local_stream_acceptor, boost::corosio::local_stream_acceptor::implementation, boost::corosio::local_endpoint, boost::corosio::detail::io_uring_local_stream_service>::cancel_waiter(boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_local_stream_acceptor, boost::corosio::local_stream_acceptor::implementation, boost::corosio::local_endpoint, boost::corosio::detail::io_uring_local_stream_service>::waiter_node*)
:320
0
0.0%
0.0%
boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_tcp_acceptor, boost::corosio::tcp_acceptor::implementation, boost::corosio::endpoint, boost::corosio::detail::io_uring_tcp_service>::cancel_waiter(boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_tcp_acceptor, boost::corosio::tcp_acceptor::implementation, boost::corosio::endpoint, boost::corosio::detail::io_uring_tcp_service>::waiter_node*)
:320
0
0.0%
0.0%
boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_local_stream_acceptor, boost::corosio::local_stream_acceptor::implementation, boost::corosio::local_endpoint, boost::corosio::detail::io_uring_local_stream_service>::on_accept_cqe(void*, int, int, bool)
:342
10x
100.0%
100.0%
boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_tcp_acceptor, boost::corosio::tcp_acceptor::implementation, boost::corosio::endpoint, boost::corosio::detail::io_uring_tcp_service>::on_accept_cqe(void*, int, int, bool)
:342
3948x
100.0%
100.0%
boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_local_stream_acceptor, boost::corosio::local_stream_acceptor::implementation, boost::corosio::local_endpoint, boost::corosio::detail::io_uring_local_stream_service>::on_accept_cqe_impl(int, int, bool)
:350
10x
67.7%
61.0%
boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_tcp_acceptor, boost::corosio::tcp_acceptor::implementation, boost::corosio::endpoint, boost::corosio::detail::io_uring_tcp_service>::on_accept_cqe_impl(int, int, bool)
:350
3948x
67.7%
61.0%
boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_local_stream_acceptor, boost::corosio::local_stream_acceptor::implementation, boost::corosio::local_endpoint, boost::corosio::detail::io_uring_local_stream_service>::waiter_canceller::operator()() const
:476
0
0.0%
0.0%
boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_tcp_acceptor, boost::corosio::tcp_acceptor::implementation, boost::corosio::endpoint, boost::corosio::detail::io_uring_tcp_service>::waiter_canceller::operator()() const
:476
0
0.0%
0.0%
| Line | TLA | Hits | Source Code |
|---|---|---|---|
| 1 | // | ||
| 2 | // Copyright (c) 2026 Steve Gerbino | ||
| 3 | // | ||
| 4 | // Distributed under the Boost Software License, Version 1.0. (See accompanying | ||
| 5 | // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) | ||
| 6 | // | ||
| 7 | // Official repository: https://github.com/cppalliance/corosio | ||
| 8 | // | ||
| 9 | |||
| 10 | #ifndef BOOST_COROSIO_NATIVE_DETAIL_IO_URING_IO_URING_MULTISHOT_ACCEPTOR_HPP | ||
| 11 | #define BOOST_COROSIO_NATIVE_DETAIL_IO_URING_IO_URING_MULTISHOT_ACCEPTOR_HPP | ||
| 12 | |||
| 13 | #include <boost/corosio/detail/platform.hpp> | ||
| 14 | |||
| 15 | #if BOOST_COROSIO_HAS_IO_URING | ||
| 16 | |||
| 17 | #include <liburing.h> | ||
| 18 | |||
| 19 | #include <boost/corosio/detail/intrusive.hpp> | ||
| 20 | #include <boost/corosio/native/detail/io_uring/io_uring_acceptor_ops.hpp> | ||
| 21 | #include <boost/corosio/native/detail/io_uring/io_uring_buffer.hpp> | ||
| 22 | #include <boost/corosio/native/detail/io_uring/io_uring_op.hpp> | ||
| 23 | #include <boost/corosio/native/detail/io_uring/io_uring_scheduler.hpp> | ||
| 24 | #include <boost/corosio/native/detail/io_uring/io_uring_socket_ops.hpp> | ||
| 25 | #include <boost/corosio/native/detail/make_err.hpp> | ||
| 26 | #include <boost/corosio/io/io_object.hpp> | ||
| 27 | |||
| 28 | #include <atomic> | ||
| 29 | #include <coroutine> | ||
| 30 | #include <memory> | ||
| 31 | #include <mutex> | ||
| 32 | #include <optional> | ||
| 33 | #include <stop_token> | ||
| 34 | #include <system_error> | ||
| 35 | |||
| 36 | #include <netinet/in.h> | ||
| 37 | #include <sys/socket.h> | ||
| 38 | #include <unistd.h> | ||
| 39 | |||
| 40 | namespace boost::corosio::detail { | ||
| 41 | |||
| 42 | template<class Derived, class ImplBase, class Endpoint, class PeerService> | ||
| 43 | class io_uring_multishot_acceptor_base | ||
| 44 | : public ImplBase | ||
| 45 | , public std::enable_shared_from_this<Derived> | ||
| 46 | { | ||
| 47 | protected: | ||
| 48 | struct ready_fd_node : intrusive_list<ready_fd_node>::node | ||
| 49 | { | ||
| 50 | int fd = -1; | ||
| 51 | sockaddr_storage peer{}; | ||
| 52 | socklen_t peer_len = 0; | ||
| 53 | }; | ||
| 54 | |||
| 55 | struct waiter_node; | ||
| 56 | |||
| 57 | struct waiter_canceller | ||
| 58 | { | ||
| 59 | waiter_node* w; | ||
| 60 | void operator()() const noexcept; | ||
| 61 | }; | ||
| 62 | |||
| 63 | struct waiter_node : intrusive_list<waiter_node>::node | ||
| 64 | { | ||
| 65 | std::coroutine_handle<> h; | ||
| 66 | capy::executor_ref ex; | ||
| 67 | std::error_code* ec_out = nullptr; | ||
| 68 | io_object::implementation** impl_out = nullptr; | ||
| 69 | Derived* owner = nullptr; | ||
| 70 | std::atomic<bool> cancelled{false}; | ||
| 71 | std::optional<std::stop_callback<waiter_canceller>> stop_cb; | ||
| 72 | }; | ||
| 73 | |||
| 74 | int fd_ = -1; | ||
| 75 | io_uring_scheduler* sched_; | ||
| 76 | PeerService* peer_service_; | ||
| 77 | Endpoint local_endpoint_{}; | ||
| 78 | mutable std::mutex mutex_; | ||
| 79 | intrusive_list<ready_fd_node> ready_fds_; | ||
| 80 | intrusive_list<waiter_node> waiters_; | ||
| 81 | std::unique_ptr<uring_multi_accept_op> multi_op_; | ||
| 82 | bool closing_ = false; | ||
| 83 | |||
| 84 | private: | ||
| 85 | // CRTP ctor private + Derived friended so the base cannot be | ||
| 86 | // constructed except as a CRTP base of Derived | ||
| 87 | // (clang-tidy bugprone-crtp-constructor-accessibility). | ||
| 88 | friend Derived; | ||
| 89 | 113x | io_uring_multishot_acceptor_base( | |
| 90 | io_uring_scheduler& sched, PeerService& peer_svc) noexcept | ||
| 91 | 113x | : sched_(&sched) | |
| 92 | 113x | , peer_service_(&peer_svc) | |
| 93 | 113x | {} | |
| 94 | |||
| 95 | public: | ||
| 96 | |||
| 97 | 113x | ~io_uring_multishot_acceptor_base() override | |
| 98 | { | ||
| 99 | { | ||
| 100 | 113x | std::lock_guard lk(mutex_); | |
| 101 | 113x | closing_ = true; | |
| 102 | 113x | } | |
| 103 | 113x | if (fd_ >= 0) | |
| 104 | { | ||
| 105 | ✗ | sched_->submit_cancel_by_fd(fd_); | |
| 106 | ✗ | ::close(fd_); | |
| 107 | ✗ | fd_ = -1; | |
| 108 | } | ||
| 109 | |||
| 110 | // Drain parked accepted-connection fds unconditionally. These are | ||
| 111 | // distinct from the listener fd and can be present even when the | ||
| 112 | // service close() path already closed and cleared fd_ — that path | ||
| 113 | // does not touch ready_fds_, so the drain must run here. | ||
| 114 | 113x | intrusive_list<ready_fd_node> drained; | |
| 115 | { | ||
| 116 | 113x | std::lock_guard lk(mutex_); | |
| 117 | 115x | while (auto* r = ready_fds_.pop_front()) | |
| 118 | 2x | drained.push_back(r); | |
| 119 | 113x | } | |
| 120 | 115x | while (auto* r = drained.pop_front()) | |
| 121 | { | ||
| 122 | 2x | ::close(r->fd); | |
| 123 | 2x | delete r; | |
| 124 | } | ||
| 125 | |||
| 126 | // Break the multi_op_ → impl_ptr (shared_ptr<this>) cycle and | ||
| 127 | // drain pending CQEs so unique_ptr<multi_op_> can free safely. | ||
| 128 | 113x | if (multi_op_) | |
| 129 | { | ||
| 130 | 85x | multi_op_->impl_ptr.reset(); | |
| 131 | 85x | sched_->drain_cqes_for(multi_op_.get()); | |
| 132 | } | ||
| 133 | 226x | } | |
| 134 | |||
| 135 | 81x | Endpoint local_endpoint() const noexcept override | |
| 136 | { | ||
| 137 | 81x | return local_endpoint_; | |
| 138 | } | ||
| 139 | |||
| 140 | 4643x | bool is_open() const noexcept override | |
| 141 | { | ||
| 142 | 4643x | return fd_ >= 0; | |
| 143 | } | ||
| 144 | |||
| 145 | 4x | void cancel() noexcept override | |
| 146 | { | ||
| 147 | 4x | drain_waiters_only(); | |
| 148 | 4x | if (fd_ >= 0) | |
| 149 | 4x | sched_->submit_cancel_by_fd(fd_); | |
| 150 | 4x | } | |
| 151 | |||
| 152 | /// Drain queued waiters with operation_aborted but do NOT submit | ||
| 153 | /// any kernel cancel for the fd. Used by service close() paths | ||
| 154 | /// that have already submitted (or are about to submit) the | ||
| 155 | /// cancel-by-fd themselves via `cancel_and_flush`. | ||
| 156 | 105x | void drain_waiters_only() noexcept | |
| 157 | { | ||
| 158 | 105x | intrusive_list<waiter_node> drained; | |
| 159 | { | ||
| 160 | 105x | std::lock_guard lk(mutex_); | |
| 161 | 105x | closing_ = true; | |
| 162 | // Drain under the lock — the kernel cancel may not produce | ||
| 163 | // a !more CQE before the fd is closed, so we can't rely on | ||
| 164 | // on_accept_cqe_impl to surface operation_aborted. | ||
| 165 | 109x | while (auto* w = waiters_.pop_front()) | |
| 166 | 4x | drained.push_back(w); | |
| 167 | 105x | } | |
| 168 | |||
| 169 | 109x | while (auto* w = drained.pop_front()) | |
| 170 | { | ||
| 171 | 4x | w->stop_cb.reset(); | |
| 172 | // NOLINTNEXTLINE(bugprone-unhandled-exception-at-new) — noexcept destructor path: OOM => std::terminate is the intended behavior | ||
| 173 | 4x | auto* op = new uring_accept_op(); | |
| 174 | 4x | op->h = w->h; | |
| 175 | 4x | op->ex = w->ex; | |
| 176 | 4x | op->ec_out = w->ec_out; | |
| 177 | 4x | op->impl_out = w->impl_out; | |
| 178 | 4x | op->cancelled.store(true, std::memory_order_release); | |
| 179 | 4x | delete w; | |
| 180 | 4x | sched_->post(op); | |
| 181 | 4x | sched_->work_finished(); | |
| 182 | } | ||
| 183 | 105x | } | |
| 184 | |||
| 185 | 85x | std::error_code set_option( | |
| 186 | int level, int optname, | ||
| 187 | void const* data, std::size_t size) noexcept override | ||
| 188 | { | ||
| 189 | 85x | if (fd_ < 0) return make_err(EBADF); | |
| 190 | 85x | if (::setsockopt(fd_, level, optname, | |
| 191 | reinterpret_cast<char const*>(data), | ||
| 192 | 85x | static_cast<socklen_t>(size)) < 0) | |
| 193 | ✗ | return make_err(errno); | |
| 194 | 85x | return {}; | |
| 195 | } | ||
| 196 | |||
| 197 | 2x | std::error_code get_option( | |
| 198 | int level, int optname, | ||
| 199 | void* data, std::size_t* size) const noexcept override | ||
| 200 | { | ||
| 201 | 2x | if (fd_ < 0) return make_err(EBADF); | |
| 202 | 2x | socklen_t len = static_cast<socklen_t>(*size); | |
| 203 | 2x | if (::getsockopt(fd_, level, optname, | |
| 204 | 2x | reinterpret_cast<char*>(data), &len) < 0) | |
| 205 | ✗ | return make_err(errno); | |
| 206 | 2x | *size = static_cast<std::size_t>(len); | |
| 207 | 2x | return {}; | |
| 208 | } | ||
| 209 | |||
| 210 | 85x | void start_multishot() | |
| 211 | { | ||
| 212 | 85x | if (!multi_op_) | |
| 213 | { | ||
| 214 | 85x | multi_op_ = std::make_unique<uring_multi_accept_op>(); | |
| 215 | 85x | multi_op_->listen_fd = fd_; | |
| 216 | 85x | multi_op_->acceptor_impl = this; | |
| 217 | 85x | multi_op_->on_cqe = | |
| 218 | &io_uring_multishot_acceptor_base::on_accept_cqe; | ||
| 219 | 85x | multi_op_->impl_ptr = this->shared_from_this(); | |
| 220 | } | ||
| 221 | else | ||
| 222 | { | ||
| 223 | // Reuse the existing op (re-arm path). Reset peer scratch | ||
| 224 | // so the kernel writes into a clean slot. | ||
| 225 | ✗ | multi_op_->peer_storage = sockaddr_storage{}; | |
| 226 | ✗ | multi_op_->peer_len = sizeof(sockaddr_storage); | |
| 227 | } | ||
| 228 | |||
| 229 | 85x | auto* op = multi_op_.get(); | |
| 230 | 85x | io_uring_submit_op(*sched_, op); | |
| 231 | // Deliberately no work_started(): the multishot SQE is a persistent | ||
| 232 | // internal mechanism. User-visible work is tracked per-accept call. | ||
| 233 | 85x | } | |
| 234 | |||
| 235 | /// Pull a parked fd or queue a waiter — used by Derived::accept(). | ||
| 236 | /// Either case ends with the calling coroutine suspending; the | ||
| 237 | /// caller returns `std::noop_coroutine()` unconditionally. | ||
| 238 | 3956x | void dispatch_or_queue( | |
| 239 | std::coroutine_handle<> h, | ||
| 240 | capy::executor_ref ex, | ||
| 241 | std::stop_token const& token, | ||
| 242 | std::error_code* ec, | ||
| 243 | io_object::implementation** impl_out) | ||
| 244 | { | ||
| 245 | 3956x | sockaddr_storage peer_storage{}; | |
| 246 | 3956x | socklen_t peer_len = sizeof(peer_storage); | |
| 247 | 3956x | int accepted_fd = ::accept4(fd_, | |
| 248 | reinterpret_cast<sockaddr*>(&peer_storage), &peer_len, | ||
| 249 | SOCK_NONBLOCK | SOCK_CLOEXEC); | ||
| 250 | 3956x | if (accepted_fd >= 0) | |
| 251 | { | ||
| 252 | ✗ | auto* op = new uring_accept_op(); | |
| 253 | ✗ | op->h = h; | |
| 254 | ✗ | op->ex = ex; | |
| 255 | ✗ | op->ec_out = ec; | |
| 256 | ✗ | op->impl_out = impl_out; | |
| 257 | ✗ | op->peer_service = peer_service_; | |
| 258 | ✗ | op->adopt_fn = &Derived::adopt_thunk; | |
| 259 | ✗ | op->accepted_fd = accepted_fd; | |
| 260 | ✗ | op->peer_storage = peer_storage; | |
| 261 | ✗ | op->peer_len = peer_len; | |
| 262 | ✗ | sched_->post(op); | |
| 263 | 3952x | return; | |
| 264 | } | ||
| 265 | // accept4 returned <0 — only EAGAIN/EWOULDBLOCK should fall | ||
| 266 | // through to the parked/waiter path. Other errors (EBADF, etc.) | ||
| 267 | // surface through the existing scheduler-completion path so the | ||
| 268 | // user sees them via the op's ec_out. Build an op with `err` | ||
| 269 | // set so do_handler delivers make_err(err). | ||
| 270 | 3956x | if (errno != EAGAIN && errno != EWOULDBLOCK) | |
| 271 | { | ||
| 272 | ✗ | int saved_errno = errno; | |
| 273 | ✗ | auto* op = new uring_accept_op(); | |
| 274 | ✗ | op->h = h; | |
| 275 | ✗ | op->ex = ex; | |
| 276 | ✗ | op->ec_out = ec; | |
| 277 | ✗ | op->impl_out = impl_out; | |
| 278 | ✗ | op->err = saved_errno; | |
| 279 | ✗ | sched_->post(op); | |
| 280 | ✗ | return; | |
| 281 | } | ||
| 282 | |||
| 283 | 3956x | uring_accept_op* ready_op = nullptr; | |
| 284 | { | ||
| 285 | 3956x | std::lock_guard lk(mutex_); | |
| 286 | 3956x | if (auto* r = ready_fds_.pop_front()) | |
| 287 | { | ||
| 288 | 4x | ready_op = new uring_accept_op(); | |
| 289 | 4x | ready_op->h = h; | |
| 290 | 4x | ready_op->ex = ex; | |
| 291 | 4x | ready_op->ec_out = ec; | |
| 292 | 4x | ready_op->impl_out = impl_out; | |
| 293 | 4x | ready_op->peer_service = peer_service_; | |
| 294 | 4x | ready_op->adopt_fn = &Derived::adopt_thunk; | |
| 295 | 4x | ready_op->accepted_fd = r->fd; | |
| 296 | 4x | ready_op->peer_storage = r->peer; | |
| 297 | 4x | ready_op->peer_len = r->peer_len; | |
| 298 | 4x | delete r; | |
| 299 | } | ||
| 300 | else | ||
| 301 | { | ||
| 302 | 3952x | auto* w = new waiter_node{}; | |
| 303 | 3952x | w->h = h; | |
| 304 | 3952x | w->ex = ex; | |
| 305 | 3952x | w->ec_out = ec; | |
| 306 | 3952x | w->impl_out = impl_out; | |
| 307 | 3952x | w->owner = static_cast<Derived*>(this); | |
| 308 | 3952x | if (token.stop_possible()) | |
| 309 | ✗ | w->stop_cb.emplace(token, waiter_canceller{w}); | |
| 310 | 3952x | sched_->work_started(); | |
| 311 | 3952x | waiters_.push_back(w); | |
| 312 | 3952x | return; | |
| 313 | } | ||
| 314 | 3956x | } | |
| 315 | // Post outside the lock — acceptor mutex_ must never be held | ||
| 316 | // while dispatch_mutex_ is acquired by sched_->post(). | ||
| 317 | 4x | sched_->post(ready_op); | |
| 318 | } | ||
| 319 | |||
| 320 | ✗ | void cancel_waiter(waiter_node* w) noexcept | |
| 321 | { | ||
| 322 | { | ||
| 323 | ✗ | std::lock_guard lk(mutex_); | |
| 324 | ✗ | if (closing_) return; // on_accept_cqe_impl will drain with closing_ set | |
| 325 | ✗ | waiters_.remove(w); | |
| 326 | ✗ | } | |
| 327 | // NOLINTNEXTLINE(bugprone-unhandled-exception-at-new) — stop-token callback: noexcept, OOM => std::terminate is the intended behavior | ||
| 328 | ✗ | auto* op = new uring_accept_op(); | |
| 329 | ✗ | op->h = w->h; | |
| 330 | ✗ | op->ex = w->ex; | |
| 331 | ✗ | op->ec_out = w->ec_out; | |
| 332 | ✗ | op->impl_out = w->impl_out; | |
| 333 | ✗ | op->cancelled.store(true, std::memory_order_release); | |
| 334 | ✗ | delete w; | |
| 335 | // post() increments outstanding_work_; balances the work_started() | ||
| 336 | // from accept() when the waiter was queued. | ||
| 337 | ✗ | sched_->post(op); | |
| 338 | ✗ | sched_->work_finished(); // balance the work_started() from accept() | |
| 339 | } | ||
| 340 | |||
| 341 | private: | ||
| 342 | 3958x | static void on_accept_cqe( | |
| 343 | void* self_ptr, int new_fd, int err, bool more) noexcept | ||
| 344 | { | ||
| 345 | static_cast<Derived*>(self_ptr) | ||
| 346 | 3958x | ->on_accept_cqe_impl(new_fd, err, more); | |
| 347 | 3958x | } | |
| 348 | |||
| 349 | protected: | ||
| 350 | 3958x | void on_accept_cqe_impl(int new_fd, int err, bool more) noexcept | |
| 351 | { | ||
| 352 | 3958x | bool was_closing = false; | |
| 353 | 3958x | waiter_node* matched = nullptr; | |
| 354 | 3958x | intrusive_list<waiter_node> closing_waiters; | |
| 355 | { | ||
| 356 | 3958x | std::lock_guard lk(mutex_); | |
| 357 | 3958x | was_closing = closing_; | |
| 358 | 3958x | if (was_closing) | |
| 359 | { | ||
| 360 | 4x | if (new_fd >= 0) | |
| 361 | ✗ | ::close(new_fd); | |
| 362 | 4x | if (!more) | |
| 363 | { | ||
| 364 | // Collect waiters to drain after the lock is released. | ||
| 365 | 4x | while (auto* w = waiters_.pop_front()) | |
| 366 | ✗ | closing_waiters.push_back(w); | |
| 367 | } | ||
| 368 | } | ||
| 369 | 3954x | else if (!waiters_.empty()) | |
| 370 | { | ||
| 371 | // Claim the head waiter atomically. If the canceller | ||
| 372 | // already won the race (cancelled was already true), | ||
| 373 | // leave the waiter in the list for cancel_waiter to | ||
| 374 | // remove and dispatch with operation_aborted; park the | ||
| 375 | // new_fd so the next waiter consumes it. | ||
| 376 | 3948x | auto* head_w = waiters_.front(); | |
| 377 | 3948x | if (!head_w->cancelled.exchange( | |
| 378 | true, std::memory_order_acq_rel)) | ||
| 379 | { | ||
| 380 | 3948x | waiters_.pop_front(); | |
| 381 | 3948x | matched = head_w; | |
| 382 | } | ||
| 383 | ✗ | else if (new_fd >= 0) | |
| 384 | { | ||
| 385 | // NOLINTNEXTLINE(bugprone-unhandled-exception-at-new) — CQE handler: noexcept, OOM => std::terminate is the intended behavior | ||
| 386 | ✗ | auto* node = new ready_fd_node{}; | |
| 387 | ✗ | node->fd = new_fd; | |
| 388 | ✗ | node->peer = multi_op_->peer_storage; | |
| 389 | ✗ | node->peer_len = multi_op_->peer_len; | |
| 390 | ✗ | ready_fds_.push_back(node); | |
| 391 | } | ||
| 392 | } | ||
| 393 | 6x | else if (new_fd >= 0) | |
| 394 | { | ||
| 395 | // NOLINTNEXTLINE(bugprone-unhandled-exception-at-new) — CQE handler: noexcept, OOM => std::terminate is the intended behavior | ||
| 396 | 6x | auto* node = new ready_fd_node{}; | |
| 397 | 6x | node->fd = new_fd; | |
| 398 | 6x | node->peer = multi_op_->peer_storage; | |
| 399 | 6x | node->peer_len = multi_op_->peer_len; | |
| 400 | 6x | ready_fds_.push_back(node); | |
| 401 | } | ||
| 402 | 3958x | } | |
| 403 | |||
| 404 | 3958x | if (matched) | |
| 405 | { | ||
| 406 | 3948x | matched->stop_cb.reset(); | |
| 407 | // NOLINTNEXTLINE(bugprone-unhandled-exception-at-new) — CQE handler: noexcept, OOM => std::terminate is the intended behavior | ||
| 408 | 3948x | auto* op = new uring_accept_op(); | |
| 409 | 3948x | op->h = matched->h; | |
| 410 | 3948x | op->ex = matched->ex; | |
| 411 | 3948x | op->ec_out = matched->ec_out; | |
| 412 | 3948x | op->impl_out = matched->impl_out; | |
| 413 | 3948x | op->peer_service = peer_service_; | |
| 414 | 3948x | op->adopt_fn = &Derived::adopt_thunk; | |
| 415 | 3948x | if (err) | |
| 416 | { | ||
| 417 | ✗ | op->err = err; | |
| 418 | } | ||
| 419 | 3948x | else if (new_fd >= 0) | |
| 420 | { | ||
| 421 | 3948x | op->accepted_fd = new_fd; | |
| 422 | 3948x | op->peer_storage = multi_op_->peer_storage; | |
| 423 | 3948x | op->peer_len = multi_op_->peer_len; | |
| 424 | } | ||
| 425 | 3948x | delete matched; | |
| 426 | 3948x | sched_->post(op); | |
| 427 | 3948x | sched_->work_finished(); // balance waiter's work_started | |
| 428 | } | ||
| 429 | |||
| 430 | 3958x | while (auto* w = closing_waiters.pop_front()) | |
| 431 | { | ||
| 432 | ✗ | w->stop_cb.reset(); | |
| 433 | // NOLINTNEXTLINE(bugprone-unhandled-exception-at-new) — CQE handler shutdown path: noexcept, OOM => std::terminate is the intended behavior | ||
| 434 | ✗ | auto* op = new uring_accept_op(); | |
| 435 | ✗ | op->h = w->h; | |
| 436 | ✗ | op->ex = w->ex; | |
| 437 | ✗ | op->ec_out = w->ec_out; | |
| 438 | ✗ | op->impl_out = w->impl_out; | |
| 439 | ✗ | op->cancelled.store(true, std::memory_order_release); | |
| 440 | ✗ | delete w; | |
| 441 | ✗ | sched_->post(op); | |
| 442 | ✗ | sched_->work_finished(); // balance waiter's work_started | |
| 443 | } | ||
| 444 | |||
| 445 | 3958x | if (!more && !was_closing) | |
| 446 | { | ||
| 447 | // Re-arm: kernel terminated multishot non-fatally. | ||
| 448 | struct rearm_op final : scheduler_op | ||
| 449 | { | ||
| 450 | std::shared_ptr<Derived> self_; | ||
| 451 | explicit rearm_op(std::shared_ptr<Derived> s) noexcept | ||
| 452 | : self_(std::move(s)) {} | ||
| 453 | |||
| 454 | void operator()() override | ||
| 455 | { | ||
| 456 | auto self = std::move(self_); | ||
| 457 | delete this; | ||
| 458 | { | ||
| 459 | std::lock_guard lk(self->mutex_); | ||
| 460 | if (self->closing_) | ||
| 461 | return; | ||
| 462 | } | ||
| 463 | self->start_multishot(); | ||
| 464 | } | ||
| 465 | |||
| 466 | void destroy() override { delete this; } | ||
| 467 | }; | ||
| 468 | // NOLINTNEXTLINE(bugprone-unhandled-exception-at-new) — CQE handler re-arm: noexcept, OOM => std::terminate is the intended behavior | ||
| 469 | ✗ | sched_->post(new rearm_op(this->shared_from_this())); | |
| 470 | } | ||
| 471 | 3958x | } | |
| 472 | }; | ||
| 473 | |||
| 474 | template<class Derived, class ImplBase, class Endpoint, class PeerService> | ||
| 475 | inline void | ||
| 476 | ✗ | io_uring_multishot_acceptor_base<Derived, ImplBase, Endpoint, PeerService> | |
| 477 | ::waiter_canceller::operator()() const noexcept | ||
| 478 | { | ||
| 479 | ✗ | if (w->cancelled.exchange(true, std::memory_order_acq_rel)) | |
| 480 | ✗ | return; | |
| 481 | ✗ | w->owner->cancel_waiter(w); | |
| 482 | } | ||
| 483 | |||
| 484 | } // namespace boost::corosio::detail | ||
| 485 | |||
| 486 | #endif // BOOST_COROSIO_HAS_IO_URING | ||
| 487 | |||
| 488 | #endif // BOOST_COROSIO_NATIVE_DETAIL_IO_URING_IO_URING_MULTISHOT_ACCEPTOR_HPP | ||
| 489 |