include/boost/corosio/native/detail/reactor/reactor_scheduler.hpp

83.3% Lines (284/341) 88.9% List of functions (40/45) 59.0% Branches (118/200)
f(x) Functions (45)
Function Calls Lines Branches Blocks
boost::corosio::detail::reactor_scheduler_context::reactor_scheduler_context(boost::corosio::detail::reactor_scheduler_base const*, boost::corosio::detail::reactor_scheduler_context*) :65 6052x 100.0% 100.0% boost::corosio::detail::reactor_find_context(boost::corosio::detail::reactor_scheduler_base const*) :82 3251868x 85.7% 75.0% 75.0% boost::corosio::detail::reactor_flush_private_work(boost::corosio::detail::reactor_scheduler_context*, std::__1::atomic<long long>&) :94 0 0.0% 0.0% 0.0% boost::corosio::detail::reactor_drain_private_queue(boost::corosio::detail::reactor_scheduler_context*, std::__1::atomic<long long>&, boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&) :111 4x 57.1% 50.0% 80.0% boost::corosio::detail::reactor_scheduler_base::~reactor_scheduler_base() :144 606x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::reactor_scheduler_base() :237 606x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::task_op::task_op() :273 1212x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::task_op::~task_op() :273 1212x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::task_op::operator()() :275 0 0.0% 0.0% boost::corosio::detail::reactor_scheduler_base::task_op::destroy() :276 0 0.0% 0.0% boost::corosio::detail::reactor_thread_context_guard::reactor_thread_context_guard(boost::corosio::detail::reactor_scheduler_base const*) :321 6052x 100.0% 50.0% 100.0% boost::corosio::detail::reactor_thread_context_guard::~reactor_thread_context_guard() :329 6052x 71.4% 16.7% 100.0% boost::corosio::detail::reactor_scheduler_base::reset_inline_budget() const :341 450889x 53.8% 20.0% 33.0% boost::corosio::detail::reactor_scheduler_base::try_consume_inline_budget() const :362 1719620x 100.0% 75.0% 100.0% boost::corosio::detail::reactor_scheduler_base::post(std::__1::coroutine_handle<void>) const :376 15055x 100.0% 53.0% boost::corosio::detail::reactor_scheduler_base::post(std::__1::coroutine_handle<void>) const::post_handler::post_handler(std::__1::coroutine_handle<void>) :382 30105x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::post(std::__1::coroutine_handle<void>) const::post_handler::~post_handler() :383 30110x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::post(std::__1::coroutine_handle<void>) const::post_handler::operator()() :385 15046x 100.0% 50.0% 100.0% boost::corosio::detail::reactor_scheduler_base::post(std::__1::coroutine_handle<void>) const::post_handler::destroy() :394 9x 100.0% 62.5% 100.0% boost::corosio::detail::reactor_scheduler_base::post(boost::corosio::detail::scheduler_op*) const :419 330631x 100.0% 75.0% 71.0% boost::corosio::detail::reactor_scheduler_base::running_in_this_thread() const :436 6491x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::stop() :442 2972x 100.0% 66.7% 71.0% boost::corosio::detail::reactor_scheduler_base::stopped() const :454 29x 100.0% 50.0% 66.0% boost::corosio::detail::reactor_scheduler_base::restart() :461 2615x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::run() :468 2965x 100.0% 71.4% 78.0% boost::corosio::detail::reactor_scheduler_base::run_one() :493 2x 75.0% 50.0% 50.0% boost::corosio::detail::reactor_scheduler_base::wait_one(long) :507 65x 100.0% 66.7% 60.0% boost::corosio::detail::reactor_scheduler_base::poll() :521 8x 100.0% 64.3% 78.0% boost::corosio::detail::reactor_scheduler_base::poll_one() :546 4x 100.0% 66.7% 60.0% boost::corosio::detail::reactor_scheduler_base::work_started() :560 142768x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::work_finished() :566 157031x 100.0% 75.0% 80.0% boost::corosio::detail::reactor_scheduler_base::compensating_work_started() const :573 729183x 100.0% 50.0% 100.0% boost::corosio::detail::reactor_scheduler_base::drain_thread_queue(boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&, long long) const :581 0 0.0% 0.0% 0.0% boost::corosio::detail::reactor_scheduler_base::post_deferred_completions(boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&) const :594 127813x 40.0% 16.7% 33.0% boost::corosio::detail::reactor_scheduler_base::shutdown_drain() :611 606x 100.0% 70.0% 81.0% boost::corosio::detail::reactor_scheduler_base::signal_all(std::__1::unique_lock<std::__1::mutex>&) const :628 3563x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::maybe_unlock_and_signal_one(std::__1::unique_lock<std::__1::mutex>&) const :635 7528x 62.5% 50.0% 75.0% boost::corosio::detail::reactor_scheduler_base::unlock_and_signal_one(std::__1::unique_lock<std::__1::mutex>&) const :649 1273606x 85.7% 50.0% 66.0% boost::corosio::detail::reactor_scheduler_base::clear_signal() const :661 3x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::wait_for_signal(std::__1::unique_lock<std::__1::mutex>&) const :667 3x 100.0% 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::wait_for_signal_for(std::__1::unique_lock<std::__1::mutex>&, long) const :679 0 0.0% 0.0% 0.0% boost::corosio::detail::reactor_scheduler_base::wake_one_thread_and_unlock(std::__1::unique_lock<std::__1::mutex>&) const :691 7528x 90.0% 66.7% 85.0% boost::corosio::detail::reactor_scheduler_base::work_cleanup::~work_cleanup() :709 2405325x 94.1% 80.0% 100.0% boost::corosio::detail::reactor_scheduler_base::task_cleanup::~task_cleanup() :733 1515016x 86.7% 60.0% 100.0% boost::corosio::detail::reactor_scheduler_base::do_one(std::__1::unique_lock<std::__1::mutex>&, long, boost::corosio::detail::reactor_scheduler_context*) :754 1205634x 88.9% 77.8% 76.0%
Line Branch TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SCHEDULER_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SCHEDULER_HPP
12
13 #include <boost/corosio/detail/config.hpp>
14 #include <boost/capy/ex/execution_context.hpp>
15
16 #include <boost/corosio/native/native_scheduler.hpp>
17 #include <boost/corosio/detail/scheduler_op.hpp>
18 #include <boost/corosio/detail/thread_local_ptr.hpp>
19
20 #include <atomic>
21 #include <chrono>
22 #include <condition_variable>
23 #include <coroutine>
24 #include <cstddef>
25 #include <cstdint>
26 #include <limits>
27 #include <memory>
28 #include <mutex>
29
30 namespace boost::corosio::detail {
31
32 // Forward declaration
33 class reactor_scheduler_base;
34
35 /** Per-thread state for a reactor scheduler.
36
37 Each thread running a scheduler's event loop has one of these
38 on a thread-local stack. It holds a private work queue and
39 inline completion budget for speculative I/O fast paths.
40 */
41 struct BOOST_COROSIO_SYMBOL_VISIBLE reactor_scheduler_context
42 {
43 /// Scheduler this context belongs to.
44 reactor_scheduler_base const* key;
45
46 /// Next context frame on this thread's stack.
47 reactor_scheduler_context* next;
48
49 /// Private work queue for reduced contention.
50 op_queue private_queue;
51
52 /// Unflushed work count for the private queue.
53 std::int64_t private_outstanding_work;
54
55 /// Remaining inline completions allowed this cycle.
56 int inline_budget;
57
58 /// Maximum inline budget (adaptive, 2-16).
59 int inline_budget_max;
60
61 /// True if no other thread absorbed queued work last cycle.
62 bool unassisted;
63
64 /// Construct a context frame linked to @a n.
65 9078x reactor_scheduler_context(
66 reactor_scheduler_base const* k, reactor_scheduler_context* n)
67 3026x : key(k)
68 3026x , next(n)
69 3026x , private_outstanding_work(0)
70 3026x , inline_budget(0)
71 3026x , inline_budget_max(2)
72 3026x , unassisted(false)
73 3026x {
74 6052x }
75 };
76
77 /// Thread-local context stack for reactor schedulers.
78 inline thread_local_ptr<reactor_scheduler_context> reactor_context_stack;
79
80 /// Find the context frame for a scheduler on this thread.
81 inline reactor_scheduler_context*
82 3251868x reactor_find_context(reactor_scheduler_base const* self) noexcept
83 {
84
2/2
✓ Branch 0 taken 3238271 times.
✓ Branch 1 taken 13597 times.
3251868x for (auto* c = reactor_context_stack.get(); c != nullptr; c = c->next)
85 {
86
1/2
✓ Branch 0 taken 3238271 times.
✗ Branch 1 not taken.
3238271x if (c->key == self)
87 3238271x return c;
88 }
89 13597x return nullptr;
90 3251868x }
91
92 /// Flush private work count to global counter.
93 inline void
94 reactor_flush_private_work(
95 reactor_scheduler_context* ctx,
96 std::atomic<std::int64_t>& outstanding_work) noexcept
97 {
98 if (ctx && ctx->private_outstanding_work > 0)
99 {
100 outstanding_work.fetch_add(
101 ctx->private_outstanding_work, std::memory_order_relaxed);
102 ctx->private_outstanding_work = 0;
103 }
104 }
105
106 /** Drain private queue to global queue, flushing work count first.
107
108 @return True if any ops were drained.
109 */
110 inline bool
111 4x reactor_drain_private_queue(
112 reactor_scheduler_context* ctx,
113 std::atomic<std::int64_t>& outstanding_work,
114 op_queue& completed_ops) noexcept
115 {
116
2/4
✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 4 times.
✗ Branch 3 not taken.
4x if (!ctx || ctx->private_queue.empty())
117 4x return false;
118
119 reactor_flush_private_work(ctx, outstanding_work);
120 completed_ops.splice(ctx->private_queue);
121 return true;
122 4x }
123
124 /** Non-template base for reactor-backed scheduler implementations.
125
126 Provides the complete threading model shared by epoll, kqueue,
127 and select schedulers: signal state machine, inline completion
128 budget, work counting, run/poll methods, and the do_one event
129 loop.
130
131 Derived classes provide platform-specific hooks by overriding:
132 - `run_task(lock, ctx)` to run the reactor poll
133 - `interrupt_reactor()` to wake a blocked reactor
134
135 De-templated from the original CRTP design to eliminate
136 duplicate instantiations when multiple backends are compiled
137 into the same binary. Virtual dispatch for run_task (called
138 once per reactor cycle, before a blocking syscall) has
139 negligible overhead.
140
141 @par Thread Safety
142 All public member functions are thread-safe.
143 */
144 class reactor_scheduler_base
145 : public native_scheduler
146 , public capy::execution_context::service
147 {
148 public:
149 using key_type = scheduler;
150 using context_type = reactor_scheduler_context;
151
152 /// Post a coroutine for deferred execution.
153 void post(std::coroutine_handle<> h) const override;
154
155 /// Post a scheduler operation for deferred execution.
156 void post(scheduler_op* h) const override;
157
158 /// Return true if called from a thread running this scheduler.
159 bool running_in_this_thread() const noexcept override;
160
161 /// Request the scheduler to stop dispatching handlers.
162 void stop() override;
163
164 /// Return true if the scheduler has been stopped.
165 bool stopped() const noexcept override;
166
167 /// Reset the stopped state so `run()` can resume.
168 void restart() override;
169
170 /// Run the event loop until no work remains.
171 std::size_t run() override;
172
173 /// Run until one handler completes or no work remains.
174 std::size_t run_one() override;
175
176 /// Run until one handler completes or @a usec elapses.
177 std::size_t wait_one(long usec) override;
178
179 /// Run ready handlers without blocking.
180 std::size_t poll() override;
181
182 /// Run at most one ready handler without blocking.
183 std::size_t poll_one() override;
184
185 /// Increment the outstanding work count.
186 void work_started() noexcept override;
187
188 /// Decrement the outstanding work count, stopping on zero.
189 void work_finished() noexcept override;
190
191 /** Reset the thread's inline completion budget.
192
193 Called at the start of each posted completion handler to
194 grant a fresh budget for speculative inline completions.
195 */
196 void reset_inline_budget() const noexcept;
197
198 /** Consume one unit of inline budget if available.
199
200 @return True if budget was available and consumed.
201 */
202 bool try_consume_inline_budget() const noexcept;
203
204 /** Offset a forthcoming work_finished from work_cleanup.
205
206 Called by descriptor_state when all I/O returned EAGAIN and
207 no handler will be executed. Must be called from a scheduler
208 thread.
209 */
210 void compensating_work_started() const noexcept;
211
212 /** Drain work from thread context's private queue to global queue.
213
214 Flushes private work count to the global counter, then
215 transfers the queue under mutex protection.
216
217 @param queue The private queue to drain.
218 @param count Private work count to flush before draining.
219 */
220 void drain_thread_queue(op_queue& queue, std::int64_t count) const;
221
222 /** Post completed operations for deferred invocation.
223
224 If called from a thread running this scheduler, operations
225 go to the thread's private queue (fast path). Otherwise,
226 operations are added to the global queue under mutex and a
227 waiter is signaled.
228
229 @par Preconditions
230 work_started() must have been called for each operation.
231
232 @param ops Queue of operations to post.
233 */
234 void post_deferred_completions(op_queue& ops) const;
235
236 protected:
237 1212x reactor_scheduler_base() = default;
238
239 /** Drain completed_ops during shutdown.
240
241 Pops all operations from the global queue and destroys them,
242 skipping the task sentinel. Signals all waiting threads.
243 Derived classes call this from their shutdown() override
244 before performing platform-specific cleanup.
245 */
246 void shutdown_drain();
247
248 /// RAII guard that re-inserts the task sentinel after `run_task`.
249 struct task_cleanup
250 {
251 reactor_scheduler_base const* sched;
252 std::unique_lock<std::mutex>* lock;
253 context_type* ctx;
254 ~task_cleanup();
255 };
256
257 mutable std::mutex mutex_;
258 mutable std::condition_variable cond_;
259 mutable op_queue completed_ops_;
260 606x mutable std::atomic<std::int64_t> outstanding_work_{0};
261 606x bool stopped_ = false;
262 606x mutable std::atomic<bool> task_running_{false};
263 606x mutable bool task_interrupted_ = false;
264
265 /// Bit 0 of `state_`: set when the condvar should be signaled.
266 static constexpr std::size_t signaled_bit = 1;
267
268 /// Increment per waiting thread in `state_`.
269 static constexpr std::size_t waiter_increment = 2;
270 606x mutable std::size_t state_ = 0;
271
272 /// Sentinel op that triggers a reactor poll when dequeued.
273 struct task_op final : scheduler_op
274 {
275 void operator()() override {}
276 void destroy() override {}
277 };
278 task_op task_op_;
279
280 /// Run the platform-specific reactor poll.
281 virtual void
282 run_task(std::unique_lock<std::mutex>& lock, context_type* ctx) = 0;
283
284 /// Wake a blocked reactor (e.g. write to eventfd or pipe).
285 virtual void interrupt_reactor() const = 0;
286
287 private:
288 struct work_cleanup
289 {
290 reactor_scheduler_base* sched;
291 std::unique_lock<std::mutex>* lock;
292 context_type* ctx;
293 ~work_cleanup();
294 };
295
296 std::size_t do_one(
297 std::unique_lock<std::mutex>& lock, long timeout_us, context_type* ctx);
298
299 void signal_all(std::unique_lock<std::mutex>& lock) const;
300 bool maybe_unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const;
301 bool unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const;
302 void clear_signal() const;
303 void wait_for_signal(std::unique_lock<std::mutex>& lock) const;
304 void wait_for_signal_for(
305 std::unique_lock<std::mutex>& lock, long timeout_us) const;
306 void wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const;
307 };
308
309 /** RAII guard that pushes/pops a scheduler context frame.
310
311 On construction, pushes a new context frame onto the
312 thread-local stack. On destruction, drains any remaining
313 private queue items to the global queue and pops the frame.
314 */
315 struct reactor_thread_context_guard
316 {
317 /// The context frame managed by this guard.
318 reactor_scheduler_context frame_;
319
320 /// Construct the guard, pushing a frame for @a sched.
321 6052x explicit reactor_thread_context_guard(
322 reactor_scheduler_base const* sched) noexcept
323
1/2
✓ Branch 0 taken 3026 times.
✗ Branch 1 not taken.
3026x : frame_(sched, reactor_context_stack.get())
324 3026x {
325 3026x reactor_context_stack.set(&frame_);
326 6052x }
327
328 /// Destroy the guard, draining private work and popping the frame.
329 6052x ~reactor_thread_context_guard() noexcept
330 3026x {
331
1/2
✓ Branch 0 taken 3026 times.
✗ Branch 1 not taken.
3026x if (!frame_.private_queue.empty())
332 frame_.key->drain_thread_queue(
333 frame_.private_queue, frame_.private_outstanding_work);
334 3026x reactor_context_stack.set(frame_.next);
335 6052x }
336 };
337
338 // ---- Inline implementations ------------------------------------------------
339
340 inline void
341 450889x reactor_scheduler_base::reset_inline_budget() const noexcept
342 {
343
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 450889 times.
450889x if (auto* ctx = reactor_find_context(this))
344 {
345 // Cap when no other thread absorbed queued work
346
1/2
✓ Branch 0 taken 450889 times.
✗ Branch 1 not taken.
450889x if (ctx->unassisted)
347 {
348 450889x ctx->inline_budget_max = 4;
349 450889x ctx->inline_budget = 4;
350 450889x return;
351 }
352 // Ramp up when previous cycle fully consumed budget
353 if (ctx->inline_budget == 0)
354 ctx->inline_budget_max = (std::min)(ctx->inline_budget_max * 2, 16);
355 else if (ctx->inline_budget < ctx->inline_budget_max)
356 ctx->inline_budget_max = 2;
357 ctx->inline_budget = ctx->inline_budget_max;
358 }
359 450889x }
360
361 inline bool
362 1719620x reactor_scheduler_base::try_consume_inline_budget() const noexcept
363 {
364
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1719620 times.
1719620x if (auto* ctx = reactor_find_context(this))
365 {
366
2/2
✓ Branch 0 taken 1397478 times.
✓ Branch 1 taken 322142 times.
1719620x if (ctx->inline_budget > 0)
367 {
368 1397478x --ctx->inline_budget;
369 1397478x return true;
370 }
371 322142x }
372 322142x return false;
373 1719620x }
374
375 inline void
376 15055x reactor_scheduler_base::post(std::coroutine_handle<> h) const
377 {
378 struct post_handler final : scheduler_op
379 {
380 std::coroutine_handle<> h_;
381
382 30105x explicit post_handler(std::coroutine_handle<> h) : h_(h) {}
383 30110x ~post_handler() override = default;
384
385 15046x void operator()() override
386 {
387 15046x auto saved = h_;
388
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 15046 times.
15046x delete this;
389 // Ensure stores from the posting thread are visible
390 15046x std::atomic_thread_fence(std::memory_order_acquire);
391 15046x saved.resume();
392 15046x }
393
394 9x void destroy() override
395 {
396 9x auto saved = h_;
397
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9 times.
9x delete this;
398 9x saved.destroy();
399 9x }
400 };
401
402 15055x auto ph = std::make_unique<post_handler>(h);
403
404
2/2
✓ Branch 0 taken 7563 times.
✓ Branch 1 taken 7492 times.
15055x if (auto* ctx = reactor_find_context(this))
405 {
406 7563x ++ctx->private_outstanding_work;
407 7563x ctx->private_queue.push(ph.release());
408 7563x return;
409 }
410
411 7492x outstanding_work_.fetch_add(1, std::memory_order_relaxed);
412
413
1/2
✓ Branch 0 taken 7492 times.
✗ Branch 1 not taken.
7492x std::unique_lock lock(mutex_);
414 7492x completed_ops_.push(ph.release());
415
1/2
✓ Branch 0 taken 7492 times.
✗ Branch 1 not taken.
7492x wake_one_thread_and_unlock(lock);
416 15055x }
417
418 inline void
419 330631x reactor_scheduler_base::post(scheduler_op* h) const
420 {
421
2/2
✓ Branch 0 taken 330595 times.
✓ Branch 1 taken 36 times.
330631x if (auto* ctx = reactor_find_context(this))
422 {
423 330595x ++ctx->private_outstanding_work;
424 330595x ctx->private_queue.push(h);
425 330595x return;
426 }
427
428 36x outstanding_work_.fetch_add(1, std::memory_order_relaxed);
429
430 36x std::unique_lock lock(mutex_);
431 36x completed_ops_.push(h);
432
1/2
✓ Branch 0 taken 36 times.
✗ Branch 1 not taken.
36x wake_one_thread_and_unlock(lock);
433 330631x }
434
435 inline bool
436 6491x reactor_scheduler_base::running_in_this_thread() const noexcept
437 {
438 6491x return reactor_find_context(this) != nullptr;
439 }
440
441 inline void
442 2972x reactor_scheduler_base::stop()
443 {
444 2972x std::unique_lock lock(mutex_);
445
2/2
✓ Branch 0 taken 15 times.
✓ Branch 1 taken 2957 times.
2972x if (!stopped_)
446 {
447 2957x stopped_ = true;
448
1/2
✓ Branch 0 taken 2957 times.
✗ Branch 1 not taken.
2957x signal_all(lock);
449
1/2
✓ Branch 0 taken 2957 times.
✗ Branch 1 not taken.
2957x interrupt_reactor();
450 2957x }
451 2972x }
452
453 inline bool
454 29x reactor_scheduler_base::stopped() const noexcept
455 {
456
1/2
✓ Branch 0 taken 29 times.
✗ Branch 1 not taken.
29x std::unique_lock lock(mutex_);
457 29x return stopped_;
458 29x }
459
460 inline void
461 2615x reactor_scheduler_base::restart()
462 {
463 2615x std::unique_lock lock(mutex_);
464 2615x stopped_ = false;
465 2615x }
466
467 inline std::size_t
468 2965x reactor_scheduler_base::run()
469 {
470
2/2
✓ Branch 0 taken 2962 times.
✓ Branch 1 taken 3 times.
2965x if (outstanding_work_.load(std::memory_order_acquire) == 0)
471 {
472 3x stop();
473 3x return 0;
474 }
475
476 2962x reactor_thread_context_guard ctx(this);
477
1/2
✓ Branch 0 taken 2962 times.
✗ Branch 1 not taken.
2962x std::unique_lock lock(mutex_);
478
479 2962x std::size_t n = 0;
480 1205562x for (;;)
481 {
482
3/4
✓ Branch 0 taken 1205562 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 1202600 times.
✓ Branch 3 taken 2962 times.
1205562x if (!do_one(lock, -1, &ctx.frame_))
483 2962x break;
484
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1202600 times.
1202600x if (n != (std::numeric_limits<std::size_t>::max)())
485 1202600x ++n;
486
2/2
✓ Branch 0 taken 871221 times.
✓ Branch 1 taken 331379 times.
1202600x if (!lock.owns_lock())
487
1/2
✓ Branch 0 taken 871221 times.
✗ Branch 1 not taken.
871221x lock.lock();
488 }
489 2962x return n;
490 2965x }
491
492 inline std::size_t
493 2x reactor_scheduler_base::run_one()
494 {
495
1/2
✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
2x if (outstanding_work_.load(std::memory_order_acquire) == 0)
496 {
497 stop();
498 return 0;
499 }
500
501 2x reactor_thread_context_guard ctx(this);
502
1/2
✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
2x std::unique_lock lock(mutex_);
503
1/2
✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
2x return do_one(lock, -1, &ctx.frame_);
504 2x }
505
506 inline std::size_t
507 65x reactor_scheduler_base::wait_one(long usec)
508 {
509
2/2
✓ Branch 0 taken 53 times.
✓ Branch 1 taken 12 times.
65x if (outstanding_work_.load(std::memory_order_acquire) == 0)
510 {
511 12x stop();
512 12x return 0;
513 }
514
515 53x reactor_thread_context_guard ctx(this);
516
1/2
✓ Branch 0 taken 53 times.
✗ Branch 1 not taken.
53x std::unique_lock lock(mutex_);
517
1/2
✓ Branch 0 taken 53 times.
✗ Branch 1 not taken.
53x return do_one(lock, usec, &ctx.frame_);
518 65x }
519
520 inline std::size_t
521 8x reactor_scheduler_base::poll()
522 {
523
2/2
✓ Branch 0 taken 7 times.
✓ Branch 1 taken 1 time.
8x if (outstanding_work_.load(std::memory_order_acquire) == 0)
524 {
525 1x stop();
526 1x return 0;
527 }
528
529 7x reactor_thread_context_guard ctx(this);
530
1/2
✓ Branch 0 taken 7 times.
✗ Branch 1 not taken.
7x std::unique_lock lock(mutex_);
531
532 7x std::size_t n = 0;
533 15x for (;;)
534 {
535
3/4
✓ Branch 0 taken 15 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 8 times.
✓ Branch 3 taken 7 times.
15x if (!do_one(lock, 0, &ctx.frame_))
536 7x break;
537
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8 times.
8x if (n != (std::numeric_limits<std::size_t>::max)())
538 8x ++n;
539
1/2
✓ Branch 0 taken 8 times.
✗ Branch 1 not taken.
8x if (!lock.owns_lock())
540
1/2
✓ Branch 0 taken 8 times.
✗ Branch 1 not taken.
8x lock.lock();
541 }
542 7x return n;
543 8x }
544
545 inline std::size_t
546 4x reactor_scheduler_base::poll_one()
547 {
548
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 2 times.
4x if (outstanding_work_.load(std::memory_order_acquire) == 0)
549 {
550 2x stop();
551 2x return 0;
552 }
553
554 2x reactor_thread_context_guard ctx(this);
555
1/2
✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
2x std::unique_lock lock(mutex_);
556
1/2
✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
2x return do_one(lock, 0, &ctx.frame_);
557 4x }
558
559 inline void
560 142768x reactor_scheduler_base::work_started() noexcept
561 {
562 142768x outstanding_work_.fetch_add(1, std::memory_order_relaxed);
563 142768x }
564
565 inline void
566 157031x reactor_scheduler_base::work_finished() noexcept
567 {
568
2/2
✓ Branch 0 taken 154084 times.
✓ Branch 1 taken 2947 times.
157031x if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
569
1/2
✓ Branch 0 taken 2947 times.
✗ Branch 1 not taken.
2947x stop();
570 157031x }
571
572 inline void
573 729183x reactor_scheduler_base::compensating_work_started() const noexcept
574 {
575 729183x auto* ctx = reactor_find_context(this);
576
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 729183 times.
729183x if (ctx)
577 729183x ++ctx->private_outstanding_work;
578 729183x }
579
580 inline void
581 reactor_scheduler_base::drain_thread_queue(
582 op_queue& queue, std::int64_t count) const
583 {
584 if (count > 0)
585 outstanding_work_.fetch_add(count, std::memory_order_relaxed);
586
587 std::unique_lock lock(mutex_);
588 completed_ops_.splice(queue);
589 if (count > 0)
590 maybe_unlock_and_signal_one(lock);
591 }
592
593 inline void
594 127813x reactor_scheduler_base::post_deferred_completions(op_queue& ops) const
595 {
596
1/2
✓ Branch 0 taken 127813 times.
✗ Branch 1 not taken.
127813x if (ops.empty())
597 127813x return;
598
599 if (auto* ctx = reactor_find_context(this))
600 {
601 ctx->private_queue.splice(ops);
602 return;
603 }
604
605 std::unique_lock lock(mutex_);
606 completed_ops_.splice(ops);
607 wake_one_thread_and_unlock(lock);
608 127813x }
609
610 inline void
611 606x reactor_scheduler_base::shutdown_drain()
612 {
613 606x std::unique_lock lock(mutex_);
614
615
2/2
✓ Branch 0 taken 718 times.
✓ Branch 1 taken 606 times.
1324x while (auto* h = completed_ops_.pop())
616 {
617
2/2
✓ Branch 0 taken 112 times.
✓ Branch 1 taken 606 times.
718x if (h == &task_op_)
618 606x continue;
619
1/2
✓ Branch 0 taken 112 times.
✗ Branch 1 not taken.
112x lock.unlock();
620
1/2
✓ Branch 0 taken 112 times.
✗ Branch 1 not taken.
112x h->destroy();
621
1/2
✓ Branch 0 taken 112 times.
✗ Branch 1 not taken.
112x lock.lock();
622 }
623
624 606x signal_all(lock);
625 606x }
626
627 inline void
628 3563x reactor_scheduler_base::signal_all(std::unique_lock<std::mutex>&) const
629 {
630 3563x state_ |= signaled_bit;
631 3563x cond_.notify_all();
632 3563x }
633
634 inline bool
635 7528x reactor_scheduler_base::maybe_unlock_and_signal_one(
636 std::unique_lock<std::mutex>& lock) const
637 {
638 7528x state_ |= signaled_bit;
639
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7528 times.
7528x if (state_ > signaled_bit)
640 {
641 lock.unlock();
642 cond_.notify_one();
643 return true;
644 }
645 7528x return false;
646 7528x }
647
648 inline bool
649 1273606x reactor_scheduler_base::unlock_and_signal_one(
650 std::unique_lock<std::mutex>& lock) const
651 {
652 1273606x state_ |= signaled_bit;
653 1273606x bool have_waiters = state_ > signaled_bit;
654 1273606x lock.unlock();
655
1/2
✓ Branch 0 taken 1273606 times.
✗ Branch 1 not taken.
1273606x if (have_waiters)
656 cond_.notify_one();
657 1273606x return have_waiters;
658 }
659
660 inline void
661 3x reactor_scheduler_base::clear_signal() const
662 {
663 3x state_ &= ~signaled_bit;
664 3x }
665
666 inline void
667 3x reactor_scheduler_base::wait_for_signal(
668 std::unique_lock<std::mutex>& lock) const
669 {
670
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 3 times.
6x while ((state_ & signaled_bit) == 0)
671 {
672 3x state_ += waiter_increment;
673 3x cond_.wait(lock);
674 3x state_ -= waiter_increment;
675 }
676 3x }
677
678 inline void
679 reactor_scheduler_base::wait_for_signal_for(
680 std::unique_lock<std::mutex>& lock, long timeout_us) const
681 {
682 if ((state_ & signaled_bit) == 0)
683 {
684 state_ += waiter_increment;
685 cond_.wait_for(lock, std::chrono::microseconds(timeout_us));
686 state_ -= waiter_increment;
687 }
688 }
689
690 inline void
691 7528x reactor_scheduler_base::wake_one_thread_and_unlock(
692 std::unique_lock<std::mutex>& lock) const
693 {
694
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7528 times.
7528x if (maybe_unlock_and_signal_one(lock))
695 return;
696
697
3/4
✓ Branch 0 taken 28 times.
✓ Branch 1 taken 7500 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 28 times.
7528x if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
698 {
699 28x task_interrupted_ = true;
700 28x lock.unlock();
701 28x interrupt_reactor();
702 28x }
703 else
704 {
705 7500x lock.unlock();
706 }
707 7528x }
708
709 2405325x inline reactor_scheduler_base::work_cleanup::~work_cleanup()
710 1202660x {
711
1/2
✓ Branch 0 taken 1202665 times.
✗ Branch 1 not taken.
1202665x if (ctx)
712 {
713 1202665x std::int64_t produced = ctx->private_outstanding_work;
714
2/2
✓ Branch 0 taken 16 times.
✓ Branch 1 taken 1202649 times.
1202665x if (produced > 1)
715 32x sched->outstanding_work_.fetch_add(
716 16x produced - 1, std::memory_order_relaxed);
717
2/2
✓ Branch 0 taken 1060573 times.
✓ Branch 1 taken 142076 times.
1202649x else if (produced < 1)
718 142076x sched->work_finished();
719 1202665x ctx->private_outstanding_work = 0;
720
721
2/2
✓ Branch 0 taken 871264 times.
✓ Branch 1 taken 331401 times.
1202665x if (!ctx->private_queue.empty())
722 {
723
1/2
✓ Branch 0 taken 331401 times.
✗ Branch 1 not taken.
331401x lock->lock();
724 331401x sched->completed_ops_.splice(ctx->private_queue);
725 331401x }
726 1202665x }
727 else
728 {
729 sched->work_finished();
730 }
731 2405325x }
732
733 1515016x inline reactor_scheduler_base::task_cleanup::~task_cleanup()
734 757508x {
735
1/2
✓ Branch 0 taken 757508 times.
✗ Branch 1 not taken.
757508x if (!ctx)
736 return;
737
738
2/2
✓ Branch 0 taken 750783 times.
✓ Branch 1 taken 6725 times.
757508x if (ctx->private_outstanding_work > 0)
739 {
740 13450x sched->outstanding_work_.fetch_add(
741 6725x ctx->private_outstanding_work, std::memory_order_relaxed);
742 6725x ctx->private_outstanding_work = 0;
743 6725x }
744
745
2/2
✓ Branch 0 taken 750783 times.
✓ Branch 1 taken 6725 times.
757508x if (!ctx->private_queue.empty())
746 {
747
1/2
✓ Branch 0 taken 6725 times.
✗ Branch 1 not taken.
6725x if (!lock->owns_lock())
748 lock->lock();
749 6725x sched->completed_ops_.splice(ctx->private_queue);
750 6725x }
751 1515016x }
752
753 inline std::size_t
754 1205634x reactor_scheduler_base::do_one(
755 std::unique_lock<std::mutex>& lock, long timeout_us, context_type* ctx)
756 {
757 1205637x for (;;)
758 {
759
2/2
✓ Branch 0 taken 1960183 times.
✓ Branch 1 taken 2962 times.
1963145x if (stopped_)
760 2962x return 0;
761
762 1960183x scheduler_op* op = completed_ops_.pop();
763
764 // Handle reactor sentinel — time to poll for I/O
765
2/2
✓ Branch 0 taken 1202669 times.
✓ Branch 1 taken 757514 times.
1960183x if (op == &task_op_)
766 {
767 757514x bool more_handlers =
768
3/4
✓ Branch 0 taken 70951 times.
✓ Branch 1 taken 686563 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 686563 times.
757514x !completed_ops_.empty() || (ctx && !ctx->private_queue.empty());
769
770
4/4
✓ Branch 0 taken 686563 times.
✓ Branch 1 taken 70951 times.
✓ Branch 2 taken 686557 times.
✓ Branch 3 taken 4 times.
1444075x if (!more_handlers &&
771
2/2
✓ Branch 0 taken 686561 times.
✓ Branch 1 taken 2 times.
686563x (outstanding_work_.load(std::memory_order_acquire) == 0 ||
772 686561x timeout_us == 0))
773 {
774 6x completed_ops_.push(&task_op_);
775 6x return 0;
776 }
777
778
2/2
✓ Branch 0 taken 70951 times.
✓ Branch 1 taken 686557 times.
757508x task_interrupted_ = more_handlers || timeout_us == 0;
779 757508x task_running_.store(true, std::memory_order_release);
780
781
2/2
✓ Branch 0 taken 686557 times.
✓ Branch 1 taken 70951 times.
757508x if (more_handlers)
782 70951x unlock_and_signal_one(lock);
783
784 try
785 {
786
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 757508 times.
757508x run_task(lock, ctx);
787 757508x }
788 catch (...)
789 {
790 task_running_.store(false, std::memory_order_relaxed);
791 throw;
792 }
793
794 757508x task_running_.store(false, std::memory_order_relaxed);
795 757508x completed_ops_.push(&task_op_);
796 757508x continue;
797 }
798
799 // Handle operation
800
2/2
✓ Branch 0 taken 4 times.
✓ Branch 1 taken 1202665 times.
1202669x if (op != nullptr)
801 {
802 1202665x bool more = !completed_ops_.empty();
803
804
2/2
✓ Branch 0 taken 1202655 times.
✓ Branch 1 taken 10 times.
1202665x if (more)
805 1202655x ctx->unassisted = !unlock_and_signal_one(lock);
806 else
807 {
808 10x ctx->unassisted = false;
809 10x lock.unlock();
810 }
811
812 1202665x work_cleanup on_exit{this, &lock, ctx};
813 (void)on_exit;
814
815
1/2
✓ Branch 0 taken 1202665 times.
✗ Branch 1 not taken.
1202665x (*op)();
816 1202665x return 1;
817 1202665x }
818
819 // Try private queue before blocking
820
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4 times.
4x if (reactor_drain_private_queue(ctx, outstanding_work_, completed_ops_))
821 continue;
822
823
3/4
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 1 time.
✗ Branch 2 not taken.
✓ Branch 3 taken 3 times.
4x if (outstanding_work_.load(std::memory_order_acquire) == 0 ||
824 3x timeout_us == 0)
825 1x return 0;
826
827 3x clear_signal();
828
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3x if (timeout_us < 0)
829 3x wait_for_signal(lock);
830 else
831 wait_for_signal_for(lock, timeout_us);
832 }
833 1205634x }
834
835 } // namespace boost::corosio::detail
836
837 #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SCHEDULER_HPP
838