include/boost/corosio/native/detail/io_uring/io_uring_scheduler.hpp

85.2% Lines (390/458) 97.9% List of functions (47/48)
io_uring_scheduler.hpp
f(x) Functions (48)
Function Calls Lines Blocks
boost::corosio::detail::io_uring_scheduler::ring() :101 3671x 100.0% 100.0% boost::corosio::detail::io_uring_scheduler::dispatch_mutex() const :108 16604x 100.0% 100.0% boost::corosio::detail::io_uring_scheduler::ring_mutex() const :111 3671x 100.0% 100.0% boost::corosio::detail::io_uring_scheduler::submit_op_posted_exchange(bool) const :133 3671x 100.0% 100.0% boost::corosio::detail::io_uring_scheduler::submit_op_ref() const :141 182x 100.0% 100.0% boost::corosio::detail::io_uring_scheduler::inflight_inc() const :150 10509x 100.0% 100.0% boost::corosio::detail::io_uring_scheduler::push_completed_locked(boost::corosio::detail::scheduler_op*) const :229 16604x 100.0% 100.0% boost::corosio::detail::io_uring_scheduler::configure_single_threaded(bool) :235 2x 100.0% 100.0% boost::corosio::detail::io_uring_scheduler::configure_sqpoll(bool, unsigned int, int) :260 0 0.0% 0.0% boost::corosio::detail::io_uring_scheduler::is_single_threaded() const :269 1x 100.0% 100.0% boost::corosio::detail::io_uring_scheduler::submit_sqes_op::submit_sqes_op() :329 551x 100.0% 100.0% boost::corosio::detail::io_uring_scheduler::io_uring_scheduler(boost::capy::execution_context&, int) :363 551x 100.0% 74.0% boost::corosio::detail::io_uring_scheduler::io_uring_scheduler(boost::capy::execution_context&, int)::{lambda(void*)#1}::operator()(void*) const :374 3317x 100.0% 100.0% boost::corosio::detail::io_uring_scheduler::~io_uring_scheduler() :387 1102x 100.0% 100.0% boost::corosio::detail::io_uring_scheduler::lazy_init_ring() const :398 26682x 100.0% 100.0% boost::corosio::detail::io_uring_scheduler::lazy_init_ring() const::{lambda()#1}::operator()() const :400 416x 100.0% 100.0% boost::corosio::detail::io_uring_scheduler::lazy_init_ring_unlocked() const :406 416x 50.0% 43.0% boost::corosio::detail::io_uring_scheduler::shutdown() :511 551x 100.0% 87.0% boost::corosio::detail::io_uring_scheduler::stop() :539 372x 100.0% 90.0% boost::corosio::detail::io_uring_scheduler::stopped() const :560 36x 100.0% 100.0% boost::corosio::detail::io_uring_scheduler::restart() :566 71x 100.0% 100.0% boost::corosio::detail::io_uring_scheduler::work_started() :572 27430x 100.0% 100.0% boost::corosio::detail::io_uring_scheduler::work_finished() :578 39507x 100.0% 100.0% boost::corosio::detail::io_uring_scheduler::interrupt_reactor() const :585 13455x 75.0% 67.0% boost::corosio::detail::io_uring_scheduler::drain_wakeup_eventfd() const :612 12833x 100.0% 100.0% boost::corosio::detail::io_uring_scheduler::post(std::__n4861::coroutine_handle<void>) const :628 2057x 100.0% 93.0% boost::corosio::detail::io_uring_scheduler::post(std::__n4861::coroutine_handle<void>) const::post_handler::post_handler(std::__n4861::coroutine_handle<void>) :633 2057x 100.0% 100.0% boost::corosio::detail::io_uring_scheduler::post(std::__n4861::coroutine_handle<void>) const::post_handler::operator()() :635 2049x 100.0% 100.0% boost::corosio::detail::io_uring_scheduler::post(std::__n4861::coroutine_handle<void>) const::post_handler::destroy() :643 8x 94.4% 100.0% boost::corosio::detail::io_uring_scheduler::post(boost::corosio::detail::scheduler_op*) const :668 10051x 100.0% 100.0% boost::corosio::detail::io_uring_run_guard::io_uring_run_guard(boost::corosio::detail::io_uring_scheduler const*) :713 352x 100.0% 100.0% boost::corosio::detail::io_uring_run_guard::~io_uring_run_guard() :721 352x 100.0% 100.0% boost::corosio::detail::io_uring_scheduler::running_in_this_thread() const :728 878x 100.0% 86.0% boost::corosio::detail::io_uring_scheduler::reset_inline_budget() const :739 52246x 100.0% 83.0% boost::corosio::detail::io_uring_scheduler::try_consume_inline_budget() const :752 282456x 87.5% 78.0% boost::corosio::detail::io_uring_scheduler::run() :770 346x 100.0% 75.0% boost::corosio::detail::io_uring_scheduler::run_one() :800 4x 75.0% 67.0% boost::corosio::detail::io_uring_scheduler::wait_one(long) :813 26x 100.0% 73.0% boost::corosio::detail::io_uring_scheduler::poll() :826 14x 100.0% 77.0% boost::corosio::detail::io_uring_scheduler::poll_one() :845 4x 100.0% 73.0% boost::corosio::detail::io_uring_scheduler::do_one(long) :858 32570x 82.6% 67.0% boost::corosio::detail::io_uring_scheduler::process_completions() :1070 33644x 78.8% 78.0% boost::corosio::detail::io_uring_scheduler::submit_sqes_op::do_handler(void*, boost::corosio::detail::scheduler_op*, unsigned int, unsigned int) :1149 17x 30.0% 38.0% boost::corosio::detail::io_uring_scheduler::submit_cancel_by_user_data(boost::corosio::detail::io_uring_op*) :1167 109x 78.6% 75.0% boost::corosio::detail::io_uring_scheduler::submit_cancel_by_fd(int) :1192 80x 78.6% 75.0% boost::corosio::detail::io_uring_op::on_cancel() :1212 115x 100.0% 100.0% boost::corosio::detail::io_uring_scheduler::cancel_and_flush(int) :1223 6534x 85.7% 80.0% boost::corosio::detail::io_uring_scheduler::drain_cqes_for(boost::corosio::detail::io_uring_op*) :1246 115x 97.0% 89.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_IO_URING_IO_URING_SCHEDULER_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_IO_URING_IO_URING_SCHEDULER_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_IO_URING
16
17 // Include before any project headers open a namespace — prevents the
18 // boost::corosio::io_uring tag variable from shadowing struct ::io_uring.
19 #include <liburing.h>
20
21 #include <boost/corosio/detail/conditionally_enabled_event.hpp>
22 #include <boost/corosio/detail/conditionally_enabled_mutex.hpp>
23 #include <boost/corosio/detail/config.hpp>
24 #include <boost/corosio/detail/except.hpp>
25 #include <boost/corosio/detail/scheduler.hpp>
26 #include <boost/corosio/detail/scheduler_op.hpp>
27 #include <boost/corosio/detail/timer_service.hpp>
28 #include <boost/corosio/native/detail/io_uring/io_uring_op.hpp>
29 #include <boost/corosio/native/detail/make_err.hpp>
30 #include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
31 #include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
32 #include <boost/capy/ex/execution_context.hpp>
33
34 #include <atomic>
35 #include <chrono>
36 #include <coroutine>
37 #include <cstddef>
38 #include <cstdint>
39 #include <limits>
40
41 #include <errno.h>
42 #include <poll.h>
43 #include <sys/eventfd.h>
44 #include <unistd.h>
45
46 namespace boost::corosio::detail {
47
48 // Forward-declared so the out-of-line inline definitions below the class
49 // can reference the frame stack without a circular dependency.
50 struct io_uring_scheduler_frame;
51 extern thread_local io_uring_scheduler_frame* tl_running_scheduler_frame_;
52
53 /** io_uring scheduler — proactor model on Linux 6.x+.
54
55 Owns one io_uring per io_context. Lazy batched submit;
56 cross-thread post wakes a registered eventfd via multishot
57 POLL_ADD.
58
59 @par Thread Safety
60 All public member functions are thread-safe.
61 */
62 class BOOST_COROSIO_DECL io_uring_scheduler final
63 : public scheduler
64 , public capy::execution_context::service
65 {
66 public:
67 using key_type = scheduler;
68 using mutex_type = conditionally_enabled_mutex;
69 using lock_type = mutex_type::scoped_lock;
70 using event_type = conditionally_enabled_event;
71
72 io_uring_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
73 ~io_uring_scheduler() override;
74 io_uring_scheduler(io_uring_scheduler const&) = delete;
75 io_uring_scheduler& operator=(io_uring_scheduler const&) = delete;
76
77 void shutdown() override;
78
79 // scheduler virtuals — definitions in Task 6
80 void post(std::coroutine_handle<>) const override;
81 void post(scheduler_op*) const override;
82
83 bool running_in_this_thread() const noexcept override;
84 void stop() override;
85 bool stopped() const noexcept override;
86 void restart() override;
87 std::size_t run() override;
88 std::size_t run_one() override;
89 std::size_t wait_one(long usec) override;
90 std::size_t poll() override;
91 std::size_t poll_one() override;
92 void work_started() noexcept override;
93 void work_finished() noexcept override;
94
95 /** Return the underlying liburing ring.
96
97 Triggers lazy ring initialisation on first call. Used by
98 socket op submission helpers (e.g. `io_uring_submit_op`) and
99 any other code path that needs a live ring pointer.
100 */
101 3671x struct ::io_uring* ring() noexcept
102 {
103 3671x lazy_init_ring();
104 3671x return &ring_;
105 }
106
107 /// Return the dispatch mutex (protects completed_ops_ / cond_).
108 16604x mutex_type& dispatch_mutex() const noexcept { return dispatch_mutex_; }
109
110 /// Return the ring mutex (serialises userspace SQ/CQ access).
111 3671x mutex_type& ring_mutex() const noexcept { return ring_mutex_; }
112
113 /** Reset the calling thread's inline-budget for this scheduler.
114
115 Called at the top of each dispatched op in `do_one` so each
116 op handler gets a fresh budget for inline speculative
117 completions. Walks the frame stack; no-op if this scheduler
118 isn't on the stack (i.e. called from a non-run thread).
119 */
120 void reset_inline_budget() const noexcept;
121
122 /** Consume one unit of inline budget if available.
123
124 @return `true` if budget was available and consumed; `false`
125 if the budget is exhausted or this scheduler is not on
126 the calling thread's run stack.
127 */
128 bool try_consume_inline_budget() const noexcept;
129
130 /// Exchange the submit-batch posted flag. Returns the prior value.
131 /// Caller MUST hold ring_mutex_ — the flag is plain bool, not atomic,
132 /// and the mutex provides the read-modify-write atomicity.
133 3671x bool submit_op_posted_exchange(bool desired) const noexcept
134 {
135 3671x bool prev = submit_op_posted_;
136 3671x submit_op_posted_ = desired;
137 3671x return prev;
138 }
139
140 /// Return a reference to the mutable embedded submit_sqes_op.
141 182x scheduler_op& submit_op_ref() const noexcept
142 {
143 182x return submit_op_;
144 }
145
146 /// Increment the io_uring in-flight counter. Callers prep an SQE
147 /// whose CQE will require IORING_ENTER_GETEVENTS to surface under
148 /// DEFER_TASKRUN. Excluded: the wakeup-eventfd multishot SQE, whose
149 /// progress doesn't depend on userspace getevents.
150 10509x void inflight_inc() const noexcept
151 {
152 10509x io_uring_inflight_.fetch_add(1, std::memory_order_release);
153 10509x }
154
155 /// Initialize the io_uring ring on first access. Idempotent.
156 void lazy_init_ring() const;
157
158 /// Wake the leader if it's blocked in `submit_and_wait_timeout`.
159 /// Best-effort: the wakeup is suppressed if the leader has already
160 /// been signalled and not yet acked.
161 void interrupt_reactor() const noexcept;
162
163 /** Submit `IORING_OP_ASYNC_CANCEL` targeting an in-flight op by its
164 user_data pointer.
165
166 The kernel delivers `-ECANCELED` on the target's CQE if it was
167 still in flight; the op's completion handler then reports
168 `operation_aborted`. Best-effort: if the SQ is full after one
169 flush attempt the function returns without cancelling (the op
170 will complete normally on its own).
171
172 @param target The in-flight op to cancel.
173 */
174 void submit_cancel_by_user_data(io_uring_op* target) noexcept;
175
176 /** Submit `IORING_OP_ASYNC_CANCEL` with `IORING_ASYNC_CANCEL_FD`
177 to cancel every in-flight op on the given fd in one SQE.
178
179 Best-effort: if the SQ is full after one flush attempt the
180 function returns without cancelling.
181
182 @param fd The file descriptor whose in-flight ops should be
183 cancelled.
184 */
185 void submit_cancel_by_fd(int fd) noexcept;
186
187 /** Submit `IORING_OP_ASYNC_CANCEL` for `fd` and immediately flush
188 the submission ring to the kernel.
189
190 Must be called while `fd` is still open so the kernel can
191 resolve the file from the fd number before it is closed and
192 potentially recycled.
193
194 Best-effort: if the SQ is full the function still flushes any
195 earlier pending SQEs to the kernel.
196
197 @param fd The file descriptor whose in-flight ops should be
198 cancelled.
199 */
200 void cancel_and_flush(int fd) noexcept;
201
202 /** Drain pending CQEs for a specific op's `user_data`.
203
204 Submits an ASYNC_CANCEL by user_data to short-circuit any
205 in-flight op holding `target`, then iterates the CQ ring and
206 consumes every CQE matching `target` so its memory can be
207 freed safely. Used by member-owned ops (e.g.
208 `uring_multi_accept_op`) whose destructor cannot tolerate
209 outstanding CQEs.
210
211 @par Thread Safety
212 Safe to call from any thread. Internally takes `ring_mutex_`
213 to serialise against the run-loop leader; calls
214 `interrupt_reactor()` first so the leader returns from its
215 kernel wait promptly.
216
217 @param target The op pointer used as user_data on the SQE.
218 */
219 void drain_cqes_for(io_uring_op* target) noexcept;
220
221 /** Queue an already-counted op while the caller holds dispatch_mutex_.
222
223 Does NOT increment `outstanding_work_`. Use for synchronous
224 completion paths (e.g. SQE backpressure) where the caller called
225 `work_started()` and already holds the dispatch lock.
226
227 @pre `dispatch_mutex_` must be locked by the calling thread.
228 */
229 16604x void push_completed_locked(scheduler_op* op) const noexcept
230 {
231 16604x completed_ops_.push(op);
232 16604x }
233
234 /// Single-threaded mode toggle (matches reactor_scheduler API).
235 2x void configure_single_threaded(bool v) noexcept override
236 {
237 2x single_threaded_ = v;
238 2x dispatch_mutex_.set_enabled(!v);
239 2x ring_mutex_.set_enabled(!v);
240 2x cond_.set_enabled(!v);
241 2x }
242
243 /** Configure SQPOLL parameters.
244
245 Must be called before the first run/poll/post — the values
246 are cached and read by `lazy_init_ring_unlocked` when the
247 ring is first constructed. No-op if `enable` is false (the
248 default).
249
250 @note When combined with single-threaded mode,
251 IORING_SETUP_DEFER_TASKRUN is suppressed — the kernel
252 rejects that combination. SINGLE_ISSUER still applies.
253
254 @param enable Set IORING_SETUP_SQPOLL on ring init.
255 @param idle_ms sq_thread_idle in milliseconds; 0 = kernel
256 default (1ms).
257 @param cpu Pin the polling thread to this CPU; -1 to
258 not pin.
259 */
260 void configure_sqpoll(
261 bool enable, unsigned idle_ms, int cpu) noexcept
262 {
263 enable_sqpoll_ = enable;
264 sq_thread_idle_ms_ = idle_ms;
265 sq_thread_cpu_ = cpu;
266 }
267
268 /// Return true if single-threaded (lockless) mode is active.
269 1x bool is_single_threaded() const noexcept override { return single_threaded_; }
270
271 private:
272 // ring_ + wakeup_eventfd_ are mutable so lazy_init_ring() (called
273 // from const contexts like post()) can populate them on first use.
274 mutable struct ::io_uring ring_{};
275 mutable int wakeup_eventfd_ = -1;
276 timer_service* timer_svc_ = nullptr;
277
278 // dispatch_mutex_ protects completed_ops_, cond_, task_running_.
279 // ring_mutex_ protects every userspace touch of ring_ (SQ tail,
280 // CQ head): get_sqe / submit / submit_and_wait_timeout /
281 // for_each_cqe / cq_advance.
282 //
283 // process_completions runs under ring_mutex_ and briefly takes
284 // dispatch_mutex_ to splice into completed_ops_. The locks are
285 // never held simultaneously for the full duration of any other
286 // path's critical section, so no deadlock.
287 mutable mutex_type dispatch_mutex_{true};
288 mutable mutex_type ring_mutex_{true};
289 mutable event_type cond_{true};
290 mutable op_queue completed_ops_;
291 // outstanding_work_ and io_uring_inflight_ are both atomic
292 // counters updated at high frequency on different paths:
293 // - outstanding_work_ : every work_started / work_finished call,
294 // including timers, posts, and SQE submits.
295 // - io_uring_inflight_ : only SQE submit + non-F_MORE CQE consume.
296 // Under multi-thread workloads the threads tend to update these
297 // from different code paths; placing them on the same cache line
298 // would cause false sharing and unnecessary cache-line ping-pong.
299 // Hold each on its own line.
300 alignas(64) mutable std::atomic<std::int64_t> outstanding_work_{0};
301 // Count of io_uring SQEs in flight whose completion requires user-
302 // space to enter the kernel via IORING_ENTER_GETEVENTS for task
303 // work to progress under IORING_SETUP_DEFER_TASKRUN. Excludes the
304 // wakeup-eventfd multishot poll (registered in lazy_init_ring), and
305 // is updated by io_uring_submit_op and by process_completions on
306 // each non-F_MORE, non-eventfd CQE. Used by do_one to skip the
307 // ring pump when there is no io_uring work pending.
308 alignas(64) mutable std::atomic<std::int64_t> io_uring_inflight_{0};
309 std::atomic<bool> stopped_{false};
310 // Leader-follower flag: true while a thread is blocked in
311 // io_uring_submit_and_wait_timeout. Protected by dispatch_mutex_.
312 mutable bool task_running_ = false;
313 bool single_threaded_ = false;
314 bool enable_sqpoll_ = false;
315 unsigned sq_thread_idle_ms_ = 0;
316 int sq_thread_cpu_ = -1;
317
318 int cancel_sentinel_ = 0;
319 mutable std::atomic<bool> wakeup_armed_{false};
320
321 /// Flushes the SQ ring and drains CQEs in one mutex-held pass.
322 /// One instance covers a whole batch; subsequent SQEs in the same
323 /// batch skip the post, amortising syscall cost across the batch.
324 /// Mirrors Asio's `submit_sqes_op` (`io_uring_service.ipp:730-742`).
325 struct submit_sqes_op final : scheduler_op
326 {
327 io_uring_scheduler* sched_ = nullptr;
328
329 551x submit_sqes_op() noexcept : scheduler_op(&do_handler) {}
330
331 static void do_handler(
332 void* owner, scheduler_op* base,
333 std::uint32_t /*bytes*/, std::uint32_t /*error*/) noexcept;
334 };
335
336 /// True between the first submitter of a batch posting `submit_op_`
337 /// and the dispatched op clearing the flag inside its handler. Read
338 /// and written only while holding `ring_mutex_`.
339 mutable bool submit_op_posted_ = false;
340
341 /// Single embedded `submit_sqes_op` instance, owned by the scheduler.
342 mutable submit_sqes_op submit_op_;
343
344 // drain_cqes_for tuning. The bound exists to avoid stalling a
345 // destructor if the kernel never returns a cancel completion (best-
346 // effort drain); 8 rounds * 1ms == 8ms worst case.
347 static constexpr int drain_cqes_max_rounds = 8;
348 static constexpr unsigned long drain_cqes_kick_ns = 1'000'000;
349
350 // ring_inited_ goes true once on first run/poll/submit. The init is
351 // deferred from the constructor so configure_single_threaded(true)
352 // can take effect before io_uring_queue_init_params chooses flags.
353 mutable std::once_flag ring_init_once_;
354 mutable bool ring_inited_ = false;
355
356 std::size_t do_one(long timeout_us);
357 void process_completions();
358 void drain_wakeup_eventfd() const noexcept;
359 void lazy_init_ring_unlocked() const;
360 };
361
362 inline
363 551x io_uring_scheduler::io_uring_scheduler(
364 551x capy::execution_context& ctx, int /*concurrency_hint*/)
365 {
366 // sched_ cannot be set in the member initialiser — `this` is not
367 // available there.
368 551x submit_op_.sched_ = this;
369
370 // Wire timer service. on_earliest_changed wakes the run loop so it
371 // recomputes its wait timeout.
372 551x timer_svc_ = &get_timer_service(ctx, *this);
373 551x timer_svc_->set_on_earliest_changed(
374 3868x timer_service::callback(this, [](void* p) {
375 3317x static_cast<io_uring_scheduler*>(p)->interrupt_reactor();
376 3317x }));
377
378 551x get_resolver_service(ctx, *this);
379 551x get_signal_service(ctx, *this);
380
381 // Ring init is deferred to lazy_init_ring() so configure_single_-
382 // threaded(true), which the io_context applies after construction,
383 // can take effect before io_uring_queue_init_params chooses flags.
384 551x }
385
386 inline
387 1102x io_uring_scheduler::~io_uring_scheduler()
388 {
389 551x if (ring_inited_)
390 {
391 416x if (wakeup_eventfd_ >= 0)
392 416x ::close(wakeup_eventfd_);
393 416x ::io_uring_queue_exit(&ring_);
394 }
395 1102x }
396
397 inline void
398 26682x io_uring_scheduler::lazy_init_ring() const
399 {
400 26682x std::call_once(ring_init_once_, [this] {
401 416x lazy_init_ring_unlocked();
402 416x });
403 26682x }
404
405 inline void
406 416x io_uring_scheduler::lazy_init_ring_unlocked() const
407 {
408 416x io_uring_params params{};
409 416x if (single_threaded_)
410 {
411 // SINGLE_ISSUER promises the kernel one submitter thread,
412 // letting it skip internal SQ locking. DEFER_TASKRUN tells
413 // it to batch task_work delivery at io_uring_enter(GETEVENTS)
414 // boundaries instead of interrupting the run thread via
415 // TWA_SIGNAL — eliminates cache pollution from mid-flight
416 // task_work and gives a meaningful single-threaded
417 // throughput uplift.
418 //
419 // Plan 3 disabled DEFER_TASKRUN defensively over a misread
420 // of the GETEVENTS contract. Plan 4a re-enabled it: liburing's
421 // io_uring_submit_and_wait_timeout always sets
422 // IORING_ENTER_GETEVENTS when wait_nr > 0, regardless of
423 // ts. Our run loop's only kernel-wait call passes wait_nr=1.
424 // Submit-only paths (cancel_and_flush, etc.) leave their
425 // CQEs queued until the leader's next GETEVENTS-bearing
426 // wait — benign.
427 //
428 // Multi-thread mode never sets these flags: SINGLE_ISSUER
429 // would be unsafe with multiple submitter threads.
430 //
431 // DEFER_TASKRUN is suppressed when SQPOLL is also enabled
432 // — the kernel rejects that combination with -EINVAL. The
433 // SQPOLL polling thread already delivers completions
434 // without TWA_SIGNAL interruption, so DEFER_TASKRUN's
435 // benefit is moot in that mode.
436 1x params.flags = IORING_SETUP_SINGLE_ISSUER;
437 1x if (!enable_sqpoll_)
438 1x params.flags |= IORING_SETUP_DEFER_TASKRUN;
439 }
440
441 416x if (enable_sqpoll_)
442 {
443 // SQPOLL forks a kernel thread that busy-polls the SQ ring;
444 // submission becomes a userspace-only memory store. Combines
445 // with SINGLE_ISSUER (the kernel accepts that pair) but NOT
446 // with DEFER_TASKRUN (kernel returns -EINVAL); the
447 // single_threaded_ branch above suppresses DEFER_TASKRUN
448 // when SQPOLL is also set. Idle timeout 0 means kernel
449 // default (1ms); we only forward when explicitly set so
450 // the kernel default is preserved.
451 params.flags |= IORING_SETUP_SQPOLL;
452 if (sq_thread_idle_ms_ != 0)
453 params.sq_thread_idle = sq_thread_idle_ms_;
454 if (sq_thread_cpu_ >= 0)
455 {
456 params.flags |= IORING_SETUP_SQ_AFF;
457 params.sq_thread_cpu = static_cast<__u32>(sq_thread_cpu_);
458 }
459 }
460
461 416x int rc = ::io_uring_queue_init_params(256, &ring_, &params);
462 416x if (rc < 0)
463 detail::throw_system_error(
464 make_err(-rc), "io_uring_queue_init_params");
465
466 416x wakeup_eventfd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
467 416x if (wakeup_eventfd_ < 0)
468 {
469 int errn = errno;
470 ::io_uring_queue_exit(&ring_);
471 detail::throw_system_error(make_err(errn), "eventfd");
472 }
473
474 // Register a one-shot poll on the wake eventfd. user_data nullptr
475 // is the sentinel recognized by process_completions, which calls
476 // drain_wakeup_eventfd() to consume the eventfd byte AND re-arm
477 // the poll. Plan 5a switched away from IORING_POLL_MULTISHOT
478 // because multishot ops can silently terminate (e.g. under CQ
479 // pressure), and we don't observe the termination — leaving the
480 // wake mechanism dead and the leader stuck in kernel wait. One-
481 // shot rearm-on-fire is fail-fast: every wake event is paired
482 // with an explicit rearm, so a missed rearm would manifest
483 // immediately as the next wake being lost (test-visible).
484 416x ::io_uring_sqe* sqe = ::io_uring_get_sqe(&ring_);
485 416x if (!sqe)
486 {
487 ::close(wakeup_eventfd_);
488 ::io_uring_queue_exit(&ring_);
489 detail::throw_system_error(
490 make_err(ENOSPC), "io_uring_get_sqe (wakeup)");
491 }
492 // Multishot poll: fires a CQE on each eventfd POLLIN without
493 // consuming the SQE. Avoids the re-arm hazard of one-shot poll
494 // (where drain_wakeup_eventfd's get_sqe could return null on a
495 // full SQ, leaving no SQE to detect future wakes).
496 416x ::io_uring_prep_poll_multishot(sqe, wakeup_eventfd_, POLLIN);
497 416x ::io_uring_sqe_set_data(sqe, nullptr);
498 416x int submit_rc = ::io_uring_submit(&ring_);
499 416x if (submit_rc < 0)
500 {
501 ::close(wakeup_eventfd_);
502 ::io_uring_queue_exit(&ring_);
503 detail::throw_system_error(
504 make_err(-submit_rc), "io_uring_submit (wakeup)");
505 }
506
507 416x ring_inited_ = true;
508 416x }
509
510 inline void
511 551x io_uring_scheduler::shutdown()
512 {
513 551x stopped_.store(true, std::memory_order_release);
514
515 // Drain posted ops, calling destroy() on each so embedded handles
516 // (coroutine frames, error_code outputs) get torn down rather
517 // than leaked. Mirrors reactor_scheduler::shutdown_drain.
518 //
519 // Service shutdown order (driven by capy::execution_context):
520 // each socket/acceptor service::shutdown() submits a cancel SQE
521 // for every live impl. The CQEs that result either land in
522 // completed_ops_ (drained here as op->destroy()) or stay in the
523 // kernel ring; ~scheduler's io_uring_queue_exit cleans the
524 // latter up at process teardown. Self-referential impl_ptr
525 // cycles (e.g. multishot acceptor's multi_op_->impl_ptr) are
526 // broken explicitly inside each service before the scheduler
527 // shutdown runs.
528 551x lock_type lock(dispatch_mutex_);
529 582x while (auto* op = completed_ops_.pop())
530 {
531 31x lock.unlock();
532 31x op->destroy();
533 31x lock.lock();
534 31x }
535 551x cond_.notify_all();
536 551x }
537
538 inline void
539 372x io_uring_scheduler::stop()
540 {
541 372x stopped_.store(true, std::memory_order_release);
542 {
543 372x lock_type lock(dispatch_mutex_);
544 372x cond_.notify_all();
545 372x }
546 // Force-wake unconditionally — bypass interrupt_reactor's CAS
547 // coalescing. A dropped wake here leaves the leader blocked
548 // forever in submit_and_wait_timeout (no further CQE will
549 // arrive after stop()). With multishot poll on wakeup_eventfd_,
550 // this write reliably produces a CQE.
551 372x if (ring_inited_)
552 {
553 370x std::uint64_t v = 1;
554 [[maybe_unused]] auto r =
555 370x ::write(wakeup_eventfd_, &v, sizeof(v));
556 }
557 372x }
558
559 inline bool
560 36x io_uring_scheduler::stopped() const noexcept
561 {
562 36x return stopped_.load(std::memory_order_acquire);
563 }
564
565 inline void
566 71x io_uring_scheduler::restart()
567 {
568 71x stopped_.store(false, std::memory_order_release);
569 71x }
570
571 inline void
572 27430x io_uring_scheduler::work_started() noexcept
573 {
574 27430x outstanding_work_.fetch_add(1, std::memory_order_relaxed);
575 27430x }
576
577 inline void
578 39507x io_uring_scheduler::work_finished() noexcept
579 {
580 79014x if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
581 326x stop();
582 39507x }
583
584 inline void
585 13455x io_uring_scheduler::interrupt_reactor() const noexcept
586 {
587 // Skip if the ring hasn't been initialised yet — there's no leader
588 // to wake and no eventfd to write.
589 13455x if (!ring_inited_)
590 return;
591
592 // Single-thread: the user's coroutines run on the leader thread,
593 // so when interrupt_reactor is called from user code the leader
594 // is not in kernel wait — there is nothing to wake.
595 13455x if (single_threaded_)
596 return;
597
598 // Multi-thread: write the eventfd unconditionally. CAS-coalescing
599 // is unsafe here because the leader's Phase 2 in do_one waits
600 // indefinitely for a CQE; a dropped wake leaves the leader
601 // blocked forever when there is no other CQE-producing activity.
602 // Multishot poll on wakeup_eventfd_ delivers a CQE for every
603 // write, so multiple writes in flight produce multiple CQEs
604 // (drained together by drain_wakeup_eventfd's single read of
605 // the eventfd counter).
606 13455x std::uint64_t v = 1;
607 13455x [[maybe_unused]] auto r = ::write(wakeup_eventfd_, &v, sizeof(v));
608 13455x wakeup_armed_.store(true, std::memory_order_release);
609 }
610
611 inline void
612 12833x io_uring_scheduler::drain_wakeup_eventfd() const noexcept
613 {
614 std::uint64_t v;
615 12833x [[maybe_unused]] auto r = ::read(wakeup_eventfd_, &v, sizeof(v));
616
617 // Multishot poll never needs re-arming. The poll-add was queued
618 // once at lazy_init_ring with IORING_POLL_ADD_MULTI; each eventfd
619 // POLLIN produces a CQE without consuming the SQE.
620 //
621 // Release pairs with the acquire side of interrupt_reactor's CAS:
622 // a posting thread that observes wakeup_armed_ == false from this
623 // store will see the eventfd already drained by the leader.
624 12833x wakeup_armed_.store(false, std::memory_order_release);
625 12833x }
626
627 inline void
628 2057x io_uring_scheduler::post(std::coroutine_handle<> h) const
629 {
630 struct post_handler final : scheduler_op
631 {
632 std::coroutine_handle<> h_;
633 2057x explicit post_handler(std::coroutine_handle<> h) noexcept : h_(h) {}
634
635 2049x void operator()() override
636 {
637 2049x auto saved = h_;
638 2049x delete this;
639 std::atomic_thread_fence(std::memory_order_acquire);
640 2049x saved.resume();
641 2049x }
642
643 8x void destroy() override
644 {
645 8x auto saved = h_;
646 8x delete this;
647 8x if (saved)
648 8x saved.destroy();
649 8x }
650 };
651
652 2057x auto* op = new post_handler(h);
653 2057x lazy_init_ring();
654 2057x outstanding_work_.fetch_add(1, std::memory_order_relaxed);
655 bool wake_leader;
656 {
657 2057x lock_type lock(dispatch_mutex_);
658 2057x completed_ops_.push(op);
659 2057x wake_leader = task_running_;
660 2057x if (!wake_leader)
661 2057x cond_.notify_one();
662 2057x }
663 2057x if (wake_leader)
664 interrupt_reactor();
665 2057x }
666
667 inline void
668 10051x io_uring_scheduler::post(scheduler_op* op) const
669 {
670 10051x lazy_init_ring();
671 10051x outstanding_work_.fetch_add(1, std::memory_order_relaxed);
672 bool wake_leader;
673 {
674 10051x lock_type lock(dispatch_mutex_);
675 10051x completed_ops_.push(op);
676 10051x wake_leader = task_running_;
677 10051x if (!wake_leader)
678 6751x cond_.notify_one();
679 10051x }
680 10051x if (wake_leader)
681 3300x interrupt_reactor();
682 10051x }
683
684 // Thread-local stack of frames for io_uring schedulers being run on the
685 // current thread. Holds the running-scheduler pointer (for
686 // running_in_this_thread reporting) and the inline completion budget
687 // used by the speculative non-blocking I/O path (plan 5j). Nesting
688 // stacks frames via prev_ so each scheduler gets its own budget.
689 struct io_uring_scheduler_frame
690 {
691 io_uring_scheduler const* sched;
692 io_uring_scheduler_frame* prev;
693 int inline_budget;
694 int inline_budget_max;
695 };
696
697 inline thread_local io_uring_scheduler_frame* tl_running_scheduler_frame_ = nullptr;
698
699 // Default inline budget. Matches reactor's initial budget (2). Adaptive
700 // ramp-up to a max is intentionally NOT implemented yet — keep it simple
701 // for plan 5j and revisit if benches show fairness issues.
702 inline constexpr int io_uring_inline_budget_initial = 2;
703 inline constexpr int io_uring_inline_budget_max = 16;
704
705 /// RAII guard: pushes a frame onto the thread's running-scheduler stack
706 /// on construction, restores the previous on destruction. Used by
707 /// run/run_one/wait_one/poll/poll_one to mark the running thread and
708 /// hold a fresh inline budget for speculative completions.
709 struct io_uring_run_guard
710 {
711 io_uring_scheduler_frame frame_;
712
713 352x explicit io_uring_run_guard(io_uring_scheduler const* self) noexcept
714 352x : frame_{self, tl_running_scheduler_frame_,
715 io_uring_inline_budget_initial,
716 io_uring_inline_budget_max}
717 {
718 352x tl_running_scheduler_frame_ = &frame_;
719 352x }
720
721 352x ~io_uring_run_guard() noexcept
722 {
723 352x tl_running_scheduler_frame_ = frame_.prev;
724 352x }
725 };
726
727 inline bool
728 878x io_uring_scheduler::running_in_this_thread() const noexcept
729 {
730 878x for (auto* f = tl_running_scheduler_frame_; f != nullptr; f = f->prev)
731 {
732 258x if (f->sched == this)
733 258x return true;
734 }
735 620x return false;
736 }
737
738 inline void
739 52246x io_uring_scheduler::reset_inline_budget() const noexcept
740 {
741 52246x for (auto* f = tl_running_scheduler_frame_; f != nullptr; f = f->prev)
742 {
743 52246x if (f->sched == this)
744 {
745 52246x f->inline_budget = f->inline_budget_max;
746 52246x return;
747 }
748 }
749 }
750
751 inline bool
752 282456x io_uring_scheduler::try_consume_inline_budget() const noexcept
753 {
754 282456x for (auto* f = tl_running_scheduler_frame_; f != nullptr; f = f->prev)
755 {
756 282456x if (f->sched == this)
757 {
758 282456x if (f->inline_budget > 0)
759 {
760 265862x --f->inline_budget;
761 265862x return true;
762 }
763 16594x return false;
764 }
765 }
766 return false;
767 }
768
769 inline std::size_t
770 346x io_uring_scheduler::run()
771 {
772 346x lazy_init_ring();
773 692x if (outstanding_work_.load(std::memory_order_acquire) == 0)
774 {
775 27x stop();
776 27x return 0;
777 }
778
779 319x io_uring_run_guard guard(this);
780 319x std::size_t n = 0;
781 for (;;)
782 {
783 32516x std::size_t r = do_one(-1);
784 32516x if (r)
785 {
786 32197x if (n != (std::numeric_limits<std::size_t>::max)())
787 32197x ++n;
788 32197x continue;
789 }
790 641x if (outstanding_work_.load(std::memory_order_acquire) == 0 ||
791 3x stopped_.load(std::memory_order_acquire))
792 319x break;
793 // do_one returned 0 but work still outstanding (e.g. timer
794 // expiry dispatched async work). Continue.
795 32197x }
796 319x return n;
797 319x }
798
799 inline std::size_t
800 4x io_uring_scheduler::run_one()
801 {
802 4x lazy_init_ring();
803 8x if (outstanding_work_.load(std::memory_order_acquire) == 0)
804 {
805 stop();
806 return 0;
807 }
808 4x io_uring_run_guard guard(this);
809 4x return do_one(-1);
810 4x }
811
812 inline std::size_t
813 26x io_uring_scheduler::wait_one(long usec)
814 {
815 26x lazy_init_ring();
816 52x if (outstanding_work_.load(std::memory_order_acquire) == 0)
817 {
818 10x stop();
819 10x return 0;
820 }
821 16x io_uring_run_guard guard(this);
822 16x return do_one(usec);
823 16x }
824
825 inline std::size_t
826 14x io_uring_scheduler::poll()
827 {
828 14x lazy_init_ring();
829 28x if (outstanding_work_.load(std::memory_order_acquire) == 0)
830 {
831 3x stop();
832 3x return 0;
833 }
834 11x io_uring_run_guard guard(this);
835 11x std::size_t n = 0;
836 32x while (do_one(0))
837 {
838 21x if (n != (std::numeric_limits<std::size_t>::max)())
839 21x ++n;
840 }
841 11x return n;
842 11x }
843
844 inline std::size_t
845 4x io_uring_scheduler::poll_one()
846 {
847 4x lazy_init_ring();
848 8x if (outstanding_work_.load(std::memory_order_acquire) == 0)
849 {
850 2x stop();
851 2x return 0;
852 }
853 2x io_uring_run_guard guard(this);
854 2x return do_one(0);
855 2x }
856
857 inline std::size_t
858 32570x io_uring_scheduler::do_one(long timeout_us)
859 {
860 // Leader-follower: only one thread at a time may call
861 // io_uring_submit_and_wait_timeout on a shared ring (liburing's
862 // userspace head/tail bookkeeping is not thread-safe). Other
863 // threads either dispatch ready ops from completed_ops_ or wait
864 // on cond_ until the leader returns from the kernel.
865 32570x if (stopped_.load(std::memory_order_acquire))
866 317x return 0;
867
868 // submit_sqes_op only pumps the ring once per SQE batch. If the user
869 // keeps a non-empty completed_ops_ (e.g. timer with 0ns expiry as a
870 // yield primitive), the leader-phase kernel pass below never runs
871 // and CQEs accumulate in the ring forever — sub_request's read CQE
872 // never gets drained and the bench spins. submit_and_get_events
873 // (not plain submit) is required because IORING_SETUP_DEFER_TASKRUN
874 // gates task work on IORING_ENTER_GETEVENTS.
875 //
876 // Gate the kernel pump on there being io_uring-specific work. The
877 // check is performed under ring_mutex_ so a concurrent cross-thread
878 // submitter cannot prep an SQE that we then race past — both this
879 // path and io_uring_submit_op acquire ring_mutex_ before touching
880 // the ring. When all three sources are empty (no io_uring ops in
881 // flight needing DEFER_TASKRUN GETEVENTS, no userspace-pending
882 // SQEs, no kernel-ready CQEs) a kernel entry would have no work —
883 // saves ~8 pp of cycles on the no-I/O microbenchmark
884 // (io_context:single_threaded). We deliberately do NOT include
885 // outstanding_work_ here, because that counter mixes coroutine
886 // posts (in completed_ops_) with io_uring work — IOCTX has many
887 // coroutine posts and no io_uring work, and the kernel pump there
888 // is pure overhead.
889 32253x if (ring_inited_)
890 {
891 32253x lock_type ring_lock(ring_mutex_);
892 32253x if (io_uring_inflight_.load(std::memory_order_acquire) != 0
893 2088x || ::io_uring_sq_ready(&ring_) != 0
894 34341x || ::io_uring_cq_ready(&ring_) != 0)
895 {
896 30321x ::io_uring_submit_and_get_events(&ring_);
897 30321x process_completions();
898 }
899 32253x }
900
901 // Drain expired timers eagerly, for the same reason the kernel CQE
902 // pump runs unconditionally above: when completed_ops_ stays non-
903 // empty (e.g. continuous loopback I/O whose CQEs land in the top-
904 // of-do_one process_completions call), the leader-wait branch
905 // below — the only other place process_expired() runs — is never
906 // reached. Without this, stopper-timer-based shutdowns (and any
907 // other timer dependent on a busy I/O loop yielding) deadlock.
908 //
909 // empty() is a single relaxed-acquire atomic load on
910 // timer_service::cached_nearest_ns_ (lock-free, no clock_gettime).
911 // Skipping process_expired() when no timer is registered avoids the
912 // mutex + clock_gettime hot-path cost that dominates IOCTX cycles
913 // (~25 pp on io_context:single_threaded). When a timer IS
914 // registered the call runs exactly as before, preserving the
915 // deadlock fix this guard was originally written to address.
916 32253x if (!timer_svc_->empty())
917 29413x timer_svc_->process_expired();
918
919 32253x lock_type lock(dispatch_mutex_);
920 for (;;)
921 {
922 35565x if (stopped_.load(std::memory_order_acquire))
923 3x return 0;
924
925 35562x if (auto* op = completed_ops_.pop())
926 {
927 // Hand off any remaining queued work to a follower so we
928 // dispatch in parallel.
929 32236x if (!completed_ops_.empty())
930 15813x cond_.notify_one();
931 32236x lock.unlock();
932 // Speculative follow-ups in the handler share this budget.
933 32236x reset_inline_budget();
934 32236x (*op)();
935 32236x work_finished();
936 32236x return 1;
937 }
938
939 6652x if (outstanding_work_.load(std::memory_order_acquire) == 0)
940 3x return 0;
941
942 3323x if (task_running_)
943 {
944 // Another thread holds leadership; either return (poll)
945 // or wait for it to deliver work / release leadership.
946 if (timeout_us == 0)
947 return 0;
948 if (timeout_us < 0)
949 cond_.wait(lock);
950 else
951 {
952 cond_.wait_for(
953 lock, std::chrono::microseconds(timeout_us));
954 // wait_one honoured its timeout; if nothing arrived,
955 // return rather than re-arm.
956 if (completed_ops_.empty() &&
957 !stopped_.load(std::memory_order_acquire))
958 return 0;
959 }
960 continue;
961 }
962
963 // Become the leader: run the kernel poll. We drop the lock
964 // for the blocking wait, then take it back to release
965 // leadership and wake any follower that should pick up new
966 // work.
967 3323x __kernel_timespec ts{};
968 3323x __kernel_timespec* ts_ptr = nullptr;
969 3323x auto next_expiry = timer_svc_->nearest_expiry();
970 3323x auto now = std::chrono::steady_clock::now();
971
972 3323x if (timeout_us == 0)
973 {
974 7x ts.tv_sec = 0;
975 7x ts.tv_nsec = 0;
976 7x ts_ptr = &ts;
977 }
978 3316x else if (next_expiry != timer_service::time_point::max())
979 {
980 auto delta_ns =
981 3295x std::chrono::duration_cast<std::chrono::nanoseconds>(
982 3295x next_expiry - now)
983 3295x .count();
984 3295x if (delta_ns < 0) delta_ns = 0;
985 3295x ts.tv_sec = delta_ns / 1'000'000'000;
986 3295x ts.tv_nsec = delta_ns % 1'000'000'000;
987 3295x ts_ptr = &ts;
988 }
989 21x else if (timeout_us > 0)
990 {
991 4x ts.tv_sec = timeout_us / 1'000'000;
992 4x ts.tv_nsec = (timeout_us % 1'000'000) * 1000;
993 4x ts_ptr = &ts;
994 }
995 else
996 {
997 // run() with no pending timers: cap the kernel wait at 1s
998 // so the leader periodically re-checks state. Defense in
999 // depth against a lost wakeup (e.g. multishot poll on the
1000 // wakeup eventfd terminates and the re-arm SQE doesn't
1001 // reach the kernel in time). Worst case: one extra
1002 // wake-up per io_context per second when truly idle.
1003 17x ts.tv_sec = 1;
1004 17x ts.tv_nsec = 0;
1005 17x ts_ptr = &ts;
1006 }
1007
1008 3323x task_running_ = true;
1009 3323x lock.unlock();
1010
1011 // Three-phase kernel wait, matching Boost.Asio's
1012 // io_uring_service::run pattern. ring_mutex_ is held briefly
1013 // to push pending SQEs and to drain CQEs, but NOT during
1014 // the blocking io_uring_wait_cqe_timeout. Cross-thread
1015 // submitters (io_uring_submit_op, cancel paths) can take
1016 // ring_mutex_ during the wait and prep new SQEs without
1017 // blocking on the leader; their wake eventfd write fires the
1018 // multishot poll and returns the leader from wait_cqe_timeout
1019 // promptly.
1020 //
1021 // Phase 1 — submit any pending SQEs to the kernel.
1022 {
1023 3323x lock_type ring_lock(ring_mutex_);
1024 3323x ::io_uring_submit(&ring_);
1025 3323x }
1026
1027 // Phase 2 — wait for at least one CQE without holding the
1028 // mutex. Multi-thread `io_uring_enter` is permitted without
1029 // SINGLE_ISSUER. wait_cqe_timeout only peeks the CQ ring;
1030 // head advancement happens under the mutex in
1031 // process_completions below.
1032 3323x ::io_uring_cqe* cqe = nullptr;
1033 3323x int rc = ::io_uring_wait_cqe_timeout(&ring_, &cqe, ts_ptr);
1034
1035 // Phase 3 — drain CQEs under the mutex.
1036 {
1037 3323x lock_type ring_lock(ring_mutex_);
1038 3323x if (rc == 0 || rc == -ETIME || rc == -EINTR)
1039 3323x process_completions();
1040 3323x }
1041
1042 3323x if (rc < 0 && rc != -ETIME && rc != -EINTR)
1043 {
1044 // Restore state before propagating so followers don't
1045 // deadlock waiting for a leader that never returns.
1046 lock.lock();
1047 task_running_ = false;
1048 cond_.notify_all();
1049 detail::throw_system_error(
1050 make_err(-rc), "io_uring_wait_cqe_timeout");
1051 }
1052
1053 3323x if (!timer_svc_->empty())
1054 3302x timer_svc_->process_expired();
1055
1056 3323x lock.lock();
1057 3323x task_running_ = false;
1058 3323x cond_.notify_all();
1059
1060 // For poll() / wait_one() we honour the timeout: one kernel
1061 // pass is the contract. If still nothing dispatchable, exit.
1062 // For run() (timeout < 0) keep looping until work arrives or
1063 // someone calls stop().
1064 3323x if (timeout_us >= 0 && completed_ops_.empty())
1065 11x return 0;
1066 3312x }
1067 32253x }
1068
1069 inline void
1070 33644x io_uring_scheduler::process_completions()
1071 {
1072 unsigned head;
1073 ::io_uring_cqe* cqe;
1074 33644x unsigned consumed = 0;
1075
1076 // Collect completed I/O ops locally; splice into completed_ops_
1077 // after the loop so do_one dispatches them one at a time.
1078 33644x op_queue local_ops;
1079
1080 33644x std::int64_t inflight_dec = 0;
1081 59306x io_uring_for_each_cqe(&ring_, head, cqe)
1082 {
1083 25662x void* ud = io_uring_cqe_get_data(cqe);
1084 25662x if (ud == nullptr)
1085 {
1086 // Wakeup eventfd CQE: drain the eventfd byte. Not counted
1087 // by io_uring_inflight_; we never incremented for the
1088 // wakeup multishot SQE (its progress doesn't depend on
1089 // userspace getevents).
1090 12833x drain_wakeup_eventfd();
1091 // If multishot terminated (kernel dropped under memory
1092 // pressure or similar), re-arm. Each CQE except the last
1093 // sets IORING_CQE_F_MORE.
1094 12833x if ((cqe->flags & IORING_CQE_F_MORE) == 0)
1095 {
1096 ::io_uring_sqe* re = ::io_uring_get_sqe(&ring_);
1097 if (!re)
1098 {
1099 ::io_uring_submit(&ring_);
1100 re = ::io_uring_get_sqe(&ring_);
1101 }
1102 if (re)
1103 {
1104 ::io_uring_prep_poll_multishot(
1105 re, wakeup_eventfd_, POLLIN);
1106 ::io_uring_sqe_set_data(re, nullptr);
1107 }
1108 }
1109 }
1110 12829x else if (ud == &cancel_sentinel_)
1111 {
1112 // CQE for an ASYNC_CANCEL op — ignore; the actual op's
1113 // CQE arrives separately and is dispatched via cqe_func.
1114 // Cancels are one-shot, no F_MORE, decrement inflight.
1115 6201x ++inflight_dec;
1116 }
1117 else
1118 {
1119 6628x auto* iop = static_cast<io_uring_op*>(ud);
1120 6628x (*iop->cqe_func)(iop, cqe->res, cqe->flags, local_ops);
1121 // Decrement inflight on the terminal CQE only — multishot
1122 // ops (acceptor) hold the SQE alive across F_MORE CQEs and
1123 // free it only when F_MORE is cleared.
1124 6628x if ((cqe->flags & IORING_CQE_F_MORE) == 0)
1125 3559x ++inflight_dec;
1126 }
1127 25662x ++consumed;
1128 }
1129 33644x if (inflight_dec)
1130 9535x io_uring_inflight_.fetch_sub(
1131 inflight_dec, std::memory_order_acq_rel);
1132
1133 33644x if (consumed)
1134 12964x io_uring_cq_advance(&ring_, consumed);
1135
1136 // Caller holds ring_mutex_. Take dispatch_mutex_ briefly to
1137 // splice locally-collected ops onto the global queue (lock order
1138 // ring_mutex_ -> dispatch_mutex_).
1139 33644x if (!local_ops.empty())
1140 {
1141 3539x lock_type lock(dispatch_mutex_);
1142 3539x completed_ops_.splice(local_ops);
1143 // Wake any follower waiting on cond_; it'll pop and dispatch.
1144 3539x cond_.notify_one();
1145 3539x }
1146 33644x }
1147
1148 inline void
1149 17x io_uring_scheduler::submit_sqes_op::do_handler(
1150 void* owner, scheduler_op* base,
1151 std::uint32_t /*bytes*/, std::uint32_t /*error*/) noexcept
1152 {
1153 17x if (owner == nullptr)
1154 17x return; // shutdown drain — nothing to do; SQE storage is
1155 // kernel-mapped and discarded by io_uring_queue_exit.
1156
1157 auto* self = static_cast<submit_sqes_op*>(base);
1158 auto* sched = self->sched_;
1159
1160 io_uring_scheduler::lock_type ring_lock(sched->ring_mutex_);
1161 sched->submit_op_posted_ = false;
1162 ::io_uring_submit_and_get_events(&sched->ring_);
1163 sched->process_completions();
1164 }
1165
1166 inline void
1167 109x io_uring_scheduler::submit_cancel_by_user_data(io_uring_op* target) noexcept
1168 {
1169 109x lazy_init_ring();
1170 // Wake the leader (if any) so its submit_and_wait_timeout returns
1171 // and releases ring_mutex_; otherwise we'd block here until the
1172 // next CQE arrives organically. Cancellation is best-effort if
1173 // the SQ stays full after one flush — the op completes on its
1174 // own and reports cancelled via the in-flight `cancelled` flag.
1175 109x interrupt_reactor();
1176 109x lock_type lock(ring_mutex_);
1177 109x io_uring_sqe* sqe = io_uring_get_sqe(&ring_);
1178 109x if (!sqe)
1179 {
1180 io_uring_submit(&ring_);
1181 sqe = io_uring_get_sqe(&ring_);
1182 }
1183 109x if (!sqe)
1184 return;
1185
1186 109x io_uring_prep_cancel(sqe, target, 0);
1187 109x io_uring_sqe_set_data(sqe, &cancel_sentinel_);
1188 109x inflight_inc();
1189 109x }
1190
1191 inline void
1192 80x io_uring_scheduler::submit_cancel_by_fd(int fd) noexcept
1193 {
1194 80x lazy_init_ring();
1195 80x interrupt_reactor();
1196 80x lock_type lock(ring_mutex_);
1197 80x io_uring_sqe* sqe = io_uring_get_sqe(&ring_);
1198 80x if (!sqe)
1199 {
1200 io_uring_submit(&ring_);
1201 sqe = io_uring_get_sqe(&ring_);
1202 }
1203 80x if (!sqe)
1204 return;
1205
1206 80x io_uring_prep_cancel_fd(sqe, fd, IORING_ASYNC_CANCEL_ALL);
1207 80x io_uring_sqe_set_data(sqe, &cancel_sentinel_);
1208 80x inflight_inc();
1209 80x }
1210
1211 inline void
1212 115x io_uring_op::on_cancel() noexcept
1213 {
1214 115x request_cancel(); // coro_op: records the cancellation (sets the flag)
1215 // Skip the cancel SQE if we never linked an SQE to this op — the
1216 // bypass path in the caller will see cancelled=true and complete
1217 // synchronously without a kernel round-trip.
1218 115x if (sched_ && sqe_set.load(std::memory_order_acquire))
1219 109x sched_->submit_cancel_by_user_data(this);
1220 115x }
1221
1222 inline void
1223 6534x io_uring_scheduler::cancel_and_flush(int fd) noexcept
1224 {
1225 6534x lazy_init_ring();
1226 6534x interrupt_reactor();
1227 6534x lock_type lock(ring_mutex_);
1228 6534x io_uring_sqe* sqe = io_uring_get_sqe(&ring_);
1229 6534x if (!sqe)
1230 {
1231 io_uring_submit(&ring_);
1232 sqe = io_uring_get_sqe(&ring_);
1233 }
1234 6534x if (sqe)
1235 {
1236 6534x io_uring_prep_cancel_fd(sqe, fd, IORING_ASYNC_CANCEL_ALL);
1237 6534x io_uring_sqe_set_data(sqe, &cancel_sentinel_);
1238 6534x inflight_inc();
1239 }
1240 // Flush while fd is still open so the kernel resolves the file
1241 // from the fd number before the caller closes and recycles it.
1242 6534x io_uring_submit(&ring_);
1243 6534x }
1244
1245 inline void
1246 115x io_uring_scheduler::drain_cqes_for(io_uring_op* target) noexcept
1247 {
1248 115x lazy_init_ring();
1249 // Submit a cancel by user_data so the kernel returns CQEs for
1250 // the target promptly, then iterate the CQ ring and consume
1251 // every CQE that matches `target`. ring_mutex_ serializes against
1252 // the leader's kernel wait and any concurrent cancel path; the
1253 // interrupt_reactor() ensures the leader returns promptly so we
1254 // can take the mutex.
1255 115x interrupt_reactor();
1256 {
1257 115x lock_type lock(ring_mutex_);
1258 115x if (auto* sqe = io_uring_get_sqe(&ring_))
1259 {
1260 115x io_uring_prep_cancel(sqe, target, 0);
1261 115x io_uring_sqe_set_data(sqe, &cancel_sentinel_);
1262 115x inflight_inc();
1263 }
1264 115x io_uring_submit(&ring_);
1265 115x }
1266
1267 // Loop a few rounds: cancel SQE submission, then drain CQEs.
1268 // Bounded loop avoids stalls if the kernel never returns a
1269 // cancel completion — best-effort.
1270 119x for (int rounds = 0; rounds < drain_cqes_max_rounds; ++rounds)
1271 {
1272 119x lock_type lock(ring_mutex_);
1273
1274 unsigned head;
1275 ::io_uring_cqe* cqe;
1276 119x unsigned consumed = 0;
1277 119x bool saw_target = false;
1278
1279 909x io_uring_for_each_cqe(&ring_, head, cqe)
1280 {
1281 790x void* ud = io_uring_cqe_get_data(cqe);
1282 790x if (ud == target)
1283 {
1284 112x saw_target = true;
1285 // Don't dispatch — caller is destructing target;
1286 // just consume so the CQE doesn't dangle.
1287 }
1288 // Other CQEs are intentionally NOT dispatched here. They
1289 // may belong to ops freed by sibling teardowns (other
1290 // acceptors / sockets), and dispatching would UAF. The
1291 // next normal run-loop iteration will handle them; the
1292 // io_context's destructor sequence runs services'
1293 // shutdowns before ~scheduler so any still-live ops get
1294 // a chance to drain through their own paths first.
1295 790x ++consumed;
1296 }
1297 119x if (consumed)
1298 {
1299 115x io_uring_cq_advance(&ring_, consumed);
1300 115x if (saw_target)
1301 111x break;
1302 4x continue;
1303 }
1304
1305 // Nothing in the CQ — kick the kernel briefly. Hold
1306 // ring_mutex_ across the wait so we don't race with the
1307 // run-loop leader.
1308 4x __kernel_timespec ts{
1309 0, static_cast<long long>(drain_cqes_kick_ns)};
1310 4x ::io_uring_cqe* one = nullptr;
1311 4x int rc = ::io_uring_submit_and_wait_timeout(
1312 &ring_, &one, 1, &ts, nullptr);
1313 4x if (rc < 0 && rc != -ETIME && rc != -EINTR)
1314 break;
1315 4x if (rc == -ETIME)
1316 4x break;
1317 119x }
1318 115x }
1319
1320 } // namespace boost::corosio::detail
1321
1322 #endif // BOOST_COROSIO_HAS_IO_URING
1323
1324 #endif // BOOST_COROSIO_NATIVE_DETAIL_IO_URING_IO_URING_SCHEDULER_HPP
1325