include/boost/corosio/detail/timer_service.hpp

92.7% Lines (344/371) 100.0% List of functions (44/44) 75.9% Branches (142/187)
f(x) Functions (44)
Function Calls Lines Branches Blocks
boost::corosio::detail::timer_service::callback::callback() :97 369x 100.0% 100.0% boost::corosio::detail::timer_service::callback::callback(void*, void (*)(void*)) :100 369x 100.0% 100.0% boost::corosio::detail::timer_service::callback::operator()() const :109 1927x 100.0% 50.0% 100.0% boost::corosio::detail::timer_service::timer_service(boost::capy::execution_context&, boost::corosio::detail::scheduler&) :138 369x 100.0% 100.0% boost::corosio::detail::timer_service::get_scheduler() :144 3898x 100.0% 100.0% boost::corosio::detail::timer_service::~timer_service() :150 738x 100.0% 100.0% boost::corosio::detail::timer_service::set_on_earliest_changed(boost::corosio::detail::timer_service::callback) :156 369x 100.0% 100.0% boost::corosio::detail::timer_service::nearest_expiry() const :169 3088x 100.0% 70.0% boost::corosio::detail::timer_service::refresh_cached_nearest() :212 3886x 100.0% 100.0% 66.7% boost::corosio::detail::waiter_node::completion_op::completion_op() :236 134x 100.0% 100.0% boost::corosio::detail::waiter_node::waiter_node() :261 134x 100.0% 100.0% boost::corosio::detail::try_pop_tl_cache(boost::corosio::detail::timer_service*) :297 2207x 87.5% 50.0% 77.8% boost::corosio::detail::try_push_tl_cache(boost::corosio::detail::timer_service::implementation*) :312 2203x 100.0% 100.0% 100.0% boost::corosio::detail::try_pop_waiter_tl_cache() :323 1951x 100.0% 100.0% 100.0% boost::corosio::detail::try_push_waiter_tl_cache(boost::corosio::detail::waiter_node*) :335 1943x 100.0% 100.0% 100.0% boost::corosio::detail::timer_service_invalidate_cache() :346 738x 100.0% 100.0% 100.0% boost::corosio::detail::timer_service::implementation::implementation(boost::corosio::detail::timer_service&) :357 160x 100.0% 100.0% boost::corosio::detail::timer_service::shutdown() :364 738x 100.0% 78.3% 81.1% boost::corosio::detail::timer_service::construct() :421 2207x 66.7% 83.3% 76.5% boost::corosio::detail::timer_service::destroy(boost::corosio::io_object::implementation*) :450 2206x 100.0% 100.0% boost::corosio::detail::timer_service::destroy_impl(boost::corosio::detail::timer_service::implementation&) :456 2206x 73.3% 70.0% 63.6% boost::corosio::detail::timer_service::create_waiter() :483 1951x 100.0% 100.0% 86.7% boost::corosio::detail::timer_service::destroy_waiter(boost::corosio::detail::waiter_node*) :501 1943x 100.0% 100.0% 100.0% boost::corosio::detail::timer_service::update_timer(boost::corosio::detail::timer_service::implementation&, std::chrono::time_point<std::chrono::_V2::steady_clock, std::chrono::duration<long long, std::ratio<1ll, 1000000000ll> > >) :512 3x 93.1% 63.6% 78.0% boost::corosio::detail::timer_service::insert_waiter(boost::corosio::detail::timer_service::implementation&, boost::corosio::detail::waiter_node*) :562 1951x 100.0% 100.0% 82.4% boost::corosio::detail::timer_service::cancel_timer(boost::corosio::detail::timer_service::implementation&) :582 2958x 87.5% 73.3% 77.4% boost::corosio::detail::timer_service::cancel_waiter(boost::corosio::detail::waiter_node*) :622 18x 92.9% 83.3% 80.0% boost::corosio::detail::timer_service::cancel_one_waiter(boost::corosio::detail::timer_service::implementation&) :645 1x 76.5% 50.0% 69.6% boost::corosio::detail::timer_service::process_expired() :672 1161x 100.0% 100.0% 90.9% boost::corosio::detail::timer_service::remove_timer_impl(boost::corosio::detail::timer_service::implementation&) :707 1936x 84.6% 50.0% 65.2% boost::corosio::detail::timer_service::up_heap(unsigned long long) :734 1943x 100.0% 100.0% 100.0% boost::corosio::detail::timer_service::down_heap(unsigned long long) :747 1098x 69.2% 50.0% 61.9% boost::corosio::detail::timer_service::swap_heap(unsigned long long, unsigned long long) :767 2185x 100.0% 100.0% boost::corosio::detail::waiter_node::canceller::operator()() const :779 18x 100.0% 100.0% boost::corosio::detail::waiter_node::completion_op::do_complete(void*, boost::corosio::detail::scheduler_op*, unsigned int, unsigned int) :785 1943x 100.0% 50.0% 80.0% boost::corosio::detail::waiter_node::completion_op::operator()() :799 1943x 100.0% 75.0% 100.0% boost::corosio::detail::waiter_node::completion_op::destroy() :824 4x 100.0% 60.0% 100.0% boost::corosio::detail::timer_service::implementation::wait(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, std::stop_token, std::error_code*) :851 2150x 100.0% 85.7% 97.1% boost::corosio::detail::timer_service_access::get_scheduler(boost::corosio::io_context&) :893 2207x 100.0% 100.0% boost::corosio::detail::timer_service_direct(boost::capy::execution_context&) :901 2207x 100.0% 100.0% boost::corosio::detail::timer_service_update_expiry(boost::corosio::io_timer::implementation&) :908 3x 100.0% 100.0% boost::corosio::detail::timer_service_cancel(boost::corosio::io_timer::implementation&) :915 755x 100.0% 100.0% boost::corosio::detail::timer_service_cancel_one(boost::corosio::io_timer::implementation&) :922 1x 100.0% 100.0% boost::corosio::detail::get_timer_service(boost::capy::execution_context&, boost::corosio::detail::scheduler&) :929 369x 100.0% 100.0%
Line Branch TLA Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco ([email protected])
3 // Copyright (c) 2026 Steve Gerbino
4 //
5 // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 //
8 // Official repository: https://github.com/cppalliance/corosio
9 //
10
11 #ifndef BOOST_COROSIO_DETAIL_TIMER_SERVICE_HPP
12 #define BOOST_COROSIO_DETAIL_TIMER_SERVICE_HPP
13
14 #include <boost/corosio/timer.hpp>
15 #include <boost/corosio/io_context.hpp>
16 #include <boost/corosio/detail/scheduler_op.hpp>
17 #include <boost/corosio/native/native_scheduler.hpp>
18 #include <boost/corosio/detail/intrusive.hpp>
19 #include <boost/corosio/detail/thread_local_ptr.hpp>
20 #include <boost/capy/error.hpp>
21 #include <boost/capy/ex/execution_context.hpp>
22 #include <boost/capy/ex/executor_ref.hpp>
23 #include <system_error>
24
25 #include <atomic>
26 #include <chrono>
27 #include <coroutine>
28 #include <cstddef>
29 #include <limits>
30 #include <mutex>
31 #include <optional>
32 #include <stop_token>
33 #include <utility>
34 #include <vector>
35
36 namespace boost::corosio::detail {
37
38 struct scheduler;
39
40 /*
41 Timer Service
42 =============
43
44 Data Structures
45 ---------------
46 waiter_node holds per-waiter state: coroutine handle, executor,
47 error output, stop_token, embedded completion_op. Each concurrent
48 co_await t.wait() allocates one waiter_node.
49
50 timer_service::implementation holds per-timer state: expiry,
51 heap index, and an intrusive_list of waiter_nodes. Multiple
52 coroutines can wait on the same timer simultaneously.
53
54 timer_service owns a min-heap of active timers, a free list
55 of recycled impls, and a free list of recycled waiter_nodes. The
56 heap is ordered by expiry time; the scheduler queries
57 nearest_expiry() to set the epoll/timerfd timeout.
58
59 Optimization Strategy
60 ---------------------
61 1. Deferred heap insertion — expires_after() stores the expiry
62 but does not insert into the heap. Insertion happens in wait().
63 2. Thread-local impl cache — single-slot per-thread cache.
64 3. Embedded completion_op — eliminates heap allocation per fire/cancel.
65 4. Cached nearest expiry — atomic avoids mutex in nearest_expiry().
66 5. might_have_pending_waits_ flag — skips lock when no wait issued.
67 6. Thread-local waiter cache — single-slot per-thread cache.
68
69 Concurrency
70 -----------
71 stop_token callbacks can fire from any thread. The impl_
72 pointer on waiter_node is used as a "still in list" marker.
73 */
74
75 struct BOOST_COROSIO_SYMBOL_VISIBLE waiter_node;
76
77 inline void timer_service_invalidate_cache() noexcept;
78
79 // timer_service class body — member function definitions are
80 // out-of-class (after implementation and waiter_node are complete)
81 class BOOST_COROSIO_DECL timer_service final
82 : public capy::execution_context::service
83 , public io_object::io_service
84 {
85 public:
86 using clock_type = std::chrono::steady_clock;
87 using time_point = clock_type::time_point;
88
89 /// Type-erased callback for earliest-expiry-changed notifications.
90 class callback
91 {
92 void* ctx_ = nullptr;
93 void (*fn_)(void*) = nullptr;
94
95 public:
96 /// Construct an empty callback.
97 369x callback() = default;
98
99 /// Construct a callback with the given context and function.
100 369x callback(void* ctx, void (*fn)(void*)) noexcept : ctx_(ctx), fn_(fn) {}
101
102 /// Return true if the callback is non-empty.
103 explicit operator bool() const noexcept
104 {
105 return fn_ != nullptr;
106 }
107
108 /// Invoke the callback.
109 1927x void operator()() const
110 {
111
1/2
✓ Branch 2 → 3 taken 1927 times.
✗ Branch 2 → 4 not taken.
1927x if (fn_)
112 1927x fn_(ctx_);
113 1927x }
114 };
115
116 struct implementation;
117
118 private:
119 struct heap_entry
120 {
121 time_point time_;
122 implementation* timer_;
123 };
124
125 scheduler* sched_ = nullptr;
126 mutable std::mutex mutex_;
127 std::vector<heap_entry> heap_;
128 implementation* free_list_ = nullptr;
129 waiter_node* waiter_free_list_ = nullptr;
130 callback on_earliest_changed_;
131 bool shutting_down_ = false;
132 // Avoids mutex in nearest_expiry() and empty()
133 mutable std::atomic<std::int64_t> cached_nearest_ns_{
134 (std::numeric_limits<std::int64_t>::max)()};
135
136 public:
137 /// Construct the timer service bound to a scheduler.
138 369x inline timer_service(capy::execution_context&, scheduler& sched)
139 369x : sched_(&sched)
140 {
141 369x }
142
143 /// Return the associated scheduler.
144 3898x inline scheduler& get_scheduler() noexcept
145 {
146 3898x return *sched_;
147 }
148
149 /// Destroy the timer service.
150 738x ~timer_service() override = default;
151
152 timer_service(timer_service const&) = delete;
153 timer_service& operator=(timer_service const&) = delete;
154
155 /// Register a callback invoked when the earliest expiry changes.
156 369x inline void set_on_earliest_changed(callback cb)
157 {
158 369x on_earliest_changed_ = cb;
159 369x }
160
161 /// Return true if no timers are in the heap.
162 inline bool empty() const noexcept
163 {
164 return cached_nearest_ns_.load(std::memory_order_acquire) ==
165 (std::numeric_limits<std::int64_t>::max)();
166 }
167
168 /// Return the nearest timer expiry without acquiring the mutex.
169 3088x inline time_point nearest_expiry() const noexcept
170 {
171 3088x auto ns = cached_nearest_ns_.load(std::memory_order_acquire);
172 3088x return time_point(time_point::duration(ns));
173 }
174
175 /// Cancel all pending timers and free cached resources.
176 inline void shutdown() override;
177
178 /// Construct a new timer implementation.
179 inline io_object::implementation* construct() override;
180
181 /// Destroy a timer implementation, cancelling pending waiters.
182 inline void destroy(io_object::implementation* p) override;
183
184 /// Cancel and recycle a timer implementation.
185 inline void destroy_impl(implementation& impl);
186
187 /// Create or recycle a waiter node.
188 inline waiter_node* create_waiter();
189
190 /// Return a waiter node to the cache or free list.
191 inline void destroy_waiter(waiter_node* w);
192
193 /// Update the timer expiry, cancelling existing waiters.
194 inline std::size_t update_timer(implementation& impl, time_point new_time);
195
196 /// Insert a waiter into the timer's waiter list and the heap.
197 inline void insert_waiter(implementation& impl, waiter_node* w);
198
199 /// Cancel all waiters on a timer.
200 inline std::size_t cancel_timer(implementation& impl);
201
202 /// Cancel a single waiter ( stop_token callback path ).
203 inline void cancel_waiter(waiter_node* w);
204
205 /// Cancel one waiter on a timer.
206 inline std::size_t cancel_one_waiter(implementation& impl);
207
208 /// Complete all waiters whose timers have expired.
209 inline std::size_t process_expired();
210
211 private:
212 3886x inline void refresh_cached_nearest() noexcept
213 {
214
2/2
✓ Branch 3 → 4 taken 835 times.
✓ Branch 3 → 5 taken 3051 times.
3886x auto ns = heap_.empty() ? (std::numeric_limits<std::int64_t>::max)()
215 3051x : heap_[0].time_.time_since_epoch().count();
216 3886x cached_nearest_ns_.store(ns, std::memory_order_release);
217 3886x }
218
219 inline void remove_timer_impl(implementation& impl);
220 inline void up_heap(std::size_t index);
221 inline void down_heap(std::size_t index);
222 inline void swap_heap(std::size_t i1, std::size_t i2);
223 };
224
225 struct BOOST_COROSIO_SYMBOL_VISIBLE waiter_node
226 : intrusive_list<waiter_node>::node
227 {
228 // Embedded completion op — avoids heap allocation per fire/cancel
229 struct completion_op final : scheduler_op
230 {
231 waiter_node* waiter_ = nullptr;
232
233 static void do_complete(
234 void* owner, scheduler_op* base, std::uint32_t, std::uint32_t);
235
236 134x completion_op() noexcept : scheduler_op(&do_complete) {}
237
238 void operator()() override;
239 void destroy() override;
240 };
241
242 // Per-waiter stop_token cancellation
243 struct canceller
244 {
245 waiter_node* waiter_;
246 void operator()() const;
247 };
248
249 // nullptr once removed from timer's waiter list (concurrency marker)
250 timer_service::implementation* impl_ = nullptr;
251 timer_service* svc_ = nullptr;
252 std::coroutine_handle<> h_;
253 capy::executor_ref d_;
254 std::error_code* ec_out_ = nullptr;
255 std::stop_token token_;
256 std::optional<std::stop_callback<canceller>> stop_cb_;
257 completion_op op_;
258 std::error_code ec_value_;
259 waiter_node* next_free_ = nullptr;
260
261 134x waiter_node() noexcept
262 134x {
263 134x op_.waiter_ = this;
264 134x }
265 };
266
267 struct timer_service::implementation final : timer::implementation
268 {
269 using clock_type = std::chrono::steady_clock;
270 using time_point = clock_type::time_point;
271 using duration = clock_type::duration;
272
273 timer_service* svc_ = nullptr;
274 intrusive_list<waiter_node> waiters_;
275
276 // Free list linkage (reused when impl is on free_list)
277 implementation* next_free_ = nullptr;
278
279 inline explicit implementation(timer_service& svc) noexcept;
280
281 inline std::coroutine_handle<> wait(
282 std::coroutine_handle<>,
283 capy::executor_ref,
284 std::stop_token,
285 std::error_code*) override;
286 };
287
288 // Thread-local caches avoid hot-path mutex acquisitions:
289 // 1. Impl cache — single-slot, validated by comparing svc_
290 // 2. Waiter cache — single-slot, no service affinity
291 // All caches are cleared by timer_service_invalidate_cache() during shutdown.
292
293 inline thread_local_ptr<timer_service::implementation> tl_cached_impl;
294 inline thread_local_ptr<waiter_node> tl_cached_waiter;
295
296 inline timer_service::implementation*
297 2207x try_pop_tl_cache(timer_service* svc) noexcept
298 {
299 2207x auto* impl = tl_cached_impl.get();
300
2/2
✓ Branch 3 → 4 taken 2047 times.
✓ Branch 3 → 9 taken 160 times.
2207x if (impl)
301 {
302 2047x tl_cached_impl.set(nullptr);
303
1/2
✓ Branch 5 → 6 taken 2047 times.
✗ Branch 5 → 7 not taken.
2047x if (impl->svc_ == svc)
304 2047x return impl;
305 // Stale impl from a destroyed service
306 delete impl;
307 }
308 160x return nullptr;
309 }
310
311 inline bool
312 2203x try_push_tl_cache(timer_service::implementation* impl) noexcept
313 {
314
2/2
✓ Branch 3 → 4 taken 2158 times.
✓ Branch 3 → 6 taken 45 times.
2203x if (!tl_cached_impl.get())
315 {
316 2158x tl_cached_impl.set(impl);
317 2158x return true;
318 }
319 45x return false;
320 }
321
322 inline waiter_node*
323 1951x try_pop_waiter_tl_cache() noexcept
324 {
325 1951x auto* w = tl_cached_waiter.get();
326
2/2
✓ Branch 3 → 4 taken 1816 times.
✓ Branch 3 → 6 taken 135 times.
1951x if (w)
327 {
328 1816x tl_cached_waiter.set(nullptr);
329 1816x return w;
330 }
331 135x return nullptr;
332 }
333
334 inline bool
335 1943x try_push_waiter_tl_cache(waiter_node* w) noexcept
336 {
337
2/2
✓ Branch 3 → 4 taken 1900 times.
✓ Branch 3 → 6 taken 43 times.
1943x if (!tl_cached_waiter.get())
338 {
339 1900x tl_cached_waiter.set(w);
340 1900x return true;
341 }
342 43x return false;
343 }
344
345 inline void
346 738x timer_service_invalidate_cache() noexcept
347 {
348
2/2
✓ Branch 3 → 4 taken 111 times.
✓ Branch 3 → 5 taken 627 times.
738x delete tl_cached_impl.get();
349 738x tl_cached_impl.set(nullptr);
350
351
2/2
✓ Branch 7 → 8 taken 84 times.
✓ Branch 7 → 10 taken 654 times.
738x delete tl_cached_waiter.get();
352 738x tl_cached_waiter.set(nullptr);
353 738x }
354
355 // timer_service out-of-class member function definitions
356
357 160x inline timer_service::implementation::implementation(
358 160x timer_service& svc) noexcept
359 160x : svc_(&svc)
360 {
361 160x }
362
363 inline void
364 738x timer_service::shutdown()
365 {
366 738x timer_service_invalidate_cache();
367 738x shutting_down_ = true;
368
369 // Snapshot impls and detach them from the heap so that
370 // coroutine-owned timer destructors (triggered by h.destroy()
371 // below) cannot re-enter remove_timer_impl() and mutate the
372 // vector during iteration.
373 738x std::vector<implementation*> impls;
374
1/1
✓ Branch 4 → 5 taken 738 times.
738x impls.reserve(heap_.size());
375
2/2
✓ Branch 12 → 7 taken 4 times.
✓ Branch 12 → 13 taken 738 times.
742x for (auto& entry : heap_)
376 {
377 4x entry.timer_->heap_index_ = (std::numeric_limits<std::size_t>::max)();
378
1/1
✓ Branch 9 → 10 taken 4 times.
4x impls.push_back(entry.timer_);
379 }
380 738x heap_.clear();
381 738x cached_nearest_ns_.store(
382 (std::numeric_limits<std::int64_t>::max)(), std::memory_order_release);
383
384 // Cancel waiting timers. Each waiter called work_started()
385 // in implementation::wait(). On IOCP the scheduler shutdown
386 // loop exits when outstanding_work_ reaches zero, so we must
387 // call work_finished() here to balance it. On other backends
388 // this is harmless.
389
2/2
✓ Branch 56 → 37 taken 4 times.
✓ Branch 56 → 57 taken 738 times.
742x for (auto* impl : impls)
390 {
391
2/2
✓ Branch 40 → 41 taken 4 times.
✓ Branch 40 → 52 taken 4 times.
8x while (auto* w = impl->waiters_.pop_front())
392 {
393 4x w->stop_cb_.reset();
394 4x auto h = std::exchange(w->h_, {});
395 4x sched_->work_finished();
396
1/2
✓ Branch 46 → 47 taken 4 times.
✗ Branch 46 → 48 not taken.
4x if (h)
397
1/1
✓ Branch 47 → 48 taken 4 times.
4x h.destroy();
398
1/2
✓ Branch 48 → 49 taken 4 times.
✗ Branch 48 → 51 not taken.
4x delete w;
399 4x }
400
1/2
✓ Branch 52 → 53 taken 4 times.
✗ Branch 52 → 54 not taken.
4x delete impl;
401 }
402
403 // Delete free-listed impls
404
2/2
✓ Branch 61 → 58 taken 45 times.
✓ Branch 61 → 62 taken 738 times.
783x while (free_list_)
405 {
406 45x auto* next = free_list_->next_free_;
407
1/2
✓ Branch 58 → 59 taken 45 times.
✗ Branch 58 → 60 not taken.
45x delete free_list_;
408 45x free_list_ = next;
409 }
410
411 // Delete free-listed waiters
412
2/2
✓ Branch 67 → 63 taken 42 times.
✓ Branch 67 → 68 taken 738 times.
780x while (waiter_free_list_)
413 {
414 42x auto* next = waiter_free_list_->next_free_;
415
1/2
✓ Branch 63 → 64 taken 42 times.
✗ Branch 63 → 66 not taken.
42x delete waiter_free_list_;
416 42x waiter_free_list_ = next;
417 }
418 738x }
419
420 inline io_object::implementation*
421 2207x timer_service::construct()
422 {
423 2207x implementation* impl = try_pop_tl_cache(this);
424
2/2
✓ Branch 3 → 4 taken 2047 times.
✓ Branch 3 → 6 taken 160 times.
2207x if (impl)
425 {
426 2047x impl->svc_ = this;
427 2047x impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
428 2047x impl->might_have_pending_waits_ = false;
429 2047x return impl;
430 }
431
432
1/1
✓ Branch 6 → 7 taken 160 times.
160x std::lock_guard lock(mutex_);
433
1/2
✗ Branch 7 → 8 not taken.
✓ Branch 7 → 10 taken 160 times.
160x if (free_list_)
434 {
435 impl = free_list_;
436 free_list_ = impl->next_free_;
437 impl->next_free_ = nullptr;
438 impl->svc_ = this;
439 impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
440 impl->might_have_pending_waits_ = false;
441 }
442 else
443 {
444
1/1
✓ Branch 10 → 11 taken 160 times.
160x impl = new implementation(*this);
445 }
446 160x return impl;
447 160x }
448
449 inline void
450 2206x timer_service::destroy(io_object::implementation* p)
451 {
452 2206x destroy_impl(static_cast<implementation&>(*p));
453 2206x }
454
455 inline void
456 2206x timer_service::destroy_impl(implementation& impl)
457 {
458 // During shutdown the impl is owned by the shutdown loop.
459 // Re-entering here (from a coroutine-owned timer destructor
460 // triggered by h.destroy()) must not modify the heap or
461 // recycle the impl — shutdown deletes it directly.
462
2/2
✓ Branch 2 → 3 taken 3 times.
✓ Branch 2 → 4 taken 2203 times.
2206x if (shutting_down_)
463 2161x return;
464
465
1/1
✓ Branch 4 → 5 taken 2203 times.
2203x cancel_timer(impl);
466
467
1/2
✗ Branch 6 → 7 not taken.
✓ Branch 6 → 12 taken 2203 times.
2203x if (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)())
468 {
469 std::lock_guard lock(mutex_);
470 remove_timer_impl(impl);
471 refresh_cached_nearest();
472 }
473
474
2/2
✓ Branch 13 → 14 taken 2158 times.
✓ Branch 13 → 15 taken 45 times.
2203x if (try_push_tl_cache(&impl))
475 2158x return;
476
477
1/1
✓ Branch 15 → 16 taken 45 times.
45x std::lock_guard lock(mutex_);
478 45x impl.next_free_ = free_list_;
479 45x free_list_ = &impl;
480 45x }
481
482 inline waiter_node*
483 1951x timer_service::create_waiter()
484 {
485
2/2
✓ Branch 3 → 4 taken 1816 times.
✓ Branch 3 → 5 taken 135 times.
1951x if (auto* w = try_pop_waiter_tl_cache())
486 1816x return w;
487
488
1/1
✓ Branch 5 → 6 taken 135 times.
135x std::lock_guard lock(mutex_);
489
2/2
✓ Branch 6 → 7 taken 1 time.
✓ Branch 6 → 8 taken 134 times.
135x if (waiter_free_list_)
490 {
491 1x auto* w = waiter_free_list_;
492 1x waiter_free_list_ = w->next_free_;
493 1x w->next_free_ = nullptr;
494 1x return w;
495 }
496
497
1/1
✓ Branch 8 → 9 taken 134 times.
134x return new waiter_node();
498 135x }
499
500 inline void
501 1943x timer_service::destroy_waiter(waiter_node* w)
502 {
503
2/2
✓ Branch 3 → 4 taken 1900 times.
✓ Branch 3 → 5 taken 43 times.
1943x if (try_push_waiter_tl_cache(w))
504 1900x return;
505
506
1/1
✓ Branch 5 → 6 taken 43 times.
43x std::lock_guard lock(mutex_);
507 43x w->next_free_ = waiter_free_list_;
508 43x waiter_free_list_ = w;
509 43x }
510
511 inline std::size_t
512 3x timer_service::update_timer(implementation& impl, time_point new_time)
513 {
514 bool in_heap =
515 3x (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)());
516
2/6
✗ Branch 3 → 4 not taken.
✓ Branch 3 → 7 taken 3 times.
✗ Branch 5 → 6 not taken.
✗ Branch 5 → 7 not taken.
✗ Branch 8 → 9 not taken.
✓ Branch 8 → 10 taken 3 times.
3x if (!in_heap && impl.waiters_.empty())
517 return 0;
518
519 3x bool notify = false;
520 3x intrusive_list<waiter_node> canceled;
521
522 {
523
1/1
✓ Branch 10 → 11 taken 3 times.
3x std::lock_guard lock(mutex_);
524
525
2/2
✓ Branch 12 → 13 taken 5 times.
✓ Branch 12 → 15 taken 3 times.
8x while (auto* w = impl.waiters_.pop_front())
526 {
527 5x w->impl_ = nullptr;
528 5x canceled.push_back(w);
529 5x }
530
531
1/2
✓ Branch 16 → 17 taken 3 times.
✗ Branch 16 → 25 not taken.
3x if (impl.heap_index_ < heap_.size())
532 {
533 3x time_point old_time = heap_[impl.heap_index_].time_;
534 3x heap_[impl.heap_index_].time_ = new_time;
535
536
2/3
✓ Branch 19 → 20 taken 3 times.
✓ Branch 21 → 22 taken 3 times.
✗ Branch 21 → 23 not taken.
3x if (new_time < old_time)
537
1/1
✓ Branch 22 → 24 taken 3 times.
3x up_heap(impl.heap_index_);
538 else
539 down_heap(impl.heap_index_);
540
541 3x notify = (impl.heap_index_ == 0);
542 }
543
544 3x refresh_cached_nearest();
545 3x }
546
547 3x std::size_t count = 0;
548
2/2
✓ Branch 29 → 30 taken 5 times.
✓ Branch 29 → 33 taken 3 times.
8x while (auto* w = canceled.pop_front())
549 {
550 5x w->ec_value_ = make_error_code(capy::error::canceled);
551
1/1
✓ Branch 31 → 32 taken 5 times.
5x sched_->post(&w->op_);
552 5x ++count;
553 5x }
554
555
1/2
✓ Branch 33 → 34 taken 3 times.
✗ Branch 33 → 35 not taken.
3x if (notify)
556
1/1
✓ Branch 34 → 35 taken 3 times.
3x on_earliest_changed_();
557
558 3x return count;
559 }
560
561 inline void
562 1951x timer_service::insert_waiter(implementation& impl, waiter_node* w)
563 {
564 1951x bool notify = false;
565 {
566
1/1
✓ Branch 2 → 3 taken 1951 times.
1951x std::lock_guard lock(mutex_);
567
2/2
✓ Branch 4 → 5 taken 1940 times.
✓ Branch 4 → 10 taken 11 times.
1951x if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)())
568 {
569 1940x impl.heap_index_ = heap_.size();
570
1/1
✓ Branch 6 → 7 taken 1940 times.
1940x heap_.push_back({impl.expiry_, &impl});
571
1/1
✓ Branch 8 → 9 taken 1940 times.
1940x up_heap(heap_.size() - 1);
572 1940x notify = (impl.heap_index_ == 0);
573 1940x refresh_cached_nearest();
574 }
575 1951x impl.waiters_.push_back(w);
576 1951x }
577
2/2
✓ Branch 12 → 13 taken 1924 times.
✓ Branch 12 → 14 taken 27 times.
1951x if (notify)
578 1924x on_earliest_changed_();
579 1951x }
580
581 inline std::size_t
582 2958x timer_service::cancel_timer(implementation& impl)
583 {
584
2/2
✓ Branch 2 → 3 taken 2195 times.
✓ Branch 2 → 4 taken 763 times.
2958x if (!impl.might_have_pending_waits_)
585 2195x return 0;
586
587 // Not in heap and no waiters — just clear the flag
588
2/6
✗ Branch 5 → 6 not taken.
✓ Branch 5 → 9 taken 763 times.
✗ Branch 7 → 8 not taken.
✗ Branch 7 → 9 not taken.
✗ Branch 10 → 11 not taken.
✓ Branch 10 → 12 taken 763 times.
763x if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)() &&
589 impl.waiters_.empty())
590 {
591 impl.might_have_pending_waits_ = false;
592 return 0;
593 }
594
595 763x intrusive_list<waiter_node> canceled;
596
597 {
598
1/1
✓ Branch 12 → 13 taken 763 times.
763x std::lock_guard lock(mutex_);
599
1/1
✓ Branch 13 → 14 taken 763 times.
763x remove_timer_impl(impl);
600
2/2
✓ Branch 15 → 16 taken 765 times.
✓ Branch 15 → 18 taken 763 times.
1528x while (auto* w = impl.waiters_.pop_front())
601 {
602 765x w->impl_ = nullptr;
603 765x canceled.push_back(w);
604 765x }
605 763x refresh_cached_nearest();
606 763x }
607
608 763x impl.might_have_pending_waits_ = false;
609
610 763x std::size_t count = 0;
611
2/2
✓ Branch 22 → 23 taken 765 times.
✓ Branch 22 → 26 taken 763 times.
1528x while (auto* w = canceled.pop_front())
612 {
613 765x w->ec_value_ = make_error_code(capy::error::canceled);
614
1/1
✓ Branch 24 → 25 taken 765 times.
765x sched_->post(&w->op_);
615 765x ++count;
616 765x }
617
618 763x return count;
619 }
620
621 inline void
622 18x timer_service::cancel_waiter(waiter_node* w)
623 {
624 {
625
1/1
✓ Branch 2 → 3 taken 18 times.
18x std::lock_guard lock(mutex_);
626 // Already removed by cancel_timer or process_expired
627
1/2
✗ Branch 3 → 4 not taken.
✓ Branch 3 → 5 taken 18 times.
18x if (!w->impl_)
628 return;
629 18x auto* impl = w->impl_;
630 18x w->impl_ = nullptr;
631 18x impl->waiters_.remove(w);
632
2/2
✓ Branch 7 → 8 taken 17 times.
✓ Branch 7 → 10 taken 1 time.
18x if (impl->waiters_.empty())
633 {
634
1/1
✓ Branch 8 → 9 taken 17 times.
17x remove_timer_impl(*impl);
635 17x impl->might_have_pending_waits_ = false;
636 }
637 18x refresh_cached_nearest();
638 18x }
639
640 18x w->ec_value_ = make_error_code(capy::error::canceled);
641 18x sched_->post(&w->op_);
642 }
643
644 inline std::size_t
645 1x timer_service::cancel_one_waiter(implementation& impl)
646 {
647
1/2
✗ Branch 2 → 3 not taken.
✓ Branch 2 → 4 taken 1 time.
1x if (!impl.might_have_pending_waits_)
648 return 0;
649
650 1x waiter_node* w = nullptr;
651
652 {
653
1/1
✓ Branch 4 → 5 taken 1 time.
1x std::lock_guard lock(mutex_);
654 1x w = impl.waiters_.pop_front();
655
1/2
✗ Branch 6 → 7 not taken.
✓ Branch 6 → 8 taken 1 time.
1x if (!w)
656 return 0;
657 1x w->impl_ = nullptr;
658
1/2
✗ Branch 9 → 10 not taken.
✓ Branch 9 → 12 taken 1 time.
1x if (impl.waiters_.empty())
659 {
660 remove_timer_impl(impl);
661 impl.might_have_pending_waits_ = false;
662 }
663 1x refresh_cached_nearest();
664 1x }
665
666 1x w->ec_value_ = make_error_code(capy::error::canceled);
667 1x sched_->post(&w->op_);
668 1x return 1;
669 }
670
671 inline std::size_t
672 1161x timer_service::process_expired()
673 {
674 1161x intrusive_list<waiter_node> expired;
675
676 {
677
1/1
✓ Branch 2 → 3 taken 1161 times.
1161x std::lock_guard lock(mutex_);
678 1161x auto now = clock_type::now();
679
680
7/7
✓ Branch 14 → 15 taken 2258 times.
✓ Branch 14 → 20 taken 59 times.
✓ Branch 16 → 17 taken 2258 times.
✓ Branch 18 → 19 taken 1156 times.
✓ Branch 18 → 20 taken 1102 times.
✓ Branch 21 → 5 taken 1156 times.
✓ Branch 21 → 22 taken 1161 times.
2317x while (!heap_.empty() && heap_[0].time_ <= now)
681 {
682 1156x implementation* t = heap_[0].timer_;
683
1/1
✓ Branch 6 → 7 taken 1156 times.
1156x remove_timer_impl(*t);
684
2/2
✓ Branch 8 → 9 taken 1158 times.
✓ Branch 8 → 12 taken 1156 times.
2314x while (auto* w = t->waiters_.pop_front())
685 {
686 1158x w->impl_ = nullptr;
687 1158x w->ec_value_ = {};
688 1158x expired.push_back(w);
689 1158x }
690 1156x t->might_have_pending_waits_ = false;
691 }
692
693 1161x refresh_cached_nearest();
694 1161x }
695
696 1161x std::size_t count = 0;
697
2/2
✓ Branch 26 → 27 taken 1158 times.
✓ Branch 26 → 29 taken 1161 times.
2319x while (auto* w = expired.pop_front())
698 {
699
1/1
✓ Branch 27 → 28 taken 1158 times.
1158x sched_->post(&w->op_);
700 1158x ++count;
701 1158x }
702
703 1161x return count;
704 }
705
706 inline void
707 1936x timer_service::remove_timer_impl(implementation& impl)
708 {
709 1936x std::size_t index = impl.heap_index_;
710
1/2
✗ Branch 3 → 4 not taken.
✓ Branch 3 → 5 taken 1936 times.
1936x if (index >= heap_.size())
711 return; // Not in heap
712
713
2/2
✓ Branch 6 → 7 taken 838 times.
✓ Branch 6 → 9 taken 1098 times.
1936x if (index == heap_.size() - 1)
714 {
715 // Last element, just pop
716 838x impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
717 838x heap_.pop_back();
718 }
719 else
720 {
721 // Swap with last and reheapify
722 1098x swap_heap(index, heap_.size() - 1);
723 1098x impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
724 1098x heap_.pop_back();
725
726
2/6
✗ Branch 13 → 14 not taken.
✓ Branch 13 → 20 taken 1098 times.
✗ Branch 18 → 19 not taken.
✗ Branch 18 → 20 not taken.
✗ Branch 21 → 22 not taken.
✓ Branch 21 → 23 taken 1098 times.
1098x if (index > 0 && heap_[index].time_ < heap_[(index - 1) / 2].time_)
727 up_heap(index);
728 else
729 1098x down_heap(index);
730 }
731 }
732
733 inline void
734 1943x timer_service::up_heap(std::size_t index)
735 {
736
2/2
✓ Branch 11 → 3 taken 1103 times.
✓ Branch 11 → 12 taken 1927 times.
3030x while (index > 0)
737 {
738 1103x std::size_t parent = (index - 1) / 2;
739
2/2
✓ Branch 7 → 8 taken 16 times.
✓ Branch 7 → 9 taken 1087 times.
1103x if (!(heap_[index].time_ < heap_[parent].time_))
740 16x break;
741 1087x swap_heap(index, parent);
742 1087x index = parent;
743 }
744 1943x }
745
746 inline void
747 1098x timer_service::down_heap(std::size_t index)
748 {
749 1098x std::size_t child = index * 2 + 1;
750
2/2
✓ Branch 21 → 3 taken 3 times.
✓ Branch 21 → 22 taken 1095 times.
1098x while (child < heap_.size())
751 {
752 3x std::size_t min_child = (child + 1 == heap_.size() ||
753 heap_[child].time_ < heap_[child + 1].time_)
754
1/2
✗ Branch 4 → 5 not taken.
✓ Branch 4 → 11 taken 3 times.
3x ? child
755 3x : child + 1;
756
757
1/2
✓ Branch 16 → 17 taken 3 times.
✗ Branch 16 → 18 not taken.
3x if (heap_[index].time_ < heap_[min_child].time_)
758 3x break;
759
760 swap_heap(index, min_child);
761 index = min_child;
762 child = index * 2 + 1;
763 }
764 1098x }
765
766 inline void
767 2185x timer_service::swap_heap(std::size_t i1, std::size_t i2)
768 {
769 2185x heap_entry tmp = heap_[i1];
770 2185x heap_[i1] = heap_[i2];
771 2185x heap_[i2] = tmp;
772 2185x heap_[i1].timer_->heap_index_ = i1;
773 2185x heap_[i2].timer_->heap_index_ = i2;
774 2185x }
775
776 // waiter_node out-of-class member function definitions
777
778 inline void
779 18x waiter_node::canceller::operator()() const
780 {
781 18x waiter_->svc_->cancel_waiter(waiter_);
782 18x }
783
784 inline void
785 1943x waiter_node::completion_op::do_complete(
786 [[maybe_unused]] void* owner,
787 scheduler_op* base,
788 std::uint32_t,
789 std::uint32_t)
790 {
791 // owner is always non-null here. The destroy path (owner == nullptr)
792 // is unreachable because completion_op overrides destroy() directly,
793 // bypassing scheduler_op::destroy() which would call func_(nullptr, ...).
794
1/2
✗ Branch 2 → 3 not taken.
✓ Branch 2 → 4 taken 1943 times.
1943x BOOST_COROSIO_ASSERT(owner);
795 1943x static_cast<completion_op*>(base)->operator()();
796 1943x }
797
798 inline void
799 1943x waiter_node::completion_op::operator()()
800 {
801 1943x auto* w = waiter_;
802 1943x w->stop_cb_.reset();
803
1/2
✓ Branch 3 → 4 taken 1943 times.
✗ Branch 3 → 5 not taken.
1943x if (w->ec_out_)
804 1943x *w->ec_out_ = w->ec_value_;
805
806 1943x auto h = w->h_;
807 1943x auto d = w->d_;
808 1943x auto* svc = w->svc_;
809 1943x auto& sched = svc->get_scheduler();
810
811
1/1
✓ Branch 6 → 7 taken 1943 times.
1943x svc->destroy_waiter(w);
812
813
1/1
✓ Branch 7 → 8 taken 1943 times.
1943x d.post(h);
814 1943x sched.work_finished();
815 1943x }
816
817 // GCC 14 false-positive: inlining ~optional<stop_callback> through
818 // delete loses track that stop_cb_ was already .reset() above.
819 #if defined(__GNUC__) && !defined(__clang__)
820 #pragma GCC diagnostic push
821 #pragma GCC diagnostic ignored "-Wmaybe-uninitialized"
822 #endif
823 inline void
824 4x waiter_node::completion_op::destroy()
825 {
826 // Called during scheduler shutdown drain when this completion_op is
827 // in the scheduler's ready queue (posted by cancel_timer() or
828 // process_expired()). Balances the work_started() from
829 // implementation::wait(). The scheduler drain loop separately
830 // balances the work_started() from post(). On IOCP both decrements
831 // are required for outstanding_work_ to reach zero; on other
832 // backends this is harmless.
833 //
834 // This override also prevents scheduler_op::destroy() from calling
835 // do_complete(nullptr, ...). See also: timer_service::shutdown()
836 // which drains waiters still in the timer heap (the other path).
837 4x auto* w = waiter_;
838 4x w->stop_cb_.reset();
839 4x auto h = std::exchange(w->h_, {});
840 4x auto& sched = w->svc_->get_scheduler();
841
1/2
✓ Branch 6 → 7 taken 4 times.
✗ Branch 6 → 9 not taken.
4x delete w;
842 4x sched.work_finished();
843
1/2
✓ Branch 11 → 12 taken 4 times.
✗ Branch 11 → 13 not taken.
4x if (h)
844
1/1
✓ Branch 12 → 13 taken 4 times.
4x h.destroy();
845 4x }
846 #if defined(__GNUC__) && !defined(__clang__)
847 #pragma GCC diagnostic pop
848 #endif
849
850 inline std::coroutine_handle<>
851 2150x timer_service::implementation::wait(
852 std::coroutine_handle<> h,
853 capy::executor_ref d,
854 std::stop_token token,
855 std::error_code* ec)
856 {
857 // Already-expired fast path — no waiter_node, no mutex.
858 // Post instead of dispatch so the coroutine yields to the
859 // scheduler, allowing other queued work to run.
860
2/2
✓ Branch 3 → 4 taken 2139 times.
✓ Branch 3 → 21 taken 11 times.
2150x if (heap_index_ == (std::numeric_limits<std::size_t>::max)())
861 {
862
7/8
✓ Branch 5 → 6 taken 2139 times.
✓ Branch 6 → 7 taken 2139 times.
✗ Branch 6 → 11 not taken.
✓ Branch 8 → 9 taken 2139 times.
✓ Branch 10 → 11 taken 199 times.
✓ Branch 10 → 12 taken 1940 times.
✓ Branch 13 → 14 taken 199 times.
✓ Branch 13 → 21 taken 1940 times.
2139x if (expiry_ == (time_point::min)() || expiry_ <= clock_type::now())
863 {
864
1/2
✓ Branch 14 → 15 taken 199 times.
✗ Branch 14 → 17 not taken.
199x if (ec)
865 199x *ec = {};
866 199x d.post(h);
867 199x return std::noop_coroutine();
868 }
869 }
870
871 1951x auto* w = svc_->create_waiter();
872 1951x w->impl_ = this;
873 1951x w->svc_ = svc_;
874 1951x w->h_ = h;
875 1951x w->d_ = d;
876 1951x w->token_ = std::move(token);
877 1951x w->ec_out_ = ec;
878
879 1951x svc_->insert_waiter(*this, w);
880 1951x might_have_pending_waits_ = true;
881 1951x svc_->get_scheduler().work_started();
882
883
2/2
✓ Branch 28 → 29 taken 30 times.
✓ Branch 28 → 31 taken 1921 times.
1951x if (w->token_.stop_possible())
884 30x w->stop_cb_.emplace(w->token_, waiter_node::canceller{w});
885
886 1951x return std::noop_coroutine();
887 }
888
889 // Free functions
890
891 struct timer_service_access
892 {
893 2207x static native_scheduler& get_scheduler(io_context& ctx) noexcept
894 {
895 2207x return static_cast<native_scheduler&>(*ctx.sched_);
896 }
897 };
898
899 // Bypass find_service() mutex by reading the scheduler's cached pointer
900 inline io_object::io_service&
901 2207x timer_service_direct(capy::execution_context& ctx) noexcept
902 {
903 2207x return *timer_service_access::get_scheduler(static_cast<io_context&>(ctx))
904 2207x .timer_svc_;
905 }
906
907 inline std::size_t
908 3x timer_service_update_expiry(timer::implementation& base)
909 {
910 3x auto& impl = static_cast<timer_service::implementation&>(base);
911 3x return impl.svc_->update_timer(impl, impl.expiry_);
912 }
913
914 inline std::size_t
915 755x timer_service_cancel(timer::implementation& base) noexcept
916 {
917 755x auto& impl = static_cast<timer_service::implementation&>(base);
918 755x return impl.svc_->cancel_timer(impl);
919 }
920
921 inline std::size_t
922 1x timer_service_cancel_one(timer::implementation& base) noexcept
923 {
924 1x auto& impl = static_cast<timer_service::implementation&>(base);
925 1x return impl.svc_->cancel_one_waiter(impl);
926 }
927
928 inline timer_service&
929 369x get_timer_service(capy::execution_context& ctx, scheduler& sched)
930 {
931 369x return ctx.make_service<timer_service>(sched);
932 }
933
934 } // namespace boost::corosio::detail
935
936 #endif
937