include/boost/corosio/native/detail/io_uring/io_uring_scheduler.hpp

80.6% Lines (369/458) 93.8% List of functions (45/48)
io_uring_scheduler.hpp
f(x) Functions (48)
Function Calls Lines Blocks
boost::corosio::detail::io_uring_scheduler::ring() :101 4293x 100.0% 100.0% boost::corosio::detail::io_uring_scheduler::dispatch_mutex() const :108 33735x 100.0% 100.0% boost::corosio::detail::io_uring_scheduler::ring_mutex() const :111 4293x 100.0% 100.0% boost::corosio::detail::io_uring_scheduler::submit_op_posted_exchange(bool) const :133 4293x 100.0% 100.0% boost::corosio::detail::io_uring_scheduler::submit_op_ref() const :141 114x 100.0% 100.0% boost::corosio::detail::io_uring_scheduler::inflight_inc() const :150 12733x 100.0% 100.0% boost::corosio::detail::io_uring_scheduler::push_completed_locked(boost::corosio::detail::scheduler_op*) const :229 33735x 100.0% 100.0% boost::corosio::detail::io_uring_scheduler::configure_single_threaded(bool) :235 1x 100.0% 100.0% boost::corosio::detail::io_uring_scheduler::configure_sqpoll(bool, unsigned int, int) :260 0 0.0% 0.0% boost::corosio::detail::io_uring_scheduler::is_single_threaded() const :269 1x 100.0% 100.0% boost::corosio::detail::io_uring_scheduler::submit_sqes_op::submit_sqes_op() :329 389x 100.0% 100.0% boost::corosio::detail::io_uring_scheduler::io_uring_scheduler(boost::capy::execution_context&, int) :363 389x 100.0% 74.0% boost::corosio::detail::io_uring_scheduler::io_uring_scheduler(boost::capy::execution_context&, int)::{lambda(void*)#1}::operator()(void*) const :374 4197x 100.0% 100.0% boost::corosio::detail::io_uring_scheduler::~io_uring_scheduler() :387 778x 100.0% 100.0% boost::corosio::detail::io_uring_scheduler::lazy_init_ring() const :398 30250x 100.0% 100.0% boost::corosio::detail::io_uring_scheduler::lazy_init_ring() const::{lambda()#1}::operator()() const :400 281x 100.0% 100.0% boost::corosio::detail::io_uring_scheduler::lazy_init_ring_unlocked() const :406 281x 42.1% 38.0% boost::corosio::detail::io_uring_scheduler::shutdown() :511 389x 100.0% 87.0% boost::corosio::detail::io_uring_scheduler::stop() :539 234x 100.0% 90.0% boost::corosio::detail::io_uring_scheduler::stopped() const :560 12x 100.0% 100.0% boost::corosio::detail::io_uring_scheduler::restart() :566 58x 100.0% 100.0% boost::corosio::detail::io_uring_scheduler::work_started() :572 46714x 100.0% 100.0% boost::corosio::detail::io_uring_scheduler::work_finished() :578 59683x 100.0% 100.0% boost::corosio::detail::io_uring_scheduler::interrupt_reactor() const :585 16817x 75.0% 67.0% boost::corosio::detail::io_uring_scheduler::drain_wakeup_eventfd() const :612 16351x 100.0% 100.0% boost::corosio::detail::io_uring_scheduler::post(std::__n4861::coroutine_handle<void>) const :628 378x 100.0% 93.0% boost::corosio::detail::io_uring_scheduler::post(std::__n4861::coroutine_handle<void>) const::post_handler::post_handler(std::__n4861::coroutine_handle<void>) :633 378x 100.0% 100.0% boost::corosio::detail::io_uring_scheduler::post(std::__n4861::coroutine_handle<void>) const::post_handler::operator()() :635 375x 100.0% 100.0% boost::corosio::detail::io_uring_scheduler::post(std::__n4861::coroutine_handle<void>) const::post_handler::destroy() :643 3x 94.4% 100.0% boost::corosio::detail::io_uring_scheduler::post(boost::corosio::detail::scheduler_op*) const :668 12608x 100.0% 100.0% boost::corosio::detail::io_uring_run_guard::io_uring_run_guard(boost::corosio::detail::io_uring_scheduler const*) :713 233x 100.0% 100.0% boost::corosio::detail::io_uring_run_guard::~io_uring_run_guard() :721 233x 100.0% 100.0% boost::corosio::detail::io_uring_scheduler::running_in_this_thread() const :728 584x 100.0% 86.0% boost::corosio::detail::io_uring_scheduler::reset_inline_budget() const :739 50912x 100.0% 83.0% boost::corosio::detail::io_uring_scheduler::try_consume_inline_budget() const :752 573746x 87.5% 78.0% boost::corosio::detail::io_uring_scheduler::run() :770 221x 88.9% 72.0% boost::corosio::detail::io_uring_scheduler::run_one() :800 0 0.0% 0.0% boost::corosio::detail::io_uring_scheduler::wait_one(long) :813 5x 100.0% 73.0% boost::corosio::detail::io_uring_scheduler::poll() :826 12x 100.0% 77.0% boost::corosio::detail::io_uring_scheduler::poll_one() :845 0 0.0% 0.0% boost::corosio::detail::io_uring_scheduler::do_one(long) :858 51144x 80.2% 64.0% boost::corosio::detail::io_uring_scheduler::process_completions() :1070 54810x 78.8% 78.0% boost::corosio::detail::io_uring_scheduler::submit_sqes_op::do_handler(void*, boost::corosio::detail::scheduler_op*, unsigned int, unsigned int) :1149 10x 30.0% 38.0% boost::corosio::detail::io_uring_scheduler::submit_cancel_by_user_data(boost::corosio::detail::io_uring_op*) :1167 101x 78.6% 75.0% boost::corosio::detail::io_uring_scheduler::submit_cancel_by_fd(int) :1192 78x 78.6% 75.0% boost::corosio::detail::io_uring_op::request_cancel() :1212 101x 100.0% 89.0% boost::corosio::detail::io_uring_scheduler::cancel_and_flush(int) :1223 8176x 85.7% 80.0% boost::corosio::detail::io_uring_scheduler::drain_cqes_for(boost::corosio::detail::io_uring_op*) :1246 85x 97.0% 89.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_IO_URING_IO_URING_SCHEDULER_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_IO_URING_IO_URING_SCHEDULER_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_IO_URING
16
17 // Include before any project headers open a namespace — prevents the
18 // boost::corosio::io_uring tag variable from shadowing struct ::io_uring.
19 #include <liburing.h>
20
21 #include <boost/corosio/detail/conditionally_enabled_event.hpp>
22 #include <boost/corosio/detail/conditionally_enabled_mutex.hpp>
23 #include <boost/corosio/detail/config.hpp>
24 #include <boost/corosio/detail/except.hpp>
25 #include <boost/corosio/detail/scheduler.hpp>
26 #include <boost/corosio/detail/scheduler_op.hpp>
27 #include <boost/corosio/detail/timer_service.hpp>
28 #include <boost/corosio/native/detail/io_uring/io_uring_op.hpp>
29 #include <boost/corosio/native/detail/make_err.hpp>
30 #include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
31 #include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
32 #include <boost/capy/ex/execution_context.hpp>
33
34 #include <atomic>
35 #include <chrono>
36 #include <coroutine>
37 #include <cstddef>
38 #include <cstdint>
39 #include <limits>
40
41 #include <errno.h>
42 #include <poll.h>
43 #include <sys/eventfd.h>
44 #include <unistd.h>
45
46 namespace boost::corosio::detail {
47
48 // Forward-declared so the out-of-line inline definitions below the class
49 // can reference the frame stack without a circular dependency.
50 struct io_uring_scheduler_frame;
51 extern thread_local io_uring_scheduler_frame* tl_running_scheduler_frame_;
52
53 /** io_uring scheduler — proactor model on Linux 6.x+.
54
55 Owns one io_uring per io_context. Lazy batched submit;
56 cross-thread post wakes a registered eventfd via multishot
57 POLL_ADD.
58
59 @par Thread Safety
60 All public member functions are thread-safe.
61 */
62 class BOOST_COROSIO_DECL io_uring_scheduler final
63 : public scheduler
64 , public capy::execution_context::service
65 {
66 public:
67 using key_type = scheduler;
68 using mutex_type = conditionally_enabled_mutex;
69 using lock_type = mutex_type::scoped_lock;
70 using event_type = conditionally_enabled_event;
71
72 io_uring_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
73 ~io_uring_scheduler() override;
74 io_uring_scheduler(io_uring_scheduler const&) = delete;
75 io_uring_scheduler& operator=(io_uring_scheduler const&) = delete;
76
77 void shutdown() override;
78
79 // scheduler virtuals — definitions in Task 6
80 void post(std::coroutine_handle<>) const override;
81 void post(scheduler_op*) const override;
82
83 bool running_in_this_thread() const noexcept override;
84 void stop() override;
85 bool stopped() const noexcept override;
86 void restart() override;
87 std::size_t run() override;
88 std::size_t run_one() override;
89 std::size_t wait_one(long usec) override;
90 std::size_t poll() override;
91 std::size_t poll_one() override;
92 void work_started() noexcept override;
93 void work_finished() noexcept override;
94
95 /** Return the underlying liburing ring.
96
97 Triggers lazy ring initialisation on first call. Used by
98 socket op submission helpers (e.g. `io_uring_submit_op`) and
99 any other code path that needs a live ring pointer.
100 */
101 4293x struct ::io_uring* ring() noexcept
102 {
103 4293x lazy_init_ring();
104 4293x return &ring_;
105 }
106
107 /// Return the dispatch mutex (protects completed_ops_ / cond_).
108 33735x mutex_type& dispatch_mutex() const noexcept { return dispatch_mutex_; }
109
110 /// Return the ring mutex (serialises userspace SQ/CQ access).
111 4293x mutex_type& ring_mutex() const noexcept { return ring_mutex_; }
112
113 /** Reset the calling thread's inline-budget for this scheduler.
114
115 Called at the top of each dispatched op in `do_one` so each
116 op handler gets a fresh budget for inline speculative
117 completions. Walks the frame stack; no-op if this scheduler
118 isn't on the stack (i.e. called from a non-run thread).
119 */
120 void reset_inline_budget() const noexcept;
121
122 /** Consume one unit of inline budget if available.
123
124 @return `true` if budget was available and consumed; `false`
125 if the budget is exhausted or this scheduler is not on
126 the calling thread's run stack.
127 */
128 bool try_consume_inline_budget() const noexcept;
129
130 /// Exchange the submit-batch posted flag. Returns the prior value.
131 /// Caller MUST hold ring_mutex_ — the flag is plain bool, not atomic,
132 /// and the mutex provides the read-modify-write atomicity.
133 4293x bool submit_op_posted_exchange(bool desired) const noexcept
134 {
135 4293x bool prev = submit_op_posted_;
136 4293x submit_op_posted_ = desired;
137 4293x return prev;
138 }
139
140 /// Return a reference to the mutable embedded submit_sqes_op.
141 114x scheduler_op& submit_op_ref() const noexcept
142 {
143 114x return submit_op_;
144 }
145
146 /// Increment the io_uring in-flight counter. Callers prep an SQE
147 /// whose CQE will require IORING_ENTER_GETEVENTS to surface under
148 /// DEFER_TASKRUN. Excluded: the wakeup-eventfd multishot SQE, whose
149 /// progress doesn't depend on userspace getevents.
150 12733x void inflight_inc() const noexcept
151 {
152 12733x io_uring_inflight_.fetch_add(1, std::memory_order_release);
153 12733x }
154
155 /// Initialize the io_uring ring on first access. Idempotent.
156 void lazy_init_ring() const;
157
158 /// Wake the leader if it's blocked in `submit_and_wait_timeout`.
159 /// Best-effort: the wakeup is suppressed if the leader has already
160 /// been signalled and not yet acked.
161 void interrupt_reactor() const noexcept;
162
163 /** Submit `IORING_OP_ASYNC_CANCEL` targeting an in-flight op by its
164 user_data pointer.
165
166 The kernel delivers `-ECANCELED` on the target's CQE if it was
167 still in flight; the op's completion handler then reports
168 `operation_aborted`. Best-effort: if the SQ is full after one
169 flush attempt the function returns without cancelling (the op
170 will complete normally on its own).
171
172 @param target The in-flight op to cancel.
173 */
174 void submit_cancel_by_user_data(io_uring_op* target) noexcept;
175
176 /** Submit `IORING_OP_ASYNC_CANCEL` with `IORING_ASYNC_CANCEL_FD`
177 to cancel every in-flight op on the given fd in one SQE.
178
179 Best-effort: if the SQ is full after one flush attempt the
180 function returns without cancelling.
181
182 @param fd The file descriptor whose in-flight ops should be
183 cancelled.
184 */
185 void submit_cancel_by_fd(int fd) noexcept;
186
187 /** Submit `IORING_OP_ASYNC_CANCEL` for `fd` and immediately flush
188 the submission ring to the kernel.
189
190 Must be called while `fd` is still open so the kernel can
191 resolve the file from the fd number before it is closed and
192 potentially recycled.
193
194 Best-effort: if the SQ is full the function still flushes any
195 earlier pending SQEs to the kernel.
196
197 @param fd The file descriptor whose in-flight ops should be
198 cancelled.
199 */
200 void cancel_and_flush(int fd) noexcept;
201
202 /** Drain pending CQEs for a specific op's `user_data`.
203
204 Submits an ASYNC_CANCEL by user_data to short-circuit any
205 in-flight op holding `target`, then iterates the CQ ring and
206 consumes every CQE matching `target` so its memory can be
207 freed safely. Used by member-owned ops (e.g.
208 `uring_multi_accept_op`) whose destructor cannot tolerate
209 outstanding CQEs.
210
211 @par Thread Safety
212 Safe to call from any thread. Internally takes `ring_mutex_`
213 to serialise against the run-loop leader; calls
214 `interrupt_reactor()` first so the leader returns from its
215 kernel wait promptly.
216
217 @param target The op pointer used as user_data on the SQE.
218 */
219 void drain_cqes_for(io_uring_op* target) noexcept;
220
221 /** Queue an already-counted op while the caller holds dispatch_mutex_.
222
223 Does NOT increment `outstanding_work_`. Use for synchronous
224 completion paths (e.g. SQE backpressure) where the caller called
225 `work_started()` and already holds the dispatch lock.
226
227 @pre `dispatch_mutex_` must be locked by the calling thread.
228 */
229 33735x void push_completed_locked(scheduler_op* op) const noexcept
230 {
231 33735x completed_ops_.push(op);
232 33735x }
233
234 /// Single-threaded mode toggle (matches reactor_scheduler API).
235 1x void configure_single_threaded(bool v) noexcept override
236 {
237 1x single_threaded_ = v;
238 1x dispatch_mutex_.set_enabled(!v);
239 1x ring_mutex_.set_enabled(!v);
240 1x cond_.set_enabled(!v);
241 1x }
242
243 /** Configure SQPOLL parameters.
244
245 Must be called before the first run/poll/post — the values
246 are cached and read by `lazy_init_ring_unlocked` when the
247 ring is first constructed. No-op if `enable` is false (the
248 default).
249
250 @note When combined with single-threaded mode,
251 IORING_SETUP_DEFER_TASKRUN is suppressed — the kernel
252 rejects that combination. SINGLE_ISSUER still applies.
253
254 @param enable Set IORING_SETUP_SQPOLL on ring init.
255 @param idle_ms sq_thread_idle in milliseconds; 0 = kernel
256 default (1ms).
257 @param cpu Pin the polling thread to this CPU; -1 to
258 not pin.
259 */
260 void configure_sqpoll(
261 bool enable, unsigned idle_ms, int cpu) noexcept
262 {
263 enable_sqpoll_ = enable;
264 sq_thread_idle_ms_ = idle_ms;
265 sq_thread_cpu_ = cpu;
266 }
267
268 /// Return true if single-threaded (lockless) mode is active.
269 1x bool is_single_threaded() const noexcept override { return single_threaded_; }
270
271 private:
272 // ring_ + wakeup_eventfd_ are mutable so lazy_init_ring() (called
273 // from const contexts like post()) can populate them on first use.
274 mutable struct ::io_uring ring_{};
275 mutable int wakeup_eventfd_ = -1;
276 timer_service* timer_svc_ = nullptr;
277
278 // dispatch_mutex_ protects completed_ops_, cond_, task_running_.
279 // ring_mutex_ protects every userspace touch of ring_ (SQ tail,
280 // CQ head): get_sqe / submit / submit_and_wait_timeout /
281 // for_each_cqe / cq_advance.
282 //
283 // process_completions runs under ring_mutex_ and briefly takes
284 // dispatch_mutex_ to splice into completed_ops_. The locks are
285 // never held simultaneously for the full duration of any other
286 // path's critical section, so no deadlock.
287 mutable mutex_type dispatch_mutex_{true};
288 mutable mutex_type ring_mutex_{true};
289 mutable event_type cond_{true};
290 mutable op_queue completed_ops_;
291 // outstanding_work_ and io_uring_inflight_ are both atomic
292 // counters updated at high frequency on different paths:
293 // - outstanding_work_ : every work_started / work_finished call,
294 // including timers, posts, and SQE submits.
295 // - io_uring_inflight_ : only SQE submit + non-F_MORE CQE consume.
296 // Under multi-thread workloads the threads tend to update these
297 // from different code paths; placing them on the same cache line
298 // would cause false sharing and unnecessary cache-line ping-pong.
299 // Hold each on its own line.
300 alignas(64) mutable std::atomic<std::int64_t> outstanding_work_{0};
301 // Count of io_uring SQEs in flight whose completion requires user-
302 // space to enter the kernel via IORING_ENTER_GETEVENTS for task
303 // work to progress under IORING_SETUP_DEFER_TASKRUN. Excludes the
304 // wakeup-eventfd multishot poll (registered in lazy_init_ring), and
305 // is updated by io_uring_submit_op and by process_completions on
306 // each non-F_MORE, non-eventfd CQE. Used by do_one to skip the
307 // ring pump when there is no io_uring work pending.
308 alignas(64) mutable std::atomic<std::int64_t> io_uring_inflight_{0};
309 std::atomic<bool> stopped_{false};
310 // Leader-follower flag: true while a thread is blocked in
311 // io_uring_submit_and_wait_timeout. Protected by dispatch_mutex_.
312 mutable bool task_running_ = false;
313 bool single_threaded_ = false;
314 bool enable_sqpoll_ = false;
315 unsigned sq_thread_idle_ms_ = 0;
316 int sq_thread_cpu_ = -1;
317
318 int cancel_sentinel_ = 0;
319 mutable std::atomic<bool> wakeup_armed_{false};
320
321 /// Flushes the SQ ring and drains CQEs in one mutex-held pass.
322 /// One instance covers a whole batch; subsequent SQEs in the same
323 /// batch skip the post, amortising syscall cost across the batch.
324 /// Mirrors Asio's `submit_sqes_op` (`io_uring_service.ipp:730-742`).
325 struct submit_sqes_op final : scheduler_op
326 {
327 io_uring_scheduler* sched_ = nullptr;
328
329 389x submit_sqes_op() noexcept : scheduler_op(&do_handler) {}
330
331 static void do_handler(
332 void* owner, scheduler_op* base,
333 std::uint32_t /*bytes*/, std::uint32_t /*error*/) noexcept;
334 };
335
336 /// True between the first submitter of a batch posting `submit_op_`
337 /// and the dispatched op clearing the flag inside its handler. Read
338 /// and written only while holding `ring_mutex_`.
339 mutable bool submit_op_posted_ = false;
340
341 /// Single embedded `submit_sqes_op` instance, owned by the scheduler.
342 mutable submit_sqes_op submit_op_;
343
344 // drain_cqes_for tuning. The bound exists to avoid stalling a
345 // destructor if the kernel never returns a cancel completion (best-
346 // effort drain); 8 rounds * 1ms == 8ms worst case.
347 static constexpr int drain_cqes_max_rounds = 8;
348 static constexpr unsigned long drain_cqes_kick_ns = 1'000'000;
349
350 // ring_inited_ goes true once on first run/poll/submit. The init is
351 // deferred from the constructor so configure_single_threaded(true)
352 // can take effect before io_uring_queue_init_params chooses flags.
353 mutable std::once_flag ring_init_once_;
354 mutable bool ring_inited_ = false;
355
356 std::size_t do_one(long timeout_us);
357 void process_completions();
358 void drain_wakeup_eventfd() const noexcept;
359 void lazy_init_ring_unlocked() const;
360 };
361
362 inline
363 389x io_uring_scheduler::io_uring_scheduler(
364 389x capy::execution_context& ctx, int /*concurrency_hint*/)
365 {
366 // sched_ cannot be set in the member initialiser — `this` is not
367 // available there.
368 389x submit_op_.sched_ = this;
369
370 // Wire timer service. on_earliest_changed wakes the run loop so it
371 // recomputes its wait timeout.
372 389x timer_svc_ = &get_timer_service(ctx, *this);
373 389x timer_svc_->set_on_earliest_changed(
374 4586x timer_service::callback(this, [](void* p) {
375 4197x static_cast<io_uring_scheduler*>(p)->interrupt_reactor();
376 4197x }));
377
378 389x get_resolver_service(ctx, *this);
379 389x get_signal_service(ctx, *this);
380
381 // Ring init is deferred to lazy_init_ring() so configure_single_-
382 // threaded(true), which the io_context applies after construction,
383 // can take effect before io_uring_queue_init_params chooses flags.
384 389x }
385
386 inline
387 778x io_uring_scheduler::~io_uring_scheduler()
388 {
389 389x if (ring_inited_)
390 {
391 281x if (wakeup_eventfd_ >= 0)
392 281x ::close(wakeup_eventfd_);
393 281x ::io_uring_queue_exit(&ring_);
394 }
395 778x }
396
397 inline void
398 30250x io_uring_scheduler::lazy_init_ring() const
399 {
400 30250x std::call_once(ring_init_once_, [this] {
401 281x lazy_init_ring_unlocked();
402 281x });
403 30250x }
404
405 inline void
406 281x io_uring_scheduler::lazy_init_ring_unlocked() const
407 {
408 281x io_uring_params params{};
409 281x if (single_threaded_)
410 {
411 // SINGLE_ISSUER promises the kernel one submitter thread,
412 // letting it skip internal SQ locking. DEFER_TASKRUN tells
413 // it to batch task_work delivery at io_uring_enter(GETEVENTS)
414 // boundaries instead of interrupting the run thread via
415 // TWA_SIGNAL — eliminates cache pollution from mid-flight
416 // task_work and gives a meaningful single-threaded
417 // throughput uplift.
418 //
419 // Plan 3 disabled DEFER_TASKRUN defensively over a misread
420 // of the GETEVENTS contract. Plan 4a re-enabled it: liburing's
421 // io_uring_submit_and_wait_timeout always sets
422 // IORING_ENTER_GETEVENTS when wait_nr > 0, regardless of
423 // ts. Our run loop's only kernel-wait call passes wait_nr=1.
424 // Submit-only paths (cancel_and_flush, etc.) leave their
425 // CQEs queued until the leader's next GETEVENTS-bearing
426 // wait — benign.
427 //
428 // Multi-thread mode never sets these flags: SINGLE_ISSUER
429 // would be unsafe with multiple submitter threads.
430 //
431 // DEFER_TASKRUN is suppressed when SQPOLL is also enabled
432 // — the kernel rejects that combination with -EINVAL. The
433 // SQPOLL polling thread already delivers completions
434 // without TWA_SIGNAL interruption, so DEFER_TASKRUN's
435 // benefit is moot in that mode.
436 params.flags = IORING_SETUP_SINGLE_ISSUER;
437 if (!enable_sqpoll_)
438 params.flags |= IORING_SETUP_DEFER_TASKRUN;
439 }
440
441 281x if (enable_sqpoll_)
442 {
443 // SQPOLL forks a kernel thread that busy-polls the SQ ring;
444 // submission becomes a userspace-only memory store. Combines
445 // with SINGLE_ISSUER (the kernel accepts that pair) but NOT
446 // with DEFER_TASKRUN (kernel returns -EINVAL); the
447 // single_threaded_ branch above suppresses DEFER_TASKRUN
448 // when SQPOLL is also set. Idle timeout 0 means kernel
449 // default (1ms); we only forward when explicitly set so
450 // the kernel default is preserved.
451 params.flags |= IORING_SETUP_SQPOLL;
452 if (sq_thread_idle_ms_ != 0)
453 params.sq_thread_idle = sq_thread_idle_ms_;
454 if (sq_thread_cpu_ >= 0)
455 {
456 params.flags |= IORING_SETUP_SQ_AFF;
457 params.sq_thread_cpu = static_cast<__u32>(sq_thread_cpu_);
458 }
459 }
460
461 281x int rc = ::io_uring_queue_init_params(256, &ring_, &params);
462 281x if (rc < 0)
463 detail::throw_system_error(
464 make_err(-rc), "io_uring_queue_init_params");
465
466 281x wakeup_eventfd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
467 281x if (wakeup_eventfd_ < 0)
468 {
469 int errn = errno;
470 ::io_uring_queue_exit(&ring_);
471 detail::throw_system_error(make_err(errn), "eventfd");
472 }
473
474 // Register a one-shot poll on the wake eventfd. user_data nullptr
475 // is the sentinel recognized by process_completions, which calls
476 // drain_wakeup_eventfd() to consume the eventfd byte AND re-arm
477 // the poll. Plan 5a switched away from IORING_POLL_MULTISHOT
478 // because multishot ops can silently terminate (e.g. under CQ
479 // pressure), and we don't observe the termination — leaving the
480 // wake mechanism dead and the leader stuck in kernel wait. One-
481 // shot rearm-on-fire is fail-fast: every wake event is paired
482 // with an explicit rearm, so a missed rearm would manifest
483 // immediately as the next wake being lost (test-visible).
484 281x ::io_uring_sqe* sqe = ::io_uring_get_sqe(&ring_);
485 281x if (!sqe)
486 {
487 ::close(wakeup_eventfd_);
488 ::io_uring_queue_exit(&ring_);
489 detail::throw_system_error(
490 make_err(ENOSPC), "io_uring_get_sqe (wakeup)");
491 }
492 // Multishot poll: fires a CQE on each eventfd POLLIN without
493 // consuming the SQE. Avoids the re-arm hazard of one-shot poll
494 // (where drain_wakeup_eventfd's get_sqe could return null on a
495 // full SQ, leaving no SQE to detect future wakes).
496 281x ::io_uring_prep_poll_multishot(sqe, wakeup_eventfd_, POLLIN);
497 281x ::io_uring_sqe_set_data(sqe, nullptr);
498 281x int submit_rc = ::io_uring_submit(&ring_);
499 281x if (submit_rc < 0)
500 {
501 ::close(wakeup_eventfd_);
502 ::io_uring_queue_exit(&ring_);
503 detail::throw_system_error(
504 make_err(-submit_rc), "io_uring_submit (wakeup)");
505 }
506
507 281x ring_inited_ = true;
508 281x }
509
510 inline void
511 389x io_uring_scheduler::shutdown()
512 {
513 389x stopped_.store(true, std::memory_order_release);
514
515 // Drain posted ops, calling destroy() on each so embedded handles
516 // (coroutine frames, error_code outputs) get torn down rather
517 // than leaked. Mirrors reactor_scheduler::shutdown_drain.
518 //
519 // Service shutdown order (driven by capy::execution_context):
520 // each socket/acceptor service::shutdown() submits a cancel SQE
521 // for every live impl. The CQEs that result either land in
522 // completed_ops_ (drained here as op->destroy()) or stay in the
523 // kernel ring; ~scheduler's io_uring_queue_exit cleans the
524 // latter up at process teardown. Self-referential impl_ptr
525 // cycles (e.g. multishot acceptor's multi_op_->impl_ptr) are
526 // broken explicitly inside each service before the scheduler
527 // shutdown runs.
528 389x lock_type lock(dispatch_mutex_);
529 406x while (auto* op = completed_ops_.pop())
530 {
531 17x lock.unlock();
532 17x op->destroy();
533 17x lock.lock();
534 17x }
535 389x cond_.notify_all();
536 389x }
537
538 inline void
539 234x io_uring_scheduler::stop()
540 {
541 234x stopped_.store(true, std::memory_order_release);
542 {
543 234x lock_type lock(dispatch_mutex_);
544 234x cond_.notify_all();
545 234x }
546 // Force-wake unconditionally — bypass interrupt_reactor's CAS
547 // coalescing. A dropped wake here leaves the leader blocked
548 // forever in submit_and_wait_timeout (no further CQE will
549 // arrive after stop()). With multishot poll on wakeup_eventfd_,
550 // this write reliably produces a CQE.
551 234x if (ring_inited_)
552 {
553 233x std::uint64_t v = 1;
554 [[maybe_unused]] auto r =
555 233x ::write(wakeup_eventfd_, &v, sizeof(v));
556 }
557 234x }
558
559 inline bool
560 12x io_uring_scheduler::stopped() const noexcept
561 {
562 12x return stopped_.load(std::memory_order_acquire);
563 }
564
565 inline void
566 58x io_uring_scheduler::restart()
567 {
568 58x stopped_.store(false, std::memory_order_release);
569 58x }
570
571 inline void
572 46714x io_uring_scheduler::work_started() noexcept
573 {
574 46714x outstanding_work_.fetch_add(1, std::memory_order_relaxed);
575 46714x }
576
577 inline void
578 59683x io_uring_scheduler::work_finished() noexcept
579 {
580 119366x if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
581 226x stop();
582 59683x }
583
584 inline void
585 16817x io_uring_scheduler::interrupt_reactor() const noexcept
586 {
587 // Skip if the ring hasn't been initialised yet — there's no leader
588 // to wake and no eventfd to write.
589 16817x if (!ring_inited_)
590 return;
591
592 // Single-thread: the user's coroutines run on the leader thread,
593 // so when interrupt_reactor is called from user code the leader
594 // is not in kernel wait — there is nothing to wake.
595 16817x if (single_threaded_)
596 return;
597
598 // Multi-thread: write the eventfd unconditionally. CAS-coalescing
599 // is unsafe here because the leader's Phase 2 in do_one waits
600 // indefinitely for a CQE; a dropped wake leaves the leader
601 // blocked forever when there is no other CQE-producing activity.
602 // Multishot poll on wakeup_eventfd_ delivers a CQE for every
603 // write, so multiple writes in flight produce multiple CQEs
604 // (drained together by drain_wakeup_eventfd's single read of
605 // the eventfd counter).
606 16817x std::uint64_t v = 1;
607 16817x [[maybe_unused]] auto r = ::write(wakeup_eventfd_, &v, sizeof(v));
608 16817x wakeup_armed_.store(true, std::memory_order_release);
609 }
610
611 inline void
612 16351x io_uring_scheduler::drain_wakeup_eventfd() const noexcept
613 {
614 std::uint64_t v;
615 16351x [[maybe_unused]] auto r = ::read(wakeup_eventfd_, &v, sizeof(v));
616
617 // Multishot poll never needs re-arming. The poll-add was queued
618 // once at lazy_init_ring with IORING_POLL_ADD_MULTI; each eventfd
619 // POLLIN produces a CQE without consuming the SQE.
620 //
621 // Release pairs with the acquire side of interrupt_reactor's CAS:
622 // a posting thread that observes wakeup_armed_ == false from this
623 // store will see the eventfd already drained by the leader.
624 16351x wakeup_armed_.store(false, std::memory_order_release);
625 16351x }
626
627 inline void
628 378x io_uring_scheduler::post(std::coroutine_handle<> h) const
629 {
630 struct post_handler final : scheduler_op
631 {
632 std::coroutine_handle<> h_;
633 378x explicit post_handler(std::coroutine_handle<> h) noexcept : h_(h) {}
634
635 375x void operator()() override
636 {
637 375x auto saved = h_;
638 375x delete this;
639 std::atomic_thread_fence(std::memory_order_acquire);
640 375x saved.resume();
641 375x }
642
643 3x void destroy() override
644 {
645 3x auto saved = h_;
646 3x delete this;
647 3x if (saved)
648 3x saved.destroy();
649 3x }
650 };
651
652 378x auto* op = new post_handler(h);
653 378x lazy_init_ring();
654 378x outstanding_work_.fetch_add(1, std::memory_order_relaxed);
655 bool wake_leader;
656 {
657 378x lock_type lock(dispatch_mutex_);
658 378x completed_ops_.push(op);
659 378x wake_leader = task_running_;
660 378x if (!wake_leader)
661 378x cond_.notify_one();
662 378x }
663 378x if (wake_leader)
664 interrupt_reactor();
665 378x }
666
667 inline void
668 12608x io_uring_scheduler::post(scheduler_op* op) const
669 {
670 12608x lazy_init_ring();
671 12608x outstanding_work_.fetch_add(1, std::memory_order_relaxed);
672 bool wake_leader;
673 {
674 12608x lock_type lock(dispatch_mutex_);
675 12608x completed_ops_.push(op);
676 12608x wake_leader = task_running_;
677 12608x if (!wake_leader)
678 8428x cond_.notify_one();
679 12608x }
680 12608x if (wake_leader)
681 4180x interrupt_reactor();
682 12608x }
683
684 // Thread-local stack of frames for io_uring schedulers being run on the
685 // current thread. Holds the running-scheduler pointer (for
686 // running_in_this_thread reporting) and the inline completion budget
687 // used by the speculative non-blocking I/O path (plan 5j). Nesting
688 // stacks frames via prev_ so each scheduler gets its own budget.
689 struct io_uring_scheduler_frame
690 {
691 io_uring_scheduler const* sched;
692 io_uring_scheduler_frame* prev;
693 int inline_budget;
694 int inline_budget_max;
695 };
696
697 inline thread_local io_uring_scheduler_frame* tl_running_scheduler_frame_ = nullptr;
698
699 // Default inline budget. Matches reactor's initial budget (2). Adaptive
700 // ramp-up to a max is intentionally NOT implemented yet — keep it simple
701 // for plan 5j and revisit if benches show fairness issues.
702 inline constexpr int io_uring_inline_budget_initial = 2;
703 inline constexpr int io_uring_inline_budget_max = 16;
704
705 /// RAII guard: pushes a frame onto the thread's running-scheduler stack
706 /// on construction, restores the previous on destruction. Used by
707 /// run/run_one/wait_one/poll/poll_one to mark the running thread and
708 /// hold a fresh inline budget for speculative completions.
709 struct io_uring_run_guard
710 {
711 io_uring_scheduler_frame frame_;
712
713 233x explicit io_uring_run_guard(io_uring_scheduler const* self) noexcept
714 233x : frame_{self, tl_running_scheduler_frame_,
715 io_uring_inline_budget_initial,
716 io_uring_inline_budget_max}
717 {
718 233x tl_running_scheduler_frame_ = &frame_;
719 233x }
720
721 233x ~io_uring_run_guard() noexcept
722 {
723 233x tl_running_scheduler_frame_ = frame_.prev;
724 233x }
725 };
726
727 inline bool
728 584x io_uring_scheduler::running_in_this_thread() const noexcept
729 {
730 584x for (auto* f = tl_running_scheduler_frame_; f != nullptr; f = f->prev)
731 {
732 209x if (f->sched == this)
733 209x return true;
734 }
735 375x return false;
736 }
737
738 inline void
739 50912x io_uring_scheduler::reset_inline_budget() const noexcept
740 {
741 50912x for (auto* f = tl_running_scheduler_frame_; f != nullptr; f = f->prev)
742 {
743 50912x if (f->sched == this)
744 {
745 50912x f->inline_budget = f->inline_budget_max;
746 50912x return;
747 }
748 }
749 }
750
751 inline bool
752 573746x io_uring_scheduler::try_consume_inline_budget() const noexcept
753 {
754 573746x for (auto* f = tl_running_scheduler_frame_; f != nullptr; f = f->prev)
755 {
756 573746x if (f->sched == this)
757 {
758 573746x if (f->inline_budget > 0)
759 {
760 540011x --f->inline_budget;
761 540011x return true;
762 }
763 33735x return false;
764 }
765 }
766 return false;
767 }
768
769 inline std::size_t
770 221x io_uring_scheduler::run()
771 {
772 221x lazy_init_ring();
773 442x if (outstanding_work_.load(std::memory_order_acquire) == 0)
774 {
775 stop();
776 return 0;
777 }
778
779 221x io_uring_run_guard guard(this);
780 221x std::size_t n = 0;
781 for (;;)
782 {
783 51113x std::size_t r = do_one(-1);
784 51113x if (r)
785 {
786 50892x if (n != (std::numeric_limits<std::size_t>::max)())
787 50892x ++n;
788 50892x continue;
789 }
790 444x if (outstanding_work_.load(std::memory_order_acquire) == 0 ||
791 2x stopped_.load(std::memory_order_acquire))
792 221x break;
793 // do_one returned 0 but work still outstanding (e.g. timer
794 // expiry dispatched async work). Continue.
795 50892x }
796 221x return n;
797 221x }
798
799 inline std::size_t
800 io_uring_scheduler::run_one()
801 {
802 lazy_init_ring();
803 if (outstanding_work_.load(std::memory_order_acquire) == 0)
804 {
805 stop();
806 return 0;
807 }
808 io_uring_run_guard guard(this);
809 return do_one(-1);
810 }
811
812 inline std::size_t
813 5x io_uring_scheduler::wait_one(long usec)
814 {
815 5x lazy_init_ring();
816 10x if (outstanding_work_.load(std::memory_order_acquire) == 0)
817 {
818 3x stop();
819 3x return 0;
820 }
821 2x io_uring_run_guard guard(this);
822 2x return do_one(usec);
823 2x }
824
825 inline std::size_t
826 12x io_uring_scheduler::poll()
827 {
828 12x lazy_init_ring();
829 24x if (outstanding_work_.load(std::memory_order_acquire) == 0)
830 {
831 2x stop();
832 2x return 0;
833 }
834 10x io_uring_run_guard guard(this);
835 10x std::size_t n = 0;
836 29x while (do_one(0))
837 {
838 19x if (n != (std::numeric_limits<std::size_t>::max)())
839 19x ++n;
840 }
841 10x return n;
842 10x }
843
844 inline std::size_t
845 io_uring_scheduler::poll_one()
846 {
847 lazy_init_ring();
848 if (outstanding_work_.load(std::memory_order_acquire) == 0)
849 {
850 stop();
851 return 0;
852 }
853 io_uring_run_guard guard(this);
854 return do_one(0);
855 }
856
857 inline std::size_t
858 51144x io_uring_scheduler::do_one(long timeout_us)
859 {
860 // Leader-follower: only one thread at a time may call
861 // io_uring_submit_and_wait_timeout on a shared ring (liburing's
862 // userspace head/tail bookkeeping is not thread-safe). Other
863 // threads either dispatch ready ops from completed_ops_ or wait
864 // on cond_ until the leader returns from the kernel.
865 51144x if (stopped_.load(std::memory_order_acquire))
866 224x return 0;
867
868 // submit_sqes_op only pumps the ring once per SQE batch. If the user
869 // keeps a non-empty completed_ops_ (e.g. timer with 0ns expiry as a
870 // yield primitive), the leader-phase kernel pass below never runs
871 // and CQEs accumulate in the ring forever — sub_request's read CQE
872 // never gets drained and the bench spins. submit_and_get_events
873 // (not plain submit) is required because IORING_SETUP_DEFER_TASKRUN
874 // gates task work on IORING_ENTER_GETEVENTS.
875 //
876 // Gate the kernel pump on there being io_uring-specific work. The
877 // check is performed under ring_mutex_ so a concurrent cross-thread
878 // submitter cannot prep an SQE that we then race past — both this
879 // path and io_uring_submit_op acquire ring_mutex_ before touching
880 // the ring. When all three sources are empty (no io_uring ops in
881 // flight needing DEFER_TASKRUN GETEVENTS, no userspace-pending
882 // SQEs, no kernel-ready CQEs) a kernel entry would have no work —
883 // saves ~8 pp of cycles on the no-I/O microbenchmark
884 // (io_context:single_threaded). We deliberately do NOT include
885 // outstanding_work_ here, because that counter mixes coroutine
886 // posts (in completed_ops_) with io_uring work — IOCTX has many
887 // coroutine posts and no io_uring work, and the kernel pump there
888 // is pure overhead.
889 50920x if (ring_inited_)
890 {
891 50920x lock_type ring_lock(ring_mutex_);
892 50920x if (io_uring_inflight_.load(std::memory_order_acquire) != 0
893 446x || ::io_uring_sq_ready(&ring_) != 0
894 51366x || ::io_uring_cq_ready(&ring_) != 0)
895 {
896 50623x ::io_uring_submit_and_get_events(&ring_);
897 50623x process_completions();
898 }
899 50920x }
900
901 // Drain expired timers eagerly, for the same reason the kernel CQE
902 // pump runs unconditionally above: when completed_ops_ stays non-
903 // empty (e.g. continuous loopback I/O whose CQEs land in the top-
904 // of-do_one process_completions call), the leader-wait branch
905 // below — the only other place process_expired() runs — is never
906 // reached. Without this, stopper-timer-based shutdowns (and any
907 // other timer dependent on a busy I/O loop yielding) deadlock.
908 //
909 // empty() is a single relaxed-acquire atomic load on
910 // timer_service::cached_nearest_ns_ (lock-free, no clock_gettime).
911 // Skipping process_expired() when no timer is registered avoids the
912 // mutex + clock_gettime hot-path cost that dominates IOCTX cycles
913 // (~25 pp on io_context:single_threaded). When a timer IS
914 // registered the call runs exactly as before, preserving the
915 // deadlock fix this guard was originally written to address.
916 50920x if (!timer_svc_->empty())
917 50121x timer_svc_->process_expired();
918
919 50920x lock_type lock(dispatch_mutex_);
920 for (;;)
921 {
922 55099x if (stopped_.load(std::memory_order_acquire))
923 return 0;
924
925 55099x if (auto* op = completed_ops_.pop())
926 {
927 // Hand off any remaining queued work to a follower so we
928 // dispatch in parallel.
929 50912x if (!completed_ops_.empty())
930 28493x cond_.notify_one();
931 50912x lock.unlock();
932 // Speculative follow-ups in the handler share this budget.
933 50912x reset_inline_budget();
934 50912x (*op)();
935 50912x work_finished();
936 50912x return 1;
937 }
938
939 8374x if (outstanding_work_.load(std::memory_order_acquire) == 0)
940 return 0;
941
942 4187x if (task_running_)
943 {
944 // Another thread holds leadership; either return (poll)
945 // or wait for it to deliver work / release leadership.
946 if (timeout_us == 0)
947 return 0;
948 if (timeout_us < 0)
949 cond_.wait(lock);
950 else
951 {
952 cond_.wait_for(
953 lock, std::chrono::microseconds(timeout_us));
954 // wait_one honoured its timeout; if nothing arrived,
955 // return rather than re-arm.
956 if (completed_ops_.empty() &&
957 !stopped_.load(std::memory_order_acquire))
958 return 0;
959 }
960 continue;
961 }
962
963 // Become the leader: run the kernel poll. We drop the lock
964 // for the blocking wait, then take it back to release
965 // leadership and wake any follower that should pick up new
966 // work.
967 4187x __kernel_timespec ts{};
968 4187x __kernel_timespec* ts_ptr = nullptr;
969 4187x auto next_expiry = timer_svc_->nearest_expiry();
970 4187x auto now = std::chrono::steady_clock::now();
971
972 4187x if (timeout_us == 0)
973 {
974 7x ts.tv_sec = 0;
975 7x ts.tv_nsec = 0;
976 7x ts_ptr = &ts;
977 }
978 4180x else if (next_expiry != timer_service::time_point::max())
979 {
980 auto delta_ns =
981 4176x std::chrono::duration_cast<std::chrono::nanoseconds>(
982 4176x next_expiry - now)
983 4176x .count();
984 4176x if (delta_ns < 0) delta_ns = 0;
985 4176x ts.tv_sec = delta_ns / 1'000'000'000;
986 4176x ts.tv_nsec = delta_ns % 1'000'000'000;
987 4176x ts_ptr = &ts;
988 }
989 4x else if (timeout_us > 0)
990 {
991 1x ts.tv_sec = timeout_us / 1'000'000;
992 1x ts.tv_nsec = (timeout_us % 1'000'000) * 1000;
993 1x ts_ptr = &ts;
994 }
995 else
996 {
997 // run() with no pending timers: cap the kernel wait at 1s
998 // so the leader periodically re-checks state. Defense in
999 // depth against a lost wakeup (e.g. multishot poll on the
1000 // wakeup eventfd terminates and the re-arm SQE doesn't
1001 // reach the kernel in time). Worst case: one extra
1002 // wake-up per io_context per second when truly idle.
1003 3x ts.tv_sec = 1;
1004 3x ts.tv_nsec = 0;
1005 3x ts_ptr = &ts;
1006 }
1007
1008 4187x task_running_ = true;
1009 4187x lock.unlock();
1010
1011 // Three-phase kernel wait, matching Boost.Asio's
1012 // io_uring_service::run pattern. ring_mutex_ is held briefly
1013 // to push pending SQEs and to drain CQEs, but NOT during
1014 // the blocking io_uring_wait_cqe_timeout. Cross-thread
1015 // submitters (io_uring_submit_op, cancel paths) can take
1016 // ring_mutex_ during the wait and prep new SQEs without
1017 // blocking on the leader; their wake eventfd write fires the
1018 // multishot poll and returns the leader from wait_cqe_timeout
1019 // promptly.
1020 //
1021 // Phase 1 — submit any pending SQEs to the kernel.
1022 {
1023 4187x lock_type ring_lock(ring_mutex_);
1024 4187x ::io_uring_submit(&ring_);
1025 4187x }
1026
1027 // Phase 2 — wait for at least one CQE without holding the
1028 // mutex. Multi-thread `io_uring_enter` is permitted without
1029 // SINGLE_ISSUER. wait_cqe_timeout only peeks the CQ ring;
1030 // head advancement happens under the mutex in
1031 // process_completions below.
1032 4187x ::io_uring_cqe* cqe = nullptr;
1033 4187x int rc = ::io_uring_wait_cqe_timeout(&ring_, &cqe, ts_ptr);
1034
1035 // Phase 3 — drain CQEs under the mutex.
1036 {
1037 4187x lock_type ring_lock(ring_mutex_);
1038 4187x if (rc == 0 || rc == -ETIME || rc == -EINTR)
1039 4187x process_completions();
1040 4187x }
1041
1042 4187x if (rc < 0 && rc != -ETIME && rc != -EINTR)
1043 {
1044 // Restore state before propagating so followers don't
1045 // deadlock waiting for a leader that never returns.
1046 lock.lock();
1047 task_running_ = false;
1048 cond_.notify_all();
1049 detail::throw_system_error(
1050 make_err(-rc), "io_uring_wait_cqe_timeout");
1051 }
1052
1053 4187x if (!timer_svc_->empty())
1054 4183x timer_svc_->process_expired();
1055
1056 4187x lock.lock();
1057 4187x task_running_ = false;
1058 4187x cond_.notify_all();
1059
1060 // For poll() / wait_one() we honour the timeout: one kernel
1061 // pass is the contract. If still nothing dispatchable, exit.
1062 // For run() (timeout < 0) keep looping until work arrives or
1063 // someone calls stop().
1064 4187x if (timeout_us >= 0 && completed_ops_.empty())
1065 8x return 0;
1066 4179x }
1067 50920x }
1068
1069 inline void
1070 54810x io_uring_scheduler::process_completions()
1071 {
1072 unsigned head;
1073 ::io_uring_cqe* cqe;
1074 54810x unsigned consumed = 0;
1075
1076 // Collect completed I/O ops locally; splice into completed_ops_
1077 // after the loop so do_one dispatches them one at a time.
1078 54810x op_queue local_ops;
1079
1080 54810x std::int64_t inflight_dec = 0;
1081 87298x io_uring_for_each_cqe(&ring_, head, cqe)
1082 {
1083 32488x void* ud = io_uring_cqe_get_data(cqe);
1084 32488x if (ud == nullptr)
1085 {
1086 // Wakeup eventfd CQE: drain the eventfd byte. Not counted
1087 // by io_uring_inflight_; we never incremented for the
1088 // wakeup multishot SQE (its progress doesn't depend on
1089 // userspace getevents).
1090 16351x drain_wakeup_eventfd();
1091 // If multishot terminated (kernel dropped under memory
1092 // pressure or similar), re-arm. Each CQE except the last
1093 // sets IORING_CQE_F_MORE.
1094 16351x if ((cqe->flags & IORING_CQE_F_MORE) == 0)
1095 {
1096 ::io_uring_sqe* re = ::io_uring_get_sqe(&ring_);
1097 if (!re)
1098 {
1099 ::io_uring_submit(&ring_);
1100 re = ::io_uring_get_sqe(&ring_);
1101 }
1102 if (re)
1103 {
1104 ::io_uring_prep_poll_multishot(
1105 re, wakeup_eventfd_, POLLIN);
1106 ::io_uring_sqe_set_data(re, nullptr);
1107 }
1108 }
1109 }
1110 16137x else if (ud == &cancel_sentinel_)
1111 {
1112 // CQE for an ASYNC_CANCEL op — ignore; the actual op's
1113 // CQE arrives separately and is dispatched via cqe_func.
1114 // Cancels are one-shot, no F_MORE, decrement inflight.
1115 7971x ++inflight_dec;
1116 }
1117 else
1118 {
1119 8166x auto* iop = static_cast<io_uring_op*>(ud);
1120 8166x (*iop->cqe_func)(iop, cqe->res, cqe->flags, local_ops);
1121 // Decrement inflight on the terminal CQE only — multishot
1122 // ops (acceptor) hold the SQE alive across F_MORE CQEs and
1123 // free it only when F_MORE is cleared.
1124 8166x if ((cqe->flags & IORING_CQE_F_MORE) == 0)
1125 4212x ++inflight_dec;
1126 }
1127 32488x ++consumed;
1128 }
1129 54810x if (inflight_dec)
1130 11987x io_uring_inflight_.fetch_sub(
1131 inflight_dec, std::memory_order_acq_rel);
1132
1133 54810x if (consumed)
1134 16266x io_uring_cq_advance(&ring_, consumed);
1135
1136 // Caller holds ring_mutex_. Take dispatch_mutex_ briefly to
1137 // splice locally-collected ops onto the global queue (lock order
1138 // ring_mutex_ -> dispatch_mutex_).
1139 54810x if (!local_ops.empty())
1140 {
1141 4205x lock_type lock(dispatch_mutex_);
1142 4205x completed_ops_.splice(local_ops);
1143 // Wake any follower waiting on cond_; it'll pop and dispatch.
1144 4205x cond_.notify_one();
1145 4205x }
1146 54810x }
1147
1148 inline void
1149 10x io_uring_scheduler::submit_sqes_op::do_handler(
1150 void* owner, scheduler_op* base,
1151 std::uint32_t /*bytes*/, std::uint32_t /*error*/) noexcept
1152 {
1153 10x if (owner == nullptr)
1154 10x return; // shutdown drain — nothing to do; SQE storage is
1155 // kernel-mapped and discarded by io_uring_queue_exit.
1156
1157 auto* self = static_cast<submit_sqes_op*>(base);
1158 auto* sched = self->sched_;
1159
1160 io_uring_scheduler::lock_type ring_lock(sched->ring_mutex_);
1161 sched->submit_op_posted_ = false;
1162 ::io_uring_submit_and_get_events(&sched->ring_);
1163 sched->process_completions();
1164 }
1165
1166 inline void
1167 101x io_uring_scheduler::submit_cancel_by_user_data(io_uring_op* target) noexcept
1168 {
1169 101x lazy_init_ring();
1170 // Wake the leader (if any) so its submit_and_wait_timeout returns
1171 // and releases ring_mutex_; otherwise we'd block here until the
1172 // next CQE arrives organically. Cancellation is best-effort if
1173 // the SQ stays full after one flush — the op completes on its
1174 // own and reports cancelled via the in-flight `cancelled` flag.
1175 101x interrupt_reactor();
1176 101x lock_type lock(ring_mutex_);
1177 101x io_uring_sqe* sqe = io_uring_get_sqe(&ring_);
1178 101x if (!sqe)
1179 {
1180 io_uring_submit(&ring_);
1181 sqe = io_uring_get_sqe(&ring_);
1182 }
1183 101x if (!sqe)
1184 return;
1185
1186 101x io_uring_prep_cancel(sqe, target, 0);
1187 101x io_uring_sqe_set_data(sqe, &cancel_sentinel_);
1188 101x inflight_inc();
1189 101x }
1190
1191 inline void
1192 78x io_uring_scheduler::submit_cancel_by_fd(int fd) noexcept
1193 {
1194 78x lazy_init_ring();
1195 78x interrupt_reactor();
1196 78x lock_type lock(ring_mutex_);
1197 78x io_uring_sqe* sqe = io_uring_get_sqe(&ring_);
1198 78x if (!sqe)
1199 {
1200 io_uring_submit(&ring_);
1201 sqe = io_uring_get_sqe(&ring_);
1202 }
1203 78x if (!sqe)
1204 return;
1205
1206 78x io_uring_prep_cancel_fd(sqe, fd, IORING_ASYNC_CANCEL_ALL);
1207 78x io_uring_sqe_set_data(sqe, &cancel_sentinel_);
1208 78x inflight_inc();
1209 78x }
1210
1211 inline void
1212 101x io_uring_op::request_cancel() noexcept
1213 {
1214 101x cancelled.store(true, std::memory_order_release);
1215 // Skip the cancel SQE if we never linked an SQE to this op — the
1216 // bypass path in the caller will see cancelled=true and complete
1217 // synchronously without a kernel round-trip.
1218 101x if (sched_ && sqe_set.load(std::memory_order_acquire))
1219 101x sched_->submit_cancel_by_user_data(this);
1220 101x }
1221
1222 inline void
1223 8176x io_uring_scheduler::cancel_and_flush(int fd) noexcept
1224 {
1225 8176x lazy_init_ring();
1226 8176x interrupt_reactor();
1227 8176x lock_type lock(ring_mutex_);
1228 8176x io_uring_sqe* sqe = io_uring_get_sqe(&ring_);
1229 8176x if (!sqe)
1230 {
1231 io_uring_submit(&ring_);
1232 sqe = io_uring_get_sqe(&ring_);
1233 }
1234 8176x if (sqe)
1235 {
1236 8176x io_uring_prep_cancel_fd(sqe, fd, IORING_ASYNC_CANCEL_ALL);
1237 8176x io_uring_sqe_set_data(sqe, &cancel_sentinel_);
1238 8176x inflight_inc();
1239 }
1240 // Flush while fd is still open so the kernel resolves the file
1241 // from the fd number before the caller closes and recycles it.
1242 8176x io_uring_submit(&ring_);
1243 8176x }
1244
1245 inline void
1246 85x io_uring_scheduler::drain_cqes_for(io_uring_op* target) noexcept
1247 {
1248 85x lazy_init_ring();
1249 // Submit a cancel by user_data so the kernel returns CQEs for
1250 // the target promptly, then iterate the CQ ring and consume
1251 // every CQE that matches `target`. ring_mutex_ serializes against
1252 // the leader's kernel wait and any concurrent cancel path; the
1253 // interrupt_reactor() ensures the leader returns promptly so we
1254 // can take the mutex.
1255 85x interrupt_reactor();
1256 {
1257 85x lock_type lock(ring_mutex_);
1258 85x if (auto* sqe = io_uring_get_sqe(&ring_))
1259 {
1260 85x io_uring_prep_cancel(sqe, target, 0);
1261 85x io_uring_sqe_set_data(sqe, &cancel_sentinel_);
1262 85x inflight_inc();
1263 }
1264 85x io_uring_submit(&ring_);
1265 85x }
1266
1267 // Loop a few rounds: cancel SQE submission, then drain CQEs.
1268 // Bounded loop avoids stalls if the kernel never returns a
1269 // cancel completion — best-effort.
1270 89x for (int rounds = 0; rounds < drain_cqes_max_rounds; ++rounds)
1271 {
1272 89x lock_type lock(ring_mutex_);
1273
1274 unsigned head;
1275 ::io_uring_cqe* cqe;
1276 89x unsigned consumed = 0;
1277 89x bool saw_target = false;
1278
1279 697x io_uring_for_each_cqe(&ring_, head, cqe)
1280 {
1281 608x void* ud = io_uring_cqe_get_data(cqe);
1282 608x if (ud == target)
1283 {
1284 81x saw_target = true;
1285 // Don't dispatch — caller is destructing target;
1286 // just consume so the CQE doesn't dangle.
1287 }
1288 // Other CQEs are intentionally NOT dispatched here. They
1289 // may belong to ops freed by sibling teardowns (other
1290 // acceptors / sockets), and dispatching would UAF. The
1291 // next normal run-loop iteration will handle them; the
1292 // io_context's destructor sequence runs services'
1293 // shutdowns before ~scheduler so any still-live ops get
1294 // a chance to drain through their own paths first.
1295 608x ++consumed;
1296 }
1297 89x if (consumed)
1298 {
1299 85x io_uring_cq_advance(&ring_, consumed);
1300 85x if (saw_target)
1301 81x break;
1302 4x continue;
1303 }
1304
1305 // Nothing in the CQ — kick the kernel briefly. Hold
1306 // ring_mutex_ across the wait so we don't race with the
1307 // run-loop leader.
1308 4x __kernel_timespec ts{
1309 0, static_cast<long long>(drain_cqes_kick_ns)};
1310 4x ::io_uring_cqe* one = nullptr;
1311 4x int rc = ::io_uring_submit_and_wait_timeout(
1312 &ring_, &one, 1, &ts, nullptr);
1313 4x if (rc < 0 && rc != -ETIME && rc != -EINTR)
1314 break;
1315 4x if (rc == -ETIME)
1316 4x break;
1317 89x }
1318 85x }
1319
1320 } // namespace boost::corosio::detail
1321
1322 #endif // BOOST_COROSIO_HAS_IO_URING
1323
1324 #endif // BOOST_COROSIO_NATIVE_DETAIL_IO_URING_IO_URING_SCHEDULER_HPP
1325