include/boost/corosio/detail/timer_service.hpp

88.1% Lines (312/354) 97.7% Functions (43/44) 72.3% Branches (125/173)
include/boost/corosio/detail/timer_service.hpp
Line Branch TLA Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco ([email protected])
3 // Copyright (c) 2026 Steve Gerbino
4 //
5 // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 //
8 // Official repository: https://github.com/cppalliance/corosio
9 //
10
11 #ifndef BOOST_COROSIO_DETAIL_TIMER_SERVICE_HPP
12 #define BOOST_COROSIO_DETAIL_TIMER_SERVICE_HPP
13
14 #include <boost/corosio/timer.hpp>
15 #include <boost/corosio/io_context.hpp>
16 #include <boost/corosio/detail/scheduler_op.hpp>
17 #include <boost/corosio/native/native_scheduler.hpp>
18 #include <boost/corosio/detail/intrusive.hpp>
19 #include <boost/corosio/detail/thread_local_ptr.hpp>
20 #include <boost/capy/error.hpp>
21 #include <boost/capy/ex/execution_context.hpp>
22 #include <boost/capy/ex/executor_ref.hpp>
23 #include <system_error>
24
25 #include <atomic>
26 #include <chrono>
27 #include <coroutine>
28 #include <cstddef>
29 #include <limits>
30 #include <mutex>
31 #include <optional>
32 #include <stop_token>
33 #include <vector>
34
35 namespace boost::corosio::detail {
36
37 struct scheduler;
38
39 /*
40 Timer Service
41 =============
42
43 Data Structures
44 ---------------
45 waiter_node holds per-waiter state: coroutine handle, executor,
46 error output, stop_token, embedded completion_op. Each concurrent
47 co_await t.wait() allocates one waiter_node.
48
49 timer_service::implementation holds per-timer state: expiry,
50 heap index, and an intrusive_list of waiter_nodes. Multiple
51 coroutines can wait on the same timer simultaneously.
52
53 timer_service owns a min-heap of active timers, a free list
54 of recycled impls, and a free list of recycled waiter_nodes. The
55 heap is ordered by expiry time; the scheduler queries
56 nearest_expiry() to set the epoll/timerfd timeout.
57
58 Optimization Strategy
59 ---------------------
60 1. Deferred heap insertion — expires_after() stores the expiry
61 but does not insert into the heap. Insertion happens in wait().
62 2. Thread-local impl cache — single-slot per-thread cache.
63 3. Embedded completion_op — eliminates heap allocation per fire/cancel.
64 4. Cached nearest expiry — atomic avoids mutex in nearest_expiry().
65 5. might_have_pending_waits_ flag — skips lock when no wait issued.
66 6. Thread-local waiter cache — single-slot per-thread cache.
67
68 Concurrency
69 -----------
70 stop_token callbacks can fire from any thread. The impl_
71 pointer on waiter_node is used as a "still in list" marker.
72 */
73
74 struct BOOST_COROSIO_SYMBOL_VISIBLE waiter_node;
75
76 inline void timer_service_invalidate_cache() noexcept;
77
78 // timer_service class body — member function definitions are
79 // out-of-class (after implementation and waiter_node are complete)
80 class BOOST_COROSIO_DECL timer_service final
81 : public capy::execution_context::service
82 , public io_object::io_service
83 {
84 public:
85 using clock_type = std::chrono::steady_clock;
86 using time_point = clock_type::time_point;
87
88 class callback
89 {
90 void* ctx_ = nullptr;
91 void (*fn_)(void*) = nullptr;
92
93 public:
94 290 callback() = default;
95 290 callback(void* ctx, void (*fn)(void*)) noexcept : ctx_(ctx), fn_(fn) {}
96
97 explicit operator bool() const noexcept
98 {
99 return fn_ != nullptr;
100 }
101 1764 void operator()() const
102 {
103
1/2
✓ Branch 2 → 3 taken 1764 times.
✗ Branch 2 → 4 not taken.
1764 if (fn_)
104 1764 fn_(ctx_);
105 1764 }
106 };
107
108 struct implementation;
109
110 private:
111 struct heap_entry
112 {
113 time_point time_;
114 implementation* timer_;
115 };
116
117 scheduler* sched_ = nullptr;
118 mutable std::mutex mutex_;
119 std::vector<heap_entry> heap_;
120 implementation* free_list_ = nullptr;
121 waiter_node* waiter_free_list_ = nullptr;
122 callback on_earliest_changed_;
123 // Avoids mutex in nearest_expiry() and empty()
124 mutable std::atomic<std::int64_t> cached_nearest_ns_{
125 (std::numeric_limits<std::int64_t>::max)()};
126
127 public:
128 290 inline timer_service(capy::execution_context&, scheduler& sched)
129 290 : sched_(&sched)
130 {
131 290 }
132
133 3556 inline scheduler& get_scheduler() noexcept
134 {
135 3556 return *sched_;
136 }
137
138 580 ~timer_service() override = default;
139
140 timer_service(timer_service const&) = delete;
141 timer_service& operator=(timer_service const&) = delete;
142
143 290 inline void set_on_earliest_changed(callback cb)
144 {
145 290 on_earliest_changed_ = cb;
146 290 }
147
148 inline bool empty() const noexcept
149 {
150 return cached_nearest_ns_.load(std::memory_order_acquire) ==
151 (std::numeric_limits<std::int64_t>::max)();
152 }
153
154 2800 inline time_point nearest_expiry() const noexcept
155 {
156 2800 auto ns = cached_nearest_ns_.load(std::memory_order_acquire);
157 2800 return time_point(time_point::duration(ns));
158 }
159
160 inline void shutdown() override;
161 inline io_object::implementation* construct() override;
162 inline void destroy(io_object::implementation* p) override;
163 inline void destroy_impl(implementation& impl);
164 inline waiter_node* create_waiter();
165 inline void destroy_waiter(waiter_node* w);
166 inline std::size_t update_timer(implementation& impl, time_point new_time);
167 inline void insert_waiter(implementation& impl, waiter_node* w);
168 inline std::size_t cancel_timer(implementation& impl);
169 inline void cancel_waiter(waiter_node* w);
170 inline std::size_t cancel_one_waiter(implementation& impl);
171 inline std::size_t process_expired();
172
173 private:
174 3540 inline void refresh_cached_nearest() noexcept
175 {
176
2/2
✓ Branch 3 → 4 taken 777 times.
✓ Branch 3 → 5 taken 2763 times.
3540 auto ns = heap_.empty() ? (std::numeric_limits<std::int64_t>::max)()
177 2763 : heap_[0].time_.time_since_epoch().count();
178 3540 cached_nearest_ns_.store(ns, std::memory_order_release);
179 3540 }
180
181 inline void remove_timer_impl(implementation& impl);
182 inline void up_heap(std::size_t index);
183 inline void down_heap(std::size_t index);
184 inline void swap_heap(std::size_t i1, std::size_t i2);
185 };
186
187 struct BOOST_COROSIO_SYMBOL_VISIBLE waiter_node
188 : intrusive_list<waiter_node>::node
189 {
190 // Embedded completion op — avoids heap allocation per fire/cancel
191 struct completion_op final : scheduler_op
192 {
193 waiter_node* waiter_ = nullptr;
194
195 static void do_complete(
196 void* owner, scheduler_op* base, std::uint32_t, std::uint32_t);
197
198 92 completion_op() noexcept : scheduler_op(&do_complete) {}
199
200 void operator()() override;
201 // No-op — lifetime owned by waiter_node, not the scheduler queue
202 void destroy() override {}
203 };
204
205 // Per-waiter stop_token cancellation
206 struct canceller
207 {
208 waiter_node* waiter_;
209 void operator()() const;
210 };
211
212 // nullptr once removed from timer's waiter list (concurrency marker)
213 timer_service::implementation* impl_ = nullptr;
214 timer_service* svc_ = nullptr;
215 std::coroutine_handle<> h_;
216 capy::executor_ref d_;
217 std::error_code* ec_out_ = nullptr;
218 std::stop_token token_;
219 std::optional<std::stop_callback<canceller>> stop_cb_;
220 completion_op op_;
221 std::error_code ec_value_;
222 waiter_node* next_free_ = nullptr;
223
224 92 waiter_node() noexcept
225 92 {
226 92 op_.waiter_ = this;
227 92 }
228 };
229
230 struct timer_service::implementation final : timer::implementation
231 {
232 using clock_type = std::chrono::steady_clock;
233 using time_point = clock_type::time_point;
234 using duration = clock_type::duration;
235
236 timer_service* svc_ = nullptr;
237 intrusive_list<waiter_node> waiters_;
238
239 // Free list linkage (reused when impl is on free_list)
240 implementation* next_free_ = nullptr;
241
242 inline explicit implementation(timer_service& svc) noexcept;
243
244 inline std::coroutine_handle<> wait(
245 std::coroutine_handle<>,
246 capy::executor_ref,
247 std::stop_token,
248 std::error_code*) override;
249 };
250
251 // Thread-local caches avoid hot-path mutex acquisitions:
252 // 1. Impl cache — single-slot, validated by comparing svc_
253 // 2. Waiter cache — single-slot, no service affinity
254 // All caches are cleared by timer_service_invalidate_cache() during shutdown.
255
256 inline thread_local_ptr<timer_service::implementation> tl_cached_impl;
257 inline thread_local_ptr<waiter_node> tl_cached_waiter;
258
259 inline timer_service::implementation*
260 2113 try_pop_tl_cache(timer_service* svc) noexcept
261 {
262 2113 auto* impl = tl_cached_impl.get();
263
2/2
✓ Branch 3 → 4 taken 2001 times.
✓ Branch 3 → 9 taken 112 times.
2113 if (impl)
264 {
265 2001 tl_cached_impl.set(nullptr);
266
1/2
✓ Branch 5 → 6 taken 2001 times.
✗ Branch 5 → 7 not taken.
2001 if (impl->svc_ == svc)
267 2001 return impl;
268 // Stale impl from a destroyed service
269 delete impl;
270 }
271 112 return nullptr;
272 }
273
274 inline bool
275 2113 try_push_tl_cache(timer_service::implementation* impl) noexcept
276 {
277
2/2
✓ Branch 3 → 4 taken 2089 times.
✓ Branch 3 → 6 taken 24 times.
2113 if (!tl_cached_impl.get())
278 {
279 2089 tl_cached_impl.set(impl);
280 2089 return true;
281 }
282 24 return false;
283 }
284
285 inline waiter_node*
286 1778 try_pop_waiter_tl_cache() noexcept
287 {
288 1778 auto* w = tl_cached_waiter.get();
289
2/2
✓ Branch 3 → 4 taken 1686 times.
✓ Branch 3 → 6 taken 92 times.
1778 if (w)
290 {
291 1686 tl_cached_waiter.set(nullptr);
292 1686 return w;
293 }
294 92 return nullptr;
295 }
296
297 inline bool
298 1778 try_push_waiter_tl_cache(waiter_node* w) noexcept
299 {
300
2/2
✓ Branch 3 → 4 taken 1749 times.
✓ Branch 3 → 6 taken 29 times.
1778 if (!tl_cached_waiter.get())
301 {
302 1749 tl_cached_waiter.set(w);
303 1749 return true;
304 }
305 29 return false;
306 }
307
308 inline void
309 290 timer_service_invalidate_cache() noexcept
310 {
311
2/2
✓ Branch 3 → 4 taken 88 times.
✓ Branch 3 → 5 taken 202 times.
290 delete tl_cached_impl.get();
312 290 tl_cached_impl.set(nullptr);
313
314
2/2
✓ Branch 7 → 8 taken 63 times.
✓ Branch 7 → 10 taken 227 times.
290 delete tl_cached_waiter.get();
315 290 tl_cached_waiter.set(nullptr);
316 290 }
317
318 // timer_service out-of-class member function definitions
319
320 112 inline timer_service::implementation::implementation(
321 112 timer_service& svc) noexcept
322 112 : svc_(&svc)
323 {
324 112 }
325
326 inline void
327 290 timer_service::shutdown()
328 {
329 290 timer_service_invalidate_cache();
330
331 // Cancel waiting timers still in the heap
332
1/2
✗ Branch 21 → 5 not taken.
✓ Branch 21 → 22 taken 290 times.
290 for (auto& entry : heap_)
333 {
334 auto* impl = entry.timer_;
335 while (auto* w = impl->waiters_.pop_front())
336 {
337 w->stop_cb_.reset();
338 w->h_ = {};
339 sched_->work_finished();
340 delete w;
341 }
342 impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
343 delete impl;
344 }
345 290 heap_.clear();
346 290 cached_nearest_ns_.store(
347 (std::numeric_limits<std::int64_t>::max)(), std::memory_order_release);
348
349 // Delete free-listed impls
350
2/2
✓ Branch 48 → 45 taken 24 times.
✓ Branch 48 → 49 taken 290 times.
314 while (free_list_)
351 {
352 24 auto* next = free_list_->next_free_;
353
1/2
✓ Branch 45 → 46 taken 24 times.
✗ Branch 45 → 47 not taken.
24 delete free_list_;
354 24 free_list_ = next;
355 }
356
357 // Delete free-listed waiters
358
2/2
✓ Branch 54 → 50 taken 29 times.
✓ Branch 54 → 55 taken 290 times.
319 while (waiter_free_list_)
359 {
360 29 auto* next = waiter_free_list_->next_free_;
361
1/2
✓ Branch 50 → 51 taken 29 times.
✗ Branch 50 → 53 not taken.
29 delete waiter_free_list_;
362 29 waiter_free_list_ = next;
363 }
364 290 }
365
366 inline io_object::implementation*
367 2113 timer_service::construct()
368 {
369 2113 implementation* impl = try_pop_tl_cache(this);
370
2/2
✓ Branch 3 → 4 taken 2001 times.
✓ Branch 3 → 6 taken 112 times.
2113 if (impl)
371 {
372 2001 impl->svc_ = this;
373 2001 impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
374 2001 impl->might_have_pending_waits_ = false;
375 2001 return impl;
376 }
377
378
1/1
✓ Branch 6 → 7 taken 112 times.
112 std::lock_guard lock(mutex_);
379
1/2
✗ Branch 7 → 8 not taken.
✓ Branch 7 → 10 taken 112 times.
112 if (free_list_)
380 {
381 impl = free_list_;
382 free_list_ = impl->next_free_;
383 impl->next_free_ = nullptr;
384 impl->svc_ = this;
385 impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
386 impl->might_have_pending_waits_ = false;
387 }
388 else
389 {
390
1/1
✓ Branch 10 → 11 taken 112 times.
112 impl = new implementation(*this);
391 }
392 112 return impl;
393 112 }
394
395 inline void
396 2113 timer_service::destroy(io_object::implementation* p)
397 {
398 2113 destroy_impl(static_cast<implementation&>(*p));
399 2113 }
400
401 inline void
402 2113 timer_service::destroy_impl(implementation& impl)
403 {
404
1/1
✓ Branch 2 → 3 taken 2113 times.
2113 cancel_timer(impl);
405
406
1/2
✗ Branch 4 → 5 not taken.
✓ Branch 4 → 10 taken 2113 times.
2113 if (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)())
407 {
408 std::lock_guard lock(mutex_);
409 remove_timer_impl(impl);
410 refresh_cached_nearest();
411 }
412
413
2/2
✓ Branch 11 → 12 taken 2089 times.
✓ Branch 11 → 13 taken 24 times.
2113 if (try_push_tl_cache(&impl))
414 2089 return;
415
416
1/1
✓ Branch 13 → 14 taken 24 times.
24 std::lock_guard lock(mutex_);
417 24 impl.next_free_ = free_list_;
418 24 free_list_ = &impl;
419 24 }
420
421 inline waiter_node*
422 1778 timer_service::create_waiter()
423 {
424
2/2
✓ Branch 3 → 4 taken 1686 times.
✓ Branch 3 → 5 taken 92 times.
1778 if (auto* w = try_pop_waiter_tl_cache())
425 1686 return w;
426
427
1/1
✓ Branch 5 → 6 taken 92 times.
92 std::lock_guard lock(mutex_);
428
1/2
✗ Branch 6 → 7 not taken.
✓ Branch 6 → 8 taken 92 times.
92 if (waiter_free_list_)
429 {
430 auto* w = waiter_free_list_;
431 waiter_free_list_ = w->next_free_;
432 w->next_free_ = nullptr;
433 return w;
434 }
435
436
1/1
✓ Branch 8 → 9 taken 92 times.
92 return new waiter_node();
437 92 }
438
439 inline void
440 1778 timer_service::destroy_waiter(waiter_node* w)
441 {
442
2/2
✓ Branch 3 → 4 taken 1749 times.
✓ Branch 3 → 5 taken 29 times.
1778 if (try_push_waiter_tl_cache(w))
443 1749 return;
444
445
1/1
✓ Branch 5 → 6 taken 29 times.
29 std::lock_guard lock(mutex_);
446 29 w->next_free_ = waiter_free_list_;
447 29 waiter_free_list_ = w;
448 29 }
449
450 inline std::size_t
451 3 timer_service::update_timer(implementation& impl, time_point new_time)
452 {
453 bool in_heap =
454 3 (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)());
455
2/6
✗ Branch 3 → 4 not taken.
✓ Branch 3 → 7 taken 3 times.
✗ Branch 5 → 6 not taken.
✗ Branch 5 → 7 not taken.
✗ Branch 8 → 9 not taken.
✓ Branch 8 → 10 taken 3 times.
3 if (!in_heap && impl.waiters_.empty())
456 return 0;
457
458 3 bool notify = false;
459 3 intrusive_list<waiter_node> canceled;
460
461 {
462
1/1
✓ Branch 10 → 11 taken 3 times.
3 std::lock_guard lock(mutex_);
463
464
2/2
✓ Branch 12 → 13 taken 5 times.
✓ Branch 12 → 15 taken 3 times.
8 while (auto* w = impl.waiters_.pop_front())
465 {
466 5 w->impl_ = nullptr;
467 5 canceled.push_back(w);
468 5 }
469
470
1/2
✓ Branch 16 → 17 taken 3 times.
✗ Branch 16 → 25 not taken.
3 if (impl.heap_index_ < heap_.size())
471 {
472 3 time_point old_time = heap_[impl.heap_index_].time_;
473 3 heap_[impl.heap_index_].time_ = new_time;
474
475
2/3
✓ Branch 19 → 20 taken 3 times.
✓ Branch 21 → 22 taken 3 times.
✗ Branch 21 → 23 not taken.
3 if (new_time < old_time)
476
1/1
✓ Branch 22 → 24 taken 3 times.
3 up_heap(impl.heap_index_);
477 else
478 down_heap(impl.heap_index_);
479
480 3 notify = (impl.heap_index_ == 0);
481 }
482
483 3 refresh_cached_nearest();
484 3 }
485
486 3 std::size_t count = 0;
487
2/2
✓ Branch 29 → 30 taken 5 times.
✓ Branch 29 → 33 taken 3 times.
8 while (auto* w = canceled.pop_front())
488 {
489 5 w->ec_value_ = make_error_code(capy::error::canceled);
490
1/1
✓ Branch 31 → 32 taken 5 times.
5 sched_->post(&w->op_);
491 5 ++count;
492 5 }
493
494
1/2
✓ Branch 33 → 34 taken 3 times.
✗ Branch 33 → 35 not taken.
3 if (notify)
495
1/1
✓ Branch 34 → 35 taken 3 times.
3 on_earliest_changed_();
496
497 3 return count;
498 }
499
500 inline void
501 1778 timer_service::insert_waiter(implementation& impl, waiter_node* w)
502 {
503 1778 bool notify = false;
504 {
505
1/1
✓ Branch 2 → 3 taken 1778 times.
1778 std::lock_guard lock(mutex_);
506
2/2
✓ Branch 4 → 5 taken 1767 times.
✓ Branch 4 → 10 taken 11 times.
1778 if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)())
507 {
508 1767 impl.heap_index_ = heap_.size();
509
1/1
✓ Branch 6 → 7 taken 1767 times.
1767 heap_.push_back({impl.expiry_, &impl});
510
1/1
✓ Branch 8 → 9 taken 1767 times.
1767 up_heap(heap_.size() - 1);
511 1767 notify = (impl.heap_index_ == 0);
512 1767 refresh_cached_nearest();
513 }
514 1778 impl.waiters_.push_back(w);
515 1778 }
516
2/2
✓ Branch 12 → 13 taken 1761 times.
✓ Branch 12 → 14 taken 17 times.
1778 if (notify)
517 1761 on_earliest_changed_();
518 1778 }
519
520 inline std::size_t
521 2840 timer_service::cancel_timer(implementation& impl)
522 {
523
2/2
✓ Branch 2 → 3 taken 2109 times.
✓ Branch 2 → 4 taken 731 times.
2840 if (!impl.might_have_pending_waits_)
524 2109 return 0;
525
526 // Not in heap and no waiters — just clear the flag
527
2/6
✗ Branch 5 → 6 not taken.
✓ Branch 5 → 9 taken 731 times.
✗ Branch 7 → 8 not taken.
✗ Branch 7 → 9 not taken.
✗ Branch 10 → 11 not taken.
✓ Branch 10 → 12 taken 731 times.
731 if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)() &&
528 impl.waiters_.empty())
529 {
530 impl.might_have_pending_waits_ = false;
531 return 0;
532 }
533
534 731 intrusive_list<waiter_node> canceled;
535
536 {
537
1/1
✓ Branch 12 → 13 taken 731 times.
731 std::lock_guard lock(mutex_);
538
1/1
✓ Branch 13 → 14 taken 731 times.
731 remove_timer_impl(impl);
539
2/2
✓ Branch 15 → 16 taken 733 times.
✓ Branch 15 → 18 taken 731 times.
1464 while (auto* w = impl.waiters_.pop_front())
540 {
541 733 w->impl_ = nullptr;
542 733 canceled.push_back(w);
543 733 }
544 731 refresh_cached_nearest();
545 731 }
546
547 731 impl.might_have_pending_waits_ = false;
548
549 731 std::size_t count = 0;
550
2/2
✓ Branch 22 → 23 taken 733 times.
✓ Branch 22 → 26 taken 731 times.
1464 while (auto* w = canceled.pop_front())
551 {
552 733 w->ec_value_ = make_error_code(capy::error::canceled);
553
1/1
✓ Branch 24 → 25 taken 733 times.
733 sched_->post(&w->op_);
554 733 ++count;
555 733 }
556
557 731 return count;
558 }
559
560 inline void
561 2 timer_service::cancel_waiter(waiter_node* w)
562 {
563 {
564
1/1
✓ Branch 2 → 3 taken 2 times.
2 std::lock_guard lock(mutex_);
565 // Already removed by cancel_timer or process_expired
566
1/2
✗ Branch 3 → 4 not taken.
✓ Branch 3 → 5 taken 2 times.
2 if (!w->impl_)
567 return;
568 2 auto* impl = w->impl_;
569 2 w->impl_ = nullptr;
570 2 impl->waiters_.remove(w);
571
2/2
✓ Branch 7 → 8 taken 1 time.
✓ Branch 7 → 10 taken 1 time.
2 if (impl->waiters_.empty())
572 {
573
1/1
✓ Branch 8 → 9 taken 1 time.
1 remove_timer_impl(*impl);
574 1 impl->might_have_pending_waits_ = false;
575 }
576 2 refresh_cached_nearest();
577 2 }
578
579 2 w->ec_value_ = make_error_code(capy::error::canceled);
580 2 sched_->post(&w->op_);
581 }
582
583 inline std::size_t
584 1 timer_service::cancel_one_waiter(implementation& impl)
585 {
586
1/2
✗ Branch 2 → 3 not taken.
✓ Branch 2 → 4 taken 1 time.
1 if (!impl.might_have_pending_waits_)
587 return 0;
588
589 1 waiter_node* w = nullptr;
590
591 {
592
1/1
✓ Branch 4 → 5 taken 1 time.
1 std::lock_guard lock(mutex_);
593 1 w = impl.waiters_.pop_front();
594
1/2
✗ Branch 6 → 7 not taken.
✓ Branch 6 → 8 taken 1 time.
1 if (!w)
595 return 0;
596 1 w->impl_ = nullptr;
597
1/2
✗ Branch 9 → 10 not taken.
✓ Branch 9 → 12 taken 1 time.
1 if (impl.waiters_.empty())
598 {
599 remove_timer_impl(impl);
600 impl.might_have_pending_waits_ = false;
601 }
602 1 refresh_cached_nearest();
603 1 }
604
605 1 w->ec_value_ = make_error_code(capy::error::canceled);
606 1 sched_->post(&w->op_);
607 1 return 1;
608 }
609
610 inline std::size_t
611 1036 timer_service::process_expired()
612 {
613 1036 intrusive_list<waiter_node> expired;
614
615 {
616
1/1
✓ Branch 2 → 3 taken 1036 times.
1036 std::lock_guard lock(mutex_);
617 1036 auto now = clock_type::now();
618
619
7/7
✓ Branch 14 → 15 taken 2025 times.
✓ Branch 14 → 20 taken 46 times.
✓ Branch 16 → 17 taken 2025 times.
✓ Branch 18 → 19 taken 1035 times.
✓ Branch 18 → 20 taken 990 times.
✓ Branch 21 → 5 taken 1035 times.
✓ Branch 21 → 22 taken 1036 times.
2071 while (!heap_.empty() && heap_[0].time_ <= now)
620 {
621 1035 implementation* t = heap_[0].timer_;
622
1/1
✓ Branch 6 → 7 taken 1035 times.
1035 remove_timer_impl(*t);
623
2/2
✓ Branch 8 → 9 taken 1037 times.
✓ Branch 8 → 12 taken 1035 times.
2072 while (auto* w = t->waiters_.pop_front())
624 {
625 1037 w->impl_ = nullptr;
626 1037 w->ec_value_ = {};
627 1037 expired.push_back(w);
628 1037 }
629 1035 t->might_have_pending_waits_ = false;
630 }
631
632 1036 refresh_cached_nearest();
633 1036 }
634
635 1036 std::size_t count = 0;
636
2/2
✓ Branch 26 → 27 taken 1037 times.
✓ Branch 26 → 29 taken 1036 times.
2073 while (auto* w = expired.pop_front())
637 {
638
1/1
✓ Branch 27 → 28 taken 1037 times.
1037 sched_->post(&w->op_);
639 1037 ++count;
640 1037 }
641
642 1036 return count;
643 }
644
645 inline void
646 1767 timer_service::remove_timer_impl(implementation& impl)
647 {
648 1767 std::size_t index = impl.heap_index_;
649
1/2
✗ Branch 3 → 4 not taken.
✓ Branch 3 → 5 taken 1767 times.
1767 if (index >= heap_.size())
650 return; // Not in heap
651
652
2/2
✓ Branch 6 → 7 taken 778 times.
✓ Branch 6 → 9 taken 989 times.
1767 if (index == heap_.size() - 1)
653 {
654 // Last element, just pop
655 778 impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
656 778 heap_.pop_back();
657 }
658 else
659 {
660 // Swap with last and reheapify
661 989 swap_heap(index, heap_.size() - 1);
662 989 impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
663 989 heap_.pop_back();
664
665
2/6
✗ Branch 13 → 14 not taken.
✓ Branch 13 → 20 taken 989 times.
✗ Branch 18 → 19 not taken.
✗ Branch 18 → 20 not taken.
✗ Branch 21 → 22 not taken.
✓ Branch 21 → 23 taken 989 times.
989 if (index > 0 && heap_[index].time_ < heap_[(index - 1) / 2].time_)
666 up_heap(index);
667 else
668 989 down_heap(index);
669 }
670 }
671
672 inline void
673 1770 timer_service::up_heap(std::size_t index)
674 {
675
2/2
✓ Branch 11 → 3 taken 990 times.
✓ Branch 11 → 12 taken 1764 times.
2754 while (index > 0)
676 {
677 990 std::size_t parent = (index - 1) / 2;
678
2/2
✓ Branch 7 → 8 taken 6 times.
✓ Branch 7 → 9 taken 984 times.
990 if (!(heap_[index].time_ < heap_[parent].time_))
679 6 break;
680 984 swap_heap(index, parent);
681 984 index = parent;
682 }
683 1770 }
684
685 inline void
686 989 timer_service::down_heap(std::size_t index)
687 {
688 989 std::size_t child = index * 2 + 1;
689
2/2
✓ Branch 21 → 3 taken 2 times.
✓ Branch 21 → 22 taken 987 times.
989 while (child < heap_.size())
690 {
691 2 std::size_t min_child = (child + 1 == heap_.size() ||
692 heap_[child].time_ < heap_[child + 1].time_)
693
1/2
✗ Branch 4 → 5 not taken.
✓ Branch 4 → 11 taken 2 times.
2 ? child
694 2 : child + 1;
695
696
1/2
✓ Branch 16 → 17 taken 2 times.
✗ Branch 16 → 18 not taken.
2 if (heap_[index].time_ < heap_[min_child].time_)
697 2 break;
698
699 swap_heap(index, min_child);
700 index = min_child;
701 child = index * 2 + 1;
702 }
703 989 }
704
705 inline void
706 1973 timer_service::swap_heap(std::size_t i1, std::size_t i2)
707 {
708 1973 heap_entry tmp = heap_[i1];
709 1973 heap_[i1] = heap_[i2];
710 1973 heap_[i2] = tmp;
711 1973 heap_[i1].timer_->heap_index_ = i1;
712 1973 heap_[i2].timer_->heap_index_ = i2;
713 1973 }
714
715 // waiter_node out-of-class member function definitions
716
717 inline void
718 2 waiter_node::canceller::operator()() const
719 {
720 2 waiter_->svc_->cancel_waiter(waiter_);
721 2 }
722
723 inline void
724 1778 waiter_node::completion_op::do_complete(
725 void* owner, scheduler_op* base, std::uint32_t, std::uint32_t)
726 {
727
1/2
✗ Branch 2 → 3 not taken.
✓ Branch 2 → 4 taken 1778 times.
1778 if (!owner)
728 return;
729 1778 static_cast<completion_op*>(base)->operator()();
730 }
731
732 inline void
733 1778 waiter_node::completion_op::operator()()
734 {
735 1778 auto* w = waiter_;
736 1778 w->stop_cb_.reset();
737
1/2
✓ Branch 3 → 4 taken 1778 times.
✗ Branch 3 → 5 not taken.
1778 if (w->ec_out_)
738 1778 *w->ec_out_ = w->ec_value_;
739
740 1778 auto h = w->h_;
741 1778 auto d = w->d_;
742 1778 auto* svc = w->svc_;
743 1778 auto& sched = svc->get_scheduler();
744
745
1/1
✓ Branch 6 → 7 taken 1778 times.
1778 svc->destroy_waiter(w);
746
747
1/1
✓ Branch 7 → 8 taken 1778 times.
1778 d.post(h);
748 1778 sched.work_finished();
749 1778 }
750
751 inline std::coroutine_handle<>
752 2022 timer_service::implementation::wait(
753 std::coroutine_handle<> h,
754 capy::executor_ref d,
755 std::stop_token token,
756 std::error_code* ec)
757 {
758 // Already-expired fast path — no waiter_node, no mutex.
759 // Post instead of dispatch so the coroutine yields to the
760 // scheduler, allowing other queued work to run.
761
2/2
✓ Branch 3 → 4 taken 2011 times.
✓ Branch 3 → 21 taken 11 times.
2022 if (heap_index_ == (std::numeric_limits<std::size_t>::max)())
762 {
763
7/8
✓ Branch 5 → 6 taken 2011 times.
✓ Branch 6 → 7 taken 2011 times.
✗ Branch 6 → 11 not taken.
✓ Branch 8 → 9 taken 2011 times.
✓ Branch 10 → 11 taken 244 times.
✓ Branch 10 → 12 taken 1767 times.
✓ Branch 13 → 14 taken 244 times.
✓ Branch 13 → 21 taken 1767 times.
2011 if (expiry_ == (time_point::min)() || expiry_ <= clock_type::now())
764 {
765
1/2
✓ Branch 14 → 15 taken 244 times.
✗ Branch 14 → 17 not taken.
244 if (ec)
766 244 *ec = {};
767 244 d.post(h);
768 244 return std::noop_coroutine();
769 }
770 }
771
772 1778 auto* w = svc_->create_waiter();
773 1778 w->impl_ = this;
774 1778 w->svc_ = svc_;
775 1778 w->h_ = h;
776 1778 w->d_ = d;
777 1778 w->token_ = std::move(token);
778 1778 w->ec_out_ = ec;
779
780 1778 svc_->insert_waiter(*this, w);
781 1778 might_have_pending_waits_ = true;
782 1778 svc_->get_scheduler().work_started();
783
784
2/2
✓ Branch 28 → 29 taken 2 times.
✓ Branch 28 → 31 taken 1776 times.
1778 if (w->token_.stop_possible())
785 2 w->stop_cb_.emplace(w->token_, waiter_node::canceller{w});
786
787 1778 return std::noop_coroutine();
788 }
789
790 // Free functions
791
792 struct timer_service_access
793 {
794 2113 static native_scheduler& get_scheduler(io_context& ctx) noexcept
795 {
796 2113 return static_cast<native_scheduler&>(*ctx.sched_);
797 }
798 };
799
800 // Bypass find_service() mutex by reading the scheduler's cached pointer
801 inline io_object::io_service&
802 2113 timer_service_direct(capy::execution_context& ctx) noexcept
803 {
804 2113 return *timer_service_access::get_scheduler(static_cast<io_context&>(ctx))
805 2113 .timer_svc_;
806 }
807
808 inline std::size_t
809 3 timer_service_update_expiry(timer::implementation& base)
810 {
811 3 auto& impl = static_cast<timer_service::implementation&>(base);
812 3 return impl.svc_->update_timer(impl, impl.expiry_);
813 }
814
815 inline std::size_t
816 727 timer_service_cancel(timer::implementation& base) noexcept
817 {
818 727 auto& impl = static_cast<timer_service::implementation&>(base);
819 727 return impl.svc_->cancel_timer(impl);
820 }
821
822 inline std::size_t
823 1 timer_service_cancel_one(timer::implementation& base) noexcept
824 {
825 1 auto& impl = static_cast<timer_service::implementation&>(base);
826 1 return impl.svc_->cancel_one_waiter(impl);
827 }
828
829 inline timer_service&
830 290 get_timer_service(capy::execution_context& ctx, scheduler& sched)
831 {
832 290 return ctx.make_service<timer_service>(sched);
833 }
834
835 } // namespace boost::corosio::detail
836
837 #endif
838