include/boost/corosio/native/detail/io_uring/io_uring_scheduler.hpp
80.6% Lines (369/458)
93.8% List of functions (45/48)
Functions (48)
Function
Calls
Lines
Blocks
boost::corosio::detail::io_uring_scheduler::ring()
:101
4293x
100.0%
100.0%
boost::corosio::detail::io_uring_scheduler::dispatch_mutex() const
:108
33735x
100.0%
100.0%
boost::corosio::detail::io_uring_scheduler::ring_mutex() const
:111
4293x
100.0%
100.0%
boost::corosio::detail::io_uring_scheduler::submit_op_posted_exchange(bool) const
:133
4293x
100.0%
100.0%
boost::corosio::detail::io_uring_scheduler::submit_op_ref() const
:141
114x
100.0%
100.0%
boost::corosio::detail::io_uring_scheduler::inflight_inc() const
:150
12733x
100.0%
100.0%
boost::corosio::detail::io_uring_scheduler::push_completed_locked(boost::corosio::detail::scheduler_op*) const
:229
33735x
100.0%
100.0%
boost::corosio::detail::io_uring_scheduler::configure_single_threaded(bool)
:235
1x
100.0%
100.0%
boost::corosio::detail::io_uring_scheduler::configure_sqpoll(bool, unsigned int, int)
:260
0
0.0%
0.0%
boost::corosio::detail::io_uring_scheduler::is_single_threaded() const
:269
1x
100.0%
100.0%
boost::corosio::detail::io_uring_scheduler::submit_sqes_op::submit_sqes_op()
:329
389x
100.0%
100.0%
boost::corosio::detail::io_uring_scheduler::io_uring_scheduler(boost::capy::execution_context&, int)
:363
389x
100.0%
74.0%
boost::corosio::detail::io_uring_scheduler::io_uring_scheduler(boost::capy::execution_context&, int)::{lambda(void*)#1}::operator()(void*) const
:374
4197x
100.0%
100.0%
boost::corosio::detail::io_uring_scheduler::~io_uring_scheduler()
:387
778x
100.0%
100.0%
boost::corosio::detail::io_uring_scheduler::lazy_init_ring() const
:398
30250x
100.0%
100.0%
boost::corosio::detail::io_uring_scheduler::lazy_init_ring() const::{lambda()#1}::operator()() const
:400
281x
100.0%
100.0%
boost::corosio::detail::io_uring_scheduler::lazy_init_ring_unlocked() const
:406
281x
42.1%
38.0%
boost::corosio::detail::io_uring_scheduler::shutdown()
:511
389x
100.0%
87.0%
boost::corosio::detail::io_uring_scheduler::stop()
:539
234x
100.0%
90.0%
boost::corosio::detail::io_uring_scheduler::stopped() const
:560
12x
100.0%
100.0%
boost::corosio::detail::io_uring_scheduler::restart()
:566
58x
100.0%
100.0%
boost::corosio::detail::io_uring_scheduler::work_started()
:572
46714x
100.0%
100.0%
boost::corosio::detail::io_uring_scheduler::work_finished()
:578
59683x
100.0%
100.0%
boost::corosio::detail::io_uring_scheduler::interrupt_reactor() const
:585
16817x
75.0%
67.0%
boost::corosio::detail::io_uring_scheduler::drain_wakeup_eventfd() const
:612
16351x
100.0%
100.0%
boost::corosio::detail::io_uring_scheduler::post(std::__n4861::coroutine_handle<void>) const
:628
378x
100.0%
93.0%
boost::corosio::detail::io_uring_scheduler::post(std::__n4861::coroutine_handle<void>) const::post_handler::post_handler(std::__n4861::coroutine_handle<void>)
:633
378x
100.0%
100.0%
boost::corosio::detail::io_uring_scheduler::post(std::__n4861::coroutine_handle<void>) const::post_handler::operator()()
:635
375x
100.0%
100.0%
boost::corosio::detail::io_uring_scheduler::post(std::__n4861::coroutine_handle<void>) const::post_handler::destroy()
:643
3x
94.4%
100.0%
boost::corosio::detail::io_uring_scheduler::post(boost::corosio::detail::scheduler_op*) const
:668
12608x
100.0%
100.0%
boost::corosio::detail::io_uring_run_guard::io_uring_run_guard(boost::corosio::detail::io_uring_scheduler const*)
:713
233x
100.0%
100.0%
boost::corosio::detail::io_uring_run_guard::~io_uring_run_guard()
:721
233x
100.0%
100.0%
boost::corosio::detail::io_uring_scheduler::running_in_this_thread() const
:728
584x
100.0%
86.0%
boost::corosio::detail::io_uring_scheduler::reset_inline_budget() const
:739
50912x
100.0%
83.0%
boost::corosio::detail::io_uring_scheduler::try_consume_inline_budget() const
:752
573746x
87.5%
78.0%
boost::corosio::detail::io_uring_scheduler::run()
:770
221x
88.9%
72.0%
boost::corosio::detail::io_uring_scheduler::run_one()
:800
0
0.0%
0.0%
boost::corosio::detail::io_uring_scheduler::wait_one(long)
:813
5x
100.0%
73.0%
boost::corosio::detail::io_uring_scheduler::poll()
:826
12x
100.0%
77.0%
boost::corosio::detail::io_uring_scheduler::poll_one()
:845
0
0.0%
0.0%
boost::corosio::detail::io_uring_scheduler::do_one(long)
:858
51144x
80.2%
64.0%
boost::corosio::detail::io_uring_scheduler::process_completions()
:1070
54810x
78.8%
78.0%
boost::corosio::detail::io_uring_scheduler::submit_sqes_op::do_handler(void*, boost::corosio::detail::scheduler_op*, unsigned int, unsigned int)
:1149
10x
30.0%
38.0%
boost::corosio::detail::io_uring_scheduler::submit_cancel_by_user_data(boost::corosio::detail::io_uring_op*)
:1167
101x
78.6%
75.0%
boost::corosio::detail::io_uring_scheduler::submit_cancel_by_fd(int)
:1192
78x
78.6%
75.0%
boost::corosio::detail::io_uring_op::request_cancel()
:1212
101x
100.0%
89.0%
boost::corosio::detail::io_uring_scheduler::cancel_and_flush(int)
:1223
8176x
85.7%
80.0%
boost::corosio::detail::io_uring_scheduler::drain_cqes_for(boost::corosio::detail::io_uring_op*)
:1246
85x
97.0%
89.0%
| Line | TLA | Hits | Source Code |
|---|---|---|---|
| 1 | // | ||
| 2 | // Copyright (c) 2026 Steve Gerbino | ||
| 3 | // | ||
| 4 | // Distributed under the Boost Software License, Version 1.0. (See accompanying | ||
| 5 | // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) | ||
| 6 | // | ||
| 7 | // Official repository: https://github.com/cppalliance/corosio | ||
| 8 | // | ||
| 9 | |||
| 10 | #ifndef BOOST_COROSIO_NATIVE_DETAIL_IO_URING_IO_URING_SCHEDULER_HPP | ||
| 11 | #define BOOST_COROSIO_NATIVE_DETAIL_IO_URING_IO_URING_SCHEDULER_HPP | ||
| 12 | |||
| 13 | #include <boost/corosio/detail/platform.hpp> | ||
| 14 | |||
| 15 | #if BOOST_COROSIO_HAS_IO_URING | ||
| 16 | |||
| 17 | // Include before any project headers open a namespace — prevents the | ||
| 18 | // boost::corosio::io_uring tag variable from shadowing struct ::io_uring. | ||
| 19 | #include <liburing.h> | ||
| 20 | |||
| 21 | #include <boost/corosio/detail/conditionally_enabled_event.hpp> | ||
| 22 | #include <boost/corosio/detail/conditionally_enabled_mutex.hpp> | ||
| 23 | #include <boost/corosio/detail/config.hpp> | ||
| 24 | #include <boost/corosio/detail/except.hpp> | ||
| 25 | #include <boost/corosio/detail/scheduler.hpp> | ||
| 26 | #include <boost/corosio/detail/scheduler_op.hpp> | ||
| 27 | #include <boost/corosio/detail/timer_service.hpp> | ||
| 28 | #include <boost/corosio/native/detail/io_uring/io_uring_op.hpp> | ||
| 29 | #include <boost/corosio/native/detail/make_err.hpp> | ||
| 30 | #include <boost/corosio/native/detail/posix/posix_resolver_service.hpp> | ||
| 31 | #include <boost/corosio/native/detail/posix/posix_signal_service.hpp> | ||
| 32 | #include <boost/capy/ex/execution_context.hpp> | ||
| 33 | |||
| 34 | #include <atomic> | ||
| 35 | #include <chrono> | ||
| 36 | #include <coroutine> | ||
| 37 | #include <cstddef> | ||
| 38 | #include <cstdint> | ||
| 39 | #include <limits> | ||
| 40 | |||
| 41 | #include <errno.h> | ||
| 42 | #include <poll.h> | ||
| 43 | #include <sys/eventfd.h> | ||
| 44 | #include <unistd.h> | ||
| 45 | |||
| 46 | namespace boost::corosio::detail { | ||
| 47 | |||
| 48 | // Forward-declared so the out-of-line inline definitions below the class | ||
| 49 | // can reference the frame stack without a circular dependency. | ||
| 50 | struct io_uring_scheduler_frame; | ||
| 51 | extern thread_local io_uring_scheduler_frame* tl_running_scheduler_frame_; | ||
| 52 | |||
| 53 | /** io_uring scheduler — proactor model on Linux 6.x+. | ||
| 54 | |||
| 55 | Owns one io_uring per io_context. Lazy batched submit; | ||
| 56 | cross-thread post wakes a registered eventfd via multishot | ||
| 57 | POLL_ADD. | ||
| 58 | |||
| 59 | @par Thread Safety | ||
| 60 | All public member functions are thread-safe. | ||
| 61 | */ | ||
| 62 | class BOOST_COROSIO_DECL io_uring_scheduler final | ||
| 63 | : public scheduler | ||
| 64 | , public capy::execution_context::service | ||
| 65 | { | ||
| 66 | public: | ||
| 67 | using key_type = scheduler; | ||
| 68 | using mutex_type = conditionally_enabled_mutex; | ||
| 69 | using lock_type = mutex_type::scoped_lock; | ||
| 70 | using event_type = conditionally_enabled_event; | ||
| 71 | |||
| 72 | io_uring_scheduler(capy::execution_context& ctx, int concurrency_hint = -1); | ||
| 73 | ~io_uring_scheduler() override; | ||
| 74 | io_uring_scheduler(io_uring_scheduler const&) = delete; | ||
| 75 | io_uring_scheduler& operator=(io_uring_scheduler const&) = delete; | ||
| 76 | |||
| 77 | void shutdown() override; | ||
| 78 | |||
| 79 | // scheduler virtuals — definitions in Task 6 | ||
| 80 | void post(std::coroutine_handle<>) const override; | ||
| 81 | void post(scheduler_op*) const override; | ||
| 82 | |||
| 83 | bool running_in_this_thread() const noexcept override; | ||
| 84 | void stop() override; | ||
| 85 | bool stopped() const noexcept override; | ||
| 86 | void restart() override; | ||
| 87 | std::size_t run() override; | ||
| 88 | std::size_t run_one() override; | ||
| 89 | std::size_t wait_one(long usec) override; | ||
| 90 | std::size_t poll() override; | ||
| 91 | std::size_t poll_one() override; | ||
| 92 | void work_started() noexcept override; | ||
| 93 | void work_finished() noexcept override; | ||
| 94 | |||
| 95 | /** Return the underlying liburing ring. | ||
| 96 | |||
| 97 | Triggers lazy ring initialisation on first call. Used by | ||
| 98 | socket op submission helpers (e.g. `io_uring_submit_op`) and | ||
| 99 | any other code path that needs a live ring pointer. | ||
| 100 | */ | ||
| 101 | 4293x | struct ::io_uring* ring() noexcept | |
| 102 | { | ||
| 103 | 4293x | lazy_init_ring(); | |
| 104 | 4293x | return &ring_; | |
| 105 | } | ||
| 106 | |||
| 107 | /// Return the dispatch mutex (protects completed_ops_ / cond_). | ||
| 108 | 33735x | mutex_type& dispatch_mutex() const noexcept { return dispatch_mutex_; } | |
| 109 | |||
| 110 | /// Return the ring mutex (serialises userspace SQ/CQ access). | ||
| 111 | 4293x | mutex_type& ring_mutex() const noexcept { return ring_mutex_; } | |
| 112 | |||
| 113 | /** Reset the calling thread's inline-budget for this scheduler. | ||
| 114 | |||
| 115 | Called at the top of each dispatched op in `do_one` so each | ||
| 116 | op handler gets a fresh budget for inline speculative | ||
| 117 | completions. Walks the frame stack; no-op if this scheduler | ||
| 118 | isn't on the stack (i.e. called from a non-run thread). | ||
| 119 | */ | ||
| 120 | void reset_inline_budget() const noexcept; | ||
| 121 | |||
| 122 | /** Consume one unit of inline budget if available. | ||
| 123 | |||
| 124 | @return `true` if budget was available and consumed; `false` | ||
| 125 | if the budget is exhausted or this scheduler is not on | ||
| 126 | the calling thread's run stack. | ||
| 127 | */ | ||
| 128 | bool try_consume_inline_budget() const noexcept; | ||
| 129 | |||
| 130 | /// Exchange the submit-batch posted flag. Returns the prior value. | ||
| 131 | /// Caller MUST hold ring_mutex_ — the flag is plain bool, not atomic, | ||
| 132 | /// and the mutex provides the read-modify-write atomicity. | ||
| 133 | 4293x | bool submit_op_posted_exchange(bool desired) const noexcept | |
| 134 | { | ||
| 135 | 4293x | bool prev = submit_op_posted_; | |
| 136 | 4293x | submit_op_posted_ = desired; | |
| 137 | 4293x | return prev; | |
| 138 | } | ||
| 139 | |||
| 140 | /// Return a reference to the mutable embedded submit_sqes_op. | ||
| 141 | 114x | scheduler_op& submit_op_ref() const noexcept | |
| 142 | { | ||
| 143 | 114x | return submit_op_; | |
| 144 | } | ||
| 145 | |||
| 146 | /// Increment the io_uring in-flight counter. Callers prep an SQE | ||
| 147 | /// whose CQE will require IORING_ENTER_GETEVENTS to surface under | ||
| 148 | /// DEFER_TASKRUN. Excluded: the wakeup-eventfd multishot SQE, whose | ||
| 149 | /// progress doesn't depend on userspace getevents. | ||
| 150 | 12733x | void inflight_inc() const noexcept | |
| 151 | { | ||
| 152 | 12733x | io_uring_inflight_.fetch_add(1, std::memory_order_release); | |
| 153 | 12733x | } | |
| 154 | |||
| 155 | /// Initialize the io_uring ring on first access. Idempotent. | ||
| 156 | void lazy_init_ring() const; | ||
| 157 | |||
| 158 | /// Wake the leader if it's blocked in `submit_and_wait_timeout`. | ||
| 159 | /// Best-effort: the wakeup is suppressed if the leader has already | ||
| 160 | /// been signalled and not yet acked. | ||
| 161 | void interrupt_reactor() const noexcept; | ||
| 162 | |||
| 163 | /** Submit `IORING_OP_ASYNC_CANCEL` targeting an in-flight op by its | ||
| 164 | user_data pointer. | ||
| 165 | |||
| 166 | The kernel delivers `-ECANCELED` on the target's CQE if it was | ||
| 167 | still in flight; the op's completion handler then reports | ||
| 168 | `operation_aborted`. Best-effort: if the SQ is full after one | ||
| 169 | flush attempt the function returns without cancelling (the op | ||
| 170 | will complete normally on its own). | ||
| 171 | |||
| 172 | @param target The in-flight op to cancel. | ||
| 173 | */ | ||
| 174 | void submit_cancel_by_user_data(io_uring_op* target) noexcept; | ||
| 175 | |||
| 176 | /** Submit `IORING_OP_ASYNC_CANCEL` with `IORING_ASYNC_CANCEL_FD` | ||
| 177 | to cancel every in-flight op on the given fd in one SQE. | ||
| 178 | |||
| 179 | Best-effort: if the SQ is full after one flush attempt the | ||
| 180 | function returns without cancelling. | ||
| 181 | |||
| 182 | @param fd The file descriptor whose in-flight ops should be | ||
| 183 | cancelled. | ||
| 184 | */ | ||
| 185 | void submit_cancel_by_fd(int fd) noexcept; | ||
| 186 | |||
| 187 | /** Submit `IORING_OP_ASYNC_CANCEL` for `fd` and immediately flush | ||
| 188 | the submission ring to the kernel. | ||
| 189 | |||
| 190 | Must be called while `fd` is still open so the kernel can | ||
| 191 | resolve the file from the fd number before it is closed and | ||
| 192 | potentially recycled. | ||
| 193 | |||
| 194 | Best-effort: if the SQ is full the function still flushes any | ||
| 195 | earlier pending SQEs to the kernel. | ||
| 196 | |||
| 197 | @param fd The file descriptor whose in-flight ops should be | ||
| 198 | cancelled. | ||
| 199 | */ | ||
| 200 | void cancel_and_flush(int fd) noexcept; | ||
| 201 | |||
| 202 | /** Drain pending CQEs for a specific op's `user_data`. | ||
| 203 | |||
| 204 | Submits an ASYNC_CANCEL by user_data to short-circuit any | ||
| 205 | in-flight op holding `target`, then iterates the CQ ring and | ||
| 206 | consumes every CQE matching `target` so its memory can be | ||
| 207 | freed safely. Used by member-owned ops (e.g. | ||
| 208 | `uring_multi_accept_op`) whose destructor cannot tolerate | ||
| 209 | outstanding CQEs. | ||
| 210 | |||
| 211 | @par Thread Safety | ||
| 212 | Safe to call from any thread. Internally takes `ring_mutex_` | ||
| 213 | to serialise against the run-loop leader; calls | ||
| 214 | `interrupt_reactor()` first so the leader returns from its | ||
| 215 | kernel wait promptly. | ||
| 216 | |||
| 217 | @param target The op pointer used as user_data on the SQE. | ||
| 218 | */ | ||
| 219 | void drain_cqes_for(io_uring_op* target) noexcept; | ||
| 220 | |||
| 221 | /** Queue an already-counted op while the caller holds dispatch_mutex_. | ||
| 222 | |||
| 223 | Does NOT increment `outstanding_work_`. Use for synchronous | ||
| 224 | completion paths (e.g. SQE backpressure) where the caller called | ||
| 225 | `work_started()` and already holds the dispatch lock. | ||
| 226 | |||
| 227 | @pre `dispatch_mutex_` must be locked by the calling thread. | ||
| 228 | */ | ||
| 229 | 33735x | void push_completed_locked(scheduler_op* op) const noexcept | |
| 230 | { | ||
| 231 | 33735x | completed_ops_.push(op); | |
| 232 | 33735x | } | |
| 233 | |||
| 234 | /// Single-threaded mode toggle (matches reactor_scheduler API). | ||
| 235 | 1x | void configure_single_threaded(bool v) noexcept override | |
| 236 | { | ||
| 237 | 1x | single_threaded_ = v; | |
| 238 | 1x | dispatch_mutex_.set_enabled(!v); | |
| 239 | 1x | ring_mutex_.set_enabled(!v); | |
| 240 | 1x | cond_.set_enabled(!v); | |
| 241 | 1x | } | |
| 242 | |||
| 243 | /** Configure SQPOLL parameters. | ||
| 244 | |||
| 245 | Must be called before the first run/poll/post — the values | ||
| 246 | are cached and read by `lazy_init_ring_unlocked` when the | ||
| 247 | ring is first constructed. No-op if `enable` is false (the | ||
| 248 | default). | ||
| 249 | |||
| 250 | @note When combined with single-threaded mode, | ||
| 251 | IORING_SETUP_DEFER_TASKRUN is suppressed — the kernel | ||
| 252 | rejects that combination. SINGLE_ISSUER still applies. | ||
| 253 | |||
| 254 | @param enable Set IORING_SETUP_SQPOLL on ring init. | ||
| 255 | @param idle_ms sq_thread_idle in milliseconds; 0 = kernel | ||
| 256 | default (1ms). | ||
| 257 | @param cpu Pin the polling thread to this CPU; -1 to | ||
| 258 | not pin. | ||
| 259 | */ | ||
| 260 | ✗ | void configure_sqpoll( | |
| 261 | bool enable, unsigned idle_ms, int cpu) noexcept | ||
| 262 | { | ||
| 263 | ✗ | enable_sqpoll_ = enable; | |
| 264 | ✗ | sq_thread_idle_ms_ = idle_ms; | |
| 265 | ✗ | sq_thread_cpu_ = cpu; | |
| 266 | ✗ | } | |
| 267 | |||
| 268 | /// Return true if single-threaded (lockless) mode is active. | ||
| 269 | 1x | bool is_single_threaded() const noexcept override { return single_threaded_; } | |
| 270 | |||
| 271 | private: | ||
| 272 | // ring_ + wakeup_eventfd_ are mutable so lazy_init_ring() (called | ||
| 273 | // from const contexts like post()) can populate them on first use. | ||
| 274 | mutable struct ::io_uring ring_{}; | ||
| 275 | mutable int wakeup_eventfd_ = -1; | ||
| 276 | timer_service* timer_svc_ = nullptr; | ||
| 277 | |||
| 278 | // dispatch_mutex_ protects completed_ops_, cond_, task_running_. | ||
| 279 | // ring_mutex_ protects every userspace touch of ring_ (SQ tail, | ||
| 280 | // CQ head): get_sqe / submit / submit_and_wait_timeout / | ||
| 281 | // for_each_cqe / cq_advance. | ||
| 282 | // | ||
| 283 | // process_completions runs under ring_mutex_ and briefly takes | ||
| 284 | // dispatch_mutex_ to splice into completed_ops_. The locks are | ||
| 285 | // never held simultaneously for the full duration of any other | ||
| 286 | // path's critical section, so no deadlock. | ||
| 287 | mutable mutex_type dispatch_mutex_{true}; | ||
| 288 | mutable mutex_type ring_mutex_{true}; | ||
| 289 | mutable event_type cond_{true}; | ||
| 290 | mutable op_queue completed_ops_; | ||
| 291 | // outstanding_work_ and io_uring_inflight_ are both atomic | ||
| 292 | // counters updated at high frequency on different paths: | ||
| 293 | // - outstanding_work_ : every work_started / work_finished call, | ||
| 294 | // including timers, posts, and SQE submits. | ||
| 295 | // - io_uring_inflight_ : only SQE submit + non-F_MORE CQE consume. | ||
| 296 | // Under multi-thread workloads the threads tend to update these | ||
| 297 | // from different code paths; placing them on the same cache line | ||
| 298 | // would cause false sharing and unnecessary cache-line ping-pong. | ||
| 299 | // Hold each on its own line. | ||
| 300 | alignas(64) mutable std::atomic<std::int64_t> outstanding_work_{0}; | ||
| 301 | // Count of io_uring SQEs in flight whose completion requires user- | ||
| 302 | // space to enter the kernel via IORING_ENTER_GETEVENTS for task | ||
| 303 | // work to progress under IORING_SETUP_DEFER_TASKRUN. Excludes the | ||
| 304 | // wakeup-eventfd multishot poll (registered in lazy_init_ring), and | ||
| 305 | // is updated by io_uring_submit_op and by process_completions on | ||
| 306 | // each non-F_MORE, non-eventfd CQE. Used by do_one to skip the | ||
| 307 | // ring pump when there is no io_uring work pending. | ||
| 308 | alignas(64) mutable std::atomic<std::int64_t> io_uring_inflight_{0}; | ||
| 309 | std::atomic<bool> stopped_{false}; | ||
| 310 | // Leader-follower flag: true while a thread is blocked in | ||
| 311 | // io_uring_submit_and_wait_timeout. Protected by dispatch_mutex_. | ||
| 312 | mutable bool task_running_ = false; | ||
| 313 | bool single_threaded_ = false; | ||
| 314 | bool enable_sqpoll_ = false; | ||
| 315 | unsigned sq_thread_idle_ms_ = 0; | ||
| 316 | int sq_thread_cpu_ = -1; | ||
| 317 | |||
| 318 | int cancel_sentinel_ = 0; | ||
| 319 | mutable std::atomic<bool> wakeup_armed_{false}; | ||
| 320 | |||
| 321 | /// Flushes the SQ ring and drains CQEs in one mutex-held pass. | ||
| 322 | /// One instance covers a whole batch; subsequent SQEs in the same | ||
| 323 | /// batch skip the post, amortising syscall cost across the batch. | ||
| 324 | /// Mirrors Asio's `submit_sqes_op` (`io_uring_service.ipp:730-742`). | ||
| 325 | struct submit_sqes_op final : scheduler_op | ||
| 326 | { | ||
| 327 | io_uring_scheduler* sched_ = nullptr; | ||
| 328 | |||
| 329 | 389x | submit_sqes_op() noexcept : scheduler_op(&do_handler) {} | |
| 330 | |||
| 331 | static void do_handler( | ||
| 332 | void* owner, scheduler_op* base, | ||
| 333 | std::uint32_t /*bytes*/, std::uint32_t /*error*/) noexcept; | ||
| 334 | }; | ||
| 335 | |||
| 336 | /// True between the first submitter of a batch posting `submit_op_` | ||
| 337 | /// and the dispatched op clearing the flag inside its handler. Read | ||
| 338 | /// and written only while holding `ring_mutex_`. | ||
| 339 | mutable bool submit_op_posted_ = false; | ||
| 340 | |||
| 341 | /// Single embedded `submit_sqes_op` instance, owned by the scheduler. | ||
| 342 | mutable submit_sqes_op submit_op_; | ||
| 343 | |||
| 344 | // drain_cqes_for tuning. The bound exists to avoid stalling a | ||
| 345 | // destructor if the kernel never returns a cancel completion (best- | ||
| 346 | // effort drain); 8 rounds * 1ms == 8ms worst case. | ||
| 347 | static constexpr int drain_cqes_max_rounds = 8; | ||
| 348 | static constexpr unsigned long drain_cqes_kick_ns = 1'000'000; | ||
| 349 | |||
| 350 | // ring_inited_ goes true once on first run/poll/submit. The init is | ||
| 351 | // deferred from the constructor so configure_single_threaded(true) | ||
| 352 | // can take effect before io_uring_queue_init_params chooses flags. | ||
| 353 | mutable std::once_flag ring_init_once_; | ||
| 354 | mutable bool ring_inited_ = false; | ||
| 355 | |||
| 356 | std::size_t do_one(long timeout_us); | ||
| 357 | void process_completions(); | ||
| 358 | void drain_wakeup_eventfd() const noexcept; | ||
| 359 | void lazy_init_ring_unlocked() const; | ||
| 360 | }; | ||
| 361 | |||
| 362 | inline | ||
| 363 | 389x | io_uring_scheduler::io_uring_scheduler( | |
| 364 | 389x | capy::execution_context& ctx, int /*concurrency_hint*/) | |
| 365 | { | ||
| 366 | // sched_ cannot be set in the member initialiser — `this` is not | ||
| 367 | // available there. | ||
| 368 | 389x | submit_op_.sched_ = this; | |
| 369 | |||
| 370 | // Wire timer service. on_earliest_changed wakes the run loop so it | ||
| 371 | // recomputes its wait timeout. | ||
| 372 | 389x | timer_svc_ = &get_timer_service(ctx, *this); | |
| 373 | 389x | timer_svc_->set_on_earliest_changed( | |
| 374 | 4586x | timer_service::callback(this, [](void* p) { | |
| 375 | 4197x | static_cast<io_uring_scheduler*>(p)->interrupt_reactor(); | |
| 376 | 4197x | })); | |
| 377 | |||
| 378 | 389x | get_resolver_service(ctx, *this); | |
| 379 | 389x | get_signal_service(ctx, *this); | |
| 380 | |||
| 381 | // Ring init is deferred to lazy_init_ring() so configure_single_- | ||
| 382 | // threaded(true), which the io_context applies after construction, | ||
| 383 | // can take effect before io_uring_queue_init_params chooses flags. | ||
| 384 | 389x | } | |
| 385 | |||
| 386 | inline | ||
| 387 | 778x | io_uring_scheduler::~io_uring_scheduler() | |
| 388 | { | ||
| 389 | 389x | if (ring_inited_) | |
| 390 | { | ||
| 391 | 281x | if (wakeup_eventfd_ >= 0) | |
| 392 | 281x | ::close(wakeup_eventfd_); | |
| 393 | 281x | ::io_uring_queue_exit(&ring_); | |
| 394 | } | ||
| 395 | 778x | } | |
| 396 | |||
| 397 | inline void | ||
| 398 | 30250x | io_uring_scheduler::lazy_init_ring() const | |
| 399 | { | ||
| 400 | 30250x | std::call_once(ring_init_once_, [this] { | |
| 401 | 281x | lazy_init_ring_unlocked(); | |
| 402 | 281x | }); | |
| 403 | 30250x | } | |
| 404 | |||
| 405 | inline void | ||
| 406 | 281x | io_uring_scheduler::lazy_init_ring_unlocked() const | |
| 407 | { | ||
| 408 | 281x | io_uring_params params{}; | |
| 409 | 281x | if (single_threaded_) | |
| 410 | { | ||
| 411 | // SINGLE_ISSUER promises the kernel one submitter thread, | ||
| 412 | // letting it skip internal SQ locking. DEFER_TASKRUN tells | ||
| 413 | // it to batch task_work delivery at io_uring_enter(GETEVENTS) | ||
| 414 | // boundaries instead of interrupting the run thread via | ||
| 415 | // TWA_SIGNAL — eliminates cache pollution from mid-flight | ||
| 416 | // task_work and gives a meaningful single-threaded | ||
| 417 | // throughput uplift. | ||
| 418 | // | ||
| 419 | // Plan 3 disabled DEFER_TASKRUN defensively over a misread | ||
| 420 | // of the GETEVENTS contract. Plan 4a re-enabled it: liburing's | ||
| 421 | // io_uring_submit_and_wait_timeout always sets | ||
| 422 | // IORING_ENTER_GETEVENTS when wait_nr > 0, regardless of | ||
| 423 | // ts. Our run loop's only kernel-wait call passes wait_nr=1. | ||
| 424 | // Submit-only paths (cancel_and_flush, etc.) leave their | ||
| 425 | // CQEs queued until the leader's next GETEVENTS-bearing | ||
| 426 | // wait — benign. | ||
| 427 | // | ||
| 428 | // Multi-thread mode never sets these flags: SINGLE_ISSUER | ||
| 429 | // would be unsafe with multiple submitter threads. | ||
| 430 | // | ||
| 431 | // DEFER_TASKRUN is suppressed when SQPOLL is also enabled | ||
| 432 | // — the kernel rejects that combination with -EINVAL. The | ||
| 433 | // SQPOLL polling thread already delivers completions | ||
| 434 | // without TWA_SIGNAL interruption, so DEFER_TASKRUN's | ||
| 435 | // benefit is moot in that mode. | ||
| 436 | ✗ | params.flags = IORING_SETUP_SINGLE_ISSUER; | |
| 437 | ✗ | if (!enable_sqpoll_) | |
| 438 | ✗ | params.flags |= IORING_SETUP_DEFER_TASKRUN; | |
| 439 | } | ||
| 440 | |||
| 441 | 281x | if (enable_sqpoll_) | |
| 442 | { | ||
| 443 | // SQPOLL forks a kernel thread that busy-polls the SQ ring; | ||
| 444 | // submission becomes a userspace-only memory store. Combines | ||
| 445 | // with SINGLE_ISSUER (the kernel accepts that pair) but NOT | ||
| 446 | // with DEFER_TASKRUN (kernel returns -EINVAL); the | ||
| 447 | // single_threaded_ branch above suppresses DEFER_TASKRUN | ||
| 448 | // when SQPOLL is also set. Idle timeout 0 means kernel | ||
| 449 | // default (1ms); we only forward when explicitly set so | ||
| 450 | // the kernel default is preserved. | ||
| 451 | ✗ | params.flags |= IORING_SETUP_SQPOLL; | |
| 452 | ✗ | if (sq_thread_idle_ms_ != 0) | |
| 453 | ✗ | params.sq_thread_idle = sq_thread_idle_ms_; | |
| 454 | ✗ | if (sq_thread_cpu_ >= 0) | |
| 455 | { | ||
| 456 | ✗ | params.flags |= IORING_SETUP_SQ_AFF; | |
| 457 | ✗ | params.sq_thread_cpu = static_cast<__u32>(sq_thread_cpu_); | |
| 458 | } | ||
| 459 | } | ||
| 460 | |||
| 461 | 281x | int rc = ::io_uring_queue_init_params(256, &ring_, ¶ms); | |
| 462 | 281x | if (rc < 0) | |
| 463 | ✗ | detail::throw_system_error( | |
| 464 | ✗ | make_err(-rc), "io_uring_queue_init_params"); | |
| 465 | |||
| 466 | 281x | wakeup_eventfd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); | |
| 467 | 281x | if (wakeup_eventfd_ < 0) | |
| 468 | { | ||
| 469 | ✗ | int errn = errno; | |
| 470 | ✗ | ::io_uring_queue_exit(&ring_); | |
| 471 | ✗ | detail::throw_system_error(make_err(errn), "eventfd"); | |
| 472 | } | ||
| 473 | |||
| 474 | // Register a one-shot poll on the wake eventfd. user_data nullptr | ||
| 475 | // is the sentinel recognized by process_completions, which calls | ||
| 476 | // drain_wakeup_eventfd() to consume the eventfd byte AND re-arm | ||
| 477 | // the poll. Plan 5a switched away from IORING_POLL_MULTISHOT | ||
| 478 | // because multishot ops can silently terminate (e.g. under CQ | ||
| 479 | // pressure), and we don't observe the termination — leaving the | ||
| 480 | // wake mechanism dead and the leader stuck in kernel wait. One- | ||
| 481 | // shot rearm-on-fire is fail-fast: every wake event is paired | ||
| 482 | // with an explicit rearm, so a missed rearm would manifest | ||
| 483 | // immediately as the next wake being lost (test-visible). | ||
| 484 | 281x | ::io_uring_sqe* sqe = ::io_uring_get_sqe(&ring_); | |
| 485 | 281x | if (!sqe) | |
| 486 | { | ||
| 487 | ✗ | ::close(wakeup_eventfd_); | |
| 488 | ✗ | ::io_uring_queue_exit(&ring_); | |
| 489 | ✗ | detail::throw_system_error( | |
| 490 | ✗ | make_err(ENOSPC), "io_uring_get_sqe (wakeup)"); | |
| 491 | } | ||
| 492 | // Multishot poll: fires a CQE on each eventfd POLLIN without | ||
| 493 | // consuming the SQE. Avoids the re-arm hazard of one-shot poll | ||
| 494 | // (where drain_wakeup_eventfd's get_sqe could return null on a | ||
| 495 | // full SQ, leaving no SQE to detect future wakes). | ||
| 496 | 281x | ::io_uring_prep_poll_multishot(sqe, wakeup_eventfd_, POLLIN); | |
| 497 | 281x | ::io_uring_sqe_set_data(sqe, nullptr); | |
| 498 | 281x | int submit_rc = ::io_uring_submit(&ring_); | |
| 499 | 281x | if (submit_rc < 0) | |
| 500 | { | ||
| 501 | ✗ | ::close(wakeup_eventfd_); | |
| 502 | ✗ | ::io_uring_queue_exit(&ring_); | |
| 503 | ✗ | detail::throw_system_error( | |
| 504 | ✗ | make_err(-submit_rc), "io_uring_submit (wakeup)"); | |
| 505 | } | ||
| 506 | |||
| 507 | 281x | ring_inited_ = true; | |
| 508 | 281x | } | |
| 509 | |||
| 510 | inline void | ||
| 511 | 389x | io_uring_scheduler::shutdown() | |
| 512 | { | ||
| 513 | 389x | stopped_.store(true, std::memory_order_release); | |
| 514 | |||
| 515 | // Drain posted ops, calling destroy() on each so embedded handles | ||
| 516 | // (coroutine frames, error_code outputs) get torn down rather | ||
| 517 | // than leaked. Mirrors reactor_scheduler::shutdown_drain. | ||
| 518 | // | ||
| 519 | // Service shutdown order (driven by capy::execution_context): | ||
| 520 | // each socket/acceptor service::shutdown() submits a cancel SQE | ||
| 521 | // for every live impl. The CQEs that result either land in | ||
| 522 | // completed_ops_ (drained here as op->destroy()) or stay in the | ||
| 523 | // kernel ring; ~scheduler's io_uring_queue_exit cleans the | ||
| 524 | // latter up at process teardown. Self-referential impl_ptr | ||
| 525 | // cycles (e.g. multishot acceptor's multi_op_->impl_ptr) are | ||
| 526 | // broken explicitly inside each service before the scheduler | ||
| 527 | // shutdown runs. | ||
| 528 | 389x | lock_type lock(dispatch_mutex_); | |
| 529 | 406x | while (auto* op = completed_ops_.pop()) | |
| 530 | { | ||
| 531 | 17x | lock.unlock(); | |
| 532 | 17x | op->destroy(); | |
| 533 | 17x | lock.lock(); | |
| 534 | 17x | } | |
| 535 | 389x | cond_.notify_all(); | |
| 536 | 389x | } | |
| 537 | |||
| 538 | inline void | ||
| 539 | 234x | io_uring_scheduler::stop() | |
| 540 | { | ||
| 541 | 234x | stopped_.store(true, std::memory_order_release); | |
| 542 | { | ||
| 543 | 234x | lock_type lock(dispatch_mutex_); | |
| 544 | 234x | cond_.notify_all(); | |
| 545 | 234x | } | |
| 546 | // Force-wake unconditionally — bypass interrupt_reactor's CAS | ||
| 547 | // coalescing. A dropped wake here leaves the leader blocked | ||
| 548 | // forever in submit_and_wait_timeout (no further CQE will | ||
| 549 | // arrive after stop()). With multishot poll on wakeup_eventfd_, | ||
| 550 | // this write reliably produces a CQE. | ||
| 551 | 234x | if (ring_inited_) | |
| 552 | { | ||
| 553 | 233x | std::uint64_t v = 1; | |
| 554 | [[maybe_unused]] auto r = | ||
| 555 | 233x | ::write(wakeup_eventfd_, &v, sizeof(v)); | |
| 556 | } | ||
| 557 | 234x | } | |
| 558 | |||
| 559 | inline bool | ||
| 560 | 12x | io_uring_scheduler::stopped() const noexcept | |
| 561 | { | ||
| 562 | 12x | return stopped_.load(std::memory_order_acquire); | |
| 563 | } | ||
| 564 | |||
| 565 | inline void | ||
| 566 | 58x | io_uring_scheduler::restart() | |
| 567 | { | ||
| 568 | 58x | stopped_.store(false, std::memory_order_release); | |
| 569 | 58x | } | |
| 570 | |||
| 571 | inline void | ||
| 572 | 46714x | io_uring_scheduler::work_started() noexcept | |
| 573 | { | ||
| 574 | 46714x | outstanding_work_.fetch_add(1, std::memory_order_relaxed); | |
| 575 | 46714x | } | |
| 576 | |||
| 577 | inline void | ||
| 578 | 59683x | io_uring_scheduler::work_finished() noexcept | |
| 579 | { | ||
| 580 | 119366x | if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1) | |
| 581 | 226x | stop(); | |
| 582 | 59683x | } | |
| 583 | |||
| 584 | inline void | ||
| 585 | 16817x | io_uring_scheduler::interrupt_reactor() const noexcept | |
| 586 | { | ||
| 587 | // Skip if the ring hasn't been initialised yet — there's no leader | ||
| 588 | // to wake and no eventfd to write. | ||
| 589 | 16817x | if (!ring_inited_) | |
| 590 | ✗ | return; | |
| 591 | |||
| 592 | // Single-thread: the user's coroutines run on the leader thread, | ||
| 593 | // so when interrupt_reactor is called from user code the leader | ||
| 594 | // is not in kernel wait — there is nothing to wake. | ||
| 595 | 16817x | if (single_threaded_) | |
| 596 | ✗ | return; | |
| 597 | |||
| 598 | // Multi-thread: write the eventfd unconditionally. CAS-coalescing | ||
| 599 | // is unsafe here because the leader's Phase 2 in do_one waits | ||
| 600 | // indefinitely for a CQE; a dropped wake leaves the leader | ||
| 601 | // blocked forever when there is no other CQE-producing activity. | ||
| 602 | // Multishot poll on wakeup_eventfd_ delivers a CQE for every | ||
| 603 | // write, so multiple writes in flight produce multiple CQEs | ||
| 604 | // (drained together by drain_wakeup_eventfd's single read of | ||
| 605 | // the eventfd counter). | ||
| 606 | 16817x | std::uint64_t v = 1; | |
| 607 | 16817x | [[maybe_unused]] auto r = ::write(wakeup_eventfd_, &v, sizeof(v)); | |
| 608 | 16817x | wakeup_armed_.store(true, std::memory_order_release); | |
| 609 | } | ||
| 610 | |||
| 611 | inline void | ||
| 612 | 16351x | io_uring_scheduler::drain_wakeup_eventfd() const noexcept | |
| 613 | { | ||
| 614 | std::uint64_t v; | ||
| 615 | 16351x | [[maybe_unused]] auto r = ::read(wakeup_eventfd_, &v, sizeof(v)); | |
| 616 | |||
| 617 | // Multishot poll never needs re-arming. The poll-add was queued | ||
| 618 | // once at lazy_init_ring with IORING_POLL_ADD_MULTI; each eventfd | ||
| 619 | // POLLIN produces a CQE without consuming the SQE. | ||
| 620 | // | ||
| 621 | // Release pairs with the acquire side of interrupt_reactor's CAS: | ||
| 622 | // a posting thread that observes wakeup_armed_ == false from this | ||
| 623 | // store will see the eventfd already drained by the leader. | ||
| 624 | 16351x | wakeup_armed_.store(false, std::memory_order_release); | |
| 625 | 16351x | } | |
| 626 | |||
| 627 | inline void | ||
| 628 | 378x | io_uring_scheduler::post(std::coroutine_handle<> h) const | |
| 629 | { | ||
| 630 | struct post_handler final : scheduler_op | ||
| 631 | { | ||
| 632 | std::coroutine_handle<> h_; | ||
| 633 | 378x | explicit post_handler(std::coroutine_handle<> h) noexcept : h_(h) {} | |
| 634 | |||
| 635 | 375x | void operator()() override | |
| 636 | { | ||
| 637 | 375x | auto saved = h_; | |
| 638 | 375x | delete this; | |
| 639 | std::atomic_thread_fence(std::memory_order_acquire); | ||
| 640 | 375x | saved.resume(); | |
| 641 | 375x | } | |
| 642 | |||
| 643 | 3x | void destroy() override | |
| 644 | { | ||
| 645 | 3x | auto saved = h_; | |
| 646 | 3x | delete this; | |
| 647 | 3x | if (saved) | |
| 648 | 3x | saved.destroy(); | |
| 649 | 3x | } | |
| 650 | }; | ||
| 651 | |||
| 652 | 378x | auto* op = new post_handler(h); | |
| 653 | 378x | lazy_init_ring(); | |
| 654 | 378x | outstanding_work_.fetch_add(1, std::memory_order_relaxed); | |
| 655 | bool wake_leader; | ||
| 656 | { | ||
| 657 | 378x | lock_type lock(dispatch_mutex_); | |
| 658 | 378x | completed_ops_.push(op); | |
| 659 | 378x | wake_leader = task_running_; | |
| 660 | 378x | if (!wake_leader) | |
| 661 | 378x | cond_.notify_one(); | |
| 662 | 378x | } | |
| 663 | 378x | if (wake_leader) | |
| 664 | ✗ | interrupt_reactor(); | |
| 665 | 378x | } | |
| 666 | |||
| 667 | inline void | ||
| 668 | 12608x | io_uring_scheduler::post(scheduler_op* op) const | |
| 669 | { | ||
| 670 | 12608x | lazy_init_ring(); | |
| 671 | 12608x | outstanding_work_.fetch_add(1, std::memory_order_relaxed); | |
| 672 | bool wake_leader; | ||
| 673 | { | ||
| 674 | 12608x | lock_type lock(dispatch_mutex_); | |
| 675 | 12608x | completed_ops_.push(op); | |
| 676 | 12608x | wake_leader = task_running_; | |
| 677 | 12608x | if (!wake_leader) | |
| 678 | 8428x | cond_.notify_one(); | |
| 679 | 12608x | } | |
| 680 | 12608x | if (wake_leader) | |
| 681 | 4180x | interrupt_reactor(); | |
| 682 | 12608x | } | |
| 683 | |||
| 684 | // Thread-local stack of frames for io_uring schedulers being run on the | ||
| 685 | // current thread. Holds the running-scheduler pointer (for | ||
| 686 | // running_in_this_thread reporting) and the inline completion budget | ||
| 687 | // used by the speculative non-blocking I/O path (plan 5j). Nesting | ||
| 688 | // stacks frames via prev_ so each scheduler gets its own budget. | ||
| 689 | struct io_uring_scheduler_frame | ||
| 690 | { | ||
| 691 | io_uring_scheduler const* sched; | ||
| 692 | io_uring_scheduler_frame* prev; | ||
| 693 | int inline_budget; | ||
| 694 | int inline_budget_max; | ||
| 695 | }; | ||
| 696 | |||
| 697 | inline thread_local io_uring_scheduler_frame* tl_running_scheduler_frame_ = nullptr; | ||
| 698 | |||
| 699 | // Default inline budget. Matches reactor's initial budget (2). Adaptive | ||
| 700 | // ramp-up to a max is intentionally NOT implemented yet — keep it simple | ||
| 701 | // for plan 5j and revisit if benches show fairness issues. | ||
| 702 | inline constexpr int io_uring_inline_budget_initial = 2; | ||
| 703 | inline constexpr int io_uring_inline_budget_max = 16; | ||
| 704 | |||
| 705 | /// RAII guard: pushes a frame onto the thread's running-scheduler stack | ||
| 706 | /// on construction, restores the previous on destruction. Used by | ||
| 707 | /// run/run_one/wait_one/poll/poll_one to mark the running thread and | ||
| 708 | /// hold a fresh inline budget for speculative completions. | ||
| 709 | struct io_uring_run_guard | ||
| 710 | { | ||
| 711 | io_uring_scheduler_frame frame_; | ||
| 712 | |||
| 713 | 233x | explicit io_uring_run_guard(io_uring_scheduler const* self) noexcept | |
| 714 | 233x | : frame_{self, tl_running_scheduler_frame_, | |
| 715 | io_uring_inline_budget_initial, | ||
| 716 | io_uring_inline_budget_max} | ||
| 717 | { | ||
| 718 | 233x | tl_running_scheduler_frame_ = &frame_; | |
| 719 | 233x | } | |
| 720 | |||
| 721 | 233x | ~io_uring_run_guard() noexcept | |
| 722 | { | ||
| 723 | 233x | tl_running_scheduler_frame_ = frame_.prev; | |
| 724 | 233x | } | |
| 725 | }; | ||
| 726 | |||
| 727 | inline bool | ||
| 728 | 584x | io_uring_scheduler::running_in_this_thread() const noexcept | |
| 729 | { | ||
| 730 | 584x | for (auto* f = tl_running_scheduler_frame_; f != nullptr; f = f->prev) | |
| 731 | { | ||
| 732 | 209x | if (f->sched == this) | |
| 733 | 209x | return true; | |
| 734 | } | ||
| 735 | 375x | return false; | |
| 736 | } | ||
| 737 | |||
| 738 | inline void | ||
| 739 | 50912x | io_uring_scheduler::reset_inline_budget() const noexcept | |
| 740 | { | ||
| 741 | 50912x | for (auto* f = tl_running_scheduler_frame_; f != nullptr; f = f->prev) | |
| 742 | { | ||
| 743 | 50912x | if (f->sched == this) | |
| 744 | { | ||
| 745 | 50912x | f->inline_budget = f->inline_budget_max; | |
| 746 | 50912x | return; | |
| 747 | } | ||
| 748 | } | ||
| 749 | } | ||
| 750 | |||
| 751 | inline bool | ||
| 752 | 573746x | io_uring_scheduler::try_consume_inline_budget() const noexcept | |
| 753 | { | ||
| 754 | 573746x | for (auto* f = tl_running_scheduler_frame_; f != nullptr; f = f->prev) | |
| 755 | { | ||
| 756 | 573746x | if (f->sched == this) | |
| 757 | { | ||
| 758 | 573746x | if (f->inline_budget > 0) | |
| 759 | { | ||
| 760 | 540011x | --f->inline_budget; | |
| 761 | 540011x | return true; | |
| 762 | } | ||
| 763 | 33735x | return false; | |
| 764 | } | ||
| 765 | } | ||
| 766 | ✗ | return false; | |
| 767 | } | ||
| 768 | |||
| 769 | inline std::size_t | ||
| 770 | 221x | io_uring_scheduler::run() | |
| 771 | { | ||
| 772 | 221x | lazy_init_ring(); | |
| 773 | 442x | if (outstanding_work_.load(std::memory_order_acquire) == 0) | |
| 774 | { | ||
| 775 | ✗ | stop(); | |
| 776 | ✗ | return 0; | |
| 777 | } | ||
| 778 | |||
| 779 | 221x | io_uring_run_guard guard(this); | |
| 780 | 221x | std::size_t n = 0; | |
| 781 | for (;;) | ||
| 782 | { | ||
| 783 | 51113x | std::size_t r = do_one(-1); | |
| 784 | 51113x | if (r) | |
| 785 | { | ||
| 786 | 50892x | if (n != (std::numeric_limits<std::size_t>::max)()) | |
| 787 | 50892x | ++n; | |
| 788 | 50892x | continue; | |
| 789 | } | ||
| 790 | 444x | if (outstanding_work_.load(std::memory_order_acquire) == 0 || | |
| 791 | 2x | stopped_.load(std::memory_order_acquire)) | |
| 792 | 221x | break; | |
| 793 | // do_one returned 0 but work still outstanding (e.g. timer | ||
| 794 | // expiry dispatched async work). Continue. | ||
| 795 | 50892x | } | |
| 796 | 221x | return n; | |
| 797 | 221x | } | |
| 798 | |||
| 799 | inline std::size_t | ||
| 800 | ✗ | io_uring_scheduler::run_one() | |
| 801 | { | ||
| 802 | ✗ | lazy_init_ring(); | |
| 803 | ✗ | if (outstanding_work_.load(std::memory_order_acquire) == 0) | |
| 804 | { | ||
| 805 | ✗ | stop(); | |
| 806 | ✗ | return 0; | |
| 807 | } | ||
| 808 | ✗ | io_uring_run_guard guard(this); | |
| 809 | ✗ | return do_one(-1); | |
| 810 | ✗ | } | |
| 811 | |||
| 812 | inline std::size_t | ||
| 813 | 5x | io_uring_scheduler::wait_one(long usec) | |
| 814 | { | ||
| 815 | 5x | lazy_init_ring(); | |
| 816 | 10x | if (outstanding_work_.load(std::memory_order_acquire) == 0) | |
| 817 | { | ||
| 818 | 3x | stop(); | |
| 819 | 3x | return 0; | |
| 820 | } | ||
| 821 | 2x | io_uring_run_guard guard(this); | |
| 822 | 2x | return do_one(usec); | |
| 823 | 2x | } | |
| 824 | |||
| 825 | inline std::size_t | ||
| 826 | 12x | io_uring_scheduler::poll() | |
| 827 | { | ||
| 828 | 12x | lazy_init_ring(); | |
| 829 | 24x | if (outstanding_work_.load(std::memory_order_acquire) == 0) | |
| 830 | { | ||
| 831 | 2x | stop(); | |
| 832 | 2x | return 0; | |
| 833 | } | ||
| 834 | 10x | io_uring_run_guard guard(this); | |
| 835 | 10x | std::size_t n = 0; | |
| 836 | 29x | while (do_one(0)) | |
| 837 | { | ||
| 838 | 19x | if (n != (std::numeric_limits<std::size_t>::max)()) | |
| 839 | 19x | ++n; | |
| 840 | } | ||
| 841 | 10x | return n; | |
| 842 | 10x | } | |
| 843 | |||
| 844 | inline std::size_t | ||
| 845 | ✗ | io_uring_scheduler::poll_one() | |
| 846 | { | ||
| 847 | ✗ | lazy_init_ring(); | |
| 848 | ✗ | if (outstanding_work_.load(std::memory_order_acquire) == 0) | |
| 849 | { | ||
| 850 | ✗ | stop(); | |
| 851 | ✗ | return 0; | |
| 852 | } | ||
| 853 | ✗ | io_uring_run_guard guard(this); | |
| 854 | ✗ | return do_one(0); | |
| 855 | ✗ | } | |
| 856 | |||
| 857 | inline std::size_t | ||
| 858 | 51144x | io_uring_scheduler::do_one(long timeout_us) | |
| 859 | { | ||
| 860 | // Leader-follower: only one thread at a time may call | ||
| 861 | // io_uring_submit_and_wait_timeout on a shared ring (liburing's | ||
| 862 | // userspace head/tail bookkeeping is not thread-safe). Other | ||
| 863 | // threads either dispatch ready ops from completed_ops_ or wait | ||
| 864 | // on cond_ until the leader returns from the kernel. | ||
| 865 | 51144x | if (stopped_.load(std::memory_order_acquire)) | |
| 866 | 224x | return 0; | |
| 867 | |||
| 868 | // submit_sqes_op only pumps the ring once per SQE batch. If the user | ||
| 869 | // keeps a non-empty completed_ops_ (e.g. timer with 0ns expiry as a | ||
| 870 | // yield primitive), the leader-phase kernel pass below never runs | ||
| 871 | // and CQEs accumulate in the ring forever — sub_request's read CQE | ||
| 872 | // never gets drained and the bench spins. submit_and_get_events | ||
| 873 | // (not plain submit) is required because IORING_SETUP_DEFER_TASKRUN | ||
| 874 | // gates task work on IORING_ENTER_GETEVENTS. | ||
| 875 | // | ||
| 876 | // Gate the kernel pump on there being io_uring-specific work. The | ||
| 877 | // check is performed under ring_mutex_ so a concurrent cross-thread | ||
| 878 | // submitter cannot prep an SQE that we then race past — both this | ||
| 879 | // path and io_uring_submit_op acquire ring_mutex_ before touching | ||
| 880 | // the ring. When all three sources are empty (no io_uring ops in | ||
| 881 | // flight needing DEFER_TASKRUN GETEVENTS, no userspace-pending | ||
| 882 | // SQEs, no kernel-ready CQEs) a kernel entry would have no work — | ||
| 883 | // saves ~8 pp of cycles on the no-I/O microbenchmark | ||
| 884 | // (io_context:single_threaded). We deliberately do NOT include | ||
| 885 | // outstanding_work_ here, because that counter mixes coroutine | ||
| 886 | // posts (in completed_ops_) with io_uring work — IOCTX has many | ||
| 887 | // coroutine posts and no io_uring work, and the kernel pump there | ||
| 888 | // is pure overhead. | ||
| 889 | 50920x | if (ring_inited_) | |
| 890 | { | ||
| 891 | 50920x | lock_type ring_lock(ring_mutex_); | |
| 892 | 50920x | if (io_uring_inflight_.load(std::memory_order_acquire) != 0 | |
| 893 | 446x | || ::io_uring_sq_ready(&ring_) != 0 | |
| 894 | 51366x | || ::io_uring_cq_ready(&ring_) != 0) | |
| 895 | { | ||
| 896 | 50623x | ::io_uring_submit_and_get_events(&ring_); | |
| 897 | 50623x | process_completions(); | |
| 898 | } | ||
| 899 | 50920x | } | |
| 900 | |||
| 901 | // Drain expired timers eagerly, for the same reason the kernel CQE | ||
| 902 | // pump runs unconditionally above: when completed_ops_ stays non- | ||
| 903 | // empty (e.g. continuous loopback I/O whose CQEs land in the top- | ||
| 904 | // of-do_one process_completions call), the leader-wait branch | ||
| 905 | // below — the only other place process_expired() runs — is never | ||
| 906 | // reached. Without this, stopper-timer-based shutdowns (and any | ||
| 907 | // other timer dependent on a busy I/O loop yielding) deadlock. | ||
| 908 | // | ||
| 909 | // empty() is a single relaxed-acquire atomic load on | ||
| 910 | // timer_service::cached_nearest_ns_ (lock-free, no clock_gettime). | ||
| 911 | // Skipping process_expired() when no timer is registered avoids the | ||
| 912 | // mutex + clock_gettime hot-path cost that dominates IOCTX cycles | ||
| 913 | // (~25 pp on io_context:single_threaded). When a timer IS | ||
| 914 | // registered the call runs exactly as before, preserving the | ||
| 915 | // deadlock fix this guard was originally written to address. | ||
| 916 | 50920x | if (!timer_svc_->empty()) | |
| 917 | 50121x | timer_svc_->process_expired(); | |
| 918 | |||
| 919 | 50920x | lock_type lock(dispatch_mutex_); | |
| 920 | for (;;) | ||
| 921 | { | ||
| 922 | 55099x | if (stopped_.load(std::memory_order_acquire)) | |
| 923 | ✗ | return 0; | |
| 924 | |||
| 925 | 55099x | if (auto* op = completed_ops_.pop()) | |
| 926 | { | ||
| 927 | // Hand off any remaining queued work to a follower so we | ||
| 928 | // dispatch in parallel. | ||
| 929 | 50912x | if (!completed_ops_.empty()) | |
| 930 | 28493x | cond_.notify_one(); | |
| 931 | 50912x | lock.unlock(); | |
| 932 | // Speculative follow-ups in the handler share this budget. | ||
| 933 | 50912x | reset_inline_budget(); | |
| 934 | 50912x | (*op)(); | |
| 935 | 50912x | work_finished(); | |
| 936 | 50912x | return 1; | |
| 937 | } | ||
| 938 | |||
| 939 | 8374x | if (outstanding_work_.load(std::memory_order_acquire) == 0) | |
| 940 | ✗ | return 0; | |
| 941 | |||
| 942 | 4187x | if (task_running_) | |
| 943 | { | ||
| 944 | // Another thread holds leadership; either return (poll) | ||
| 945 | // or wait for it to deliver work / release leadership. | ||
| 946 | ✗ | if (timeout_us == 0) | |
| 947 | ✗ | return 0; | |
| 948 | ✗ | if (timeout_us < 0) | |
| 949 | ✗ | cond_.wait(lock); | |
| 950 | else | ||
| 951 | { | ||
| 952 | ✗ | cond_.wait_for( | |
| 953 | ✗ | lock, std::chrono::microseconds(timeout_us)); | |
| 954 | // wait_one honoured its timeout; if nothing arrived, | ||
| 955 | // return rather than re-arm. | ||
| 956 | ✗ | if (completed_ops_.empty() && | |
| 957 | ✗ | !stopped_.load(std::memory_order_acquire)) | |
| 958 | ✗ | return 0; | |
| 959 | } | ||
| 960 | ✗ | continue; | |
| 961 | } | ||
| 962 | |||
| 963 | // Become the leader: run the kernel poll. We drop the lock | ||
| 964 | // for the blocking wait, then take it back to release | ||
| 965 | // leadership and wake any follower that should pick up new | ||
| 966 | // work. | ||
| 967 | 4187x | __kernel_timespec ts{}; | |
| 968 | 4187x | __kernel_timespec* ts_ptr = nullptr; | |
| 969 | 4187x | auto next_expiry = timer_svc_->nearest_expiry(); | |
| 970 | 4187x | auto now = std::chrono::steady_clock::now(); | |
| 971 | |||
| 972 | 4187x | if (timeout_us == 0) | |
| 973 | { | ||
| 974 | 7x | ts.tv_sec = 0; | |
| 975 | 7x | ts.tv_nsec = 0; | |
| 976 | 7x | ts_ptr = &ts; | |
| 977 | } | ||
| 978 | 4180x | else if (next_expiry != timer_service::time_point::max()) | |
| 979 | { | ||
| 980 | auto delta_ns = | ||
| 981 | 4176x | std::chrono::duration_cast<std::chrono::nanoseconds>( | |
| 982 | 4176x | next_expiry - now) | |
| 983 | 4176x | .count(); | |
| 984 | 4176x | if (delta_ns < 0) delta_ns = 0; | |
| 985 | 4176x | ts.tv_sec = delta_ns / 1'000'000'000; | |
| 986 | 4176x | ts.tv_nsec = delta_ns % 1'000'000'000; | |
| 987 | 4176x | ts_ptr = &ts; | |
| 988 | } | ||
| 989 | 4x | else if (timeout_us > 0) | |
| 990 | { | ||
| 991 | 1x | ts.tv_sec = timeout_us / 1'000'000; | |
| 992 | 1x | ts.tv_nsec = (timeout_us % 1'000'000) * 1000; | |
| 993 | 1x | ts_ptr = &ts; | |
| 994 | } | ||
| 995 | else | ||
| 996 | { | ||
| 997 | // run() with no pending timers: cap the kernel wait at 1s | ||
| 998 | // so the leader periodically re-checks state. Defense in | ||
| 999 | // depth against a lost wakeup (e.g. multishot poll on the | ||
| 1000 | // wakeup eventfd terminates and the re-arm SQE doesn't | ||
| 1001 | // reach the kernel in time). Worst case: one extra | ||
| 1002 | // wake-up per io_context per second when truly idle. | ||
| 1003 | 3x | ts.tv_sec = 1; | |
| 1004 | 3x | ts.tv_nsec = 0; | |
| 1005 | 3x | ts_ptr = &ts; | |
| 1006 | } | ||
| 1007 | |||
| 1008 | 4187x | task_running_ = true; | |
| 1009 | 4187x | lock.unlock(); | |
| 1010 | |||
| 1011 | // Three-phase kernel wait, matching Boost.Asio's | ||
| 1012 | // io_uring_service::run pattern. ring_mutex_ is held briefly | ||
| 1013 | // to push pending SQEs and to drain CQEs, but NOT during | ||
| 1014 | // the blocking io_uring_wait_cqe_timeout. Cross-thread | ||
| 1015 | // submitters (io_uring_submit_op, cancel paths) can take | ||
| 1016 | // ring_mutex_ during the wait and prep new SQEs without | ||
| 1017 | // blocking on the leader; their wake eventfd write fires the | ||
| 1018 | // multishot poll and returns the leader from wait_cqe_timeout | ||
| 1019 | // promptly. | ||
| 1020 | // | ||
| 1021 | // Phase 1 — submit any pending SQEs to the kernel. | ||
| 1022 | { | ||
| 1023 | 4187x | lock_type ring_lock(ring_mutex_); | |
| 1024 | 4187x | ::io_uring_submit(&ring_); | |
| 1025 | 4187x | } | |
| 1026 | |||
| 1027 | // Phase 2 — wait for at least one CQE without holding the | ||
| 1028 | // mutex. Multi-thread `io_uring_enter` is permitted without | ||
| 1029 | // SINGLE_ISSUER. wait_cqe_timeout only peeks the CQ ring; | ||
| 1030 | // head advancement happens under the mutex in | ||
| 1031 | // process_completions below. | ||
| 1032 | 4187x | ::io_uring_cqe* cqe = nullptr; | |
| 1033 | 4187x | int rc = ::io_uring_wait_cqe_timeout(&ring_, &cqe, ts_ptr); | |
| 1034 | |||
| 1035 | // Phase 3 — drain CQEs under the mutex. | ||
| 1036 | { | ||
| 1037 | 4187x | lock_type ring_lock(ring_mutex_); | |
| 1038 | 4187x | if (rc == 0 || rc == -ETIME || rc == -EINTR) | |
| 1039 | 4187x | process_completions(); | |
| 1040 | 4187x | } | |
| 1041 | |||
| 1042 | 4187x | if (rc < 0 && rc != -ETIME && rc != -EINTR) | |
| 1043 | { | ||
| 1044 | // Restore state before propagating so followers don't | ||
| 1045 | // deadlock waiting for a leader that never returns. | ||
| 1046 | ✗ | lock.lock(); | |
| 1047 | ✗ | task_running_ = false; | |
| 1048 | ✗ | cond_.notify_all(); | |
| 1049 | ✗ | detail::throw_system_error( | |
| 1050 | ✗ | make_err(-rc), "io_uring_wait_cqe_timeout"); | |
| 1051 | } | ||
| 1052 | |||
| 1053 | 4187x | if (!timer_svc_->empty()) | |
| 1054 | 4183x | timer_svc_->process_expired(); | |
| 1055 | |||
| 1056 | 4187x | lock.lock(); | |
| 1057 | 4187x | task_running_ = false; | |
| 1058 | 4187x | cond_.notify_all(); | |
| 1059 | |||
| 1060 | // For poll() / wait_one() we honour the timeout: one kernel | ||
| 1061 | // pass is the contract. If still nothing dispatchable, exit. | ||
| 1062 | // For run() (timeout < 0) keep looping until work arrives or | ||
| 1063 | // someone calls stop(). | ||
| 1064 | 4187x | if (timeout_us >= 0 && completed_ops_.empty()) | |
| 1065 | 8x | return 0; | |
| 1066 | 4179x | } | |
| 1067 | 50920x | } | |
| 1068 | |||
| 1069 | inline void | ||
| 1070 | 54810x | io_uring_scheduler::process_completions() | |
| 1071 | { | ||
| 1072 | unsigned head; | ||
| 1073 | ::io_uring_cqe* cqe; | ||
| 1074 | 54810x | unsigned consumed = 0; | |
| 1075 | |||
| 1076 | // Collect completed I/O ops locally; splice into completed_ops_ | ||
| 1077 | // after the loop so do_one dispatches them one at a time. | ||
| 1078 | 54810x | op_queue local_ops; | |
| 1079 | |||
| 1080 | 54810x | std::int64_t inflight_dec = 0; | |
| 1081 | 87298x | io_uring_for_each_cqe(&ring_, head, cqe) | |
| 1082 | { | ||
| 1083 | 32488x | void* ud = io_uring_cqe_get_data(cqe); | |
| 1084 | 32488x | if (ud == nullptr) | |
| 1085 | { | ||
| 1086 | // Wakeup eventfd CQE: drain the eventfd byte. Not counted | ||
| 1087 | // by io_uring_inflight_; we never incremented for the | ||
| 1088 | // wakeup multishot SQE (its progress doesn't depend on | ||
| 1089 | // userspace getevents). | ||
| 1090 | 16351x | drain_wakeup_eventfd(); | |
| 1091 | // If multishot terminated (kernel dropped under memory | ||
| 1092 | // pressure or similar), re-arm. Each CQE except the last | ||
| 1093 | // sets IORING_CQE_F_MORE. | ||
| 1094 | 16351x | if ((cqe->flags & IORING_CQE_F_MORE) == 0) | |
| 1095 | { | ||
| 1096 | ✗ | ::io_uring_sqe* re = ::io_uring_get_sqe(&ring_); | |
| 1097 | ✗ | if (!re) | |
| 1098 | { | ||
| 1099 | ✗ | ::io_uring_submit(&ring_); | |
| 1100 | ✗ | re = ::io_uring_get_sqe(&ring_); | |
| 1101 | } | ||
| 1102 | ✗ | if (re) | |
| 1103 | { | ||
| 1104 | ✗ | ::io_uring_prep_poll_multishot( | |
| 1105 | re, wakeup_eventfd_, POLLIN); | ||
| 1106 | ✗ | ::io_uring_sqe_set_data(re, nullptr); | |
| 1107 | } | ||
| 1108 | } | ||
| 1109 | } | ||
| 1110 | 16137x | else if (ud == &cancel_sentinel_) | |
| 1111 | { | ||
| 1112 | // CQE for an ASYNC_CANCEL op — ignore; the actual op's | ||
| 1113 | // CQE arrives separately and is dispatched via cqe_func. | ||
| 1114 | // Cancels are one-shot, no F_MORE, decrement inflight. | ||
| 1115 | 7971x | ++inflight_dec; | |
| 1116 | } | ||
| 1117 | else | ||
| 1118 | { | ||
| 1119 | 8166x | auto* iop = static_cast<io_uring_op*>(ud); | |
| 1120 | 8166x | (*iop->cqe_func)(iop, cqe->res, cqe->flags, local_ops); | |
| 1121 | // Decrement inflight on the terminal CQE only — multishot | ||
| 1122 | // ops (acceptor) hold the SQE alive across F_MORE CQEs and | ||
| 1123 | // free it only when F_MORE is cleared. | ||
| 1124 | 8166x | if ((cqe->flags & IORING_CQE_F_MORE) == 0) | |
| 1125 | 4212x | ++inflight_dec; | |
| 1126 | } | ||
| 1127 | 32488x | ++consumed; | |
| 1128 | } | ||
| 1129 | 54810x | if (inflight_dec) | |
| 1130 | 11987x | io_uring_inflight_.fetch_sub( | |
| 1131 | inflight_dec, std::memory_order_acq_rel); | ||
| 1132 | |||
| 1133 | 54810x | if (consumed) | |
| 1134 | 16266x | io_uring_cq_advance(&ring_, consumed); | |
| 1135 | |||
| 1136 | // Caller holds ring_mutex_. Take dispatch_mutex_ briefly to | ||
| 1137 | // splice locally-collected ops onto the global queue (lock order | ||
| 1138 | // ring_mutex_ -> dispatch_mutex_). | ||
| 1139 | 54810x | if (!local_ops.empty()) | |
| 1140 | { | ||
| 1141 | 4205x | lock_type lock(dispatch_mutex_); | |
| 1142 | 4205x | completed_ops_.splice(local_ops); | |
| 1143 | // Wake any follower waiting on cond_; it'll pop and dispatch. | ||
| 1144 | 4205x | cond_.notify_one(); | |
| 1145 | 4205x | } | |
| 1146 | 54810x | } | |
| 1147 | |||
| 1148 | inline void | ||
| 1149 | 10x | io_uring_scheduler::submit_sqes_op::do_handler( | |
| 1150 | void* owner, scheduler_op* base, | ||
| 1151 | std::uint32_t /*bytes*/, std::uint32_t /*error*/) noexcept | ||
| 1152 | { | ||
| 1153 | 10x | if (owner == nullptr) | |
| 1154 | 10x | return; // shutdown drain — nothing to do; SQE storage is | |
| 1155 | // kernel-mapped and discarded by io_uring_queue_exit. | ||
| 1156 | |||
| 1157 | ✗ | auto* self = static_cast<submit_sqes_op*>(base); | |
| 1158 | ✗ | auto* sched = self->sched_; | |
| 1159 | |||
| 1160 | ✗ | io_uring_scheduler::lock_type ring_lock(sched->ring_mutex_); | |
| 1161 | ✗ | sched->submit_op_posted_ = false; | |
| 1162 | ✗ | ::io_uring_submit_and_get_events(&sched->ring_); | |
| 1163 | ✗ | sched->process_completions(); | |
| 1164 | ✗ | } | |
| 1165 | |||
| 1166 | inline void | ||
| 1167 | 101x | io_uring_scheduler::submit_cancel_by_user_data(io_uring_op* target) noexcept | |
| 1168 | { | ||
| 1169 | 101x | lazy_init_ring(); | |
| 1170 | // Wake the leader (if any) so its submit_and_wait_timeout returns | ||
| 1171 | // and releases ring_mutex_; otherwise we'd block here until the | ||
| 1172 | // next CQE arrives organically. Cancellation is best-effort if | ||
| 1173 | // the SQ stays full after one flush — the op completes on its | ||
| 1174 | // own and reports cancelled via the in-flight `cancelled` flag. | ||
| 1175 | 101x | interrupt_reactor(); | |
| 1176 | 101x | lock_type lock(ring_mutex_); | |
| 1177 | 101x | io_uring_sqe* sqe = io_uring_get_sqe(&ring_); | |
| 1178 | 101x | if (!sqe) | |
| 1179 | { | ||
| 1180 | ✗ | io_uring_submit(&ring_); | |
| 1181 | ✗ | sqe = io_uring_get_sqe(&ring_); | |
| 1182 | } | ||
| 1183 | 101x | if (!sqe) | |
| 1184 | ✗ | return; | |
| 1185 | |||
| 1186 | 101x | io_uring_prep_cancel(sqe, target, 0); | |
| 1187 | 101x | io_uring_sqe_set_data(sqe, &cancel_sentinel_); | |
| 1188 | 101x | inflight_inc(); | |
| 1189 | 101x | } | |
| 1190 | |||
| 1191 | inline void | ||
| 1192 | 78x | io_uring_scheduler::submit_cancel_by_fd(int fd) noexcept | |
| 1193 | { | ||
| 1194 | 78x | lazy_init_ring(); | |
| 1195 | 78x | interrupt_reactor(); | |
| 1196 | 78x | lock_type lock(ring_mutex_); | |
| 1197 | 78x | io_uring_sqe* sqe = io_uring_get_sqe(&ring_); | |
| 1198 | 78x | if (!sqe) | |
| 1199 | { | ||
| 1200 | ✗ | io_uring_submit(&ring_); | |
| 1201 | ✗ | sqe = io_uring_get_sqe(&ring_); | |
| 1202 | } | ||
| 1203 | 78x | if (!sqe) | |
| 1204 | ✗ | return; | |
| 1205 | |||
| 1206 | 78x | io_uring_prep_cancel_fd(sqe, fd, IORING_ASYNC_CANCEL_ALL); | |
| 1207 | 78x | io_uring_sqe_set_data(sqe, &cancel_sentinel_); | |
| 1208 | 78x | inflight_inc(); | |
| 1209 | 78x | } | |
| 1210 | |||
| 1211 | inline void | ||
| 1212 | 101x | io_uring_op::request_cancel() noexcept | |
| 1213 | { | ||
| 1214 | 101x | cancelled.store(true, std::memory_order_release); | |
| 1215 | // Skip the cancel SQE if we never linked an SQE to this op — the | ||
| 1216 | // bypass path in the caller will see cancelled=true and complete | ||
| 1217 | // synchronously without a kernel round-trip. | ||
| 1218 | 101x | if (sched_ && sqe_set.load(std::memory_order_acquire)) | |
| 1219 | 101x | sched_->submit_cancel_by_user_data(this); | |
| 1220 | 101x | } | |
| 1221 | |||
| 1222 | inline void | ||
| 1223 | 8176x | io_uring_scheduler::cancel_and_flush(int fd) noexcept | |
| 1224 | { | ||
| 1225 | 8176x | lazy_init_ring(); | |
| 1226 | 8176x | interrupt_reactor(); | |
| 1227 | 8176x | lock_type lock(ring_mutex_); | |
| 1228 | 8176x | io_uring_sqe* sqe = io_uring_get_sqe(&ring_); | |
| 1229 | 8176x | if (!sqe) | |
| 1230 | { | ||
| 1231 | ✗ | io_uring_submit(&ring_); | |
| 1232 | ✗ | sqe = io_uring_get_sqe(&ring_); | |
| 1233 | } | ||
| 1234 | 8176x | if (sqe) | |
| 1235 | { | ||
| 1236 | 8176x | io_uring_prep_cancel_fd(sqe, fd, IORING_ASYNC_CANCEL_ALL); | |
| 1237 | 8176x | io_uring_sqe_set_data(sqe, &cancel_sentinel_); | |
| 1238 | 8176x | inflight_inc(); | |
| 1239 | } | ||
| 1240 | // Flush while fd is still open so the kernel resolves the file | ||
| 1241 | // from the fd number before the caller closes and recycles it. | ||
| 1242 | 8176x | io_uring_submit(&ring_); | |
| 1243 | 8176x | } | |
| 1244 | |||
| 1245 | inline void | ||
| 1246 | 85x | io_uring_scheduler::drain_cqes_for(io_uring_op* target) noexcept | |
| 1247 | { | ||
| 1248 | 85x | lazy_init_ring(); | |
| 1249 | // Submit a cancel by user_data so the kernel returns CQEs for | ||
| 1250 | // the target promptly, then iterate the CQ ring and consume | ||
| 1251 | // every CQE that matches `target`. ring_mutex_ serializes against | ||
| 1252 | // the leader's kernel wait and any concurrent cancel path; the | ||
| 1253 | // interrupt_reactor() ensures the leader returns promptly so we | ||
| 1254 | // can take the mutex. | ||
| 1255 | 85x | interrupt_reactor(); | |
| 1256 | { | ||
| 1257 | 85x | lock_type lock(ring_mutex_); | |
| 1258 | 85x | if (auto* sqe = io_uring_get_sqe(&ring_)) | |
| 1259 | { | ||
| 1260 | 85x | io_uring_prep_cancel(sqe, target, 0); | |
| 1261 | 85x | io_uring_sqe_set_data(sqe, &cancel_sentinel_); | |
| 1262 | 85x | inflight_inc(); | |
| 1263 | } | ||
| 1264 | 85x | io_uring_submit(&ring_); | |
| 1265 | 85x | } | |
| 1266 | |||
| 1267 | // Loop a few rounds: cancel SQE submission, then drain CQEs. | ||
| 1268 | // Bounded loop avoids stalls if the kernel never returns a | ||
| 1269 | // cancel completion — best-effort. | ||
| 1270 | 89x | for (int rounds = 0; rounds < drain_cqes_max_rounds; ++rounds) | |
| 1271 | { | ||
| 1272 | 89x | lock_type lock(ring_mutex_); | |
| 1273 | |||
| 1274 | unsigned head; | ||
| 1275 | ::io_uring_cqe* cqe; | ||
| 1276 | 89x | unsigned consumed = 0; | |
| 1277 | 89x | bool saw_target = false; | |
| 1278 | |||
| 1279 | 697x | io_uring_for_each_cqe(&ring_, head, cqe) | |
| 1280 | { | ||
| 1281 | 608x | void* ud = io_uring_cqe_get_data(cqe); | |
| 1282 | 608x | if (ud == target) | |
| 1283 | { | ||
| 1284 | 81x | saw_target = true; | |
| 1285 | // Don't dispatch — caller is destructing target; | ||
| 1286 | // just consume so the CQE doesn't dangle. | ||
| 1287 | } | ||
| 1288 | // Other CQEs are intentionally NOT dispatched here. They | ||
| 1289 | // may belong to ops freed by sibling teardowns (other | ||
| 1290 | // acceptors / sockets), and dispatching would UAF. The | ||
| 1291 | // next normal run-loop iteration will handle them; the | ||
| 1292 | // io_context's destructor sequence runs services' | ||
| 1293 | // shutdowns before ~scheduler so any still-live ops get | ||
| 1294 | // a chance to drain through their own paths first. | ||
| 1295 | 608x | ++consumed; | |
| 1296 | } | ||
| 1297 | 89x | if (consumed) | |
| 1298 | { | ||
| 1299 | 85x | io_uring_cq_advance(&ring_, consumed); | |
| 1300 | 85x | if (saw_target) | |
| 1301 | 81x | break; | |
| 1302 | 4x | continue; | |
| 1303 | } | ||
| 1304 | |||
| 1305 | // Nothing in the CQ — kick the kernel briefly. Hold | ||
| 1306 | // ring_mutex_ across the wait so we don't race with the | ||
| 1307 | // run-loop leader. | ||
| 1308 | 4x | __kernel_timespec ts{ | |
| 1309 | 0, static_cast<long long>(drain_cqes_kick_ns)}; | ||
| 1310 | 4x | ::io_uring_cqe* one = nullptr; | |
| 1311 | 4x | int rc = ::io_uring_submit_and_wait_timeout( | |
| 1312 | &ring_, &one, 1, &ts, nullptr); | ||
| 1313 | 4x | if (rc < 0 && rc != -ETIME && rc != -EINTR) | |
| 1314 | ✗ | break; | |
| 1315 | 4x | if (rc == -ETIME) | |
| 1316 | 4x | break; | |
| 1317 | 89x | } | |
| 1318 | 85x | } | |
| 1319 | |||
| 1320 | } // namespace boost::corosio::detail | ||
| 1321 | |||
| 1322 | #endif // BOOST_COROSIO_HAS_IO_URING | ||
| 1323 | |||
| 1324 | #endif // BOOST_COROSIO_NATIVE_DETAIL_IO_URING_IO_URING_SCHEDULER_HPP | ||
| 1325 |