include/boost/corosio/detail/timer_service.hpp

92.8% Lines (349/376) 100.0% List of functions (45/45) 75.9% Branches (142/187)
timer_service.hpp
f(x) Functions (45)
Function Calls Lines Branches Blocks
boost::corosio::detail::timer_service::callback::callback() :96 431x 100.0% 100.0% boost::corosio::detail::timer_service::callback::callback(void*, void (*)(void*)) :99 431x 100.0% 100.0% boost::corosio::detail::timer_service::callback::operator()() const :108 1679x 100.0% 50.0% 100.0% boost::corosio::detail::timer_service::timer_service(boost::capy::execution_context&, boost::corosio::detail::scheduler&) :137 431x 100.0% 100.0% boost::corosio::detail::timer_service::get_scheduler() :143 3404x 100.0% 100.0% boost::corosio::detail::timer_service::~timer_service() :149 862x 100.0% 100.0% boost::corosio::detail::timer_service::set_on_earliest_changed(boost::corosio::detail::timer_service::callback) :155 431x 100.0% 100.0% boost::corosio::detail::timer_service::nearest_expiry() const :168 2653x 100.0% 70.0% boost::corosio::detail::timer_service::refresh_cached_nearest() :211 3396x 100.0% 100.0% 66.7% boost::corosio::detail::waiter_node::completion_op::completion_op() :235 134x 100.0% 100.0% boost::corosio::detail::waiter_node::waiter_node() :261 134x 100.0% 100.0% boost::corosio::detail::try_pop_tl_cache(boost::corosio::detail::timer_service*) :298 2136x 87.5% 50.0% 77.8% boost::corosio::detail::try_push_tl_cache(boost::corosio::detail::timer_service::implementation*) :313 2132x 100.0% 100.0% 100.0% boost::corosio::detail::try_pop_waiter_tl_cache() :324 1704x 100.0% 100.0% 100.0% boost::corosio::detail::try_push_waiter_tl_cache(boost::corosio::detail::waiter_node*) :336 1696x 100.0% 100.0% 100.0% boost::corosio::detail::timer_service_invalidate_cache() :347 862x 100.0% 100.0% 100.0% boost::corosio::detail::timer_service::implementation::implementation(boost::corosio::detail::timer_service&) :358 160x 100.0% 100.0% boost::corosio::detail::timer_service::shutdown() :365 862x 100.0% 78.3% 81.1% boost::corosio::detail::timer_service::construct() :422 2136x 66.7% 83.3% 76.5% boost::corosio::detail::timer_service::destroy(boost::corosio::io_object::implementation*) :451 2135x 100.0% 100.0% boost::corosio::detail::timer_service::destroy_impl(boost::corosio::detail::timer_service::implementation&) :457 2135x 73.3% 70.0% 63.6% boost::corosio::detail::timer_service::create_waiter() :484 1704x 100.0% 100.0% 86.7% boost::corosio::detail::timer_service::destroy_waiter(boost::corosio::detail::waiter_node*) :502 1696x 100.0% 100.0% 100.0% boost::corosio::detail::timer_service::update_timer(boost::corosio::detail::timer_service::implementation&, std::chrono::time_point<std::chrono::_V2::steady_clock, std::chrono::duration<long long, std::ratio<1ll, 1000000000ll> > >) :513 3x 93.1% 63.6% 78.0% boost::corosio::detail::timer_service::insert_waiter(boost::corosio::detail::timer_service::implementation&, boost::corosio::detail::waiter_node*) :563 1704x 100.0% 100.0% 82.4% boost::corosio::detail::timer_service::cancel_timer(boost::corosio::detail::timer_service::implementation&) :583 2831x 87.5% 73.3% 77.4% boost::corosio::detail::timer_service::cancel_waiter(boost::corosio::detail::waiter_node*) :623 18x 92.9% 83.3% 80.0% boost::corosio::detail::timer_service::cancel_one_waiter(boost::corosio::detail::timer_service::implementation&) :646 1x 76.5% 50.0% 69.6% boost::corosio::detail::timer_service::process_expired() :673 974x 100.0% 100.0% 90.9% boost::corosio::detail::timer_service::remove_timer_impl(boost::corosio::detail::timer_service::implementation&) :708 1689x 84.6% 50.0% 65.2% boost::corosio::detail::timer_service::up_heap(unsigned long long) :735 1696x 100.0% 100.0% 100.0% boost::corosio::detail::timer_service::down_heap(unsigned long long) :748 907x 69.2% 50.0% 61.9% boost::corosio::detail::timer_service::swap_heap(unsigned long long, unsigned long long) :768 1802x 100.0% 100.0% boost::corosio::detail::waiter_node::canceller::operator()() const :780 18x 100.0% 100.0% boost::corosio::detail::waiter_node::completion_op::do_complete(void*, boost::corosio::detail::scheduler_op*, unsigned int, unsigned int) :786 1696x 100.0% 50.0% 80.0% boost::corosio::detail::waiter_node::completion_op::operator()() :800 1696x 100.0% 75.0% 100.0% boost::corosio::detail::waiter_node::completion_op::destroy() :825 4x 100.0% 60.0% 100.0% boost::corosio::detail::timer_service::implementation::wait(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, std::stop_token, std::error_code*, boost::capy::continuation*) :852 1987x 100.0% 85.7% 97.1% boost::corosio::detail::timer_service_access::get_timer(boost::corosio::io_context&) :896 2136x 100.0% 100.0% boost::corosio::detail::timer_service_access::set_timer(boost::corosio::io_context&, boost::corosio::detail::timer_service&) :901 431x 100.0% 100.0% boost::corosio::detail::timer_service_direct(boost::capy::execution_context&) :909 2136x 100.0% 100.0% boost::corosio::detail::timer_service_update_expiry(boost::corosio::io_timer::implementation&) :915 3x 100.0% 100.0% boost::corosio::detail::timer_service_cancel(boost::corosio::io_timer::implementation&) :922 699x 100.0% 100.0% boost::corosio::detail::timer_service_cancel_one(boost::corosio::io_timer::implementation&) :929 1x 100.0% 100.0% boost::corosio::detail::get_timer_service(boost::capy::execution_context&, boost::corosio::detail::scheduler&) :936 431x 100.0% 100.0%
Line Branch TLA Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco ([email protected])
3 // Copyright (c) 2026 Steve Gerbino
4 //
5 // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 //
8 // Official repository: https://github.com/cppalliance/corosio
9 //
10
11 #ifndef BOOST_COROSIO_DETAIL_TIMER_SERVICE_HPP
12 #define BOOST_COROSIO_DETAIL_TIMER_SERVICE_HPP
13
14 #include <boost/corosio/timer.hpp>
15 #include <boost/corosio/io_context.hpp>
16 #include <boost/corosio/detail/scheduler_op.hpp>
17 #include <boost/corosio/detail/intrusive.hpp>
18 #include <boost/corosio/detail/thread_local_ptr.hpp>
19 #include <boost/capy/error.hpp>
20 #include <boost/capy/ex/execution_context.hpp>
21 #include <boost/capy/ex/executor_ref.hpp>
22 #include <system_error>
23
24 #include <atomic>
25 #include <chrono>
26 #include <coroutine>
27 #include <cstddef>
28 #include <limits>
29 #include <mutex>
30 #include <optional>
31 #include <stop_token>
32 #include <utility>
33 #include <vector>
34
35 namespace boost::corosio::detail {
36
37 struct scheduler;
38
39 /*
40 Timer Service
41 =============
42
43 Data Structures
44 ---------------
45 waiter_node holds per-waiter state: coroutine handle, executor,
46 error output, stop_token, embedded completion_op. Each concurrent
47 co_await t.wait() allocates one waiter_node.
48
49 timer_service::implementation holds per-timer state: expiry,
50 heap index, and an intrusive_list of waiter_nodes. Multiple
51 coroutines can wait on the same timer simultaneously.
52
53 timer_service owns a min-heap of active timers, a free list
54 of recycled impls, and a free list of recycled waiter_nodes. The
55 heap is ordered by expiry time; the scheduler queries
56 nearest_expiry() to set the epoll/timerfd timeout.
57
58 Optimization Strategy
59 ---------------------
60 1. Deferred heap insertion — expires_after() stores the expiry
61 but does not insert into the heap. Insertion happens in wait().
62 2. Thread-local impl cache — single-slot per-thread cache.
63 3. Embedded completion_op — eliminates heap allocation per fire/cancel.
64 4. Cached nearest expiry — atomic avoids mutex in nearest_expiry().
65 5. might_have_pending_waits_ flag — skips lock when no wait issued.
66 6. Thread-local waiter cache — single-slot per-thread cache.
67
68 Concurrency
69 -----------
70 stop_token callbacks can fire from any thread. The impl_
71 pointer on waiter_node is used as a "still in list" marker.
72 */
73
74 struct BOOST_COROSIO_SYMBOL_VISIBLE waiter_node;
75
76 inline void timer_service_invalidate_cache() noexcept;
77
78 // timer_service class body — member function definitions are
79 // out-of-class (after implementation and waiter_node are complete)
80 class BOOST_COROSIO_DECL timer_service final
81 : public capy::execution_context::service
82 , public io_object::io_service
83 {
84 public:
85 using clock_type = std::chrono::steady_clock;
86 using time_point = clock_type::time_point;
87
88 /// Type-erased callback for earliest-expiry-changed notifications.
89 class callback
90 {
91 void* ctx_ = nullptr;
92 void (*fn_)(void*) = nullptr;
93
94 public:
95 /// Construct an empty callback.
96 431x callback() = default;
97
98 /// Construct a callback with the given context and function.
99 431x callback(void* ctx, void (*fn)(void*)) noexcept : ctx_(ctx), fn_(fn) {}
100
101 /// Return true if the callback is non-empty.
102 explicit operator bool() const noexcept
103 {
104 return fn_ != nullptr;
105 }
106
107 /// Invoke the callback.
108 1679x void operator()() const
109 {
110
1/2
✓ Branch 2 → 3 taken 1679 times.
✗ Branch 2 → 4 not taken.
1679x if (fn_)
111 1679x fn_(ctx_);
112 1679x }
113 };
114
115 struct implementation;
116
117 private:
118 struct heap_entry
119 {
120 time_point time_;
121 implementation* timer_;
122 };
123
124 scheduler* sched_ = nullptr;
125 mutable std::mutex mutex_;
126 std::vector<heap_entry> heap_;
127 implementation* free_list_ = nullptr;
128 waiter_node* waiter_free_list_ = nullptr;
129 callback on_earliest_changed_;
130 bool shutting_down_ = false;
131 // Avoids mutex in nearest_expiry() and empty()
132 mutable std::atomic<std::int64_t> cached_nearest_ns_{
133 (std::numeric_limits<std::int64_t>::max)()};
134
135 public:
136 /// Construct the timer service bound to a scheduler.
137 431x inline timer_service(capy::execution_context&, scheduler& sched)
138 431x : sched_(&sched)
139 {
140 431x }
141
142 /// Return the associated scheduler.
143 3404x inline scheduler& get_scheduler() noexcept
144 {
145 3404x return *sched_;
146 }
147
148 /// Destroy the timer service.
149 862x ~timer_service() override = default;
150
151 timer_service(timer_service const&) = delete;
152 timer_service& operator=(timer_service const&) = delete;
153
154 /// Register a callback invoked when the earliest expiry changes.
155 431x inline void set_on_earliest_changed(callback cb)
156 {
157 431x on_earliest_changed_ = cb;
158 431x }
159
160 /// Return true if no timers are in the heap.
161 inline bool empty() const noexcept
162 {
163 return cached_nearest_ns_.load(std::memory_order_acquire) ==
164 (std::numeric_limits<std::int64_t>::max)();
165 }
166
167 /// Return the nearest timer expiry without acquiring the mutex.
168 2653x inline time_point nearest_expiry() const noexcept
169 {
170 2653x auto ns = cached_nearest_ns_.load(std::memory_order_acquire);
171 2653x return time_point(time_point::duration(ns));
172 }
173
174 /// Cancel all pending timers and free cached resources.
175 inline void shutdown() override;
176
177 /// Construct a new timer implementation.
178 inline io_object::implementation* construct() override;
179
180 /// Destroy a timer implementation, cancelling pending waiters.
181 inline void destroy(io_object::implementation* p) override;
182
183 /// Cancel and recycle a timer implementation.
184 inline void destroy_impl(implementation& impl);
185
186 /// Create or recycle a waiter node.
187 inline waiter_node* create_waiter();
188
189 /// Return a waiter node to the cache or free list.
190 inline void destroy_waiter(waiter_node* w);
191
192 /// Update the timer expiry, cancelling existing waiters.
193 inline std::size_t update_timer(implementation& impl, time_point new_time);
194
195 /// Insert a waiter into the timer's waiter list and the heap.
196 inline void insert_waiter(implementation& impl, waiter_node* w);
197
198 /// Cancel all waiters on a timer.
199 inline std::size_t cancel_timer(implementation& impl);
200
201 /// Cancel a single waiter ( stop_token callback path ).
202 inline void cancel_waiter(waiter_node* w);
203
204 /// Cancel one waiter on a timer.
205 inline std::size_t cancel_one_waiter(implementation& impl);
206
207 /// Complete all waiters whose timers have expired.
208 inline std::size_t process_expired();
209
210 private:
211 3396x inline void refresh_cached_nearest() noexcept
212 {
213
2/2
✓ Branch 3 → 4 taken 779 times.
✓ Branch 3 → 5 taken 2617 times.
3396x auto ns = heap_.empty() ? (std::numeric_limits<std::int64_t>::max)()
214 2617x : heap_[0].time_.time_since_epoch().count();
215 3396x cached_nearest_ns_.store(ns, std::memory_order_release);
216 3396x }
217
218 inline void remove_timer_impl(implementation& impl);
219 inline void up_heap(std::size_t index);
220 inline void down_heap(std::size_t index);
221 inline void swap_heap(std::size_t i1, std::size_t i2);
222 };
223
224 struct BOOST_COROSIO_SYMBOL_VISIBLE waiter_node
225 : intrusive_list<waiter_node>::node
226 {
227 // Embedded completion op — avoids heap allocation per fire/cancel
228 struct completion_op final : scheduler_op
229 {
230 waiter_node* waiter_ = nullptr;
231
232 static void do_complete(
233 void* owner, scheduler_op* base, std::uint32_t, std::uint32_t);
234
235 134x completion_op() noexcept : scheduler_op(&do_complete) {}
236
237 void operator()() override;
238 void destroy() override;
239 };
240
241 // Per-waiter stop_token cancellation
242 struct canceller
243 {
244 waiter_node* waiter_;
245 void operator()() const;
246 };
247
248 // nullptr once removed from timer's waiter list (concurrency marker)
249 timer_service::implementation* impl_ = nullptr;
250 timer_service* svc_ = nullptr;
251 std::coroutine_handle<> h_;
252 capy::continuation* cont_ = nullptr;
253 capy::executor_ref d_;
254 std::error_code* ec_out_ = nullptr;
255 std::stop_token token_;
256 std::optional<std::stop_callback<canceller>> stop_cb_;
257 completion_op op_;
258 std::error_code ec_value_;
259 waiter_node* next_free_ = nullptr;
260
261 134x waiter_node() noexcept
262 134x {
263 134x op_.waiter_ = this;
264 134x }
265 };
266
267 struct timer_service::implementation final : timer::implementation
268 {
269 using clock_type = std::chrono::steady_clock;
270 using time_point = clock_type::time_point;
271 using duration = clock_type::duration;
272
273 timer_service* svc_ = nullptr;
274 intrusive_list<waiter_node> waiters_;
275
276 // Free list linkage (reused when impl is on free_list)
277 implementation* next_free_ = nullptr;
278
279 inline explicit implementation(timer_service& svc) noexcept;
280
281 inline std::coroutine_handle<> wait(
282 std::coroutine_handle<>,
283 capy::executor_ref,
284 std::stop_token,
285 std::error_code*,
286 capy::continuation*) override;
287 };
288
289 // Thread-local caches avoid hot-path mutex acquisitions:
290 // 1. Impl cache — single-slot, validated by comparing svc_
291 // 2. Waiter cache — single-slot, no service affinity
292 // All caches are cleared by timer_service_invalidate_cache() during shutdown.
293
294 inline thread_local_ptr<timer_service::implementation> tl_cached_impl;
295 inline thread_local_ptr<waiter_node> tl_cached_waiter;
296
297 inline timer_service::implementation*
298 2136x try_pop_tl_cache(timer_service* svc) noexcept
299 {
300 2136x auto* impl = tl_cached_impl.get();
301
2/2
✓ Branch 3 → 4 taken 1976 times.
✓ Branch 3 → 9 taken 160 times.
2136x if (impl)
302 {
303 1976x tl_cached_impl.set(nullptr);
304
1/2
✓ Branch 5 → 6 taken 1976 times.
✗ Branch 5 → 7 not taken.
1976x if (impl->svc_ == svc)
305 1976x return impl;
306 // Stale impl from a destroyed service
307 delete impl;
308 }
309 160x return nullptr;
310 }
311
312 inline bool
313 2132x try_push_tl_cache(timer_service::implementation* impl) noexcept
314 {
315
2/2
✓ Branch 3 → 4 taken 2087 times.
✓ Branch 3 → 6 taken 45 times.
2132x if (!tl_cached_impl.get())
316 {
317 2087x tl_cached_impl.set(impl);
318 2087x return true;
319 }
320 45x return false;
321 }
322
323 inline waiter_node*
324 1704x try_pop_waiter_tl_cache() noexcept
325 {
326 1704x auto* w = tl_cached_waiter.get();
327
2/2
✓ Branch 3 → 4 taken 1569 times.
✓ Branch 3 → 6 taken 135 times.
1704x if (w)
328 {
329 1569x tl_cached_waiter.set(nullptr);
330 1569x return w;
331 }
332 135x return nullptr;
333 }
334
335 inline bool
336 1696x try_push_waiter_tl_cache(waiter_node* w) noexcept
337 {
338
2/2
✓ Branch 3 → 4 taken 1653 times.
✓ Branch 3 → 6 taken 43 times.
1696x if (!tl_cached_waiter.get())
339 {
340 1653x tl_cached_waiter.set(w);
341 1653x return true;
342 }
343 43x return false;
344 }
345
346 inline void
347 862x timer_service_invalidate_cache() noexcept
348 {
349
2/2
✓ Branch 3 → 4 taken 111 times.
✓ Branch 3 → 5 taken 751 times.
862x delete tl_cached_impl.get();
350 862x tl_cached_impl.set(nullptr);
351
352
2/2
✓ Branch 7 → 8 taken 84 times.
✓ Branch 7 → 10 taken 778 times.
862x delete tl_cached_waiter.get();
353 862x tl_cached_waiter.set(nullptr);
354 862x }
355
356 // timer_service out-of-class member function definitions
357
358 160x inline timer_service::implementation::implementation(
359 160x timer_service& svc) noexcept
360 160x : svc_(&svc)
361 {
362 160x }
363
364 inline void
365 862x timer_service::shutdown()
366 {
367 862x timer_service_invalidate_cache();
368 862x shutting_down_ = true;
369
370 // Snapshot impls and detach them from the heap so that
371 // coroutine-owned timer destructors (triggered by h.destroy()
372 // below) cannot re-enter remove_timer_impl() and mutate the
373 // vector during iteration.
374 862x std::vector<implementation*> impls;
375
1/1
✓ Branch 4 → 5 taken 862 times.
862x impls.reserve(heap_.size());
376
2/2
✓ Branch 12 → 7 taken 4 times.
✓ Branch 12 → 13 taken 862 times.
866x for (auto& entry : heap_)
377 {
378 4x entry.timer_->heap_index_ = (std::numeric_limits<std::size_t>::max)();
379
1/1
✓ Branch 9 → 10 taken 4 times.
4x impls.push_back(entry.timer_);
380 }
381 862x heap_.clear();
382 862x cached_nearest_ns_.store(
383 (std::numeric_limits<std::int64_t>::max)(), std::memory_order_release);
384
385 // Cancel waiting timers. Each waiter called work_started()
386 // in implementation::wait(). On IOCP the scheduler shutdown
387 // loop exits when outstanding_work_ reaches zero, so we must
388 // call work_finished() here to balance it. On other backends
389 // this is harmless.
390
2/2
✓ Branch 56 → 37 taken 4 times.
✓ Branch 56 → 57 taken 862 times.
866x for (auto* impl : impls)
391 {
392
2/2
✓ Branch 40 → 41 taken 4 times.
✓ Branch 40 → 52 taken 4 times.
8x while (auto* w = impl->waiters_.pop_front())
393 {
394 4x w->stop_cb_.reset();
395 4x auto h = std::exchange(w->h_, {});
396 4x sched_->work_finished();
397
1/2
✓ Branch 46 → 47 taken 4 times.
✗ Branch 46 → 48 not taken.
4x if (h)
398
1/1
✓ Branch 47 → 48 taken 4 times.
4x h.destroy();
399
1/2
✓ Branch 48 → 49 taken 4 times.
✗ Branch 48 → 51 not taken.
4x delete w;
400 4x }
401
1/2
✓ Branch 52 → 53 taken 4 times.
✗ Branch 52 → 54 not taken.
4x delete impl;
402 }
403
404 // Delete free-listed impls
405
2/2
✓ Branch 61 → 58 taken 45 times.
✓ Branch 61 → 62 taken 862 times.
907x while (free_list_)
406 {
407 45x auto* next = free_list_->next_free_;
408
1/2
✓ Branch 58 → 59 taken 45 times.
✗ Branch 58 → 60 not taken.
45x delete free_list_;
409 45x free_list_ = next;
410 }
411
412 // Delete free-listed waiters
413
2/2
✓ Branch 67 → 63 taken 42 times.
✓ Branch 67 → 68 taken 862 times.
904x while (waiter_free_list_)
414 {
415 42x auto* next = waiter_free_list_->next_free_;
416
1/2
✓ Branch 63 → 64 taken 42 times.
✗ Branch 63 → 66 not taken.
42x delete waiter_free_list_;
417 42x waiter_free_list_ = next;
418 }
419 862x }
420
421 inline io_object::implementation*
422 2136x timer_service::construct()
423 {
424 2136x implementation* impl = try_pop_tl_cache(this);
425
2/2
✓ Branch 3 → 4 taken 1976 times.
✓ Branch 3 → 6 taken 160 times.
2136x if (impl)
426 {
427 1976x impl->svc_ = this;
428 1976x impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
429 1976x impl->might_have_pending_waits_ = false;
430 1976x return impl;
431 }
432
433
1/1
✓ Branch 6 → 7 taken 160 times.
160x std::lock_guard lock(mutex_);
434
1/2
✗ Branch 7 → 8 not taken.
✓ Branch 7 → 10 taken 160 times.
160x if (free_list_)
435 {
436 impl = free_list_;
437 free_list_ = impl->next_free_;
438 impl->next_free_ = nullptr;
439 impl->svc_ = this;
440 impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
441 impl->might_have_pending_waits_ = false;
442 }
443 else
444 {
445
1/1
✓ Branch 10 → 11 taken 160 times.
160x impl = new implementation(*this);
446 }
447 160x return impl;
448 160x }
449
450 inline void
451 2135x timer_service::destroy(io_object::implementation* p)
452 {
453 2135x destroy_impl(static_cast<implementation&>(*p));
454 2135x }
455
456 inline void
457 2135x timer_service::destroy_impl(implementation& impl)
458 {
459 // During shutdown the impl is owned by the shutdown loop.
460 // Re-entering here (from a coroutine-owned timer destructor
461 // triggered by h.destroy()) must not modify the heap or
462 // recycle the impl — shutdown deletes it directly.
463
2/2
✓ Branch 2 → 3 taken 3 times.
✓ Branch 2 → 4 taken 2132 times.
2135x if (shutting_down_)
464 2090x return;
465
466
1/1
✓ Branch 4 → 5 taken 2132 times.
2132x cancel_timer(impl);
467
468
1/2
✗ Branch 6 → 7 not taken.
✓ Branch 6 → 12 taken 2132 times.
2132x if (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)())
469 {
470 std::lock_guard lock(mutex_);
471 remove_timer_impl(impl);
472 refresh_cached_nearest();
473 }
474
475
2/2
✓ Branch 13 → 14 taken 2087 times.
✓ Branch 13 → 15 taken 45 times.
2132x if (try_push_tl_cache(&impl))
476 2087x return;
477
478
1/1
✓ Branch 15 → 16 taken 45 times.
45x std::lock_guard lock(mutex_);
479 45x impl.next_free_ = free_list_;
480 45x free_list_ = &impl;
481 45x }
482
483 inline waiter_node*
484 1704x timer_service::create_waiter()
485 {
486
2/2
✓ Branch 3 → 4 taken 1569 times.
✓ Branch 3 → 5 taken 135 times.
1704x if (auto* w = try_pop_waiter_tl_cache())
487 1569x return w;
488
489
1/1
✓ Branch 5 → 6 taken 135 times.
135x std::lock_guard lock(mutex_);
490
2/2
✓ Branch 6 → 7 taken 1 time.
✓ Branch 6 → 8 taken 134 times.
135x if (waiter_free_list_)
491 {
492 1x auto* w = waiter_free_list_;
493 1x waiter_free_list_ = w->next_free_;
494 1x w->next_free_ = nullptr;
495 1x return w;
496 }
497
498
1/1
✓ Branch 8 → 9 taken 134 times.
134x return new waiter_node();
499 135x }
500
501 inline void
502 1696x timer_service::destroy_waiter(waiter_node* w)
503 {
504
2/2
✓ Branch 3 → 4 taken 1653 times.
✓ Branch 3 → 5 taken 43 times.
1696x if (try_push_waiter_tl_cache(w))
505 1653x return;
506
507
1/1
✓ Branch 5 → 6 taken 43 times.
43x std::lock_guard lock(mutex_);
508 43x w->next_free_ = waiter_free_list_;
509 43x waiter_free_list_ = w;
510 43x }
511
512 inline std::size_t
513 3x timer_service::update_timer(implementation& impl, time_point new_time)
514 {
515 bool in_heap =
516 3x (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)());
517
2/6
✗ Branch 3 → 4 not taken.
✓ Branch 3 → 7 taken 3 times.
✗ Branch 5 → 6 not taken.
✗ Branch 5 → 7 not taken.
✗ Branch 8 → 9 not taken.
✓ Branch 8 → 10 taken 3 times.
3x if (!in_heap && impl.waiters_.empty())
518 return 0;
519
520 3x bool notify = false;
521 3x intrusive_list<waiter_node> canceled;
522
523 {
524
1/1
✓ Branch 10 → 11 taken 3 times.
3x std::lock_guard lock(mutex_);
525
526
2/2
✓ Branch 12 → 13 taken 5 times.
✓ Branch 12 → 15 taken 3 times.
8x while (auto* w = impl.waiters_.pop_front())
527 {
528 5x w->impl_ = nullptr;
529 5x canceled.push_back(w);
530 5x }
531
532
1/2
✓ Branch 16 → 17 taken 3 times.
✗ Branch 16 → 25 not taken.
3x if (impl.heap_index_ < heap_.size())
533 {
534 3x time_point old_time = heap_[impl.heap_index_].time_;
535 3x heap_[impl.heap_index_].time_ = new_time;
536
537
2/3
✓ Branch 19 → 20 taken 3 times.
✓ Branch 21 → 22 taken 3 times.
✗ Branch 21 → 23 not taken.
3x if (new_time < old_time)
538
1/1
✓ Branch 22 → 24 taken 3 times.
3x up_heap(impl.heap_index_);
539 else
540 down_heap(impl.heap_index_);
541
542 3x notify = (impl.heap_index_ == 0);
543 }
544
545 3x refresh_cached_nearest();
546 3x }
547
548 3x std::size_t count = 0;
549
2/2
✓ Branch 29 → 30 taken 5 times.
✓ Branch 29 → 33 taken 3 times.
8x while (auto* w = canceled.pop_front())
550 {
551 5x w->ec_value_ = make_error_code(capy::error::canceled);
552
1/1
✓ Branch 31 → 32 taken 5 times.
5x sched_->post(&w->op_);
553 5x ++count;
554 5x }
555
556
1/2
✓ Branch 33 → 34 taken 3 times.
✗ Branch 33 → 35 not taken.
3x if (notify)
557
1/1
✓ Branch 34 → 35 taken 3 times.
3x on_earliest_changed_();
558
559 3x return count;
560 }
561
562 inline void
563 1704x timer_service::insert_waiter(implementation& impl, waiter_node* w)
564 {
565 1704x bool notify = false;
566 {
567
1/1
✓ Branch 2 → 3 taken 1704 times.
1704x std::lock_guard lock(mutex_);
568
2/2
✓ Branch 4 → 5 taken 1693 times.
✓ Branch 4 → 10 taken 11 times.
1704x if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)())
569 {
570 1693x impl.heap_index_ = heap_.size();
571
1/1
✓ Branch 6 → 7 taken 1693 times.
1693x heap_.push_back({impl.expiry_, &impl});
572
1/1
✓ Branch 8 → 9 taken 1693 times.
1693x up_heap(heap_.size() - 1);
573 1693x notify = (impl.heap_index_ == 0);
574 1693x refresh_cached_nearest();
575 }
576 1704x impl.waiters_.push_back(w);
577 1704x }
578
2/2
✓ Branch 12 → 13 taken 1676 times.
✓ Branch 12 → 14 taken 28 times.
1704x if (notify)
579 1676x on_earliest_changed_();
580 1704x }
581
582 inline std::size_t
583 2831x timer_service::cancel_timer(implementation& impl)
584 {
585
2/2
✓ Branch 2 → 3 taken 2124 times.
✓ Branch 2 → 4 taken 707 times.
2831x if (!impl.might_have_pending_waits_)
586 2124x return 0;
587
588 // Not in heap and no waiters — just clear the flag
589
2/6
✗ Branch 5 → 6 not taken.
✓ Branch 5 → 9 taken 707 times.
✗ Branch 7 → 8 not taken.
✗ Branch 7 → 9 not taken.
✗ Branch 10 → 11 not taken.
✓ Branch 10 → 12 taken 707 times.
707x if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)() &&
590 impl.waiters_.empty())
591 {
592 impl.might_have_pending_waits_ = false;
593 return 0;
594 }
595
596 707x intrusive_list<waiter_node> canceled;
597
598 {
599
1/1
✓ Branch 12 → 13 taken 707 times.
707x std::lock_guard lock(mutex_);
600
1/1
✓ Branch 13 → 14 taken 707 times.
707x remove_timer_impl(impl);
601
2/2
✓ Branch 15 → 16 taken 709 times.
✓ Branch 15 → 18 taken 707 times.
1416x while (auto* w = impl.waiters_.pop_front())
602 {
603 709x w->impl_ = nullptr;
604 709x canceled.push_back(w);
605 709x }
606 707x refresh_cached_nearest();
607 707x }
608
609 707x impl.might_have_pending_waits_ = false;
610
611 707x std::size_t count = 0;
612
2/2
✓ Branch 22 → 23 taken 709 times.
✓ Branch 22 → 26 taken 707 times.
1416x while (auto* w = canceled.pop_front())
613 {
614 709x w->ec_value_ = make_error_code(capy::error::canceled);
615
1/1
✓ Branch 24 → 25 taken 709 times.
709x sched_->post(&w->op_);
616 709x ++count;
617 709x }
618
619 707x return count;
620 }
621
622 inline void
623 18x timer_service::cancel_waiter(waiter_node* w)
624 {
625 {
626
1/1
✓ Branch 2 → 3 taken 18 times.
18x std::lock_guard lock(mutex_);
627 // Already removed by cancel_timer or process_expired
628
1/2
✗ Branch 3 → 4 not taken.
✓ Branch 3 → 5 taken 18 times.
18x if (!w->impl_)
629 return;
630 18x auto* impl = w->impl_;
631 18x w->impl_ = nullptr;
632 18x impl->waiters_.remove(w);
633
2/2
✓ Branch 7 → 8 taken 17 times.
✓ Branch 7 → 10 taken 1 time.
18x if (impl->waiters_.empty())
634 {
635
1/1
✓ Branch 8 → 9 taken 17 times.
17x remove_timer_impl(*impl);
636 17x impl->might_have_pending_waits_ = false;
637 }
638 18x refresh_cached_nearest();
639 18x }
640
641 18x w->ec_value_ = make_error_code(capy::error::canceled);
642 18x sched_->post(&w->op_);
643 }
644
645 inline std::size_t
646 1x timer_service::cancel_one_waiter(implementation& impl)
647 {
648
1/2
✗ Branch 2 → 3 not taken.
✓ Branch 2 → 4 taken 1 time.
1x if (!impl.might_have_pending_waits_)
649 return 0;
650
651 1x waiter_node* w = nullptr;
652
653 {
654
1/1
✓ Branch 4 → 5 taken 1 time.
1x std::lock_guard lock(mutex_);
655 1x w = impl.waiters_.pop_front();
656
1/2
✗ Branch 6 → 7 not taken.
✓ Branch 6 → 8 taken 1 time.
1x if (!w)
657 return 0;
658 1x w->impl_ = nullptr;
659
1/2
✗ Branch 9 → 10 not taken.
✓ Branch 9 → 12 taken 1 time.
1x if (impl.waiters_.empty())
660 {
661 remove_timer_impl(impl);
662 impl.might_have_pending_waits_ = false;
663 }
664 1x refresh_cached_nearest();
665 1x }
666
667 1x w->ec_value_ = make_error_code(capy::error::canceled);
668 1x sched_->post(&w->op_);
669 1x return 1;
670 }
671
672 inline std::size_t
673 974x timer_service::process_expired()
674 {
675 974x intrusive_list<waiter_node> expired;
676
677 {
678
1/1
✓ Branch 2 → 3 taken 974 times.
974x std::lock_guard lock(mutex_);
679 974x auto now = clock_type::now();
680
681
7/7
✓ Branch 14 → 15 taken 1880 times.
✓ Branch 14 → 20 taken 59 times.
✓ Branch 16 → 17 taken 1880 times.
✓ Branch 18 → 19 taken 965 times.
✓ Branch 18 → 20 taken 915 times.
✓ Branch 21 → 5 taken 965 times.
✓ Branch 21 → 22 taken 974 times.
1939x while (!heap_.empty() && heap_[0].time_ <= now)
682 {
683 965x implementation* t = heap_[0].timer_;
684
1/1
✓ Branch 6 → 7 taken 965 times.
965x remove_timer_impl(*t);
685
2/2
✓ Branch 8 → 9 taken 967 times.
✓ Branch 8 → 12 taken 965 times.
1932x while (auto* w = t->waiters_.pop_front())
686 {
687 967x w->impl_ = nullptr;
688 967x w->ec_value_ = {};
689 967x expired.push_back(w);
690 967x }
691 965x t->might_have_pending_waits_ = false;
692 }
693
694 974x refresh_cached_nearest();
695 974x }
696
697 974x std::size_t count = 0;
698
2/2
✓ Branch 26 → 27 taken 967 times.
✓ Branch 26 → 29 taken 974 times.
1941x while (auto* w = expired.pop_front())
699 {
700
1/1
✓ Branch 27 → 28 taken 967 times.
967x sched_->post(&w->op_);
701 967x ++count;
702 967x }
703
704 974x return count;
705 }
706
707 inline void
708 1689x timer_service::remove_timer_impl(implementation& impl)
709 {
710 1689x std::size_t index = impl.heap_index_;
711
1/2
✗ Branch 3 → 4 not taken.
✓ Branch 3 → 5 taken 1689 times.
1689x if (index >= heap_.size())
712 return; // Not in heap
713
714
2/2
✓ Branch 6 → 7 taken 782 times.
✓ Branch 6 → 9 taken 907 times.
1689x if (index == heap_.size() - 1)
715 {
716 // Last element, just pop
717 782x impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
718 782x heap_.pop_back();
719 }
720 else
721 {
722 // Swap with last and reheapify
723 907x swap_heap(index, heap_.size() - 1);
724 907x impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
725 907x heap_.pop_back();
726
727
2/6
✗ Branch 13 → 14 not taken.
✓ Branch 13 → 20 taken 907 times.
✗ Branch 18 → 19 not taken.
✗ Branch 18 → 20 not taken.
✗ Branch 21 → 22 not taken.
✓ Branch 21 → 23 taken 907 times.
907x if (index > 0 && heap_[index].time_ < heap_[(index - 1) / 2].time_)
728 up_heap(index);
729 else
730 907x down_heap(index);
731 }
732 }
733
734 inline void
735 1696x timer_service::up_heap(std::size_t index)
736 {
737
2/2
✓ Branch 11 → 3 taken 912 times.
✓ Branch 11 → 12 taken 1679 times.
2591x while (index > 0)
738 {
739 912x std::size_t parent = (index - 1) / 2;
740
2/2
✓ Branch 7 → 8 taken 17 times.
✓ Branch 7 → 9 taken 895 times.
912x if (!(heap_[index].time_ < heap_[parent].time_))
741 17x break;
742 895x swap_heap(index, parent);
743 895x index = parent;
744 }
745 1696x }
746
747 inline void
748 907x timer_service::down_heap(std::size_t index)
749 {
750 907x std::size_t child = index * 2 + 1;
751
2/2
✓ Branch 21 → 3 taken 3 times.
✓ Branch 21 → 22 taken 904 times.
907x while (child < heap_.size())
752 {
753 3x std::size_t min_child = (child + 1 == heap_.size() ||
754 heap_[child].time_ < heap_[child + 1].time_)
755
1/2
✗ Branch 4 → 5 not taken.
✓ Branch 4 → 11 taken 3 times.
3x ? child
756 3x : child + 1;
757
758
1/2
✓ Branch 16 → 17 taken 3 times.
✗ Branch 16 → 18 not taken.
3x if (heap_[index].time_ < heap_[min_child].time_)
759 3x break;
760
761 swap_heap(index, min_child);
762 index = min_child;
763 child = index * 2 + 1;
764 }
765 907x }
766
767 inline void
768 1802x timer_service::swap_heap(std::size_t i1, std::size_t i2)
769 {
770 1802x heap_entry tmp = heap_[i1];
771 1802x heap_[i1] = heap_[i2];
772 1802x heap_[i2] = tmp;
773 1802x heap_[i1].timer_->heap_index_ = i1;
774 1802x heap_[i2].timer_->heap_index_ = i2;
775 1802x }
776
777 // waiter_node out-of-class member function definitions
778
779 inline void
780 18x waiter_node::canceller::operator()() const
781 {
782 18x waiter_->svc_->cancel_waiter(waiter_);
783 18x }
784
785 inline void
786 1696x waiter_node::completion_op::do_complete(
787 [[maybe_unused]] void* owner,
788 scheduler_op* base,
789 std::uint32_t,
790 std::uint32_t)
791 {
792 // owner is always non-null here. The destroy path (owner == nullptr)
793 // is unreachable because completion_op overrides destroy() directly,
794 // bypassing scheduler_op::destroy() which would call func_(nullptr, ...).
795
1/2
✗ Branch 2 → 3 not taken.
✓ Branch 2 → 4 taken 1696 times.
1696x BOOST_COROSIO_ASSERT(owner);
796 1696x static_cast<completion_op*>(base)->operator()();
797 1696x }
798
799 inline void
800 1696x waiter_node::completion_op::operator()()
801 {
802 1696x auto* w = waiter_;
803 1696x w->stop_cb_.reset();
804
1/2
✓ Branch 3 → 4 taken 1696 times.
✗ Branch 3 → 5 not taken.
1696x if (w->ec_out_)
805 1696x *w->ec_out_ = w->ec_value_;
806
807 1696x auto* cont = w->cont_;
808 1696x auto d = w->d_;
809 1696x auto* svc = w->svc_;
810 1696x auto& sched = svc->get_scheduler();
811
812
1/1
✓ Branch 6 → 7 taken 1696 times.
1696x svc->destroy_waiter(w);
813
814
1/1
✓ Branch 7 → 8 taken 1696 times.
1696x d.post(*cont);
815 1696x sched.work_finished();
816 1696x }
817
818 // GCC 14 false-positive: inlining ~optional<stop_callback> through
819 // delete loses track that stop_cb_ was already .reset() above.
820 #if defined(__GNUC__) && !defined(__clang__)
821 #pragma GCC diagnostic push
822 #pragma GCC diagnostic ignored "-Wmaybe-uninitialized"
823 #endif
824 inline void
825 4x waiter_node::completion_op::destroy()
826 {
827 // Called during scheduler shutdown drain when this completion_op is
828 // in the scheduler's ready queue (posted by cancel_timer() or
829 // process_expired()). Balances the work_started() from
830 // implementation::wait(). The scheduler drain loop separately
831 // balances the work_started() from post(). On IOCP both decrements
832 // are required for outstanding_work_ to reach zero; on other
833 // backends this is harmless.
834 //
835 // This override also prevents scheduler_op::destroy() from calling
836 // do_complete(nullptr, ...). See also: timer_service::shutdown()
837 // which drains waiters still in the timer heap (the other path).
838 4x auto* w = waiter_;
839 4x w->stop_cb_.reset();
840 4x auto h = std::exchange(w->h_, {});
841 4x auto& sched = w->svc_->get_scheduler();
842
1/2
✓ Branch 6 → 7 taken 4 times.
✗ Branch 6 → 9 not taken.
4x delete w;
843 4x sched.work_finished();
844
1/2
✓ Branch 11 → 12 taken 4 times.
✗ Branch 11 → 13 not taken.
4x if (h)
845
1/1
✓ Branch 12 → 13 taken 4 times.
4x h.destroy();
846 4x }
847 #if defined(__GNUC__) && !defined(__clang__)
848 #pragma GCC diagnostic pop
849 #endif
850
851 inline std::coroutine_handle<>
852 1987x timer_service::implementation::wait(
853 std::coroutine_handle<> h,
854 capy::executor_ref d,
855 std::stop_token token,
856 std::error_code* ec,
857 capy::continuation* cont)
858 {
859 // Already-expired fast path — no waiter_node, no mutex.
860 // Post instead of dispatch so the coroutine yields to the
861 // scheduler, allowing other queued work to run.
862
2/2
✓ Branch 3 → 4 taken 1976 times.
✓ Branch 3 → 21 taken 11 times.
1987x if (heap_index_ == (std::numeric_limits<std::size_t>::max)())
863 {
864
7/8
✓ Branch 5 → 6 taken 1976 times.
✓ Branch 6 → 7 taken 1976 times.
✗ Branch 6 → 11 not taken.
✓ Branch 8 → 9 taken 1976 times.
✓ Branch 10 → 11 taken 283 times.
✓ Branch 10 → 12 taken 1693 times.
✓ Branch 13 → 14 taken 283 times.
✓ Branch 13 → 21 taken 1693 times.
1976x if (expiry_ == (time_point::min)() || expiry_ <= clock_type::now())
865 {
866
1/2
✓ Branch 14 → 15 taken 283 times.
✗ Branch 14 → 17 not taken.
283x if (ec)
867 283x *ec = {};
868 283x d.post(*cont);
869 283x return std::noop_coroutine();
870 }
871 }
872
873 1704x auto* w = svc_->create_waiter();
874 1704x w->impl_ = this;
875 1704x w->svc_ = svc_;
876 1704x w->h_ = h;
877 1704x w->cont_ = cont;
878 1704x w->d_ = d;
879 1704x w->token_ = std::move(token);
880 1704x w->ec_out_ = ec;
881
882 1704x svc_->insert_waiter(*this, w);
883 1704x might_have_pending_waits_ = true;
884 1704x svc_->get_scheduler().work_started();
885
886
2/2
✓ Branch 28 → 29 taken 30 times.
✓ Branch 28 → 31 taken 1674 times.
1704x if (w->token_.stop_possible())
887 30x w->stop_cb_.emplace(w->token_, waiter_node::canceller{w});
888
889 1704x return std::noop_coroutine();
890 }
891
892 // Free functions
893
894 struct timer_service_access
895 {
896 2136x static timer_service& get_timer(io_context& ctx) noexcept
897 {
898 2136x return *ctx.timer_svc_;
899 }
900
901 431x static void set_timer(io_context& ctx, timer_service& svc) noexcept
902 {
903 431x ctx.timer_svc_ = &svc;
904 431x }
905 };
906
907 // Bypass find_service() mutex by reading io_context's cached pointer
908 inline io_object::io_service&
909 2136x timer_service_direct(capy::execution_context& ctx) noexcept
910 {
911 2136x return timer_service_access::get_timer(static_cast<io_context&>(ctx));
912 }
913
914 inline std::size_t
915 3x timer_service_update_expiry(timer::implementation& base)
916 {
917 3x auto& impl = static_cast<timer_service::implementation&>(base);
918 3x return impl.svc_->update_timer(impl, impl.expiry_);
919 }
920
921 inline std::size_t
922 699x timer_service_cancel(timer::implementation& base) noexcept
923 {
924 699x auto& impl = static_cast<timer_service::implementation&>(base);
925 699x return impl.svc_->cancel_timer(impl);
926 }
927
928 inline std::size_t
929 1x timer_service_cancel_one(timer::implementation& base) noexcept
930 {
931 1x auto& impl = static_cast<timer_service::implementation&>(base);
932 1x return impl.svc_->cancel_one_waiter(impl);
933 }
934
935 inline timer_service&
936 431x get_timer_service(capy::execution_context& ctx, scheduler& sched)
937 {
938 431x auto& svc = ctx.make_service<timer_service>(sched);
939 431x timer_service_access::set_timer(static_cast<io_context&>(ctx), svc);
940 431x return svc;
941 }
942
943 } // namespace boost::corosio::detail
944
945 #endif
946