include/boost/corosio/detail/timer_service.hpp

86.9% Lines (332/382) 95.7% Functions (45/48) 61.3% Branches (103/168)
include/boost/corosio/detail/timer_service.hpp
Line Branch TLA Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco ([email protected])
3 // Copyright (c) 2026 Steve Gerbino
4 //
5 // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 //
8 // Official repository: https://github.com/cppalliance/corosio
9 //
10
11 #ifndef BOOST_COROSIO_DETAIL_TIMER_SERVICE_HPP
12 #define BOOST_COROSIO_DETAIL_TIMER_SERVICE_HPP
13
14 #include <boost/corosio/timer.hpp>
15 #include <boost/corosio/io_context.hpp>
16 #include <boost/corosio/detail/scheduler_op.hpp>
17 #include <boost/corosio/native/native_scheduler.hpp>
18 #include <boost/corosio/detail/intrusive.hpp>
19 #include <boost/corosio/detail/thread_local_ptr.hpp>
20 #include <boost/capy/error.hpp>
21 #include <boost/capy/ex/execution_context.hpp>
22 #include <boost/capy/ex/executor_ref.hpp>
23 #include <system_error>
24
25 #include <atomic>
26 #include <chrono>
27 #include <coroutine>
28 #include <cstddef>
29 #include <limits>
30 #include <mutex>
31 #include <optional>
32 #include <stop_token>
33 #include <vector>
34
35 namespace boost::corosio::detail {
36
37 struct scheduler;
38
39 /*
40 Timer Service
41 =============
42
43 Data Structures
44 ---------------
45 waiter_node holds per-waiter state: coroutine handle, executor,
46 error output, stop_token, embedded completion_op. Each concurrent
47 co_await t.wait() allocates one waiter_node.
48
49 timer_service::implementation holds per-timer state: expiry,
50 heap index, and an intrusive_list of waiter_nodes. Multiple
51 coroutines can wait on the same timer simultaneously.
52
53 timer_service owns a min-heap of active timers, a free list
54 of recycled impls, and a free list of recycled waiter_nodes. The
55 heap is ordered by expiry time; the scheduler queries
56 nearest_expiry() to set the epoll/timerfd timeout.
57
58 Optimization Strategy
59 ---------------------
60 1. Deferred heap insertion — expires_after() stores the expiry
61 but does not insert into the heap. Insertion happens in wait().
62 2. Thread-local impl cache — single-slot per-thread cache.
63 3. Embedded completion_op — eliminates heap allocation per fire/cancel.
64 4. Cached nearest expiry — atomic avoids mutex in nearest_expiry().
65 5. might_have_pending_waits_ flag — skips lock when no wait issued.
66 6. Thread-local waiter cache — single-slot per-thread cache.
67
68 Concurrency
69 -----------
70 stop_token callbacks can fire from any thread. The impl_
71 pointer on waiter_node is used as a "still in list" marker.
72 */
73
74 struct BOOST_COROSIO_SYMBOL_VISIBLE waiter_node;
75
76 inline void timer_service_invalidate_cache() noexcept;
77
78 // timer_service class body — member function definitions are
79 // out-of-class (after implementation and waiter_node are complete)
80 class BOOST_COROSIO_DECL timer_service final
81 : public capy::execution_context::service
82 , public io_object::io_service
83 {
84 public:
85 using clock_type = std::chrono::steady_clock;
86 using time_point = clock_type::time_point;
87
88 class callback
89 {
90 460 void* ctx_ = nullptr;
91 460 void (*fn_)(void*) = nullptr;
92
93 public:
94 1380 callback() = default;
95 920 callback(void* ctx, void (*fn)(void*)) noexcept : ctx_(ctx), fn_(fn) {}
96
97 explicit operator bool() const noexcept
98 {
99 return fn_ != nullptr;
100 }
101 7237 void operator()() const
102 {
103
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7237 times.
7237 if (fn_)
104 7237 fn_(ctx_);
105 7237 }
106 };
107
108 struct implementation;
109
110 private:
111 struct heap_entry
112 {
113 time_point time_;
114 implementation* timer_;
115 };
116
117 scheduler* sched_ = nullptr;
118 mutable std::mutex mutex_;
119 std::vector<heap_entry> heap_;
120 460 implementation* free_list_ = nullptr;
121 460 waiter_node* waiter_free_list_ = nullptr;
122 callback on_earliest_changed_;
123 // Avoids mutex in nearest_expiry() and empty()
124 920 mutable std::atomic<std::int64_t> cached_nearest_ns_{
125 460 (std::numeric_limits<std::int64_t>::max)()};
126
127 public:
128 2300 inline timer_service(capy::execution_context&, scheduler& sched)
129 460 : sched_(&sched)
130 1380 {
131 920 }
132
133 14532 inline scheduler& get_scheduler() noexcept
134 {
135 14532 return *sched_;
136 }
137
138 1380 ~timer_service() override = default;
139
140 timer_service(timer_service const&) = delete;
141 timer_service& operator=(timer_service const&) = delete;
142
143 460 inline void set_on_earliest_changed(callback cb)
144 {
145 460 on_earliest_changed_ = cb;
146 460 }
147
148 inline bool empty() const noexcept
149 {
150 return cached_nearest_ns_.load(std::memory_order_acquire) ==
151 (std::numeric_limits<std::int64_t>::max)();
152 }
153
154 120015 inline time_point nearest_expiry() const noexcept
155 {
156 120015 auto ns = cached_nearest_ns_.load(std::memory_order_acquire);
157
2/4
✓ Branch 0 taken 120015 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 120015 times.
✗ Branch 3 not taken.
120015 return time_point(time_point::duration(ns));
158 }
159
160 inline void shutdown() override;
161 inline io_object::implementation* construct() override;
162 inline void destroy(io_object::implementation* p) override;
163 inline void destroy_impl(implementation& impl);
164 inline waiter_node* create_waiter();
165 inline void destroy_waiter(waiter_node* w);
166 inline std::size_t update_timer(implementation& impl, time_point new_time);
167 inline void insert_waiter(implementation& impl, waiter_node* w);
168 inline std::size_t cancel_timer(implementation& impl);
169 inline void cancel_waiter(waiter_node* w);
170 inline std::size_t cancel_one_waiter(implementation& impl);
171 inline std::size_t process_expired();
172
173 private:
174 298912 inline void refresh_cached_nearest() noexcept
175 {
176
2/2
✓ Branch 0 taken 7011 times.
✓ Branch 1 taken 291901 times.
298912 auto ns = heap_.empty() ? (std::numeric_limits<std::int64_t>::max)()
177 291901 : heap_[0].time_.time_since_epoch().count();
178 298912 cached_nearest_ns_.store(ns, std::memory_order_release);
179 298912 }
180
181 inline void remove_timer_impl(implementation& impl);
182 inline void up_heap(std::size_t index);
183 inline void down_heap(std::size_t index);
184 inline void swap_heap(std::size_t i1, std::size_t i2);
185 };
186
187 struct BOOST_COROSIO_SYMBOL_VISIBLE waiter_node
188 : intrusive_list<waiter_node>::node
189 {
190 // Embedded completion op — avoids heap allocation per fire/cancel
191 struct completion_op final : scheduler_op
192 {
193 163 waiter_node* waiter_ = nullptr;
194
195 static void do_complete(
196 void* owner, scheduler_op* base, std::uint32_t, std::uint32_t);
197
198 489 completion_op() noexcept : scheduler_op(&do_complete) {}
199
200 void operator()() override;
201 // No-op — lifetime owned by waiter_node, not the scheduler queue
202 void destroy() override {}
203 };
204
205 // Per-waiter stop_token cancellation
206 struct canceller
207 {
208 waiter_node* waiter_;
209 void operator()() const;
210 };
211
212 // nullptr once removed from timer's waiter list (concurrency marker)
213 163 timer_service::implementation* impl_ = nullptr;
214 163 timer_service* svc_ = nullptr;
215 std::coroutine_handle<> h_;
216 capy::executor_ref d_;
217 163 std::error_code* ec_out_ = nullptr;
218 std::stop_token token_;
219 std::optional<std::stop_callback<canceller>> stop_cb_;
220 completion_op op_;
221 std::error_code ec_value_;
222 163 waiter_node* next_free_ = nullptr;
223
224 652 waiter_node() noexcept
225 163 {
226 163 op_.waiter_ = this;
227 326 }
228 };
229
230 struct timer_service::implementation final : timer::implementation
231 {
232 using clock_type = std::chrono::steady_clock;
233 using time_point = clock_type::time_point;
234 using duration = clock_type::duration;
235
236 timer_service* svc_ = nullptr;
237 intrusive_list<waiter_node> waiters_;
238
239 // Free list linkage (reused when impl is on free_list)
240 202 implementation* next_free_ = nullptr;
241
242 inline explicit implementation(timer_service& svc) noexcept;
243
244 inline std::coroutine_handle<> wait(
245 std::coroutine_handle<>,
246 capy::executor_ref,
247 std::stop_token,
248 std::error_code*) override;
249 };
250
251 // Thread-local caches avoid hot-path mutex acquisitions:
252 // 1. Impl cache — single-slot, validated by comparing svc_
253 // 2. Waiter cache — single-slot, no service affinity
254 // All caches are cleared by timer_service_invalidate_cache() during shutdown.
255
256 inline thread_local_ptr<timer_service::implementation> tl_cached_impl;
257 inline thread_local_ptr<waiter_node> tl_cached_waiter;
258
259 inline timer_service::implementation*
260 7305 try_pop_tl_cache(timer_service* svc) noexcept
261 {
262 7305 auto* impl = tl_cached_impl.get();
263
2/2
✓ Branch 0 taken 202 times.
✓ Branch 1 taken 7103 times.
7305 if (impl)
264 {
265 7103 tl_cached_impl.set(nullptr);
266
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7103 times.
7103 if (impl->svc_ == svc)
267 7103 return impl;
268 // Stale impl from a destroyed service
269 delete impl;
270 }
271 202 return nullptr;
272 7305 }
273
274 inline bool
275 7305 try_push_tl_cache(timer_service::implementation* impl) noexcept
276 {
277
2/2
✓ Branch 0 taken 48 times.
✓ Branch 1 taken 7257 times.
7305 if (!tl_cached_impl.get())
278 {
279 7257 tl_cached_impl.set(impl);
280 7257 return true;
281 }
282 48 return false;
283 7305 }
284
285 inline waiter_node*
286 7266 try_pop_waiter_tl_cache() noexcept
287 {
288 7266 auto* w = tl_cached_waiter.get();
289
2/2
✓ Branch 0 taken 7103 times.
✓ Branch 1 taken 163 times.
7266 if (w)
290 {
291 7103 tl_cached_waiter.set(nullptr);
292 7103 return w;
293 }
294 163 return nullptr;
295 7266 }
296
297 inline bool
298 7266 try_push_waiter_tl_cache(waiter_node* w) noexcept
299 {
300
2/2
✓ Branch 0 taken 58 times.
✓ Branch 1 taken 7208 times.
7266 if (!tl_cached_waiter.get())
301 {
302 7208 tl_cached_waiter.set(w);
303 7208 return true;
304 }
305 58 return false;
306 7266 }
307
308 inline void
309 460 timer_service_invalidate_cache() noexcept
310 {
311
2/2
✓ Branch 0 taken 306 times.
✓ Branch 1 taken 154 times.
460 delete tl_cached_impl.get();
312 460 tl_cached_impl.set(nullptr);
313
314
2/2
✓ Branch 0 taken 355 times.
✓ Branch 1 taken 105 times.
460 delete tl_cached_waiter.get();
315 460 tl_cached_waiter.set(nullptr);
316 460 }
317
318 // timer_service out-of-class member function definitions
319
320
1/2
✓ Branch 0 taken 202 times.
✗ Branch 1 not taken.
404 inline timer_service::implementation::implementation(
321 timer_service& svc) noexcept
322 202 : svc_(&svc)
323 404 {
324 404 }
325
326 inline void
327 460 timer_service::shutdown()
328 {
329 460 timer_service_invalidate_cache();
330
331 // Cancel waiting timers still in the heap
332
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 460 times.
460 for (auto& entry : heap_)
333 {
334 auto* impl = entry.timer_;
335 while (auto* w = impl->waiters_.pop_front())
336 {
337 w->stop_cb_.reset();
338 w->h_ = {};
339 sched_->work_finished();
340 delete w;
341 }
342 impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
343 delete impl;
344 }
345 460 heap_.clear();
346 920 cached_nearest_ns_.store(
347 460 (std::numeric_limits<std::int64_t>::max)(), std::memory_order_release);
348
349 // Delete free-listed impls
350
2/2
✓ Branch 0 taken 48 times.
✓ Branch 1 taken 460 times.
508 while (free_list_)
351 {
352 48 auto* next = free_list_->next_free_;
353
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 48 times.
48 delete free_list_;
354 48 free_list_ = next;
355 }
356
357 // Delete free-listed waiters
358
2/2
✓ Branch 0 taken 58 times.
✓ Branch 1 taken 460 times.
518 while (waiter_free_list_)
359 {
360 58 auto* next = waiter_free_list_->next_free_;
361
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 58 times.
58 delete waiter_free_list_;
362 58 waiter_free_list_ = next;
363 }
364 460 }
365
366 inline io_object::implementation*
367 7305 timer_service::construct()
368 {
369 7305 implementation* impl = try_pop_tl_cache(this);
370
2/2
✓ Branch 0 taken 7103 times.
✓ Branch 1 taken 202 times.
7305 if (impl)
371 {
372 7103 impl->svc_ = this;
373 7103 impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
374 7103 impl->might_have_pending_waits_ = false;
375 7103 return impl;
376 }
377
378 202 std::lock_guard lock(mutex_);
379
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 202 times.
202 if (free_list_)
380 {
381 impl = free_list_;
382 free_list_ = impl->next_free_;
383 impl->next_free_ = nullptr;
384 impl->svc_ = this;
385 impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
386 impl->might_have_pending_waits_ = false;
387 }
388 else
389 {
390
1/2
✓ Branch 0 taken 202 times.
✗ Branch 1 not taken.
202 impl = new implementation(*this);
391 }
392 202 return impl;
393 7305 }
394
395 inline void
396 7305 timer_service::destroy(io_object::implementation* p)
397 {
398 7305 destroy_impl(static_cast<implementation&>(*p));
399 7305 }
400
401 inline void
402 7305 timer_service::destroy_impl(implementation& impl)
403 {
404 7305 cancel_timer(impl);
405
406
1/2
✓ Branch 0 taken 7305 times.
✗ Branch 1 not taken.
7305 if (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)())
407 {
408 std::lock_guard lock(mutex_);
409 remove_timer_impl(impl);
410 refresh_cached_nearest();
411 }
412
413
2/2
✓ Branch 0 taken 7257 times.
✓ Branch 1 taken 48 times.
7305 if (try_push_tl_cache(&impl))
414 7257 return;
415
416 48 std::lock_guard lock(mutex_);
417 48 impl.next_free_ = free_list_;
418 48 free_list_ = &impl;
419 7305 }
420
421 inline waiter_node*
422 7266 timer_service::create_waiter()
423 {
424
2/2
✓ Branch 0 taken 7103 times.
✓ Branch 1 taken 163 times.
7266 if (auto* w = try_pop_waiter_tl_cache())
425 7103 return w;
426
427 163 std::lock_guard lock(mutex_);
428
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 163 times.
163 if (waiter_free_list_)
429 {
430 auto* w = waiter_free_list_;
431 waiter_free_list_ = w->next_free_;
432 w->next_free_ = nullptr;
433 return w;
434 }
435
436
1/2
✓ Branch 0 taken 163 times.
✗ Branch 1 not taken.
163 return new waiter_node();
437 7266 }
438
439 inline void
440 7266 timer_service::destroy_waiter(waiter_node* w)
441 {
442
2/2
✓ Branch 0 taken 7208 times.
✓ Branch 1 taken 58 times.
7266 if (try_push_waiter_tl_cache(w))
443 7208 return;
444
445 58 std::lock_guard lock(mutex_);
446 58 w->next_free_ = waiter_free_list_;
447 58 waiter_free_list_ = w;
448 7266 }
449
450 inline std::size_t
451 6 timer_service::update_timer(implementation& impl, time_point new_time)
452 {
453 6 bool in_heap =
454 6 (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)());
455
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
6 if (!in_heap && impl.waiters_.empty())
456 return 0;
457
458 6 bool notify = false;
459 6 intrusive_list<waiter_node> canceled;
460
461 {
462 6 std::lock_guard lock(mutex_);
463
464
2/2
✓ Branch 0 taken 10 times.
✓ Branch 1 taken 6 times.
16 while (auto* w = impl.waiters_.pop_front())
465 {
466 10 w->impl_ = nullptr;
467 10 canceled.push_back(w);
468 }
469
470
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
6 if (impl.heap_index_ < heap_.size())
471 {
472 6 time_point old_time = heap_[impl.heap_index_].time_;
473 6 heap_[impl.heap_index_].time_ = new_time;
474
475
2/4
✓ Branch 0 taken 6 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 6 times.
✗ Branch 3 not taken.
6 if (new_time < old_time)
476
1/2
✓ Branch 0 taken 6 times.
✗ Branch 1 not taken.
6 up_heap(impl.heap_index_);
477 else
478 down_heap(impl.heap_index_);
479
480 6 notify = (impl.heap_index_ == 0);
481 6 }
482
483 6 refresh_cached_nearest();
484 6 }
485
486 6 std::size_t count = 0;
487
2/2
✓ Branch 0 taken 10 times.
✓ Branch 1 taken 6 times.
16 while (auto* w = canceled.pop_front())
488 {
489 10 w->ec_value_ = make_error_code(capy::error::canceled);
490 10 sched_->post(&w->op_);
491 10 ++count;
492 }
493
494
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
6 if (notify)
495 6 on_earliest_changed_();
496
497 6 return count;
498 6 }
499
500 inline void
501 7266 timer_service::insert_waiter(implementation& impl, waiter_node* w)
502 {
503 7266 bool notify = false;
504 {
505 7266 std::lock_guard lock(mutex_);
506
2/2
✓ Branch 0 taken 22 times.
✓ Branch 1 taken 7244 times.
7266 if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)())
507 {
508 7244 impl.heap_index_ = heap_.size();
509
1/2
✓ Branch 0 taken 7244 times.
✗ Branch 1 not taken.
7244 heap_.push_back({impl.expiry_, &impl});
510
1/2
✓ Branch 0 taken 7244 times.
✗ Branch 1 not taken.
7244 up_heap(heap_.size() - 1);
511 7244 notify = (impl.heap_index_ == 0);
512 7244 refresh_cached_nearest();
513 7244 }
514 7266 impl.waiters_.push_back(w);
515 7266 }
516
2/2
✓ Branch 0 taken 35 times.
✓ Branch 1 taken 7231 times.
7266 if (notify)
517 7231 on_earliest_changed_();
518 7266 }
519
520 inline std::size_t
521 7896 timer_service::cancel_timer(implementation& impl)
522 {
523
2/2
✓ Branch 0 taken 7297 times.
✓ Branch 1 taken 599 times.
7896 if (!impl.might_have_pending_waits_)
524 7297 return 0;
525
526 // Not in heap and no waiters — just clear the flag
527
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 599 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
599 if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)() &&
528 impl.waiters_.empty())
529 {
530 impl.might_have_pending_waits_ = false;
531 return 0;
532 }
533
534 599 intrusive_list<waiter_node> canceled;
535
536 {
537 599 std::lock_guard lock(mutex_);
538
1/2
✓ Branch 0 taken 599 times.
✗ Branch 1 not taken.
599 remove_timer_impl(impl);
539
2/2
✓ Branch 0 taken 603 times.
✓ Branch 1 taken 599 times.
1202 while (auto* w = impl.waiters_.pop_front())
540 {
541 603 w->impl_ = nullptr;
542 603 canceled.push_back(w);
543 }
544 599 refresh_cached_nearest();
545 599 }
546
547 599 impl.might_have_pending_waits_ = false;
548
549 599 std::size_t count = 0;
550
2/2
✓ Branch 0 taken 603 times.
✓ Branch 1 taken 599 times.
1202 while (auto* w = canceled.pop_front())
551 {
552 603 w->ec_value_ = make_error_code(capy::error::canceled);
553 603 sched_->post(&w->op_);
554 603 ++count;
555 }
556
557 599 return count;
558 7896 }
559
560 inline void
561 4 timer_service::cancel_waiter(waiter_node* w)
562 {
563 {
564 4 std::lock_guard lock(mutex_);
565 // Already removed by cancel_timer or process_expired
566
1/2
✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
4 if (!w->impl_)
567 return;
568 4 auto* impl = w->impl_;
569 4 w->impl_ = nullptr;
570 4 impl->waiters_.remove(w);
571
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 2 times.
4 if (impl->waiters_.empty())
572 {
573
1/2
✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
2 remove_timer_impl(*impl);
574 2 impl->might_have_pending_waits_ = false;
575 2 }
576 4 refresh_cached_nearest();
577 4 }
578
579 4 w->ec_value_ = make_error_code(capy::error::canceled);
580 4 sched_->post(&w->op_);
581 4 }
582
583 inline std::size_t
584 2 timer_service::cancel_one_waiter(implementation& impl)
585 {
586
1/2
✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
2 if (!impl.might_have_pending_waits_)
587 return 0;
588
589 2 waiter_node* w = nullptr;
590
591 {
592 2 std::lock_guard lock(mutex_);
593 2 w = impl.waiters_.pop_front();
594
1/2
✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
2 if (!w)
595 return 0;
596 2 w->impl_ = nullptr;
597
1/2
✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
2 if (impl.waiters_.empty())
598 {
599 remove_timer_impl(impl);
600 impl.might_have_pending_waits_ = false;
601 }
602 2 refresh_cached_nearest();
603 2 }
604
605 2 w->ec_value_ = make_error_code(capy::error::canceled);
606 2 sched_->post(&w->op_);
607 2 return 1;
608 2 }
609
610 inline std::size_t
611 291057 timer_service::process_expired()
612 {
613 291057 intrusive_list<waiter_node> expired;
614
615 {
616 291057 std::lock_guard lock(mutex_);
617 291057 auto now = clock_type::now();
618
619
5/8
✓ Branch 0 taken 291288 times.
✓ Branch 1 taken 6412 times.
✓ Branch 2 taken 291288 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 6643 times.
✓ Branch 5 taken 291057 times.
297700 while (!heap_.empty() && heap_[0].time_ <= now)
620 {
621 6643 implementation* t = heap_[0].timer_;
622
1/2
✓ Branch 0 taken 6643 times.
✗ Branch 1 not taken.
6643 remove_timer_impl(*t);
623
2/2
✓ Branch 0 taken 6647 times.
✓ Branch 1 taken 6643 times.
13290 while (auto* w = t->waiters_.pop_front())
624 {
625 6647 w->impl_ = nullptr;
626 6647 w->ec_value_ = {};
627 6647 expired.push_back(w);
628 }
629 6643 t->might_have_pending_waits_ = false;
630 }
631
632 291057 refresh_cached_nearest();
633 291057 }
634
635 291057 std::size_t count = 0;
636
2/2
✓ Branch 0 taken 6647 times.
✓ Branch 1 taken 291057 times.
297704 while (auto* w = expired.pop_front())
637 {
638 6647 sched_->post(&w->op_);
639 6647 ++count;
640 }
641
642 291057 return count;
643 }
644
645 inline void
646 7244 timer_service::remove_timer_impl(implementation& impl)
647 {
648 7244 std::size_t index = impl.heap_index_;
649
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7244 times.
7244 if (index >= heap_.size())
650 return; // Not in heap
651
652
2/2
✓ Branch 0 taken 692 times.
✓ Branch 1 taken 6552 times.
7244 if (index == heap_.size() - 1)
653 {
654 // Last element, just pop
655 692 impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
656 692 heap_.pop_back();
657 692 }
658 else
659 {
660 // Swap with last and reheapify
661 6552 swap_heap(index, heap_.size() - 1);
662 6552 impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
663 6552 heap_.pop_back();
664
665
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 6552 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
6552 if (index > 0 && heap_[index].time_ < heap_[(index - 1) / 2].time_)
666 up_heap(index);
667 else
668 6552 down_heap(index);
669 }
670 7244 }
671
672 inline void
673 7250 timer_service::up_heap(std::size_t index)
674 {
675
2/2
✓ Branch 0 taken 7237 times.
✓ Branch 1 taken 6554 times.
13791 while (index > 0)
676 {
677 6554 std::size_t parent = (index - 1) / 2;
678
2/2
✓ Branch 0 taken 13 times.
✓ Branch 1 taken 6541 times.
6554 if (!(heap_[index].time_ < heap_[parent].time_))
679 13 break;
680 6541 swap_heap(index, parent);
681 6541 index = parent;
682 }
683 7250 }
684
685 inline void
686 6552 timer_service::down_heap(std::size_t index)
687 {
688 6552 std::size_t child = index * 2 + 1;
689
2/2
✓ Branch 0 taken 6548 times.
✓ Branch 1 taken 4 times.
6552 while (child < heap_.size())
690 {
691
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
4 std::size_t min_child = (child + 1 == heap_.size() ||
692 heap_[child].time_ < heap_[child + 1].time_)
693 4 ? child
694 : child + 1;
695
696
1/2
✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
4 if (heap_[index].time_ < heap_[min_child].time_)
697 4 break;
698
699 swap_heap(index, min_child);
700 index = min_child;
701 child = index * 2 + 1;
702 }
703 6552 }
704
705 inline void
706 13093 timer_service::swap_heap(std::size_t i1, std::size_t i2)
707 {
708 13093 heap_entry tmp = heap_[i1];
709 13093 heap_[i1] = heap_[i2];
710 13093 heap_[i2] = tmp;
711 13093 heap_[i1].timer_->heap_index_ = i1;
712 13093 heap_[i2].timer_->heap_index_ = i2;
713 13093 }
714
715 // waiter_node out-of-class member function definitions
716
717 inline void
718 4 waiter_node::canceller::operator()() const
719 {
720 4 waiter_->svc_->cancel_waiter(waiter_);
721 4 }
722
723 inline void
724 waiter_node::completion_op::do_complete(
725 void* owner, scheduler_op* base, std::uint32_t, std::uint32_t)
726 {
727 if (!owner)
728 return;
729 static_cast<completion_op*>(base)->operator()();
730 }
731
732 inline void
733 7266 waiter_node::completion_op::operator()()
734 {
735 7266 auto* w = waiter_;
736 7266 w->stop_cb_.reset();
737
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7266 times.
7266 if (w->ec_out_)
738 7266 *w->ec_out_ = w->ec_value_;
739
740 7266 auto h = w->h_;
741 7266 auto d = w->d_;
742 7266 auto* svc = w->svc_;
743 7266 auto& sched = svc->get_scheduler();
744
745 7266 svc->destroy_waiter(w);
746
747 7266 d.post(h);
748 7266 sched.work_finished();
749 7266 }
750
751 inline std::coroutine_handle<>
752 7277 timer_service::implementation::wait(
753 std::coroutine_handle<> h,
754 capy::executor_ref d,
755 std::stop_token token,
756 std::error_code* ec)
757 {
758 // Already-expired fast path — no waiter_node, no mutex.
759 // Post instead of dispatch so the coroutine yields to the
760 // scheduler, allowing other queued work to run.
761
2/2
✓ Branch 0 taken 22 times.
✓ Branch 1 taken 7255 times.
7277 if (heap_index_ == (std::numeric_limits<std::size_t>::max)())
762 {
763
3/4
✗ Branch 0 not taken.
✓ Branch 1 taken 7255 times.
✓ Branch 2 taken 11 times.
✓ Branch 3 taken 7244 times.
7255 if (expiry_ == (time_point::min)() || expiry_ <= clock_type::now())
764 {
765
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 11 times.
11 if (ec)
766 11 *ec = {};
767 11 d.post(h);
768 11 return std::noop_coroutine();
769 }
770 7244 }
771
772 7266 auto* w = svc_->create_waiter();
773 7266 w->impl_ = this;
774 7266 w->svc_ = svc_;
775 7266 w->h_ = h;
776 7266 w->d_ = d;
777 7266 w->token_ = std::move(token);
778 7266 w->ec_out_ = ec;
779
780 7266 svc_->insert_waiter(*this, w);
781 7266 might_have_pending_waits_ = true;
782 7266 svc_->get_scheduler().work_started();
783
784
2/2
✓ Branch 0 taken 7262 times.
✓ Branch 1 taken 4 times.
7266 if (w->token_.stop_possible())
785 4 w->stop_cb_.emplace(w->token_, waiter_node::canceller{w});
786
787 7266 return std::noop_coroutine();
788 7277 }
789
790 // Free functions
791
792 struct timer_service_access
793 {
794 7305 static native_scheduler& get_scheduler(io_context& ctx) noexcept
795 {
796 7305 return static_cast<native_scheduler&>(*ctx.sched_);
797 }
798 };
799
800 // Bypass find_service() mutex by reading the scheduler's cached pointer
801 inline io_object::io_service&
802 7305 timer_service_direct(capy::execution_context& ctx) noexcept
803 {
804 14610 return *timer_service_access::get_scheduler(static_cast<io_context&>(ctx))
805 7305 .timer_svc_;
806 }
807
808 inline std::size_t
809 6 timer_service_update_expiry(timer::implementation& base)
810 {
811 6 auto& impl = static_cast<timer_service::implementation&>(base);
812 6 return impl.svc_->update_timer(impl, impl.expiry_);
813 }
814
815 inline std::size_t
816 591 timer_service_cancel(timer::implementation& base) noexcept
817 {
818 591 auto& impl = static_cast<timer_service::implementation&>(base);
819
1/2
✓ Branch 0 taken 591 times.
✗ Branch 1 not taken.
591 return impl.svc_->cancel_timer(impl);
820 }
821
822 inline std::size_t
823 2 timer_service_cancel_one(timer::implementation& base) noexcept
824 {
825 2 auto& impl = static_cast<timer_service::implementation&>(base);
826
1/2
✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
2 return impl.svc_->cancel_one_waiter(impl);
827 }
828
829 inline timer_service&
830 460 get_timer_service(capy::execution_context& ctx, scheduler& sched)
831 {
832 460 return ctx.make_service<timer_service>(sched);
833 }
834
835 } // namespace boost::corosio::detail
836
837 #endif
838