include/boost/corosio/native/detail/io_uring/io_uring_multishot_acceptor.hpp

69.3% Lines (149/215) 78.6% List of functions (22/28)
io_uring_multishot_acceptor.hpp
f(x) Functions (28)
Function Calls Lines Blocks
boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_local_stream_acceptor, boost::corosio::local_stream_acceptor::implementation, boost::corosio::local_endpoint, boost::corosio::detail::io_uring_local_stream_service>::io_uring_multishot_acceptor_base(boost::corosio::detail::io_uring_scheduler&, boost::corosio::detail::io_uring_local_stream_service&) :89 22x 100.0% 100.0% boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_tcp_acceptor, boost::corosio::tcp_acceptor::implementation, boost::corosio::endpoint, boost::corosio::detail::io_uring_tcp_service>::io_uring_multishot_acceptor_base(boost::corosio::detail::io_uring_scheduler&, boost::corosio::detail::io_uring_tcp_service&) :89 91x 100.0% 100.0% boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_local_stream_acceptor, boost::corosio::local_stream_acceptor::implementation, boost::corosio::local_endpoint, boost::corosio::detail::io_uring_local_stream_service>::~io_uring_multishot_acceptor_base() :97 22x 85.0% 90.0% boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_tcp_acceptor, boost::corosio::tcp_acceptor::implementation, boost::corosio::endpoint, boost::corosio::detail::io_uring_tcp_service>::~io_uring_multishot_acceptor_base() :97 91x 85.0% 90.0% boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_local_stream_acceptor, boost::corosio::local_stream_acceptor::implementation, boost::corosio::local_endpoint, boost::corosio::detail::io_uring_local_stream_service>::local_endpoint() const :135 1x 100.0% 100.0% boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_tcp_acceptor, boost::corosio::tcp_acceptor::implementation, boost::corosio::endpoint, boost::corosio::detail::io_uring_tcp_service>::local_endpoint() const :135 80x 100.0% 100.0% boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_local_stream_acceptor, boost::corosio::local_stream_acceptor::implementation, boost::corosio::local_endpoint, boost::corosio::detail::io_uring_local_stream_service>::is_open() const :140 91x 100.0% 100.0% boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_tcp_acceptor, boost::corosio::tcp_acceptor::implementation, boost::corosio::endpoint, boost::corosio::detail::io_uring_tcp_service>::is_open() const :140 4552x 100.0% 100.0% boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_local_stream_acceptor, boost::corosio::local_stream_acceptor::implementation, boost::corosio::local_endpoint, boost::corosio::detail::io_uring_local_stream_service>::cancel() :145 1x 100.0% 100.0% boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_tcp_acceptor, boost::corosio::tcp_acceptor::implementation, boost::corosio::endpoint, boost::corosio::detail::io_uring_tcp_service>::cancel() :145 3x 100.0% 100.0% boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_local_stream_acceptor, boost::corosio::local_stream_acceptor::implementation, boost::corosio::local_endpoint, boost::corosio::detail::io_uring_local_stream_service>::drain_waiters_only() :156 17x 100.0% 100.0% boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_tcp_acceptor, boost::corosio::tcp_acceptor::implementation, boost::corosio::endpoint, boost::corosio::detail::io_uring_tcp_service>::drain_waiters_only() :156 88x 100.0% 100.0% boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_local_stream_acceptor, boost::corosio::local_stream_acceptor::implementation, boost::corosio::local_endpoint, boost::corosio::detail::io_uring_local_stream_service>::set_option(int, int, void const*, unsigned long) :185 0 0.0% 0.0% boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_tcp_acceptor, boost::corosio::tcp_acceptor::implementation, boost::corosio::endpoint, boost::corosio::detail::io_uring_tcp_service>::set_option(int, int, void const*, unsigned long) :185 85x 83.3% 78.0% boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_local_stream_acceptor, boost::corosio::local_stream_acceptor::implementation, boost::corosio::local_endpoint, boost::corosio::detail::io_uring_local_stream_service>::get_option(int, int, void*, unsigned long*) const :197 0 0.0% 0.0% boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_tcp_acceptor, boost::corosio::tcp_acceptor::implementation, boost::corosio::endpoint, boost::corosio::detail::io_uring_tcp_service>::get_option(int, int, void*, unsigned long*) const :197 2x 87.5% 80.0% boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_local_stream_acceptor, boost::corosio::local_stream_acceptor::implementation, boost::corosio::local_endpoint, boost::corosio::detail::io_uring_local_stream_service>::start_multishot() :210 11x 83.3% 81.0% boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_tcp_acceptor, boost::corosio::tcp_acceptor::implementation, boost::corosio::endpoint, boost::corosio::detail::io_uring_tcp_service>::start_multishot() :210 74x 83.3% 81.0% boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_local_stream_acceptor, boost::corosio::local_stream_acceptor::implementation, boost::corosio::local_endpoint, boost::corosio::detail::io_uring_local_stream_service>::dispatch_or_queue(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, std::stop_token const&, std::error_code*, boost::corosio::io_object::implementation**) :238 9x 38.9% 46.0% boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_tcp_acceptor, boost::corosio::tcp_acceptor::implementation, boost::corosio::endpoint, boost::corosio::detail::io_uring_tcp_service>::dispatch_or_queue(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, std::stop_token const&, std::error_code*, boost::corosio::io_object::implementation**) :238 3947x 61.1% 66.0% boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_local_stream_acceptor, boost::corosio::local_stream_acceptor::implementation, boost::corosio::local_endpoint, boost::corosio::detail::io_uring_local_stream_service>::cancel_waiter(boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_local_stream_acceptor, boost::corosio::local_stream_acceptor::implementation, boost::corosio::local_endpoint, boost::corosio::detail::io_uring_local_stream_service>::waiter_node*) :320 0 0.0% 0.0% boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_tcp_acceptor, boost::corosio::tcp_acceptor::implementation, boost::corosio::endpoint, boost::corosio::detail::io_uring_tcp_service>::cancel_waiter(boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_tcp_acceptor, boost::corosio::tcp_acceptor::implementation, boost::corosio::endpoint, boost::corosio::detail::io_uring_tcp_service>::waiter_node*) :320 0 0.0% 0.0% boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_local_stream_acceptor, boost::corosio::local_stream_acceptor::implementation, boost::corosio::local_endpoint, boost::corosio::detail::io_uring_local_stream_service>::on_accept_cqe(void*, int, int, bool) :342 10x 100.0% 100.0% boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_tcp_acceptor, boost::corosio::tcp_acceptor::implementation, boost::corosio::endpoint, boost::corosio::detail::io_uring_tcp_service>::on_accept_cqe(void*, int, int, bool) :342 3948x 100.0% 100.0% boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_local_stream_acceptor, boost::corosio::local_stream_acceptor::implementation, boost::corosio::local_endpoint, boost::corosio::detail::io_uring_local_stream_service>::on_accept_cqe_impl(int, int, bool) :350 10x 67.7% 61.0% boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_tcp_acceptor, boost::corosio::tcp_acceptor::implementation, boost::corosio::endpoint, boost::corosio::detail::io_uring_tcp_service>::on_accept_cqe_impl(int, int, bool) :350 3948x 67.7% 61.0% boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_local_stream_acceptor, boost::corosio::local_stream_acceptor::implementation, boost::corosio::local_endpoint, boost::corosio::detail::io_uring_local_stream_service>::waiter_canceller::operator()() const :476 0 0.0% 0.0% boost::corosio::detail::io_uring_multishot_acceptor_base<boost::corosio::detail::io_uring_tcp_acceptor, boost::corosio::tcp_acceptor::implementation, boost::corosio::endpoint, boost::corosio::detail::io_uring_tcp_service>::waiter_canceller::operator()() const :476 0 0.0% 0.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_IO_URING_IO_URING_MULTISHOT_ACCEPTOR_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_IO_URING_IO_URING_MULTISHOT_ACCEPTOR_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_IO_URING
16
17 #include <liburing.h>
18
19 #include <boost/corosio/detail/intrusive.hpp>
20 #include <boost/corosio/native/detail/io_uring/io_uring_acceptor_ops.hpp>
21 #include <boost/corosio/native/detail/io_uring/io_uring_buffer.hpp>
22 #include <boost/corosio/native/detail/io_uring/io_uring_op.hpp>
23 #include <boost/corosio/native/detail/io_uring/io_uring_scheduler.hpp>
24 #include <boost/corosio/native/detail/io_uring/io_uring_socket_ops.hpp>
25 #include <boost/corosio/native/detail/make_err.hpp>
26 #include <boost/corosio/io/io_object.hpp>
27
28 #include <atomic>
29 #include <coroutine>
30 #include <memory>
31 #include <mutex>
32 #include <optional>
33 #include <stop_token>
34 #include <system_error>
35
36 #include <netinet/in.h>
37 #include <sys/socket.h>
38 #include <unistd.h>
39
40 namespace boost::corosio::detail {
41
42 template<class Derived, class ImplBase, class Endpoint, class PeerService>
43 class io_uring_multishot_acceptor_base
44 : public ImplBase
45 , public std::enable_shared_from_this<Derived>
46 {
47 protected:
48 struct ready_fd_node : intrusive_list<ready_fd_node>::node
49 {
50 int fd = -1;
51 sockaddr_storage peer{};
52 socklen_t peer_len = 0;
53 };
54
55 struct waiter_node;
56
57 struct waiter_canceller
58 {
59 waiter_node* w;
60 void operator()() const noexcept;
61 };
62
63 struct waiter_node : intrusive_list<waiter_node>::node
64 {
65 std::coroutine_handle<> h;
66 capy::executor_ref ex;
67 std::error_code* ec_out = nullptr;
68 io_object::implementation** impl_out = nullptr;
69 Derived* owner = nullptr;
70 std::atomic<bool> cancelled{false};
71 std::optional<std::stop_callback<waiter_canceller>> stop_cb;
72 };
73
74 int fd_ = -1;
75 io_uring_scheduler* sched_;
76 PeerService* peer_service_;
77 Endpoint local_endpoint_{};
78 mutable std::mutex mutex_;
79 intrusive_list<ready_fd_node> ready_fds_;
80 intrusive_list<waiter_node> waiters_;
81 std::unique_ptr<uring_multi_accept_op> multi_op_;
82 bool closing_ = false;
83
84 private:
85 // CRTP ctor private + Derived friended so the base cannot be
86 // constructed except as a CRTP base of Derived
87 // (clang-tidy bugprone-crtp-constructor-accessibility).
88 friend Derived;
89 113x io_uring_multishot_acceptor_base(
90 io_uring_scheduler& sched, PeerService& peer_svc) noexcept
91 113x : sched_(&sched)
92 113x , peer_service_(&peer_svc)
93 113x {}
94
95 public:
96
97 113x ~io_uring_multishot_acceptor_base() override
98 {
99 {
100 113x std::lock_guard lk(mutex_);
101 113x closing_ = true;
102 113x }
103 113x if (fd_ >= 0)
104 {
105 sched_->submit_cancel_by_fd(fd_);
106 ::close(fd_);
107 fd_ = -1;
108 }
109
110 // Drain parked accepted-connection fds unconditionally. These are
111 // distinct from the listener fd and can be present even when the
112 // service close() path already closed and cleared fd_ — that path
113 // does not touch ready_fds_, so the drain must run here.
114 113x intrusive_list<ready_fd_node> drained;
115 {
116 113x std::lock_guard lk(mutex_);
117 115x while (auto* r = ready_fds_.pop_front())
118 2x drained.push_back(r);
119 113x }
120 115x while (auto* r = drained.pop_front())
121 {
122 2x ::close(r->fd);
123 2x delete r;
124 }
125
126 // Break the multi_op_ → impl_ptr (shared_ptr<this>) cycle and
127 // drain pending CQEs so unique_ptr<multi_op_> can free safely.
128 113x if (multi_op_)
129 {
130 85x multi_op_->impl_ptr.reset();
131 85x sched_->drain_cqes_for(multi_op_.get());
132 }
133 226x }
134
135 81x Endpoint local_endpoint() const noexcept override
136 {
137 81x return local_endpoint_;
138 }
139
140 4643x bool is_open() const noexcept override
141 {
142 4643x return fd_ >= 0;
143 }
144
145 4x void cancel() noexcept override
146 {
147 4x drain_waiters_only();
148 4x if (fd_ >= 0)
149 4x sched_->submit_cancel_by_fd(fd_);
150 4x }
151
152 /// Drain queued waiters with operation_aborted but do NOT submit
153 /// any kernel cancel for the fd. Used by service close() paths
154 /// that have already submitted (or are about to submit) the
155 /// cancel-by-fd themselves via `cancel_and_flush`.
156 105x void drain_waiters_only() noexcept
157 {
158 105x intrusive_list<waiter_node> drained;
159 {
160 105x std::lock_guard lk(mutex_);
161 105x closing_ = true;
162 // Drain under the lock — the kernel cancel may not produce
163 // a !more CQE before the fd is closed, so we can't rely on
164 // on_accept_cqe_impl to surface operation_aborted.
165 109x while (auto* w = waiters_.pop_front())
166 4x drained.push_back(w);
167 105x }
168
169 109x while (auto* w = drained.pop_front())
170 {
171 4x w->stop_cb.reset();
172 // NOLINTNEXTLINE(bugprone-unhandled-exception-at-new) — noexcept destructor path: OOM => std::terminate is the intended behavior
173 4x auto* op = new uring_accept_op();
174 4x op->h = w->h;
175 4x op->ex = w->ex;
176 4x op->ec_out = w->ec_out;
177 4x op->impl_out = w->impl_out;
178 4x op->cancelled.store(true, std::memory_order_release);
179 4x delete w;
180 4x sched_->post(op);
181 4x sched_->work_finished();
182 }
183 105x }
184
185 85x std::error_code set_option(
186 int level, int optname,
187 void const* data, std::size_t size) noexcept override
188 {
189 85x if (fd_ < 0) return make_err(EBADF);
190 85x if (::setsockopt(fd_, level, optname,
191 reinterpret_cast<char const*>(data),
192 85x static_cast<socklen_t>(size)) < 0)
193 return make_err(errno);
194 85x return {};
195 }
196
197 2x std::error_code get_option(
198 int level, int optname,
199 void* data, std::size_t* size) const noexcept override
200 {
201 2x if (fd_ < 0) return make_err(EBADF);
202 2x socklen_t len = static_cast<socklen_t>(*size);
203 2x if (::getsockopt(fd_, level, optname,
204 2x reinterpret_cast<char*>(data), &len) < 0)
205 return make_err(errno);
206 2x *size = static_cast<std::size_t>(len);
207 2x return {};
208 }
209
210 85x void start_multishot()
211 {
212 85x if (!multi_op_)
213 {
214 85x multi_op_ = std::make_unique<uring_multi_accept_op>();
215 85x multi_op_->listen_fd = fd_;
216 85x multi_op_->acceptor_impl = this;
217 85x multi_op_->on_cqe =
218 &io_uring_multishot_acceptor_base::on_accept_cqe;
219 85x multi_op_->impl_ptr = this->shared_from_this();
220 }
221 else
222 {
223 // Reuse the existing op (re-arm path). Reset peer scratch
224 // so the kernel writes into a clean slot.
225 multi_op_->peer_storage = sockaddr_storage{};
226 multi_op_->peer_len = sizeof(sockaddr_storage);
227 }
228
229 85x auto* op = multi_op_.get();
230 85x io_uring_submit_op(*sched_, op);
231 // Deliberately no work_started(): the multishot SQE is a persistent
232 // internal mechanism. User-visible work is tracked per-accept call.
233 85x }
234
235 /// Pull a parked fd or queue a waiter — used by Derived::accept().
236 /// Either case ends with the calling coroutine suspending; the
237 /// caller returns `std::noop_coroutine()` unconditionally.
238 3956x void dispatch_or_queue(
239 std::coroutine_handle<> h,
240 capy::executor_ref ex,
241 std::stop_token const& token,
242 std::error_code* ec,
243 io_object::implementation** impl_out)
244 {
245 3956x sockaddr_storage peer_storage{};
246 3956x socklen_t peer_len = sizeof(peer_storage);
247 3956x int accepted_fd = ::accept4(fd_,
248 reinterpret_cast<sockaddr*>(&peer_storage), &peer_len,
249 SOCK_NONBLOCK | SOCK_CLOEXEC);
250 3956x if (accepted_fd >= 0)
251 {
252 auto* op = new uring_accept_op();
253 op->h = h;
254 op->ex = ex;
255 op->ec_out = ec;
256 op->impl_out = impl_out;
257 op->peer_service = peer_service_;
258 op->adopt_fn = &Derived::adopt_thunk;
259 op->accepted_fd = accepted_fd;
260 op->peer_storage = peer_storage;
261 op->peer_len = peer_len;
262 sched_->post(op);
263 3952x return;
264 }
265 // accept4 returned <0 — only EAGAIN/EWOULDBLOCK should fall
266 // through to the parked/waiter path. Other errors (EBADF, etc.)
267 // surface through the existing scheduler-completion path so the
268 // user sees them via the op's ec_out. Build an op with `err`
269 // set so do_handler delivers make_err(err).
270 3956x if (errno != EAGAIN && errno != EWOULDBLOCK)
271 {
272 int saved_errno = errno;
273 auto* op = new uring_accept_op();
274 op->h = h;
275 op->ex = ex;
276 op->ec_out = ec;
277 op->impl_out = impl_out;
278 op->err = saved_errno;
279 sched_->post(op);
280 return;
281 }
282
283 3956x uring_accept_op* ready_op = nullptr;
284 {
285 3956x std::lock_guard lk(mutex_);
286 3956x if (auto* r = ready_fds_.pop_front())
287 {
288 4x ready_op = new uring_accept_op();
289 4x ready_op->h = h;
290 4x ready_op->ex = ex;
291 4x ready_op->ec_out = ec;
292 4x ready_op->impl_out = impl_out;
293 4x ready_op->peer_service = peer_service_;
294 4x ready_op->adopt_fn = &Derived::adopt_thunk;
295 4x ready_op->accepted_fd = r->fd;
296 4x ready_op->peer_storage = r->peer;
297 4x ready_op->peer_len = r->peer_len;
298 4x delete r;
299 }
300 else
301 {
302 3952x auto* w = new waiter_node{};
303 3952x w->h = h;
304 3952x w->ex = ex;
305 3952x w->ec_out = ec;
306 3952x w->impl_out = impl_out;
307 3952x w->owner = static_cast<Derived*>(this);
308 3952x if (token.stop_possible())
309 w->stop_cb.emplace(token, waiter_canceller{w});
310 3952x sched_->work_started();
311 3952x waiters_.push_back(w);
312 3952x return;
313 }
314 3956x }
315 // Post outside the lock — acceptor mutex_ must never be held
316 // while dispatch_mutex_ is acquired by sched_->post().
317 4x sched_->post(ready_op);
318 }
319
320 void cancel_waiter(waiter_node* w) noexcept
321 {
322 {
323 std::lock_guard lk(mutex_);
324 if (closing_) return; // on_accept_cqe_impl will drain with closing_ set
325 waiters_.remove(w);
326 }
327 // NOLINTNEXTLINE(bugprone-unhandled-exception-at-new) — stop-token callback: noexcept, OOM => std::terminate is the intended behavior
328 auto* op = new uring_accept_op();
329 op->h = w->h;
330 op->ex = w->ex;
331 op->ec_out = w->ec_out;
332 op->impl_out = w->impl_out;
333 op->cancelled.store(true, std::memory_order_release);
334 delete w;
335 // post() increments outstanding_work_; balances the work_started()
336 // from accept() when the waiter was queued.
337 sched_->post(op);
338 sched_->work_finished(); // balance the work_started() from accept()
339 }
340
341 private:
342 3958x static void on_accept_cqe(
343 void* self_ptr, int new_fd, int err, bool more) noexcept
344 {
345 static_cast<Derived*>(self_ptr)
346 3958x ->on_accept_cqe_impl(new_fd, err, more);
347 3958x }
348
349 protected:
350 3958x void on_accept_cqe_impl(int new_fd, int err, bool more) noexcept
351 {
352 3958x bool was_closing = false;
353 3958x waiter_node* matched = nullptr;
354 3958x intrusive_list<waiter_node> closing_waiters;
355 {
356 3958x std::lock_guard lk(mutex_);
357 3958x was_closing = closing_;
358 3958x if (was_closing)
359 {
360 4x if (new_fd >= 0)
361 ::close(new_fd);
362 4x if (!more)
363 {
364 // Collect waiters to drain after the lock is released.
365 4x while (auto* w = waiters_.pop_front())
366 closing_waiters.push_back(w);
367 }
368 }
369 3954x else if (!waiters_.empty())
370 {
371 // Claim the head waiter atomically. If the canceller
372 // already won the race (cancelled was already true),
373 // leave the waiter in the list for cancel_waiter to
374 // remove and dispatch with operation_aborted; park the
375 // new_fd so the next waiter consumes it.
376 3948x auto* head_w = waiters_.front();
377 3948x if (!head_w->cancelled.exchange(
378 true, std::memory_order_acq_rel))
379 {
380 3948x waiters_.pop_front();
381 3948x matched = head_w;
382 }
383 else if (new_fd >= 0)
384 {
385 // NOLINTNEXTLINE(bugprone-unhandled-exception-at-new) — CQE handler: noexcept, OOM => std::terminate is the intended behavior
386 auto* node = new ready_fd_node{};
387 node->fd = new_fd;
388 node->peer = multi_op_->peer_storage;
389 node->peer_len = multi_op_->peer_len;
390 ready_fds_.push_back(node);
391 }
392 }
393 6x else if (new_fd >= 0)
394 {
395 // NOLINTNEXTLINE(bugprone-unhandled-exception-at-new) — CQE handler: noexcept, OOM => std::terminate is the intended behavior
396 6x auto* node = new ready_fd_node{};
397 6x node->fd = new_fd;
398 6x node->peer = multi_op_->peer_storage;
399 6x node->peer_len = multi_op_->peer_len;
400 6x ready_fds_.push_back(node);
401 }
402 3958x }
403
404 3958x if (matched)
405 {
406 3948x matched->stop_cb.reset();
407 // NOLINTNEXTLINE(bugprone-unhandled-exception-at-new) — CQE handler: noexcept, OOM => std::terminate is the intended behavior
408 3948x auto* op = new uring_accept_op();
409 3948x op->h = matched->h;
410 3948x op->ex = matched->ex;
411 3948x op->ec_out = matched->ec_out;
412 3948x op->impl_out = matched->impl_out;
413 3948x op->peer_service = peer_service_;
414 3948x op->adopt_fn = &Derived::adopt_thunk;
415 3948x if (err)
416 {
417 op->err = err;
418 }
419 3948x else if (new_fd >= 0)
420 {
421 3948x op->accepted_fd = new_fd;
422 3948x op->peer_storage = multi_op_->peer_storage;
423 3948x op->peer_len = multi_op_->peer_len;
424 }
425 3948x delete matched;
426 3948x sched_->post(op);
427 3948x sched_->work_finished(); // balance waiter's work_started
428 }
429
430 3958x while (auto* w = closing_waiters.pop_front())
431 {
432 w->stop_cb.reset();
433 // NOLINTNEXTLINE(bugprone-unhandled-exception-at-new) — CQE handler shutdown path: noexcept, OOM => std::terminate is the intended behavior
434 auto* op = new uring_accept_op();
435 op->h = w->h;
436 op->ex = w->ex;
437 op->ec_out = w->ec_out;
438 op->impl_out = w->impl_out;
439 op->cancelled.store(true, std::memory_order_release);
440 delete w;
441 sched_->post(op);
442 sched_->work_finished(); // balance waiter's work_started
443 }
444
445 3958x if (!more && !was_closing)
446 {
447 // Re-arm: kernel terminated multishot non-fatally.
448 struct rearm_op final : scheduler_op
449 {
450 std::shared_ptr<Derived> self_;
451 explicit rearm_op(std::shared_ptr<Derived> s) noexcept
452 : self_(std::move(s)) {}
453
454 void operator()() override
455 {
456 auto self = std::move(self_);
457 delete this;
458 {
459 std::lock_guard lk(self->mutex_);
460 if (self->closing_)
461 return;
462 }
463 self->start_multishot();
464 }
465
466 void destroy() override { delete this; }
467 };
468 // NOLINTNEXTLINE(bugprone-unhandled-exception-at-new) — CQE handler re-arm: noexcept, OOM => std::terminate is the intended behavior
469 sched_->post(new rearm_op(this->shared_from_this()));
470 }
471 3958x }
472 };
473
474 template<class Derived, class ImplBase, class Endpoint, class PeerService>
475 inline void
476 io_uring_multishot_acceptor_base<Derived, ImplBase, Endpoint, PeerService>
477 ::waiter_canceller::operator()() const noexcept
478 {
479 if (w->cancelled.exchange(true, std::memory_order_acq_rel))
480 return;
481 w->owner->cancel_waiter(w);
482 }
483
484 } // namespace boost::corosio::detail
485
486 #endif // BOOST_COROSIO_HAS_IO_URING
487
488 #endif // BOOST_COROSIO_NATIVE_DETAIL_IO_URING_IO_URING_MULTISHOT_ACCEPTOR_HPP
489