include/boost/corosio/native/detail/io_uring/io_uring_multishot_acceptor.hpp

79.7% Lines (200/251) 85.7% List of functions (24/28)
io_uring_multishot_acceptor.hpp
f(x) Functions (28)
Function Calls Lines Blocks
boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_local_stream_acceptor, boost::corosio::local_stream_acceptor::implementation, boost::corosio::local_endpoint, boost::corosio::detail::io_uring_local_stream_service>::io_uring_multishot_acceptor_base(boost::corosio::detail::io_uring_scheduler&, boost::corosio::detail::io_uring_local_stream_service&) :93 27x 100.0% 100.0% boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_tcp_acceptor, boost::corosio::tcp_acceptor::implementation, boost::corosio::endpoint, boost::corosio::detail::io_uring_tcp_service>::io_uring_multishot_acceptor_base(boost::corosio::detail::io_uring_scheduler&, boost::corosio::detail::io_uring_tcp_service&) :93 121x 100.0% 100.0% boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_local_stream_acceptor, boost::corosio::local_stream_acceptor::implementation, boost::corosio::local_endpoint, boost::corosio::detail::io_uring_local_stream_service>::~io_uring_multishot_acceptor_base() :101 27x 85.0% 90.0% boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_tcp_acceptor, boost::corosio::tcp_acceptor::implementation, boost::corosio::endpoint, boost::corosio::detail::io_uring_tcp_service>::~io_uring_multishot_acceptor_base() :101 121x 85.0% 90.0% boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_local_stream_acceptor, boost::corosio::local_stream_acceptor::implementation, boost::corosio::local_endpoint, boost::corosio::detail::io_uring_local_stream_service>::local_endpoint() const :139 1x 100.0% 100.0% boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_tcp_acceptor, boost::corosio::tcp_acceptor::implementation, boost::corosio::endpoint, boost::corosio::detail::io_uring_tcp_service>::local_endpoint() const :139 97x 100.0% 100.0% boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_local_stream_acceptor, boost::corosio::local_stream_acceptor::implementation, boost::corosio::local_endpoint, boost::corosio::detail::io_uring_local_stream_service>::is_open() const :144 114x 100.0% 100.0% boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_tcp_acceptor, boost::corosio::tcp_acceptor::implementation, boost::corosio::endpoint, boost::corosio::detail::io_uring_tcp_service>::is_open() const :144 5161x 100.0% 100.0% boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_local_stream_acceptor, boost::corosio::local_stream_acceptor::implementation, boost::corosio::local_endpoint, boost::corosio::detail::io_uring_local_stream_service>::cancel() :149 1x 100.0% 100.0% boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_tcp_acceptor, boost::corosio::tcp_acceptor::implementation, boost::corosio::endpoint, boost::corosio::detail::io_uring_tcp_service>::cancel() :149 3x 100.0% 100.0% boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_local_stream_acceptor, boost::corosio::local_stream_acceptor::implementation, boost::corosio::local_endpoint, boost::corosio::detail::io_uring_local_stream_service>::drain_waiters_only() :160 22x 100.0% 100.0% boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_tcp_acceptor, boost::corosio::tcp_acceptor::implementation, boost::corosio::endpoint, boost::corosio::detail::io_uring_tcp_service>::drain_waiters_only() :160 118x 100.0% 100.0% boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_local_stream_acceptor, boost::corosio::local_stream_acceptor::implementation, boost::corosio::local_endpoint, boost::corosio::detail::io_uring_local_stream_service>::set_option(int, int, void const*, unsigned long) :189 0 0.0% 0.0% boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_tcp_acceptor, boost::corosio::tcp_acceptor::implementation, boost::corosio::endpoint, boost::corosio::detail::io_uring_tcp_service>::set_option(int, int, void const*, unsigned long) :189 114x 83.3% 78.0% boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_local_stream_acceptor, boost::corosio::local_stream_acceptor::implementation, boost::corosio::local_endpoint, boost::corosio::detail::io_uring_local_stream_service>::get_option(int, int, void*, unsigned long*) const :201 0 0.0% 0.0% boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_tcp_acceptor, boost::corosio::tcp_acceptor::implementation, boost::corosio::endpoint, boost::corosio::detail::io_uring_tcp_service>::get_option(int, int, void*, unsigned long*) const :201 2x 87.5% 80.0% boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_local_stream_acceptor, boost::corosio::local_stream_acceptor::implementation, boost::corosio::local_endpoint, boost::corosio::detail::io_uring_local_stream_service>::start_multishot() :214 15x 83.3% 81.0% boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_tcp_acceptor, boost::corosio::tcp_acceptor::implementation, boost::corosio::endpoint, boost::corosio::detail::io_uring_tcp_service>::start_multishot() :214 101x 100.0% 95.0% boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_local_stream_acceptor, boost::corosio::local_stream_acceptor::implementation, boost::corosio::local_endpoint, boost::corosio::detail::io_uring_local_stream_service>::dispatch_or_queue(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, std::stop_token const&, std::error_code*, boost::corosio::io_object::implementation**) :242 13x 71.6% 72.0% boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_tcp_acceptor, boost::corosio::tcp_acceptor::implementation, boost::corosio::endpoint, boost::corosio::detail::io_uring_tcp_service>::dispatch_or_queue(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, std::stop_token const&, std::error_code*, boost::corosio::io_object::implementation**) :242 4382x 71.6% 72.0% boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_local_stream_acceptor, boost::corosio::local_stream_acceptor::implementation, boost::corosio::local_endpoint, boost::corosio::detail::io_uring_local_stream_service>::cancel_waiter(boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_local_stream_acceptor, boost::corosio::local_stream_acceptor::implementation, boost::corosio::local_endpoint, boost::corosio::detail::io_uring_local_stream_service>::waiter_node*) :382 2x 100.0% 95.0% boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_tcp_acceptor, boost::corosio::tcp_acceptor::implementation, boost::corosio::endpoint, boost::corosio::detail::io_uring_tcp_service>::cancel_waiter(boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_tcp_acceptor, boost::corosio::tcp_acceptor::implementation, boost::corosio::endpoint, boost::corosio::detail::io_uring_tcp_service>::waiter_node*) :382 12x 100.0% 95.0% boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_local_stream_acceptor, boost::corosio::local_stream_acceptor::implementation, boost::corosio::local_endpoint, boost::corosio::detail::io_uring_local_stream_service>::on_accept_cqe(void*, int, int, bool) :407 11x 100.0% 100.0% boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_tcp_acceptor, boost::corosio::tcp_acceptor::implementation, boost::corosio::endpoint, boost::corosio::detail::io_uring_tcp_service>::on_accept_cqe(void*, int, int, bool) :407 4370x 100.0% 100.0% boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_local_stream_acceptor, boost::corosio::local_stream_acceptor::implementation, boost::corosio::local_endpoint, boost::corosio::detail::io_uring_local_stream_service>::on_accept_cqe_impl(int, int, bool) :415 11x 67.7% 61.0% boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_tcp_acceptor, boost::corosio::tcp_acceptor::implementation, boost::corosio::endpoint, boost::corosio::detail::io_uring_tcp_service>::on_accept_cqe_impl(int, int, bool) :415 4370x 67.7% 61.0% boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_local_stream_acceptor, boost::corosio::local_stream_acceptor::implementation, boost::corosio::local_endpoint, boost::corosio::detail::io_uring_local_stream_service>::waiter_canceller::operator()() const :541 0 0.0% 0.0% boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_tcp_acceptor, boost::corosio::tcp_acceptor::implementation, boost::corosio::endpoint, boost::corosio::detail::io_uring_tcp_service>::waiter_canceller::operator()() const :541 0 0.0% 0.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_IO_URING_IO_URING_MULTISHOT_ACCEPTOR_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_IO_URING_IO_URING_MULTISHOT_ACCEPTOR_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_IO_URING
16
17 #include <liburing.h>
18
19 #include <boost/corosio/detail/intrusive.hpp>
20 #include <boost/corosio/native/detail/io_uring/io_uring_acceptor_ops.hpp>
21 #include <boost/corosio/native/detail/io_uring/io_uring_buffer.hpp>
22 #include <boost/corosio/native/detail/io_uring/io_uring_op.hpp>
23 #include <boost/corosio/native/detail/io_uring/io_uring_scheduler.hpp>
24 #include <boost/corosio/native/detail/io_uring/io_uring_socket_ops.hpp>
25 #include <boost/corosio/native/detail/make_err.hpp>
26 #include <boost/corosio/io/io_object.hpp>
27
28 #include <atomic>
29 #include <coroutine>
30 #include <memory>
31 #include <mutex>
32 #include <optional>
33 #include <stop_token>
34 #include <system_error>
35
36 #include <netinet/in.h>
37 #include <sys/socket.h>
38 #include <unistd.h>
39
40 namespace boost::corosio::detail {
41
42 template<class Derived, class ImplBase, class Endpoint, class PeerService>
43 class io_uring_multishot_acceptor_base
44 : public ImplBase
45 , public std::enable_shared_from_this<Derived>
46 {
47 protected:
48 struct ready_fd_node : intrusive_list<ready_fd_node>::node
49 {
50 int fd = -1;
51 sockaddr_storage peer{};
52 socklen_t peer_len = 0;
53 };
54
55 struct waiter_node;
56
57 struct waiter_canceller
58 {
59 waiter_node* w;
60 void operator()() const noexcept;
61 };
62
63 struct waiter_node : intrusive_list<waiter_node>::node
64 {
65 std::coroutine_handle<> h;
66 capy::executor_ref ex;
67 std::error_code* ec_out = nullptr;
68 io_object::implementation** impl_out = nullptr;
69 Derived* owner = nullptr;
70 std::atomic<bool> cancelled{false};
71 /// True once linked into `waiters_` (guarded by `mutex_`).
72 /// The stop callback is armed before the node is queued, so
73 /// cancel_waiter must not unlink a node it never queued.
74 bool queued = false;
75 std::optional<std::stop_callback<waiter_canceller>> stop_cb;
76 };
77
78 int fd_ = -1;
79 io_uring_scheduler* sched_;
80 PeerService* peer_service_;
81 Endpoint local_endpoint_{};
82 mutable std::mutex mutex_;
83 intrusive_list<ready_fd_node> ready_fds_;
84 intrusive_list<waiter_node> waiters_;
85 std::unique_ptr<uring_multi_accept_op> multi_op_;
86 bool closing_ = false;
87
88 private:
89 // CRTP ctor private + Derived friended so the base cannot be
90 // constructed except as a CRTP base of Derived
91 // (clang-tidy bugprone-crtp-constructor-accessibility).
92 friend Derived;
93 148x io_uring_multishot_acceptor_base(
94 io_uring_scheduler& sched, PeerService& peer_svc) noexcept
95 148x : sched_(&sched)
96 148x , peer_service_(&peer_svc)
97 148x {}
98
99 public:
100
101 148x ~io_uring_multishot_acceptor_base() override
102 {
103 {
104 148x std::lock_guard lk(mutex_);
105 148x closing_ = true;
106 148x }
107 148x if (fd_ >= 0)
108 {
109 sched_->submit_cancel_by_fd(fd_);
110 ::close(fd_);
111 fd_ = -1;
112 }
113
114 // Drain parked accepted-connection fds unconditionally. These are
115 // distinct from the listener fd and can be present even when the
116 // service close() path already closed and cleared fd_ — that path
117 // does not touch ready_fds_, so the drain must run here.
118 148x intrusive_list<ready_fd_node> drained;
119 {
120 148x std::lock_guard lk(mutex_);
121 150x while (auto* r = ready_fds_.pop_front())
122 2x drained.push_back(r);
123 148x }
124 150x while (auto* r = drained.pop_front())
125 {
126 2x ::close(r->fd);
127 2x delete r;
128 }
129
130 // Break the multi_op_ → impl_ptr (shared_ptr<this>) cycle and
131 // drain pending CQEs so unique_ptr<multi_op_> can free safely.
132 148x if (multi_op_)
133 {
134 115x multi_op_->impl_ptr.reset();
135 115x sched_->drain_cqes_for(multi_op_.get());
136 }
137 296x }
138
139 98x Endpoint local_endpoint() const noexcept override
140 {
141 98x return local_endpoint_;
142 }
143
144 5275x bool is_open() const noexcept override
145 {
146 5275x return fd_ >= 0;
147 }
148
149 4x void cancel() noexcept override
150 {
151 4x drain_waiters_only();
152 4x if (fd_ >= 0)
153 4x sched_->submit_cancel_by_fd(fd_);
154 4x }
155
156 /// Drain queued waiters with operation_aborted but do NOT submit
157 /// any kernel cancel for the fd. Used by service close() paths
158 /// that have already submitted (or are about to submit) the
159 /// cancel-by-fd themselves via `cancel_and_flush`.
160 140x void drain_waiters_only() noexcept
161 {
162 140x intrusive_list<waiter_node> drained;
163 {
164 140x std::lock_guard lk(mutex_);
165 140x closing_ = true;
166 // Drain under the lock — the kernel cancel may not produce
167 // a !more CQE before the fd is closed, so we can't rely on
168 // on_accept_cqe_impl to surface operation_aborted.
169 144x while (auto* w = waiters_.pop_front())
170 4x drained.push_back(w);
171 140x }
172
173 144x while (auto* w = drained.pop_front())
174 {
175 4x w->stop_cb.reset();
176 // NOLINTNEXTLINE(bugprone-unhandled-exception-at-new) — noexcept destructor path: OOM => std::terminate is the intended behavior
177 4x auto* op = new uring_accept_op();
178 4x op->h = w->h;
179 4x op->ex = w->ex;
180 4x op->ec_out = w->ec_out;
181 4x op->impl_out = w->impl_out;
182 4x op->cancelled.store(true, std::memory_order_release);
183 4x delete w;
184 4x sched_->post(op);
185 4x sched_->work_finished();
186 }
187 140x }
188
189 114x std::error_code set_option(
190 int level, int optname,
191 void const* data, std::size_t size) noexcept override
192 {
193 114x if (fd_ < 0) return make_err(EBADF);
194 114x if (::setsockopt(fd_, level, optname,
195 reinterpret_cast<char const*>(data),
196 114x static_cast<socklen_t>(size)) < 0)
197 return make_err(errno);
198 114x return {};
199 }
200
201 2x std::error_code get_option(
202 int level, int optname,
203 void* data, std::size_t* size) const noexcept override
204 {
205 2x if (fd_ < 0) return make_err(EBADF);
206 2x socklen_t len = static_cast<socklen_t>(*size);
207 2x if (::getsockopt(fd_, level, optname,
208 2x reinterpret_cast<char*>(data), &len) < 0)
209 return make_err(errno);
210 2x *size = static_cast<std::size_t>(len);
211 2x return {};
212 }
213
214 116x void start_multishot()
215 {
216 116x if (!multi_op_)
217 {
218 115x multi_op_ = std::make_unique<uring_multi_accept_op>();
219 115x multi_op_->listen_fd = fd_;
220 115x multi_op_->acceptor_impl = this;
221 115x multi_op_->on_cqe =
222 &io_uring_multishot_acceptor_base::on_accept_cqe;
223 115x multi_op_->impl_ptr = this->shared_from_this();
224 }
225 else
226 {
227 // Reuse the existing op (re-arm path). Reset peer scratch
228 // so the kernel writes into a clean slot.
229 1x multi_op_->peer_storage = sockaddr_storage{};
230 1x multi_op_->peer_len = sizeof(sockaddr_storage);
231 }
232
233 116x auto* op = multi_op_.get();
234 116x io_uring_submit_op(*sched_, op);
235 // Deliberately no work_started(): the multishot SQE is a persistent
236 // internal mechanism. User-visible work is tracked per-accept call.
237 116x }
238
239 /// Pull a parked fd or queue a waiter — used by Derived::accept().
240 /// Either case ends with the calling coroutine suspending; the
241 /// caller returns `std::noop_coroutine()` unconditionally.
242 4395x void dispatch_or_queue(
243 std::coroutine_handle<> h,
244 capy::executor_ref ex,
245 std::stop_token const& token,
246 std::error_code* ec,
247 io_object::implementation** impl_out)
248 {
249 4395x sockaddr_storage peer_storage{};
250 4395x socklen_t peer_len = sizeof(peer_storage);
251 4395x int accepted_fd = ::accept4(fd_,
252 reinterpret_cast<sockaddr*>(&peer_storage), &peer_len,
253 SOCK_NONBLOCK | SOCK_CLOEXEC);
254 4395x if (accepted_fd >= 0)
255 {
256 auto* op = new uring_accept_op();
257 op->h = h;
258 op->ex = ex;
259 op->ec_out = ec;
260 op->impl_out = impl_out;
261 op->peer_service = peer_service_;
262 op->adopt_fn = &Derived::adopt_thunk;
263 op->accepted_fd = accepted_fd;
264 op->peer_storage = peer_storage;
265 op->peer_len = peer_len;
266 sched_->post(op);
267 4395x return;
268 }
269 // accept4 returned <0 — only EAGAIN/EWOULDBLOCK should fall
270 // through to the parked/waiter path. Other errors (EBADF, etc.)
271 // surface through the existing scheduler-completion path so the
272 // user sees them via the op's ec_out. Build an op with `err`
273 // set so do_handler delivers make_err(err).
274 4395x if (errno != EAGAIN && errno != EWOULDBLOCK)
275 {
276 2x int saved_errno = errno;
277 2x auto* op = new uring_accept_op();
278 2x op->h = h;
279 2x op->ex = ex;
280 2x op->ec_out = ec;
281 2x op->impl_out = impl_out;
282 2x op->err = saved_errno;
283 2x sched_->post(op);
284 2x return;
285 }
286
287 4393x uring_accept_op* ready_op = nullptr;
288 {
289 4393x std::lock_guard lk(mutex_);
290 4393x if (auto* r = ready_fds_.pop_front())
291 {
292 6x ready_op = new uring_accept_op();
293 6x ready_op->h = h;
294 6x ready_op->ex = ex;
295 6x ready_op->ec_out = ec;
296 6x ready_op->impl_out = impl_out;
297 6x ready_op->peer_service = peer_service_;
298 6x ready_op->adopt_fn = &Derived::adopt_thunk;
299 6x ready_op->accepted_fd = r->fd;
300 6x ready_op->peer_storage = r->peer;
301 6x ready_op->peer_len = r->peer_len;
302 6x delete r;
303 }
304 4393x }
305 4393x if (ready_op)
306 {
307 // Post outside the lock — acceptor mutex_ must never be
308 // held while dispatch_mutex_ is acquired by sched_->post().
309 6x sched_->post(ready_op);
310 6x return;
311 }
312
313 4387x auto* w = new waiter_node{};
314 4387x w->h = h;
315 4387x w->ex = ex;
316 4387x w->ec_out = ec;
317 4387x w->impl_out = impl_out;
318 4387x w->owner = static_cast<Derived*>(this);
319
320 // Arm the stop callback before the node is visible in
321 // `waiters_` and outside `mutex_`: an already-stopped token
322 // invokes the canceller synchronously from emplace, and
323 // cancel_waiter takes `mutex_` (self-deadlock if held).
324 // Arming pre-queue also keeps the CQE handler from claiming
325 // and deleting a node whose callback is not yet constructed.
326 4387x if (token.stop_possible())
327 25x w->stop_cb.emplace(token, waiter_canceller{w});
328
329 4387x bool was_cancelled = false;
330 {
331 4387x std::lock_guard lk(mutex_);
332 4387x if (w->cancelled.load(std::memory_order_acquire))
333 {
334 // Canceller already fired (pre-stopped token); it saw
335 // queued == false and left completion to us.
336 2x was_cancelled = true;
337 }
338 4385x else if (auto* r = ready_fds_.pop_front())
339 {
340 // A connection arrived while the callback was armed;
341 // prefer it over parking the waiter behind it.
342 ready_op = new uring_accept_op();
343 ready_op->h = h;
344 ready_op->ex = ex;
345 ready_op->ec_out = ec;
346 ready_op->impl_out = impl_out;
347 ready_op->peer_service = peer_service_;
348 ready_op->adopt_fn = &Derived::adopt_thunk;
349 ready_op->accepted_fd = r->fd;
350 ready_op->peer_storage = r->peer;
351 ready_op->peer_len = r->peer_len;
352 delete r;
353 }
354 else
355 {
356 4385x w->queued = true;
357 4385x sched_->work_started();
358 4385x waiters_.push_back(w);
359 4385x return;
360 }
361 4387x }
362
363 2x if (was_cancelled)
364 {
365 2x auto* op = new uring_accept_op();
366 2x op->h = w->h;
367 2x op->ex = w->ex;
368 2x op->ec_out = w->ec_out;
369 2x op->impl_out = w->impl_out;
370 2x op->cancelled.store(true, std::memory_order_release);
371 2x w->stop_cb.reset();
372 2x delete w;
373 2x sched_->post(op);
374 2x return;
375 }
376
377 w->stop_cb.reset();
378 delete w;
379 sched_->post(ready_op);
380 }
381
382 14x void cancel_waiter(waiter_node* w) noexcept
383 {
384 {
385 14x std::lock_guard lk(mutex_);
386 14x if (closing_) return; // on_accept_cqe_impl will drain with closing_ set
387 14x if (!w->queued)
388 2x return; // not in waiters_ yet; dispatch_or_queue
389 // observes `cancelled` and completes the op
390 12x waiters_.remove(w);
391 14x }
392 // NOLINTNEXTLINE(bugprone-unhandled-exception-at-new) — stop-token callback: noexcept, OOM => std::terminate is the intended behavior
393 12x auto* op = new uring_accept_op();
394 12x op->h = w->h;
395 12x op->ex = w->ex;
396 12x op->ec_out = w->ec_out;
397 12x op->impl_out = w->impl_out;
398 12x op->cancelled.store(true, std::memory_order_release);
399 12x delete w;
400 // post() increments outstanding_work_; balances the work_started()
401 // from accept() when the waiter was queued.
402 12x sched_->post(op);
403 12x sched_->work_finished(); // balance the work_started() from accept()
404 }
405
406 private:
407 4381x static void on_accept_cqe(
408 void* self_ptr, int new_fd, int err, bool more) noexcept
409 {
410 static_cast<Derived*>(self_ptr)
411 4381x ->on_accept_cqe_impl(new_fd, err, more);
412 4381x }
413
414 protected:
415 4381x void on_accept_cqe_impl(int new_fd, int err, bool more) noexcept
416 {
417 4381x bool was_closing = false;
418 4381x waiter_node* matched = nullptr;
419 4381x intrusive_list<waiter_node> closing_waiters;
420 {
421 4381x std::lock_guard lk(mutex_);
422 4381x was_closing = closing_;
423 4381x if (was_closing)
424 {
425 4x if (new_fd >= 0)
426 ::close(new_fd);
427 4x if (!more)
428 {
429 // Collect waiters to drain after the lock is released.
430 4x while (auto* w = waiters_.pop_front())
431 closing_waiters.push_back(w);
432 }
433 }
434 4377x else if (!waiters_.empty())
435 {
436 // Claim the head waiter atomically. If the canceller
437 // already won the race (cancelled was already true),
438 // leave the waiter in the list for cancel_waiter to
439 // remove and dispatch with operation_aborted; park the
440 // new_fd so the next waiter consumes it.
441 4369x auto* head_w = waiters_.front();
442 4369x if (!head_w->cancelled.exchange(
443 true, std::memory_order_acq_rel))
444 {
445 4369x waiters_.pop_front();
446 4369x matched = head_w;
447 }
448 else if (new_fd >= 0)
449 {
450 // NOLINTNEXTLINE(bugprone-unhandled-exception-at-new) — CQE handler: noexcept, OOM => std::terminate is the intended behavior
451 auto* node = new ready_fd_node{};
452 node->fd = new_fd;
453 node->peer = multi_op_->peer_storage;
454 node->peer_len = multi_op_->peer_len;
455 ready_fds_.push_back(node);
456 }
457 }
458 8x else if (new_fd >= 0)
459 {
460 // NOLINTNEXTLINE(bugprone-unhandled-exception-at-new) — CQE handler: noexcept, OOM => std::terminate is the intended behavior
461 8x auto* node = new ready_fd_node{};
462 8x node->fd = new_fd;
463 8x node->peer = multi_op_->peer_storage;
464 8x node->peer_len = multi_op_->peer_len;
465 8x ready_fds_.push_back(node);
466 }
467 4381x }
468
469 4381x if (matched)
470 {
471 4369x matched->stop_cb.reset();
472 // NOLINTNEXTLINE(bugprone-unhandled-exception-at-new) — CQE handler: noexcept, OOM => std::terminate is the intended behavior
473 4369x auto* op = new uring_accept_op();
474 4369x op->h = matched->h;
475 4369x op->ex = matched->ex;
476 4369x op->ec_out = matched->ec_out;
477 4369x op->impl_out = matched->impl_out;
478 4369x op->peer_service = peer_service_;
479 4369x op->adopt_fn = &Derived::adopt_thunk;
480 4369x if (err)
481 {
482 op->err = err;
483 }
484 4369x else if (new_fd >= 0)
485 {
486 4369x op->accepted_fd = new_fd;
487 4369x op->peer_storage = multi_op_->peer_storage;
488 4369x op->peer_len = multi_op_->peer_len;
489 }
490 4369x delete matched;
491 4369x sched_->post(op);
492 4369x sched_->work_finished(); // balance waiter's work_started
493 }
494
495 4381x while (auto* w = closing_waiters.pop_front())
496 {
497 w->stop_cb.reset();
498 // NOLINTNEXTLINE(bugprone-unhandled-exception-at-new) — CQE handler shutdown path: noexcept, OOM => std::terminate is the intended behavior
499 auto* op = new uring_accept_op();
500 op->h = w->h;
501 op->ex = w->ex;
502 op->ec_out = w->ec_out;
503 op->impl_out = w->impl_out;
504 op->cancelled.store(true, std::memory_order_release);
505 delete w;
506 sched_->post(op);
507 sched_->work_finished(); // balance waiter's work_started
508 }
509
510 4381x if (!more && !was_closing)
511 {
512 // Re-arm: kernel terminated multishot non-fatally.
513 struct rearm_op final : scheduler_op
514 {
515 std::shared_ptr<Derived> self_;
516 explicit rearm_op(std::shared_ptr<Derived> s) noexcept
517 : self_(std::move(s)) {}
518
519 void operator()() override
520 {
521 auto self = std::move(self_);
522 delete this;
523 {
524 std::lock_guard lk(self->mutex_);
525 if (self->closing_)
526 return;
527 }
528 self->start_multishot();
529 }
530
531 void destroy() override { delete this; }
532 };
533 // NOLINTNEXTLINE(bugprone-unhandled-exception-at-new) — CQE handler re-arm: noexcept, OOM => std::terminate is the intended behavior
534 sched_->post(new rearm_op(this->shared_from_this()));
535 }
536 4381x }
537 };
538
539 template<class Derived, class ImplBase, class Endpoint, class PeerService>
540 inline void
541 14x io_uring_multishot_acceptor_base<Derived, ImplBase, Endpoint, PeerService>
542 ::waiter_canceller::operator()() const noexcept
543 {
544 14x if (w->cancelled.exchange(true, std::memory_order_acq_rel))
545 return;
546 14x w->owner->cancel_waiter(w);
547 }
548
549 } // namespace boost::corosio::detail
550
551 #endif // BOOST_COROSIO_HAS_IO_URING
552
553 #endif // BOOST_COROSIO_NATIVE_DETAIL_IO_URING_IO_URING_MULTISHOT_ACCEPTOR_HPP
554