include/boost/corosio/native/detail/reactor/reactor_scheduler.hpp

84.3% Lines (323/383) 89.8% List of functions (44/49) 62.3% Branches (137/220)
reactor_scheduler.hpp
f(x) Functions (49)
Function Calls Lines Branches Blocks
boost::corosio::detail::reactor_find_context(boost::corosio::detail::reactor_scheduler const*) :78 3984299x 85.7% 75.0% 75.0% boost::corosio::detail::reactor_flush_private_work(boost::corosio::detail::reactor_scheduler_context*, std::__1::atomic<long long>&) :90 0 0.0% 0.0% 0.0% boost::corosio::detail::reactor_drain_private_queue(boost::corosio::detail::reactor_scheduler_context*, std::__1::atomic<long long>&, boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&) :107 15x 57.1% 50.0% 80.0% boost::corosio::detail::reactor_scheduler::~reactor_scheduler() :140 1321x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::inline_budget_initial() const :252 3336x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::is_single_threaded() const :258 159x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::configure_single_threaded(bool) :269 10x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::reactor_scheduler() :280 1321x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::task_op::task_op() :323 2642x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::task_op::~task_op() :323 2642x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::task_op::operator()() :325 0 0.0% 0.0% boost::corosio::detail::reactor_scheduler::task_op::destroy() :326 0 0.0% 0.0% boost::corosio::detail::reactor_thread_context_guard::reactor_thread_context_guard(boost::corosio::detail::reactor_scheduler const*) :372 6672x 100.0% 50.0% 100.0% boost::corosio::detail::reactor_thread_context_guard::~reactor_thread_context_guard() :380 6672x 71.4% 16.7% 100.0% boost::corosio::detail::reactor_scheduler_context::reactor_scheduler_context(boost::corosio::detail::reactor_scheduler const*, boost::corosio::detail::reactor_scheduler_context*) :392 6672x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::configure_reactor(unsigned int, unsigned int, unsigned int, unsigned int) :406 16x 93.3% 66.7% 62.0% boost::corosio::detail::reactor_scheduler::reset_inline_budget() const :433 465432x 50.0% 21.4% 33.0% boost::corosio::detail::reactor_scheduler::try_consume_inline_budget() const :464 1775170x 90.0% 66.7% 87.0% boost::corosio::detail::reactor_scheduler::post(std::__1::coroutine_handle<void>) const :480 9459x 100.0% 53.0% boost::corosio::detail::reactor_scheduler::post(std::__1::coroutine_handle<void>) const::post_handler::post_handler(std::__1::coroutine_handle<void>) :486 18910x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::post(std::__1::coroutine_handle<void>) const::post_handler::~post_handler() :487 18907x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::post(std::__1::coroutine_handle<void>) const::post_handler::operator()() :489 9442x 100.0% 100.0% 100.0% boost::corosio::detail::reactor_scheduler::post(std::__1::coroutine_handle<void>) const::post_handler::destroy() :498 12x 100.0% 62.5% 100.0% boost::corosio::detail::reactor_scheduler::post(boost::corosio::detail::scheduler_op*) const :523 345605x 100.0% 75.0% 71.0% boost::corosio::detail::reactor_scheduler::running_in_this_thread() const :540 7069x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::stop() :546 3262x 100.0% 66.7% 71.0% boost::corosio::detail::reactor_scheduler::stopped() const :558 88x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::restart() :564 2538x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::run() :570 3260x 100.0% 92.9% 78.0% boost::corosio::detail::reactor_scheduler::run_one() :595 16x 75.0% 50.0% 50.0% boost::corosio::detail::reactor_scheduler::wait_one(long) :609 62x 100.0% 66.7% 60.0% boost::corosio::detail::reactor_scheduler::poll() :623 28x 100.0% 71.4% 78.0% boost::corosio::detail::reactor_scheduler::poll_one() :648 8x 100.0% 66.7% 60.0% boost::corosio::detail::reactor_scheduler::work_started() :662 152529x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::work_finished() :668 169583x 100.0% 75.0% 80.0% boost::corosio::detail::reactor_scheduler::compensating_work_started() const :675 1381563x 100.0% 50.0% 100.0% boost::corosio::detail::reactor_scheduler::drain_thread_queue(boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&, long long) const :683 0 0.0% 0.0% 0.0% boost::corosio::detail::reactor_scheduler::post_deferred_completions(boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&) const :696 136243x 70.0% 50.0% 55.0% boost::corosio::detail::reactor_scheduler::shutdown_drain() :713 1321x 100.0% 66.7% 83.0% boost::corosio::detail::reactor_scheduler::signal_all(boost::corosio::detail::conditionally_enabled_mutex::scoped_lock&) const :730 4565x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::maybe_unlock_and_signal_one(boost::corosio::detail::conditionally_enabled_mutex::scoped_lock&) const :737 9828x 62.5% 50.0% 75.0% boost::corosio::detail::reactor_scheduler::unlock_and_signal_one(boost::corosio::detail::conditionally_enabled_mutex::scoped_lock&) const :751 1941111x 85.7% 50.0% 66.0% boost::corosio::detail::reactor_scheduler::clear_signal() const :763 4x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::wait_for_signal(boost::corosio::detail::conditionally_enabled_mutex::scoped_lock&) const :769 4x 100.0% 100.0% 100.0% boost::corosio::detail::reactor_scheduler::wait_for_signal_for(boost::corosio::detail::conditionally_enabled_mutex::scoped_lock&, long) const :781 0 0.0% 0.0% 0.0% boost::corosio::detail::reactor_scheduler::wake_one_thread_and_unlock(boost::corosio::detail::conditionally_enabled_mutex::scoped_lock&) const :793 9828x 90.0% 83.3% 85.0% boost::corosio::detail::reactor_scheduler::work_cleanup::~work_cleanup() :811 3745656x 94.1% 80.0% 100.0% boost::corosio::detail::reactor_scheduler::task_cleanup::~task_cleanup() :835 2849978x 86.7% 60.0% 100.0% boost::corosio::detail::reactor_scheduler::do_one(boost::corosio::detail::conditionally_enabled_mutex::scoped_lock&, long, boost::corosio::detail::reactor_scheduler_context*) :856 1876127x 89.6% 76.3% 78.0%
Line Branch TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SCHEDULER_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SCHEDULER_HPP
12
13 #include <boost/corosio/detail/config.hpp>
14 #include <boost/capy/ex/execution_context.hpp>
15
16 #include <boost/corosio/detail/scheduler.hpp>
17 #include <boost/corosio/detail/scheduler_op.hpp>
18 #include <boost/corosio/detail/thread_local_ptr.hpp>
19
20 #include <atomic>
21 #include <chrono>
22 #include <coroutine>
23 #include <cstddef>
24 #include <cstdint>
25 #include <limits>
26 #include <memory>
27 #include <stdexcept>
28
29 #include <boost/corosio/detail/conditionally_enabled_mutex.hpp>
30 #include <boost/corosio/detail/conditionally_enabled_event.hpp>
31
32 namespace boost::corosio::detail {
33
34 // Forward declarations
35 class reactor_scheduler;
36 class timer_service;
37
38 /** Per-thread state for a reactor scheduler.
39
40 Each thread running a scheduler's event loop has one of these
41 on a thread-local stack. It holds a private work queue and
42 inline completion budget for speculative I/O fast paths.
43 */
44 struct BOOST_COROSIO_SYMBOL_VISIBLE reactor_scheduler_context
45 {
46 /// Scheduler this context belongs to.
47 reactor_scheduler const* key;
48
49 /// Next context frame on this thread's stack.
50 reactor_scheduler_context* next;
51
52 /// Private work queue for reduced contention.
53 op_queue private_queue;
54
55 /// Unflushed work count for the private queue.
56 std::int64_t private_outstanding_work;
57
58 /// Remaining inline completions allowed this cycle.
59 int inline_budget;
60
61 /// Maximum inline budget (adaptive, 2-16).
62 int inline_budget_max;
63
64 /// True if no other thread absorbed queued work last cycle.
65 bool unassisted;
66
67 /// Construct a context frame linked to @a n.
68 reactor_scheduler_context(
69 reactor_scheduler const* k,
70 reactor_scheduler_context* n);
71 };
72
73 /// Thread-local context stack for reactor schedulers.
74 inline thread_local_ptr<reactor_scheduler_context> reactor_context_stack;
75
76 /// Find the context frame for a scheduler on this thread.
77 inline reactor_scheduler_context*
78 3984299x reactor_find_context(reactor_scheduler const* self) noexcept
79 {
80
2/2
✓ Branch 0 taken 3967892 times.
✓ Branch 1 taken 16407 times.
3984299x for (auto* c = reactor_context_stack.get(); c != nullptr; c = c->next)
81 {
82
1/2
✓ Branch 0 taken 3967892 times.
✗ Branch 1 not taken.
3967892x if (c->key == self)
83 3967892x return c;
84 }
85 16407x return nullptr;
86 3984299x }
87
88 /// Flush private work count to global counter.
89 inline void
90 reactor_flush_private_work(
91 reactor_scheduler_context* ctx,
92 std::atomic<std::int64_t>& outstanding_work) noexcept
93 {
94 if (ctx && ctx->private_outstanding_work > 0)
95 {
96 outstanding_work.fetch_add(
97 ctx->private_outstanding_work, std::memory_order_relaxed);
98 ctx->private_outstanding_work = 0;
99 }
100 }
101
102 /** Drain private queue to global queue, flushing work count first.
103
104 @return True if any ops were drained.
105 */
106 inline bool
107 15x reactor_drain_private_queue(
108 reactor_scheduler_context* ctx,
109 std::atomic<std::int64_t>& outstanding_work,
110 op_queue& completed_ops) noexcept
111 {
112
2/4
✓ Branch 0 taken 15 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 15 times.
✗ Branch 3 not taken.
15x if (!ctx || ctx->private_queue.empty())
113 15x return false;
114
115 reactor_flush_private_work(ctx, outstanding_work);
116 completed_ops.splice(ctx->private_queue);
117 return true;
118 15x }
119
120 /** Non-template base for reactor-backed scheduler implementations.
121
122 Provides the complete threading model shared by epoll, kqueue,
123 and select schedulers: signal state machine, inline completion
124 budget, work counting, run/poll methods, and the do_one event
125 loop.
126
127 Derived classes provide platform-specific hooks by overriding:
128 - `run_task(lock, ctx)` to run the reactor poll
129 - `interrupt_reactor()` to wake a blocked reactor
130
131 De-templated from the original CRTP design to eliminate
132 duplicate instantiations when multiple backends are compiled
133 into the same binary. Virtual dispatch for run_task (called
134 once per reactor cycle, before a blocking syscall) has
135 negligible overhead.
136
137 @par Thread Safety
138 All public member functions are thread-safe.
139 */
140 class reactor_scheduler
141 : public scheduler
142 , public capy::execution_context::service
143 {
144 public:
145 using key_type = scheduler;
146 using context_type = reactor_scheduler_context;
147 using mutex_type = conditionally_enabled_mutex;
148 using lock_type = mutex_type::scoped_lock;
149 using event_type = conditionally_enabled_event;
150
151 /// Post a coroutine for deferred execution.
152 void post(std::coroutine_handle<> h) const override;
153
154 /// Post a scheduler operation for deferred execution.
155 void post(scheduler_op* h) const override;
156
157 /// Return true if called from a thread running this scheduler.
158 bool running_in_this_thread() const noexcept override;
159
160 /// Request the scheduler to stop dispatching handlers.
161 void stop() override;
162
163 /// Return true if the scheduler has been stopped.
164 bool stopped() const noexcept override;
165
166 /// Reset the stopped state so `run()` can resume.
167 void restart() override;
168
169 /// Run the event loop until no work remains.
170 std::size_t run() override;
171
172 /// Run until one handler completes or no work remains.
173 std::size_t run_one() override;
174
175 /// Run until one handler completes or @a usec elapses.
176 std::size_t wait_one(long usec) override;
177
178 /// Run ready handlers without blocking.
179 std::size_t poll() override;
180
181 /// Run at most one ready handler without blocking.
182 std::size_t poll_one() override;
183
184 /// Increment the outstanding work count.
185 void work_started() noexcept override;
186
187 /// Decrement the outstanding work count, stopping on zero.
188 void work_finished() noexcept override;
189
190 /** Reset the thread's inline completion budget.
191
192 Called at the start of each posted completion handler to
193 grant a fresh budget for speculative inline completions.
194 */
195 void reset_inline_budget() const noexcept;
196
197 /** Consume one unit of inline budget if available.
198
199 @return True if budget was available and consumed.
200 */
201 bool try_consume_inline_budget() const noexcept;
202
203 /** Offset a forthcoming work_finished from work_cleanup.
204
205 Called by descriptor_state when all I/O returned EAGAIN and
206 no handler will be executed. Must be called from a scheduler
207 thread.
208 */
209 void compensating_work_started() const noexcept;
210
211 /** Drain work from thread context's private queue to global queue.
212
213 Flushes private work count to the global counter, then
214 transfers the queue under mutex protection.
215
216 @param queue The private queue to drain.
217 @param count Private work count to flush before draining.
218 */
219 void drain_thread_queue(op_queue& queue, std::int64_t count) const;
220
221 /** Post completed operations for deferred invocation.
222
223 If called from a thread running this scheduler, operations
224 go to the thread's private queue (fast path). Otherwise,
225 operations are added to the global queue under mutex and a
226 waiter is signaled.
227
228 @par Preconditions
229 work_started() must have been called for each operation.
230
231 @param ops Queue of operations to post.
232 */
233 void post_deferred_completions(op_queue& ops) const;
234
235 /** Apply runtime configuration to the scheduler.
236
237 Called by `io_context` after construction. Values that do
238 not apply to this backend are silently ignored.
239
240 @param max_events Event buffer size for epoll/kqueue.
241 @param budget_init Starting inline completion budget.
242 @param budget_max Hard ceiling on adaptive budget ramp-up.
243 @param unassisted Budget when single-threaded.
244 */
245 virtual void configure_reactor(
246 unsigned max_events,
247 unsigned budget_init,
248 unsigned budget_max,
249 unsigned unassisted);
250
251 /// Return the configured initial inline budget.
252 3336x unsigned inline_budget_initial() const noexcept
253 {
254 3336x return inline_budget_initial_;
255 }
256
257 /// Return true if single-threaded (lockless) mode is active.
258 159x bool is_single_threaded() const noexcept override
259 {
260 159x return single_threaded_;
261 }
262
263 /** Enable or disable single-threaded (lockless) mode.
264
265 When enabled, all scheduler mutex and condition variable
266 operations become no-ops. Cross-thread post() is
267 undefined behavior.
268 */
269 10x void configure_single_threaded(bool v) noexcept override
270 {
271 10x single_threaded_ = v;
272 10x mutex_.set_enabled(!v);
273 10x cond_.set_enabled(!v);
274 10x }
275
276 protected:
277 1321x timer_service* timer_svc_ = nullptr;
278 1321x bool single_threaded_ = false;
279
280 3963x reactor_scheduler() = default;
281
282 /** Drain completed_ops during shutdown.
283
284 Pops all operations from the global queue and destroys them,
285 skipping the task sentinel. Signals all waiting threads.
286 Derived classes call this from their shutdown() override
287 before performing platform-specific cleanup.
288 */
289 void shutdown_drain();
290
291 /// RAII guard that re-inserts the task sentinel after `run_task`.
292 struct task_cleanup
293 {
294 reactor_scheduler const* sched;
295 lock_type* lock;
296 context_type* ctx;
297 ~task_cleanup();
298 };
299
300 1321x mutable mutex_type mutex_{true};
301 1321x mutable event_type cond_{true};
302 mutable op_queue completed_ops_;
303 1321x mutable std::atomic<std::int64_t> outstanding_work_{0};
304 1321x std::atomic<bool> stopped_{false};
305 1321x mutable std::atomic<bool> task_running_{false};
306 1321x mutable bool task_interrupted_ = false;
307
308 // Runtime-configurable reactor tuning parameters.
309 // Defaults match the library's built-in values.
310 1321x unsigned max_events_per_poll_ = 128;
311 1321x unsigned inline_budget_initial_ = 2;
312 1321x unsigned inline_budget_max_ = 16;
313 1321x unsigned unassisted_budget_ = 4;
314
315 /// Bit 0 of `state_`: set when the condvar should be signaled.
316 static constexpr std::size_t signaled_bit = 1;
317
318 /// Increment per waiting thread in `state_`.
319 static constexpr std::size_t waiter_increment = 2;
320 1321x mutable std::size_t state_ = 0;
321
322 /// Sentinel op that triggers a reactor poll when dequeued.
323 struct task_op final : scheduler_op
324 {
325 void operator()() override {}
326 void destroy() override {}
327 };
328 task_op task_op_;
329
330 /// Run the platform-specific reactor poll.
331 virtual void
332 run_task(lock_type& lock, context_type* ctx,
333 long timeout_us) = 0;
334
335 /// Wake a blocked reactor (e.g. write to eventfd or pipe).
336 virtual void interrupt_reactor() const = 0;
337
338 private:
339 struct work_cleanup
340 {
341 reactor_scheduler* sched;
342 lock_type* lock;
343 context_type* ctx;
344 ~work_cleanup();
345 };
346
347 std::size_t do_one(
348 lock_type& lock, long timeout_us, context_type* ctx);
349
350 void signal_all(lock_type& lock) const;
351 bool maybe_unlock_and_signal_one(lock_type& lock) const;
352 bool unlock_and_signal_one(lock_type& lock) const;
353 void clear_signal() const;
354 void wait_for_signal(lock_type& lock) const;
355 void wait_for_signal_for(
356 lock_type& lock, long timeout_us) const;
357 void wake_one_thread_and_unlock(lock_type& lock) const;
358 };
359
360 /** RAII guard that pushes/pops a scheduler context frame.
361
362 On construction, pushes a new context frame onto the
363 thread-local stack. On destruction, drains any remaining
364 private queue items to the global queue and pops the frame.
365 */
366 struct reactor_thread_context_guard
367 {
368 /// The context frame managed by this guard.
369 reactor_scheduler_context frame_;
370
371 /// Construct the guard, pushing a frame for @a sched.
372 6672x explicit reactor_thread_context_guard(
373 reactor_scheduler const* sched) noexcept
374
1/2
✓ Branch 0 taken 3336 times.
✗ Branch 1 not taken.
3336x : frame_(sched, reactor_context_stack.get())
375 3336x {
376 3336x reactor_context_stack.set(&frame_);
377 6672x }
378
379 /// Destroy the guard, draining private work and popping the frame.
380 6672x ~reactor_thread_context_guard() noexcept
381 3336x {
382
1/2
✓ Branch 0 taken 3336 times.
✗ Branch 1 not taken.
3336x if (!frame_.private_queue.empty())
383 frame_.key->drain_thread_queue(
384 frame_.private_queue, frame_.private_outstanding_work);
385 3336x reactor_context_stack.set(frame_.next);
386 6672x }
387 };
388
389 // ---- Inline implementations ------------------------------------------------
390
391 inline
392 10008x reactor_scheduler_context::reactor_scheduler_context(
393 reactor_scheduler const* k,
394 reactor_scheduler_context* n)
395 3336x : key(k)
396 3336x , next(n)
397 3336x , private_outstanding_work(0)
398 3336x , inline_budget(0)
399 6672x , inline_budget_max(
400 3336x static_cast<int>(k->inline_budget_initial()))
401 3336x , unassisted(false)
402 3336x {
403 6672x }
404
405 inline void
406 16x reactor_scheduler::configure_reactor(
407 unsigned max_events,
408 unsigned budget_init,
409 unsigned budget_max,
410 unsigned unassisted)
411 {
412
2/2
✓ Branch 0 taken 14 times.
✓ Branch 1 taken 2 times.
16x if (max_events < 1 ||
413 14x max_events > static_cast<unsigned>(std::numeric_limits<int>::max()))
414
1/2
✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
2x throw std::out_of_range(
415 "max_events_per_poll must be in [1, INT_MAX]");
416
1/2
✓ Branch 0 taken 14 times.
✗ Branch 1 not taken.
14x if (budget_max > static_cast<unsigned>(std::numeric_limits<int>::max()))
417 throw std::out_of_range(
418 "inline_budget_max must be in [0, INT_MAX]");
419
420 // Clamp initial and unassisted to budget_max.
421
2/2
✓ Branch 0 taken 12 times.
✓ Branch 1 taken 2 times.
14x if (budget_init > budget_max)
422 2x budget_init = budget_max;
423
2/2
✓ Branch 0 taken 12 times.
✓ Branch 1 taken 2 times.
14x if (unassisted > budget_max)
424 2x unassisted = budget_max;
425
426 14x max_events_per_poll_ = max_events;
427 14x inline_budget_initial_ = budget_init;
428 14x inline_budget_max_ = budget_max;
429 14x unassisted_budget_ = unassisted;
430 14x }
431
432 inline void
433 465432x reactor_scheduler::reset_inline_budget() const noexcept
434 {
435 // When budget is disabled (max==0), all paths below would no-op
436 // (inline_budget stays 0). Skip the TLS lookup entirely.
437
1/2
✓ Branch 0 taken 465432 times.
✗ Branch 1 not taken.
465432x if (inline_budget_max_ == 0)
438 return;
439
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 465432 times.
465432x if (auto* ctx = reactor_find_context(this))
440 {
441 // Cap when no other thread absorbed queued work
442
1/2
✓ Branch 0 taken 465432 times.
✗ Branch 1 not taken.
465432x if (ctx->unassisted)
443 {
444 465432x ctx->inline_budget_max =
445 465432x static_cast<int>(unassisted_budget_);
446 465432x ctx->inline_budget =
447 465432x static_cast<int>(unassisted_budget_);
448 465432x return;
449 }
450 // Ramp up when previous cycle fully consumed budget.
451 // max(1, ...) ensures the doubling escapes zero.
452 if (ctx->inline_budget == 0)
453 ctx->inline_budget_max = (std::min)(
454 (std::max)(1, ctx->inline_budget_max) * 2,
455 static_cast<int>(inline_budget_max_));
456 else if (ctx->inline_budget < ctx->inline_budget_max)
457 ctx->inline_budget_max =
458 static_cast<int>(inline_budget_initial_);
459 ctx->inline_budget = ctx->inline_budget_max;
460 }
461 465432x }
462
463 inline bool
464 1775170x reactor_scheduler::try_consume_inline_budget() const noexcept
465 {
466
1/2
✓ Branch 0 taken 1775170 times.
✗ Branch 1 not taken.
1775170x if (inline_budget_max_ == 0)
467 return false;
468
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1775170 times.
1775170x if (auto* ctx = reactor_find_context(this))
469 {
470
2/2
✓ Branch 0 taken 1446926 times.
✓ Branch 1 taken 328244 times.
1775170x if (ctx->inline_budget > 0)
471 {
472 1446926x --ctx->inline_budget;
473 1446926x return true;
474 }
475 328244x }
476 328244x return false;
477 1775170x }
478
479 inline void
480 9459x reactor_scheduler::post(std::coroutine_handle<> h) const
481 {
482 struct post_handler final : scheduler_op
483 {
484 std::coroutine_handle<> h_;
485
486 18910x explicit post_handler(std::coroutine_handle<> h) : h_(h) {}
487 18907x ~post_handler() override = default;
488
489 9442x void operator()() override
490 {
491 9442x auto saved = h_;
492
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 9444 times.
9442x delete this;
493 // Ensure stores from the posting thread are visible
494 9446x std::atomic_thread_fence(std::memory_order_acquire);
495 9446x saved.resume();
496 9446x }
497
498 12x void destroy() override
499 {
500 12x auto saved = h_;
501
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 12 times.
12x delete this;
502 12x saved.destroy();
503 12x }
504 };
505
506 9459x auto ph = std::make_unique<post_handler>(h);
507
508
2/2
✓ Branch 0 taken 24 times.
✓ Branch 1 taken 9435 times.
9459x if (auto* ctx = reactor_find_context(this))
509 {
510 24x ++ctx->private_outstanding_work;
511 24x ctx->private_queue.push(ph.release());
512 24x return;
513 }
514
515 9435x outstanding_work_.fetch_add(1, std::memory_order_relaxed);
516
517
1/2
✓ Branch 0 taken 9435 times.
✗ Branch 1 not taken.
9435x lock_type lock(mutex_);
518 9435x completed_ops_.push(ph.release());
519
1/2
✓ Branch 0 taken 9435 times.
✗ Branch 1 not taken.
9435x wake_one_thread_and_unlock(lock);
520 9459x }
521
522 inline void
523 345605x reactor_scheduler::post(scheduler_op* h) const
524 {
525
2/2
✓ Branch 0 taken 345212 times.
✓ Branch 1 taken 393 times.
345605x if (auto* ctx = reactor_find_context(this))
526 {
527 345212x ++ctx->private_outstanding_work;
528 345212x ctx->private_queue.push(h);
529 345212x return;
530 }
531
532 393x outstanding_work_.fetch_add(1, std::memory_order_relaxed);
533
534 393x lock_type lock(mutex_);
535 393x completed_ops_.push(h);
536
1/2
✓ Branch 0 taken 393 times.
✗ Branch 1 not taken.
393x wake_one_thread_and_unlock(lock);
537 345605x }
538
539 inline bool
540 7069x reactor_scheduler::running_in_this_thread() const noexcept
541 {
542 7069x return reactor_find_context(this) != nullptr;
543 }
544
545 inline void
546 3262x reactor_scheduler::stop()
547 {
548 3262x lock_type lock(mutex_);
549
2/2
✓ Branch 0 taken 18 times.
✓ Branch 1 taken 3244 times.
3262x if (!stopped_.load(std::memory_order_acquire))
550 {
551 3244x stopped_.store(true, std::memory_order_release);
552
1/2
✓ Branch 0 taken 3244 times.
✗ Branch 1 not taken.
3244x signal_all(lock);
553
1/2
✓ Branch 0 taken 3244 times.
✗ Branch 1 not taken.
3244x interrupt_reactor();
554 3244x }
555 3262x }
556
557 inline bool
558 88x reactor_scheduler::stopped() const noexcept
559 {
560 88x return stopped_.load(std::memory_order_acquire);
561 }
562
563 inline void
564 2538x reactor_scheduler::restart()
565 {
566 2538x stopped_.store(false, std::memory_order_release);
567 2538x }
568
569 inline std::size_t
570 3260x reactor_scheduler::run()
571 {
572
2/2
✓ Branch 0 taken 3254 times.
✓ Branch 1 taken 6 times.
3260x if (outstanding_work_.load(std::memory_order_acquire) == 0)
573 {
574 6x stop();
575 6x return 0;
576 }
577
578 3254x reactor_thread_context_guard ctx(this);
579
1/2
✓ Branch 0 taken 3254 times.
✗ Branch 1 not taken.
3254x lock_type lock(mutex_);
580
581 3254x std::size_t n = 0;
582 1876003x for (;;)
583 {
584
4/4
✓ Branch 0 taken 1876002 times.
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 1872750 times.
✓ Branch 3 taken 3252 times.
1876003x if (!do_one(lock, -1, &ctx.frame_))
585 3252x break;
586
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 1872749 times.
1872750x if (n != (std::numeric_limits<std::size_t>::max)())
587 1872749x ++n;
588
2/2
✓ Branch 0 taken 1534807 times.
✓ Branch 1 taken 337941 times.
1872750x if (!lock.owns_lock())
589
2/2
✓ Branch 0 taken 1534808 times.
✓ Branch 1 taken 1 time.
1534807x lock.lock();
590 }
591 3252x return n;
592 3260x }
593
594 inline std::size_t
595 16x reactor_scheduler::run_one()
596 {
597
1/2
✓ Branch 0 taken 16 times.
✗ Branch 1 not taken.
16x if (outstanding_work_.load(std::memory_order_acquire) == 0)
598 {
599 stop();
600 return 0;
601 }
602
603 16x reactor_thread_context_guard ctx(this);
604
1/2
✓ Branch 0 taken 16 times.
✗ Branch 1 not taken.
16x lock_type lock(mutex_);
605
1/2
✓ Branch 0 taken 16 times.
✗ Branch 1 not taken.
16x return do_one(lock, -1, &ctx.frame_);
606 16x }
607
608 inline std::size_t
609 62x reactor_scheduler::wait_one(long usec)
610 {
611
2/2
✓ Branch 0 taken 42 times.
✓ Branch 1 taken 20 times.
62x if (outstanding_work_.load(std::memory_order_acquire) == 0)
612 {
613 20x stop();
614 20x return 0;
615 }
616
617 42x reactor_thread_context_guard ctx(this);
618
1/2
✓ Branch 0 taken 42 times.
✗ Branch 1 not taken.
42x lock_type lock(mutex_);
619
1/2
✓ Branch 0 taken 42 times.
✗ Branch 1 not taken.
42x return do_one(lock, usec, &ctx.frame_);
620 62x }
621
622 inline std::size_t
623 28x reactor_scheduler::poll()
624 {
625
2/2
✓ Branch 0 taken 22 times.
✓ Branch 1 taken 6 times.
28x if (outstanding_work_.load(std::memory_order_acquire) == 0)
626 {
627 6x stop();
628 6x return 0;
629 }
630
631 22x reactor_thread_context_guard ctx(this);
632
1/2
✓ Branch 0 taken 22 times.
✗ Branch 1 not taken.
22x lock_type lock(mutex_);
633
634 22x std::size_t n = 0;
635 64x for (;;)
636 {
637
3/4
✓ Branch 0 taken 64 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 42 times.
✓ Branch 3 taken 22 times.
64x if (!do_one(lock, 0, &ctx.frame_))
638 22x break;
639
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 42 times.
42x if (n != (std::numeric_limits<std::size_t>::max)())
640 42x ++n;
641
2/2
✓ Branch 0 taken 34 times.
✓ Branch 1 taken 8 times.
42x if (!lock.owns_lock())
642
1/2
✓ Branch 0 taken 34 times.
✗ Branch 1 not taken.
34x lock.lock();
643 }
644 22x return n;
645 28x }
646
647 inline std::size_t
648 8x reactor_scheduler::poll_one()
649 {
650
2/2
✓ Branch 0 taken 4 times.
✓ Branch 1 taken 4 times.
8x if (outstanding_work_.load(std::memory_order_acquire) == 0)
651 {
652 4x stop();
653 4x return 0;
654 }
655
656 4x reactor_thread_context_guard ctx(this);
657
1/2
✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
4x lock_type lock(mutex_);
658
1/2
✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
4x return do_one(lock, 0, &ctx.frame_);
659 8x }
660
661 inline void
662 152529x reactor_scheduler::work_started() noexcept
663 {
664 152529x outstanding_work_.fetch_add(1, std::memory_order_relaxed);
665 152529x }
666
667 inline void
668 169583x reactor_scheduler::work_finished() noexcept
669 {
670
2/2
✓ Branch 0 taken 166365 times.
✓ Branch 1 taken 3218 times.
169583x if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
671
1/2
✓ Branch 0 taken 3218 times.
✗ Branch 1 not taken.
3218x stop();
672 169583x }
673
674 inline void
675 1381563x reactor_scheduler::compensating_work_started() const noexcept
676 {
677 1381563x auto* ctx = reactor_find_context(this);
678
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1381563 times.
1381563x if (ctx)
679 1381563x ++ctx->private_outstanding_work;
680 1381563x }
681
682 inline void
683 reactor_scheduler::drain_thread_queue(
684 op_queue& queue, std::int64_t count) const
685 {
686 if (count > 0)
687 outstanding_work_.fetch_add(count, std::memory_order_relaxed);
688
689 lock_type lock(mutex_);
690 completed_ops_.splice(queue);
691 if (count > 0)
692 maybe_unlock_and_signal_one(lock);
693 }
694
695 inline void
696 136243x reactor_scheduler::post_deferred_completions(op_queue& ops) const
697 {
698
2/2
✓ Branch 0 taken 136242 times.
✓ Branch 1 taken 1 time.
136243x if (ops.empty())
699 136242x return;
700
701
1/2
✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
1x if (auto* ctx = reactor_find_context(this))
702 {
703 1x ctx->private_queue.splice(ops);
704 1x return;
705 }
706
707 lock_type lock(mutex_);
708 completed_ops_.splice(ops);
709 wake_one_thread_and_unlock(lock);
710 136243x }
711
712 inline void
713 1321x reactor_scheduler::shutdown_drain()
714 {
715 1321x lock_type lock(mutex_);
716
717
2/2
✓ Branch 0 taken 1321 times.
✓ Branch 1 taken 1517 times.
2838x while (auto* h = completed_ops_.pop())
718 {
719
2/2
✓ Branch 0 taken 196 times.
✓ Branch 1 taken 1321 times.
1517x if (h == &task_op_)
720 1321x continue;
721
1/2
✓ Branch 0 taken 196 times.
✗ Branch 1 not taken.
196x lock.unlock();
722
1/2
✓ Branch 0 taken 196 times.
✗ Branch 1 not taken.
196x h->destroy();
723
1/2
✓ Branch 0 taken 196 times.
✗ Branch 1 not taken.
196x lock.lock();
724 }
725
726
1/2
✓ Branch 0 taken 1321 times.
✗ Branch 1 not taken.
1321x signal_all(lock);
727 1321x }
728
729 inline void
730 4565x reactor_scheduler::signal_all(lock_type&) const
731 {
732 4565x state_ |= signaled_bit;
733 4565x cond_.notify_all();
734 4565x }
735
736 inline bool
737 9828x reactor_scheduler::maybe_unlock_and_signal_one(
738 lock_type& lock) const
739 {
740 9828x state_ |= signaled_bit;
741
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9828 times.
9828x if (state_ > signaled_bit)
742 {
743 lock.unlock();
744 cond_.notify_one();
745 return true;
746 }
747 9828x return false;
748 9828x }
749
750 inline bool
751 1941111x reactor_scheduler::unlock_and_signal_one(
752 lock_type& lock) const
753 {
754 1941111x state_ |= signaled_bit;
755 1941111x bool have_waiters = state_ > signaled_bit;
756 1941111x lock.unlock();
757
1/2
✓ Branch 0 taken 1941111 times.
✗ Branch 1 not taken.
1941111x if (have_waiters)
758 cond_.notify_one();
759 1941111x return have_waiters;
760 }
761
762 inline void
763 4x reactor_scheduler::clear_signal() const
764 {
765 4x state_ &= ~signaled_bit;
766 4x }
767
768 inline void
769 4x reactor_scheduler::wait_for_signal(
770 lock_type& lock) const
771 {
772
2/2
✓ Branch 0 taken 4 times.
✓ Branch 1 taken 4 times.
8x while ((state_ & signaled_bit) == 0)
773 {
774 4x state_ += waiter_increment;
775 4x cond_.wait(lock);
776 4x state_ -= waiter_increment;
777 }
778 4x }
779
780 inline void
781 reactor_scheduler::wait_for_signal_for(
782 lock_type& lock, long timeout_us) const
783 {
784 if ((state_ & signaled_bit) == 0)
785 {
786 state_ += waiter_increment;
787 cond_.wait_for(lock, std::chrono::microseconds(timeout_us));
788 state_ -= waiter_increment;
789 }
790 }
791
792 inline void
793 9828x reactor_scheduler::wake_one_thread_and_unlock(
794 lock_type& lock) const
795 {
796
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9828 times.
9828x if (maybe_unlock_and_signal_one(lock))
797 return;
798
799
4/4
✓ Branch 0 taken 191 times.
✓ Branch 1 taken 9637 times.
✓ Branch 2 taken 76 times.
✓ Branch 3 taken 115 times.
9828x if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
800 {
801 115x task_interrupted_ = true;
802 115x lock.unlock();
803 115x interrupt_reactor();
804 115x }
805 else
806 {
807 9713x lock.unlock();
808 }
809 9828x }
810
811 3745656x inline reactor_scheduler::work_cleanup::~work_cleanup()
812 1872821x {
813
1/2
✓ Branch 0 taken 1872835 times.
✗ Branch 1 not taken.
1872835x if (ctx)
814 {
815 1872835x std::int64_t produced = ctx->private_outstanding_work;
816
2/2
✓ Branch 0 taken 13 times.
✓ Branch 1 taken 1872822 times.
1872835x if (produced > 1)
817 26x sched->outstanding_work_.fetch_add(
818 13x produced - 1, std::memory_order_relaxed);
819
2/2
✓ Branch 0 taken 1719511 times.
✓ Branch 1 taken 153311 times.
1872822x else if (produced < 1)
820 153311x sched->work_finished();
821 1872835x ctx->private_outstanding_work = 0;
822
823
2/2
✓ Branch 0 taken 1534886 times.
✓ Branch 1 taken 337949 times.
1872835x if (!ctx->private_queue.empty())
824 {
825
1/2
✓ Branch 0 taken 337949 times.
✗ Branch 1 not taken.
337949x lock->lock();
826 337949x sched->completed_ops_.splice(ctx->private_queue);
827 337949x }
828 1872835x }
829 else
830 {
831 sched->work_finished();
832 }
833 3745656x }
834
835 2849978x inline reactor_scheduler::task_cleanup::~task_cleanup()
836 1424989x {
837
1/2
✓ Branch 0 taken 1424989 times.
✗ Branch 1 not taken.
1424989x if (!ctx)
838 return;
839
840
2/2
✓ Branch 0 taken 1417731 times.
✓ Branch 1 taken 7258 times.
1424989x if (ctx->private_outstanding_work > 0)
841 {
842 14516x sched->outstanding_work_.fetch_add(
843 7258x ctx->private_outstanding_work, std::memory_order_relaxed);
844 7258x ctx->private_outstanding_work = 0;
845 7258x }
846
847
2/2
✓ Branch 0 taken 1417731 times.
✓ Branch 1 taken 7258 times.
1424989x if (!ctx->private_queue.empty())
848 {
849
1/2
✓ Branch 0 taken 7258 times.
✗ Branch 1 not taken.
7258x if (!lock->owns_lock())
850 lock->lock();
851 7258x sched->completed_ops_.splice(ctx->private_queue);
852 7258x }
853 2849978x }
854
855 inline std::size_t
856 1876127x reactor_scheduler::do_one(
857 lock_type& lock, long timeout_us, context_type* ctx)
858 {
859 1876131x for (;;)
860 {
861
2/2
✓ Branch 0 taken 3297853 times.
✓ Branch 1 taken 3249 times.
3301102x if (stopped_.load(std::memory_order_acquire))
862 3249x return 0;
863
864 3297853x scheduler_op* op = completed_ops_.pop();
865
866 // Handle reactor sentinel — time to poll for I/O
867
2/2
✓ Branch 0 taken 1872850 times.
✓ Branch 1 taken 1425003 times.
3297853x if (op == &task_op_)
868 {
869 1425003x bool more_handlers =
870
3/4
✓ Branch 0 taken 68286 times.
✓ Branch 1 taken 1356717 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 1356717 times.
1425003x !completed_ops_.empty() || (ctx && !ctx->private_queue.empty());
871
872
4/4
✓ Branch 0 taken 1356717 times.
✓ Branch 1 taken 68286 times.
✓ Branch 2 taken 1356703 times.
✓ Branch 3 taken 14 times.
2781720x if (!more_handlers &&
873
1/2
✓ Branch 0 taken 1356717 times.
✗ Branch 1 not taken.
1356717x (outstanding_work_.load(std::memory_order_acquire) == 0 ||
874 1356717x timeout_us == 0))
875 {
876 14x completed_ops_.push(&task_op_);
877 14x return 0;
878 }
879
880
2/2
✓ Branch 0 taken 68286 times.
✓ Branch 1 taken 1356703 times.
1424989x long task_timeout_us = more_handlers ? 0 : timeout_us;
881 1424989x task_interrupted_ = task_timeout_us == 0;
882 1424989x task_running_.store(true, std::memory_order_release);
883
884
2/2
✓ Branch 0 taken 1356703 times.
✓ Branch 1 taken 68286 times.
1424989x if (more_handlers)
885 68286x unlock_and_signal_one(lock);
886
887 try
888 {
889
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1424989 times.
1424989x run_task(lock, ctx, task_timeout_us);
890 1424989x }
891 catch (...)
892 {
893 task_running_.store(false, std::memory_order_relaxed);
894 throw;
895 }
896
897 1424989x task_running_.store(false, std::memory_order_relaxed);
898 1424989x completed_ops_.push(&task_op_);
899
2/2
✓ Branch 0 taken 18 times.
✓ Branch 1 taken 1424971 times.
1424989x if (timeout_us > 0)
900 18x return 0;
901 1424971x continue;
902 }
903
904 // Handle operation
905
2/2
✓ Branch 0 taken 15 times.
✓ Branch 1 taken 1872835 times.
1872850x if (op != nullptr)
906 {
907 1872835x bool more = !completed_ops_.empty();
908
909
2/2
✓ Branch 0 taken 1872825 times.
✓ Branch 1 taken 10 times.
1872835x if (more)
910 1872825x ctx->unassisted = !unlock_and_signal_one(lock);
911 else
912 {
913 10x ctx->unassisted = false;
914 10x lock.unlock();
915 }
916
917 1872835x work_cleanup on_exit{this, &lock, ctx};
918 (void)on_exit;
919
920
1/2
✓ Branch 0 taken 1872835 times.
✗ Branch 1 not taken.
1872835x (*op)();
921 1872835x return 1;
922 1872835x }
923
924 // Try private queue before blocking
925
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 15 times.
15x if (reactor_drain_private_queue(ctx, outstanding_work_, completed_ops_))
926 continue;
927
928
3/4
✓ Branch 0 taken 4 times.
✓ Branch 1 taken 11 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 4 times.
15x if (outstanding_work_.load(std::memory_order_acquire) == 0 ||
929 4x timeout_us == 0)
930 11x return 0;
931
932 4x clear_signal();
933
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4 times.
4x if (timeout_us < 0)
934 4x wait_for_signal(lock);
935 else
936 wait_for_signal_for(lock, timeout_us);
937 }
938 1876127x }
939
940 } // namespace boost::corosio::detail
941
942 #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SCHEDULER_HPP
943