include/boost/corosio/native/detail/reactor/reactor_scheduler.hpp

78.7% Lines (299/380) 85.7% List of functions (42/49) 57.0% Branches (122/214)
reactor_scheduler.hpp
f(x) Functions (49)
Function Calls Lines Branches Blocks
boost::corosio::detail::reactor_find_context(boost::corosio::detail::reactor_scheduler const*) :78 2891625x 85.7% 75.0% 75.0% boost::corosio::detail::reactor_flush_private_work(boost::corosio::detail::reactor_scheduler_context*, std::__1::atomic<long long>&) :90 0 0.0% 0.0% 0.0% boost::corosio::detail::reactor_drain_private_queue(boost::corosio::detail::reactor_scheduler_context*, std::__1::atomic<long long>&, boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&) :107 7x 57.1% 50.0% 80.0% boost::corosio::detail::reactor_scheduler::~reactor_scheduler() :140 743x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::inline_budget_initial() const :252 2925x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::is_single_threaded() const :258 66x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::configure_single_threaded(bool) :269 0 28.6% 0.0% boost::corosio::detail::reactor_scheduler::reactor_scheduler() :280 743x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::task_op::task_op() :323 1486x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::task_op::~task_op() :323 1486x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::task_op::operator()() :325 0 0.0% 0.0% boost::corosio::detail::reactor_scheduler::task_op::destroy() :326 0 0.0% 0.0% boost::corosio::detail::reactor_thread_context_guard::reactor_thread_context_guard(boost::corosio::detail::reactor_scheduler const*) :372 5850x 100.0% 50.0% 100.0% boost::corosio::detail::reactor_thread_context_guard::~reactor_thread_context_guard() :380 5850x 71.4% 16.7% 100.0% boost::corosio::detail::reactor_scheduler_context::reactor_scheduler_context(boost::corosio::detail::reactor_scheduler const*, boost::corosio::detail::reactor_scheduler_context*) :392 5850x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::configure_reactor(unsigned int, unsigned int, unsigned int, unsigned int) :406 0 0.0% 0.0% 0.0% boost::corosio::detail::reactor_scheduler::reset_inline_budget() const :434 408954x 50.0% 20.0% 33.0% boost::corosio::detail::reactor_scheduler::try_consume_inline_budget() const :460 1536069x 100.0% 75.0% 100.0% boost::corosio::detail::reactor_scheduler::post(std::__1::coroutine_handle<void>) const :474 7220x 100.0% 53.0% boost::corosio::detail::reactor_scheduler::post(std::__1::coroutine_handle<void>) const::post_handler::post_handler(std::__1::coroutine_handle<void>) :480 14436x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::post(std::__1::coroutine_handle<void>) const::post_handler::~post_handler() :481 14434x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::post(std::__1::coroutine_handle<void>) const::post_handler::operator()() :483 7208x 100.0% 50.0% 100.0% boost::corosio::detail::reactor_scheduler::post(std::__1::coroutine_handle<void>) const::post_handler::destroy() :492 9x 100.0% 62.5% 100.0% boost::corosio::detail::reactor_scheduler::post(boost::corosio::detail::scheduler_op*) const :517 302548x 100.0% 75.0% 71.0% boost::corosio::detail::reactor_scheduler::running_in_this_thread() const :534 6200x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::stop() :540 2828x 100.0% 66.7% 71.0% boost::corosio::detail::reactor_scheduler::stopped() const :552 72x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::restart() :558 2423x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::run() :564 2820x 100.0% 85.7% 78.0% boost::corosio::detail::reactor_scheduler::run_one() :589 2x 75.0% 50.0% 50.0% boost::corosio::detail::reactor_scheduler::wait_one(long) :603 108x 100.0% 66.7% 60.0% boost::corosio::detail::reactor_scheduler::poll() :617 8x 100.0% 64.3% 78.0% boost::corosio::detail::reactor_scheduler::poll_one() :642 4x 100.0% 66.7% 60.0% boost::corosio::detail::reactor_scheduler::work_started() :656 135840x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::work_finished() :662 149845x 100.0% 75.0% 80.0% boost::corosio::detail::reactor_scheduler::compensating_work_started() const :669 630635x 100.0% 50.0% 100.0% boost::corosio::detail::reactor_scheduler::drain_thread_queue(boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&, long long) const :677 0 0.0% 0.0% 0.0% boost::corosio::detail::reactor_scheduler::post_deferred_completions(boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&) const :690 121305x 40.0% 16.7% 33.0% boost::corosio::detail::reactor_scheduler::shutdown_drain() :707 743x 100.0% 66.7% 83.0% boost::corosio::detail::reactor_scheduler::signal_all(boost::corosio::detail::conditionally_enabled_mutex::scoped_lock&) const :724 3557x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::maybe_unlock_and_signal_one(boost::corosio::detail::conditionally_enabled_mutex::scoped_lock&) const :731 7388x 62.5% 50.0% 75.0% boost::corosio::detail::reactor_scheduler::unlock_and_signal_one(boost::corosio::detail::conditionally_enabled_mutex::scoped_lock&) const :745 1127601x 85.7% 50.0% 66.0% boost::corosio::detail::reactor_scheduler::clear_signal() const :757 2x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::wait_for_signal(boost::corosio::detail::conditionally_enabled_mutex::scoped_lock&) const :763 2x 100.0% 100.0% 100.0% boost::corosio::detail::reactor_scheduler::wait_for_signal_for(boost::corosio::detail::conditionally_enabled_mutex::scoped_lock&, long) const :775 0 0.0% 0.0% 0.0% boost::corosio::detail::reactor_scheduler::wake_one_thread_and_unlock(boost::corosio::detail::conditionally_enabled_mutex::scoped_lock&) const :787 7388x 90.0% 83.3% 85.0% boost::corosio::detail::reactor_scheduler::work_cleanup::~work_cleanup() :805 2123365x 94.1% 80.0% 100.0% boost::corosio::detail::reactor_scheduler::task_cleanup::~task_cleanup() :829 1324404x 86.7% 60.0% 100.0% boost::corosio::detail::reactor_scheduler::do_one(boost::corosio::detail::conditionally_enabled_mutex::scoped_lock&, long, boost::corosio::detail::reactor_scheduler_context*) :850 1064559x 89.6% 76.3% 78.0%
Line Branch TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SCHEDULER_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SCHEDULER_HPP
12
13 #include <boost/corosio/detail/config.hpp>
14 #include <boost/capy/ex/execution_context.hpp>
15
16 #include <boost/corosio/detail/scheduler.hpp>
17 #include <boost/corosio/detail/scheduler_op.hpp>
18 #include <boost/corosio/detail/thread_local_ptr.hpp>
19
20 #include <atomic>
21 #include <chrono>
22 #include <coroutine>
23 #include <cstddef>
24 #include <cstdint>
25 #include <limits>
26 #include <memory>
27 #include <stdexcept>
28
29 #include <boost/corosio/detail/conditionally_enabled_mutex.hpp>
30 #include <boost/corosio/detail/conditionally_enabled_event.hpp>
31
32 namespace boost::corosio::detail {
33
34 // Forward declarations
35 class reactor_scheduler;
36 class timer_service;
37
38 /** Per-thread state for a reactor scheduler.
39
40 Each thread running a scheduler's event loop has one of these
41 on a thread-local stack. It holds a private work queue and
42 inline completion budget for speculative I/O fast paths.
43 */
44 struct BOOST_COROSIO_SYMBOL_VISIBLE reactor_scheduler_context
45 {
46 /// Scheduler this context belongs to.
47 reactor_scheduler const* key;
48
49 /// Next context frame on this thread's stack.
50 reactor_scheduler_context* next;
51
52 /// Private work queue for reduced contention.
53 op_queue private_queue;
54
55 /// Unflushed work count for the private queue.
56 std::int64_t private_outstanding_work;
57
58 /// Remaining inline completions allowed this cycle.
59 int inline_budget;
60
61 /// Maximum inline budget (adaptive, 2-16).
62 int inline_budget_max;
63
64 /// True if no other thread absorbed queued work last cycle.
65 bool unassisted;
66
67 /// Construct a context frame linked to @a n.
68 reactor_scheduler_context(
69 reactor_scheduler const* k,
70 reactor_scheduler_context* n);
71 };
72
73 /// Thread-local context stack for reactor schedulers.
74 inline thread_local_ptr<reactor_scheduler_context> reactor_context_stack;
75
76 /// Find the context frame for a scheduler on this thread.
77 inline reactor_scheduler_context*
78 2891625x reactor_find_context(reactor_scheduler const* self) noexcept
79 {
80
2/2
✓ Branch 0 taken 2878446 times.
✓ Branch 1 taken 13179 times.
2891625x for (auto* c = reactor_context_stack.get(); c != nullptr; c = c->next)
81 {
82
1/2
✓ Branch 0 taken 2878446 times.
✗ Branch 1 not taken.
2878446x if (c->key == self)
83 2878446x return c;
84 }
85 13179x return nullptr;
86 2891625x }
87
88 /// Flush private work count to global counter.
89 inline void
90 reactor_flush_private_work(
91 reactor_scheduler_context* ctx,
92 std::atomic<std::int64_t>& outstanding_work) noexcept
93 {
94 if (ctx && ctx->private_outstanding_work > 0)
95 {
96 outstanding_work.fetch_add(
97 ctx->private_outstanding_work, std::memory_order_relaxed);
98 ctx->private_outstanding_work = 0;
99 }
100 }
101
102 /** Drain private queue to global queue, flushing work count first.
103
104 @return True if any ops were drained.
105 */
106 inline bool
107 7x reactor_drain_private_queue(
108 reactor_scheduler_context* ctx,
109 std::atomic<std::int64_t>& outstanding_work,
110 op_queue& completed_ops) noexcept
111 {
112
2/4
✓ Branch 0 taken 7 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 7 times.
✗ Branch 3 not taken.
7x if (!ctx || ctx->private_queue.empty())
113 7x return false;
114
115 reactor_flush_private_work(ctx, outstanding_work);
116 completed_ops.splice(ctx->private_queue);
117 return true;
118 7x }
119
120 /** Non-template base for reactor-backed scheduler implementations.
121
122 Provides the complete threading model shared by epoll, kqueue,
123 and select schedulers: signal state machine, inline completion
124 budget, work counting, run/poll methods, and the do_one event
125 loop.
126
127 Derived classes provide platform-specific hooks by overriding:
128 - `run_task(lock, ctx)` to run the reactor poll
129 - `interrupt_reactor()` to wake a blocked reactor
130
131 De-templated from the original CRTP design to eliminate
132 duplicate instantiations when multiple backends are compiled
133 into the same binary. Virtual dispatch for run_task (called
134 once per reactor cycle, before a blocking syscall) has
135 negligible overhead.
136
137 @par Thread Safety
138 All public member functions are thread-safe.
139 */
140 class reactor_scheduler
141 : public scheduler
142 , public capy::execution_context::service
143 {
144 public:
145 using key_type = scheduler;
146 using context_type = reactor_scheduler_context;
147 using mutex_type = conditionally_enabled_mutex;
148 using lock_type = mutex_type::scoped_lock;
149 using event_type = conditionally_enabled_event;
150
151 /// Post a coroutine for deferred execution.
152 void post(std::coroutine_handle<> h) const override;
153
154 /// Post a scheduler operation for deferred execution.
155 void post(scheduler_op* h) const override;
156
157 /// Return true if called from a thread running this scheduler.
158 bool running_in_this_thread() const noexcept override;
159
160 /// Request the scheduler to stop dispatching handlers.
161 void stop() override;
162
163 /// Return true if the scheduler has been stopped.
164 bool stopped() const noexcept override;
165
166 /// Reset the stopped state so `run()` can resume.
167 void restart() override;
168
169 /// Run the event loop until no work remains.
170 std::size_t run() override;
171
172 /// Run until one handler completes or no work remains.
173 std::size_t run_one() override;
174
175 /// Run until one handler completes or @a usec elapses.
176 std::size_t wait_one(long usec) override;
177
178 /// Run ready handlers without blocking.
179 std::size_t poll() override;
180
181 /// Run at most one ready handler without blocking.
182 std::size_t poll_one() override;
183
184 /// Increment the outstanding work count.
185 void work_started() noexcept override;
186
187 /// Decrement the outstanding work count, stopping on zero.
188 void work_finished() noexcept override;
189
190 /** Reset the thread's inline completion budget.
191
192 Called at the start of each posted completion handler to
193 grant a fresh budget for speculative inline completions.
194 */
195 void reset_inline_budget() const noexcept;
196
197 /** Consume one unit of inline budget if available.
198
199 @return True if budget was available and consumed.
200 */
201 bool try_consume_inline_budget() const noexcept;
202
203 /** Offset a forthcoming work_finished from work_cleanup.
204
205 Called by descriptor_state when all I/O returned EAGAIN and
206 no handler will be executed. Must be called from a scheduler
207 thread.
208 */
209 void compensating_work_started() const noexcept;
210
211 /** Drain work from thread context's private queue to global queue.
212
213 Flushes private work count to the global counter, then
214 transfers the queue under mutex protection.
215
216 @param queue The private queue to drain.
217 @param count Private work count to flush before draining.
218 */
219 void drain_thread_queue(op_queue& queue, std::int64_t count) const;
220
221 /** Post completed operations for deferred invocation.
222
223 If called from a thread running this scheduler, operations
224 go to the thread's private queue (fast path). Otherwise,
225 operations are added to the global queue under mutex and a
226 waiter is signaled.
227
228 @par Preconditions
229 work_started() must have been called for each operation.
230
231 @param ops Queue of operations to post.
232 */
233 void post_deferred_completions(op_queue& ops) const;
234
235 /** Apply runtime configuration to the scheduler.
236
237 Called by `io_context` after construction. Values that do
238 not apply to this backend are silently ignored.
239
240 @param max_events Event buffer size for epoll/kqueue.
241 @param budget_init Starting inline completion budget.
242 @param budget_max Hard ceiling on adaptive budget ramp-up.
243 @param unassisted Budget when single-threaded.
244 */
245 virtual void configure_reactor(
246 unsigned max_events,
247 unsigned budget_init,
248 unsigned budget_max,
249 unsigned unassisted);
250
251 /// Return the configured initial inline budget.
252 2925x unsigned inline_budget_initial() const noexcept
253 {
254 2925x return inline_budget_initial_;
255 }
256
257 /// Return true if single-threaded (lockless) mode is active.
258 66x bool is_single_threaded() const noexcept
259 {
260 66x return single_threaded_;
261 }
262
263 /** Enable or disable single-threaded (lockless) mode.
264
265 When enabled, all scheduler mutex and condition variable
266 operations become no-ops. Cross-thread post() is
267 undefined behavior.
268 */
269 void configure_single_threaded(bool v) noexcept
270 {
271 single_threaded_ = v;
272 mutex_.set_enabled(!v);
273 cond_.set_enabled(!v);
274 }
275
276 protected:
277 743x timer_service* timer_svc_ = nullptr;
278 743x bool single_threaded_ = false;
279
280 2229x reactor_scheduler() = default;
281
282 /** Drain completed_ops during shutdown.
283
284 Pops all operations from the global queue and destroys them,
285 skipping the task sentinel. Signals all waiting threads.
286 Derived classes call this from their shutdown() override
287 before performing platform-specific cleanup.
288 */
289 void shutdown_drain();
290
291 /// RAII guard that re-inserts the task sentinel after `run_task`.
292 struct task_cleanup
293 {
294 reactor_scheduler const* sched;
295 lock_type* lock;
296 context_type* ctx;
297 ~task_cleanup();
298 };
299
300 743x mutable mutex_type mutex_{true};
301 743x mutable event_type cond_{true};
302 mutable op_queue completed_ops_;
303 743x mutable std::atomic<std::int64_t> outstanding_work_{0};
304 743x std::atomic<bool> stopped_{false};
305 743x mutable std::atomic<bool> task_running_{false};
306 743x mutable bool task_interrupted_ = false;
307
308 // Runtime-configurable reactor tuning parameters.
309 // Defaults match the library's built-in values.
310 743x unsigned max_events_per_poll_ = 128;
311 743x unsigned inline_budget_initial_ = 2;
312 743x unsigned inline_budget_max_ = 16;
313 743x unsigned unassisted_budget_ = 4;
314
315 /// Bit 0 of `state_`: set when the condvar should be signaled.
316 static constexpr std::size_t signaled_bit = 1;
317
318 /// Increment per waiting thread in `state_`.
319 static constexpr std::size_t waiter_increment = 2;
320 743x mutable std::size_t state_ = 0;
321
322 /// Sentinel op that triggers a reactor poll when dequeued.
323 struct task_op final : scheduler_op
324 {
325 void operator()() override {}
326 void destroy() override {}
327 };
328 task_op task_op_;
329
330 /// Run the platform-specific reactor poll.
331 virtual void
332 run_task(lock_type& lock, context_type* ctx,
333 long timeout_us) = 0;
334
335 /// Wake a blocked reactor (e.g. write to eventfd or pipe).
336 virtual void interrupt_reactor() const = 0;
337
338 private:
339 struct work_cleanup
340 {
341 reactor_scheduler* sched;
342 lock_type* lock;
343 context_type* ctx;
344 ~work_cleanup();
345 };
346
347 std::size_t do_one(
348 lock_type& lock, long timeout_us, context_type* ctx);
349
350 void signal_all(lock_type& lock) const;
351 bool maybe_unlock_and_signal_one(lock_type& lock) const;
352 bool unlock_and_signal_one(lock_type& lock) const;
353 void clear_signal() const;
354 void wait_for_signal(lock_type& lock) const;
355 void wait_for_signal_for(
356 lock_type& lock, long timeout_us) const;
357 void wake_one_thread_and_unlock(lock_type& lock) const;
358 };
359
360 /** RAII guard that pushes/pops a scheduler context frame.
361
362 On construction, pushes a new context frame onto the
363 thread-local stack. On destruction, drains any remaining
364 private queue items to the global queue and pops the frame.
365 */
366 struct reactor_thread_context_guard
367 {
368 /// The context frame managed by this guard.
369 reactor_scheduler_context frame_;
370
371 /// Construct the guard, pushing a frame for @a sched.
372 5850x explicit reactor_thread_context_guard(
373 reactor_scheduler const* sched) noexcept
374
1/2
✓ Branch 0 taken 2925 times.
✗ Branch 1 not taken.
2925x : frame_(sched, reactor_context_stack.get())
375 2925x {
376 2925x reactor_context_stack.set(&frame_);
377 5850x }
378
379 /// Destroy the guard, draining private work and popping the frame.
380 5850x ~reactor_thread_context_guard() noexcept
381 2925x {
382
1/2
✓ Branch 0 taken 2925 times.
✗ Branch 1 not taken.
2925x if (!frame_.private_queue.empty())
383 frame_.key->drain_thread_queue(
384 frame_.private_queue, frame_.private_outstanding_work);
385 2925x reactor_context_stack.set(frame_.next);
386 5850x }
387 };
388
389 // ---- Inline implementations ------------------------------------------------
390
391 inline
392 8775x reactor_scheduler_context::reactor_scheduler_context(
393 reactor_scheduler const* k,
394 reactor_scheduler_context* n)
395 2925x : key(k)
396 2925x , next(n)
397 2925x , private_outstanding_work(0)
398 2925x , inline_budget(0)
399 5850x , inline_budget_max(
400 2925x static_cast<int>(k->inline_budget_initial()))
401 2925x , unassisted(false)
402 2925x {
403 5850x }
404
405 inline void
406 reactor_scheduler::configure_reactor(
407 unsigned max_events,
408 unsigned budget_init,
409 unsigned budget_max,
410 unsigned unassisted)
411 {
412 if (max_events < 1 ||
413 max_events > static_cast<unsigned>(std::numeric_limits<int>::max()))
414 throw std::out_of_range(
415 "max_events_per_poll must be in [1, INT_MAX]");
416 if (budget_max < 1 ||
417 budget_max > static_cast<unsigned>(std::numeric_limits<int>::max()))
418 throw std::out_of_range(
419 "inline_budget_max must be in [1, INT_MAX]");
420
421 // Clamp initial and unassisted to budget_max.
422 if (budget_init > budget_max)
423 budget_init = budget_max;
424 if (unassisted > budget_max)
425 unassisted = budget_max;
426
427 max_events_per_poll_ = max_events;
428 inline_budget_initial_ = budget_init;
429 inline_budget_max_ = budget_max;
430 unassisted_budget_ = unassisted;
431 }
432
433 inline void
434 408954x reactor_scheduler::reset_inline_budget() const noexcept
435 {
436
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 408954 times.
408954x if (auto* ctx = reactor_find_context(this))
437 {
438 // Cap when no other thread absorbed queued work
439
1/2
✓ Branch 0 taken 408954 times.
✗ Branch 1 not taken.
408954x if (ctx->unassisted)
440 {
441 408954x ctx->inline_budget_max =
442 408954x static_cast<int>(unassisted_budget_);
443 408954x ctx->inline_budget =
444 408954x static_cast<int>(unassisted_budget_);
445 408954x return;
446 }
447 // Ramp up when previous cycle fully consumed budget
448 if (ctx->inline_budget == 0)
449 ctx->inline_budget_max = (std::min)(
450 ctx->inline_budget_max * 2,
451 static_cast<int>(inline_budget_max_));
452 else if (ctx->inline_budget < ctx->inline_budget_max)
453 ctx->inline_budget_max =
454 static_cast<int>(inline_budget_initial_);
455 ctx->inline_budget = ctx->inline_budget_max;
456 }
457 408954x }
458
459 inline bool
460 1536069x reactor_scheduler::try_consume_inline_budget() const noexcept
461 {
462
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1536069 times.
1536069x if (auto* ctx = reactor_find_context(this))
463 {
464
2/2
✓ Branch 0 taken 1249266 times.
✓ Branch 1 taken 286803 times.
1536069x if (ctx->inline_budget > 0)
465 {
466 1249266x --ctx->inline_budget;
467 1249266x return true;
468 }
469 286803x }
470 286803x return false;
471 1536069x }
472
473 inline void
474 7220x reactor_scheduler::post(std::coroutine_handle<> h) const
475 {
476 struct post_handler final : scheduler_op
477 {
478 std::coroutine_handle<> h_;
479
480 14436x explicit post_handler(std::coroutine_handle<> h) : h_(h) {}
481 14434x ~post_handler() override = default;
482
483 7208x void operator()() override
484 {
485 7208x auto saved = h_;
486
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7208 times.
7208x delete this;
487 // Ensure stores from the posting thread are visible
488 7208x std::atomic_thread_fence(std::memory_order_acquire);
489 7208x saved.resume();
490 7208x }
491
492 9x void destroy() override
493 {
494 9x auto saved = h_;
495
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9 times.
9x delete this;
496 9x saved.destroy();
497 9x }
498 };
499
500 7220x auto ph = std::make_unique<post_handler>(h);
501
502
2/2
✓ Branch 0 taken 6 times.
✓ Branch 1 taken 7214 times.
7220x if (auto* ctx = reactor_find_context(this))
503 {
504 6x ++ctx->private_outstanding_work;
505 6x ctx->private_queue.push(ph.release());
506 6x return;
507 }
508
509 7214x outstanding_work_.fetch_add(1, std::memory_order_relaxed);
510
511
1/2
✓ Branch 0 taken 7214 times.
✗ Branch 1 not taken.
7214x lock_type lock(mutex_);
512 7214x completed_ops_.push(ph.release());
513
1/2
✓ Branch 0 taken 7214 times.
✗ Branch 1 not taken.
7214x wake_one_thread_and_unlock(lock);
514 7220x }
515
516 inline void
517 302548x reactor_scheduler::post(scheduler_op* h) const
518 {
519
2/2
✓ Branch 0 taken 302374 times.
✓ Branch 1 taken 174 times.
302548x if (auto* ctx = reactor_find_context(this))
520 {
521 302374x ++ctx->private_outstanding_work;
522 302374x ctx->private_queue.push(h);
523 302374x return;
524 }
525
526 174x outstanding_work_.fetch_add(1, std::memory_order_relaxed);
527
528 174x lock_type lock(mutex_);
529 174x completed_ops_.push(h);
530
1/2
✓ Branch 0 taken 174 times.
✗ Branch 1 not taken.
174x wake_one_thread_and_unlock(lock);
531 302548x }
532
533 inline bool
534 6200x reactor_scheduler::running_in_this_thread() const noexcept
535 {
536 6200x return reactor_find_context(this) != nullptr;
537 }
538
539 inline void
540 2828x reactor_scheduler::stop()
541 {
542 2828x lock_type lock(mutex_);
543
2/2
✓ Branch 0 taken 14 times.
✓ Branch 1 taken 2814 times.
2828x if (!stopped_.load(std::memory_order_acquire))
544 {
545 2814x stopped_.store(true, std::memory_order_release);
546
1/2
✓ Branch 0 taken 2814 times.
✗ Branch 1 not taken.
2814x signal_all(lock);
547
1/2
✓ Branch 0 taken 2814 times.
✗ Branch 1 not taken.
2814x interrupt_reactor();
548 2814x }
549 2828x }
550
551 inline bool
552 72x reactor_scheduler::stopped() const noexcept
553 {
554 72x return stopped_.load(std::memory_order_acquire);
555 }
556
557 inline void
558 2423x reactor_scheduler::restart()
559 {
560 2423x stopped_.store(false, std::memory_order_release);
561 2423x }
562
563 inline std::size_t
564 2820x reactor_scheduler::run()
565 {
566
2/2
✓ Branch 0 taken 2818 times.
✓ Branch 1 taken 2 times.
2820x if (outstanding_work_.load(std::memory_order_acquire) == 0)
567 {
568 2x stop();
569 2x return 0;
570 }
571
572 2818x reactor_thread_context_guard ctx(this);
573
1/2
✓ Branch 0 taken 2818 times.
✗ Branch 1 not taken.
2818x lock_type lock(mutex_);
574
575 2818x std::size_t n = 0;
576 1064444x for (;;)
577 {
578
4/4
✓ Branch 0 taken 1064443 times.
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 1061625 times.
✓ Branch 3 taken 2818 times.
1064444x if (!do_one(lock, -1, &ctx.frame_))
579 2818x break;
580
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1061625 times.
1061625x if (n != (std::numeric_limits<std::size_t>::max)())
581 1061625x ++n;
582
2/2
✓ Branch 0 taken 765918 times.
✓ Branch 1 taken 295707 times.
1061625x if (!lock.owns_lock())
583
2/2
✓ Branch 0 taken 765919 times.
✓ Branch 1 taken 1 time.
765918x lock.lock();
584 }
585 2818x return n;
586 2822x }
587
588 inline std::size_t
589 2x reactor_scheduler::run_one()
590 {
591
1/2
✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
2x if (outstanding_work_.load(std::memory_order_acquire) == 0)
592 {
593 stop();
594 return 0;
595 }
596
597 2x reactor_thread_context_guard ctx(this);
598
1/2
✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
2x lock_type lock(mutex_);
599
1/2
✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
2x return do_one(lock, -1, &ctx.frame_);
600 2x }
601
602 inline std::size_t
603 108x reactor_scheduler::wait_one(long usec)
604 {
605
2/2
✓ Branch 0 taken 96 times.
✓ Branch 1 taken 12 times.
108x if (outstanding_work_.load(std::memory_order_acquire) == 0)
606 {
607 12x stop();
608 12x return 0;
609 }
610
611 96x reactor_thread_context_guard ctx(this);
612
1/2
✓ Branch 0 taken 96 times.
✗ Branch 1 not taken.
96x lock_type lock(mutex_);
613
1/2
✓ Branch 0 taken 96 times.
✗ Branch 1 not taken.
96x return do_one(lock, usec, &ctx.frame_);
614 108x }
615
616 inline std::size_t
617 8x reactor_scheduler::poll()
618 {
619
2/2
✓ Branch 0 taken 7 times.
✓ Branch 1 taken 1 time.
8x if (outstanding_work_.load(std::memory_order_acquire) == 0)
620 {
621 1x stop();
622 1x return 0;
623 }
624
625 7x reactor_thread_context_guard ctx(this);
626
1/2
✓ Branch 0 taken 7 times.
✗ Branch 1 not taken.
7x lock_type lock(mutex_);
627
628 7x std::size_t n = 0;
629 15x for (;;)
630 {
631
3/4
✓ Branch 0 taken 15 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 8 times.
✓ Branch 3 taken 7 times.
15x if (!do_one(lock, 0, &ctx.frame_))
632 7x break;
633
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8 times.
8x if (n != (std::numeric_limits<std::size_t>::max)())
634 8x ++n;
635
1/2
✓ Branch 0 taken 8 times.
✗ Branch 1 not taken.
8x if (!lock.owns_lock())
636
1/2
✓ Branch 0 taken 8 times.
✗ Branch 1 not taken.
8x lock.lock();
637 }
638 7x return n;
639 8x }
640
641 inline std::size_t
642 4x reactor_scheduler::poll_one()
643 {
644
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 2 times.
4x if (outstanding_work_.load(std::memory_order_acquire) == 0)
645 {
646 2x stop();
647 2x return 0;
648 }
649
650 2x reactor_thread_context_guard ctx(this);
651
1/2
✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
2x lock_type lock(mutex_);
652
1/2
✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
2x return do_one(lock, 0, &ctx.frame_);
653 4x }
654
655 inline void
656 135840x reactor_scheduler::work_started() noexcept
657 {
658 135840x outstanding_work_.fetch_add(1, std::memory_order_relaxed);
659 135840x }
660
661 inline void
662 149845x reactor_scheduler::work_finished() noexcept
663 {
664
2/2
✓ Branch 0 taken 147041 times.
✓ Branch 1 taken 2804 times.
149845x if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
665
1/2
✓ Branch 0 taken 2804 times.
✗ Branch 1 not taken.
2804x stop();
666 149845x }
667
668 inline void
669 630635x reactor_scheduler::compensating_work_started() const noexcept
670 {
671 630635x auto* ctx = reactor_find_context(this);
672
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 630635 times.
630635x if (ctx)
673 630635x ++ctx->private_outstanding_work;
674 630635x }
675
676 inline void
677 reactor_scheduler::drain_thread_queue(
678 op_queue& queue, std::int64_t count) const
679 {
680 if (count > 0)
681 outstanding_work_.fetch_add(count, std::memory_order_relaxed);
682
683 lock_type lock(mutex_);
684 completed_ops_.splice(queue);
685 if (count > 0)
686 maybe_unlock_and_signal_one(lock);
687 }
688
689 inline void
690 121305x reactor_scheduler::post_deferred_completions(op_queue& ops) const
691 {
692
1/2
✓ Branch 0 taken 121305 times.
✗ Branch 1 not taken.
121305x if (ops.empty())
693 121305x return;
694
695 if (auto* ctx = reactor_find_context(this))
696 {
697 ctx->private_queue.splice(ops);
698 return;
699 }
700
701 lock_type lock(mutex_);
702 completed_ops_.splice(ops);
703 wake_one_thread_and_unlock(lock);
704 121305x }
705
706 inline void
707 743x reactor_scheduler::shutdown_drain()
708 {
709 743x lock_type lock(mutex_);
710
711
2/2
✓ Branch 0 taken 743 times.
✓ Branch 1 taken 869 times.
1612x while (auto* h = completed_ops_.pop())
712 {
713
2/2
✓ Branch 0 taken 126 times.
✓ Branch 1 taken 743 times.
869x if (h == &task_op_)
714 743x continue;
715
1/2
✓ Branch 0 taken 126 times.
✗ Branch 1 not taken.
126x lock.unlock();
716
1/2
✓ Branch 0 taken 126 times.
✗ Branch 1 not taken.
126x h->destroy();
717
1/2
✓ Branch 0 taken 126 times.
✗ Branch 1 not taken.
126x lock.lock();
718 }
719
720
1/2
✓ Branch 0 taken 743 times.
✗ Branch 1 not taken.
743x signal_all(lock);
721 743x }
722
723 inline void
724 3557x reactor_scheduler::signal_all(lock_type&) const
725 {
726 3557x state_ |= signaled_bit;
727 3557x cond_.notify_all();
728 3557x }
729
730 inline bool
731 7388x reactor_scheduler::maybe_unlock_and_signal_one(
732 lock_type& lock) const
733 {
734 7388x state_ |= signaled_bit;
735
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7388 times.
7388x if (state_ > signaled_bit)
736 {
737 lock.unlock();
738 cond_.notify_one();
739 return true;
740 }
741 7388x return false;
742 7388x }
743
744 inline bool
745 1127601x reactor_scheduler::unlock_and_signal_one(
746 lock_type& lock) const
747 {
748 1127601x state_ |= signaled_bit;
749 1127601x bool have_waiters = state_ > signaled_bit;
750 1127601x lock.unlock();
751
1/2
✓ Branch 0 taken 1127601 times.
✗ Branch 1 not taken.
1127601x if (have_waiters)
752 cond_.notify_one();
753 1127601x return have_waiters;
754 }
755
756 inline void
757 2x reactor_scheduler::clear_signal() const
758 {
759 2x state_ &= ~signaled_bit;
760 2x }
761
762 inline void
763 2x reactor_scheduler::wait_for_signal(
764 lock_type& lock) const
765 {
766
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 2 times.
4x while ((state_ & signaled_bit) == 0)
767 {
768 2x state_ += waiter_increment;
769 2x cond_.wait(lock);
770 2x state_ -= waiter_increment;
771 }
772 2x }
773
774 inline void
775 reactor_scheduler::wait_for_signal_for(
776 lock_type& lock, long timeout_us) const
777 {
778 if ((state_ & signaled_bit) == 0)
779 {
780 state_ += waiter_increment;
781 cond_.wait_for(lock, std::chrono::microseconds(timeout_us));
782 state_ -= waiter_increment;
783 }
784 }
785
786 inline void
787 7388x reactor_scheduler::wake_one_thread_and_unlock(
788 lock_type& lock) const
789 {
790
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7388 times.
7388x if (maybe_unlock_and_signal_one(lock))
791 return;
792
793
4/4
✓ Branch 0 taken 166 times.
✓ Branch 1 taken 7222 times.
✓ Branch 2 taken 106 times.
✓ Branch 3 taken 60 times.
7388x if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
794 {
795 60x task_interrupted_ = true;
796 60x lock.unlock();
797 60x interrupt_reactor();
798 60x }
799 else
800 {
801 7328x lock.unlock();
802 }
803 7388x }
804
805 2123365x inline reactor_scheduler::work_cleanup::~work_cleanup()
806 1061674x {
807
1/2
✓ Branch 0 taken 1061691 times.
✗ Branch 1 not taken.
1061691x if (ctx)
808 {
809 1061691x std::int64_t produced = ctx->private_outstanding_work;
810
2/2
✓ Branch 0 taken 16 times.
✓ Branch 1 taken 1061675 times.
1061691x if (produced > 1)
811 32x sched->outstanding_work_.fetch_add(
812 16x produced - 1, std::memory_order_relaxed);
813
2/2
✓ Branch 0 taken 926365 times.
✓ Branch 1 taken 135310 times.
1061675x else if (produced < 1)
814 135310x sched->work_finished();
815 1061691x ctx->private_outstanding_work = 0;
816
817
2/2
✓ Branch 0 taken 765962 times.
✓ Branch 1 taken 295729 times.
1061691x if (!ctx->private_queue.empty())
818 {
819
1/2
✓ Branch 0 taken 295729 times.
✗ Branch 1 not taken.
295729x lock->lock();
820 295729x sched->completed_ops_.splice(ctx->private_queue);
821 295729x }
822 1061691x }
823 else
824 {
825 sched->work_finished();
826 }
827 2123365x }
828
829 1324404x inline reactor_scheduler::task_cleanup::~task_cleanup()
830 662202x {
831
1/2
✓ Branch 0 taken 662202 times.
✗ Branch 1 not taken.
662202x if (!ctx)
832 return;
833
834
2/2
✓ Branch 0 taken 655583 times.
✓ Branch 1 taken 6619 times.
662202x if (ctx->private_outstanding_work > 0)
835 {
836 13238x sched->outstanding_work_.fetch_add(
837 6619x ctx->private_outstanding_work, std::memory_order_relaxed);
838 6619x ctx->private_outstanding_work = 0;
839 6619x }
840
841
2/2
✓ Branch 0 taken 655583 times.
✓ Branch 1 taken 6619 times.
662202x if (!ctx->private_queue.empty())
842 {
843
1/2
✓ Branch 0 taken 6619 times.
✗ Branch 1 not taken.
6619x if (!lock->owns_lock())
844 lock->lock();
845 6619x sched->completed_ops_.splice(ctx->private_queue);
846 6619x }
847 1324404x }
848
849 inline std::size_t
850 1064559x reactor_scheduler::do_one(
851 lock_type& lock, long timeout_us, context_type* ctx)
852 {
853 1064561x for (;;)
854 {
855
2/2
✓ Branch 0 taken 1723904 times.
✓ Branch 1 taken 2816 times.
1726720x if (stopped_.load(std::memory_order_acquire))
856 2816x return 0;
857
858 1723904x scheduler_op* op = completed_ops_.pop();
859
860 // Handle reactor sentinel — time to poll for I/O
861
2/2
✓ Branch 0 taken 1061698 times.
✓ Branch 1 taken 662206 times.
1723904x if (op == &task_op_)
862 {
863 662206x bool more_handlers =
864
3/4
✓ Branch 0 taken 65916 times.
✓ Branch 1 taken 596290 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 596290 times.
662206x !completed_ops_.empty() || (ctx && !ctx->private_queue.empty());
865
866
4/4
✓ Branch 0 taken 596290 times.
✓ Branch 1 taken 65916 times.
✓ Branch 2 taken 596286 times.
✓ Branch 3 taken 4 times.
1258496x if (!more_handlers &&
867
1/2
✓ Branch 0 taken 596290 times.
✗ Branch 1 not taken.
596290x (outstanding_work_.load(std::memory_order_acquire) == 0 ||
868 596290x timeout_us == 0))
869 {
870 4x completed_ops_.push(&task_op_);
871 4x return 0;
872 }
873
874
2/2
✓ Branch 0 taken 65916 times.
✓ Branch 1 taken 596286 times.
662202x long task_timeout_us = more_handlers ? 0 : timeout_us;
875 662202x task_interrupted_ = task_timeout_us == 0;
876 662202x task_running_.store(true, std::memory_order_release);
877
878
2/2
✓ Branch 0 taken 596286 times.
✓ Branch 1 taken 65916 times.
662202x if (more_handlers)
879 65916x unlock_and_signal_one(lock);
880
881 try
882 {
883
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 662202 times.
662202x run_task(lock, ctx, task_timeout_us);
884 662202x }
885 catch (...)
886 {
887 task_running_.store(false, std::memory_order_relaxed);
888 throw;
889 }
890
891 662202x task_running_.store(false, std::memory_order_relaxed);
892 662202x completed_ops_.push(&task_op_);
893
2/2
✓ Branch 0 taken 43 times.
✓ Branch 1 taken 662159 times.
662202x if (timeout_us > 0)
894 43x return 0;
895 662159x continue;
896 }
897
898 // Handle operation
899
2/2
✓ Branch 0 taken 7 times.
✓ Branch 1 taken 1061691 times.
1061698x if (op != nullptr)
900 {
901 1061691x bool more = !completed_ops_.empty();
902
903
2/2
✓ Branch 0 taken 1061685 times.
✓ Branch 1 taken 6 times.
1061691x if (more)
904 1061685x ctx->unassisted = !unlock_and_signal_one(lock);
905 else
906 {
907 6x ctx->unassisted = false;
908 6x lock.unlock();
909 }
910
911 1061691x work_cleanup on_exit{this, &lock, ctx};
912 (void)on_exit;
913
914
1/2
✓ Branch 0 taken 1061691 times.
✗ Branch 1 not taken.
1061691x (*op)();
915 1061691x return 1;
916 1061691x }
917
918 // Try private queue before blocking
919
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7 times.
7x if (reactor_drain_private_queue(ctx, outstanding_work_, completed_ops_))
920 continue;
921
922
3/4
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 5 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 2 times.
7x if (outstanding_work_.load(std::memory_order_acquire) == 0 ||
923 2x timeout_us == 0)
924 5x return 0;
925
926 2x clear_signal();
927
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2x if (timeout_us < 0)
928 2x wait_for_signal(lock);
929 else
930 wait_for_signal_for(lock, timeout_us);
931 }
932 1064559x }
933
934 } // namespace boost::corosio::detail
935
936 #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SCHEDULER_HPP
937