include/boost/corosio/detail/timer_service.hpp

91.0% Lines (363/399) 97.9% List of functions (46/48) 65.6% Branches (118/180)
f(x) Functions (48)
Function Calls Lines Branches Blocks
<unknown function 92> :92 boost::corosio::detail::timer_service::callback::callback() :97 1212x 100.0% 100.0% boost::corosio::detail::timer_service::callback::callback(void*, void (*)(void*)) :100 1212x 100.0% 100.0% boost::corosio::detail::timer_service::callback::operator()() const :109 7463x 100.0% 50.0% 100.0% boost::corosio::detail::timer_service::timer_service(boost::capy::execution_context&, boost::corosio::detail::scheduler&) :138 1212x 100.0% 100.0% boost::corosio::detail::timer_service::get_scheduler() :144 15018x 100.0% 100.0% boost::corosio::detail::timer_service::~timer_service() :150 1818x 100.0% 100.0% boost::corosio::detail::timer_service::set_on_earliest_changed(boost::corosio::detail::timer_service::callback) :156 606x 100.0% 100.0% boost::corosio::detail::timer_service::nearest_expiry() const :169 686557x 100.0% 50.0% 75.0% boost::corosio::detail::timer_service::refresh_cached_nearest() :212 765759x 100.0% 100.0% 100.0% boost::corosio::detail::waiter_node::~waiter_node() :225 494x 100.0% 100.0% boost::corosio::detail::waiter_node::completion_op::~completion_op() :229 494x 100.0% 100.0% boost::corosio::detail::waiter_node::completion_op::completion_op() :236 494x 100.0% 100.0% boost::corosio::detail::waiter_node::waiter_node() :261 494x 100.0% 100.0% boost::corosio::detail::timer_service::implementation::~implementation() :267 596x 100.0% 100.0% boost::corosio::detail::try_pop_tl_cache(boost::corosio::detail::timer_service*) :297 7556x 80.0% 50.0% 62.0% boost::corosio::detail::try_push_tl_cache(boost::corosio::detail::timer_service::implementation*) :312 7548x 100.0% 100.0% 100.0% boost::corosio::detail::try_pop_waiter_tl_cache() :323 7513x 100.0% 100.0% 100.0% boost::corosio::detail::try_push_waiter_tl_cache(boost::corosio::detail::waiter_node*) :335 7497x 100.0% 100.0% 100.0% boost::corosio::detail::timer_service_invalidate_cache() :346 606x 100.0% 100.0% 100.0% boost::corosio::detail::timer_service::implementation::implementation(boost::corosio::detail::timer_service&) :357 596x 100.0% 50.0% 100.0% boost::corosio::detail::timer_service::shutdown() :364 606x 100.0% 69.2% 93.0% boost::corosio::detail::timer_service::construct() :421 7556x 63.2% 66.7% 70.0% boost::corosio::detail::timer_service::destroy(boost::corosio::io_object::implementation*) :450 7554x 100.0% 100.0% boost::corosio::detail::timer_service::destroy_impl(boost::corosio::detail::timer_service::implementation&) :456 7554x 73.3% 62.5% 63.0% boost::corosio::detail::timer_service::create_waiter() :483 7513x 100.0% 83.3% 80.0% boost::corosio::detail::timer_service::destroy_waiter(boost::corosio::detail::waiter_node*) :501 7497x 100.0% 100.0% 100.0% boost::corosio::detail::timer_service::update_timer(boost::corosio::detail::timer_service::implementation&, std::__1::chrono::time_point<std::__1::chrono::steady_clock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l>>>) :512 6x 93.3% 50.0% 73.0% boost::corosio::detail::timer_service::insert_waiter(boost::corosio::detail::timer_service::implementation&, boost::corosio::detail::waiter_node*) :562 7513x 100.0% 75.0% 77.0% boost::corosio::detail::timer_service::cancel_timer(boost::corosio::detail::timer_service::implementation&) :582 8248x 87.0% 66.7% 75.0% boost::corosio::detail::timer_service::cancel_waiter(boost::corosio::detail::waiter_node*) :622 36x 93.8% 66.7% 66.0% boost::corosio::detail::timer_service::cancel_one_waiter(boost::corosio::detail::timer_service::implementation&) :645 2x 73.7% 37.5% 50.0% boost::corosio::detail::timer_service::process_expired() :672 757508x 95.0% 90.0% 86.0% boost::corosio::detail::timer_service::remove_timer_impl(boost::corosio::detail::timer_service::implementation&) :707 7483x 86.7% 50.0% 70.0% boost::corosio::detail::timer_service::up_heap(unsigned long) :734 7497x 100.0% 100.0% 100.0% boost::corosio::detail::timer_service::down_heap(unsigned long) :747 6621x 61.5% 50.0% 70.0% boost::corosio::detail::timer_service::swap_heap(unsigned long, unsigned long) :767 13218x 100.0% 100.0% boost::corosio::detail::waiter_node::canceller::operator()() const :779 36x 100.0% 100.0% boost::corosio::detail::waiter_node::completion_op::do_complete(void*, boost::corosio::detail::scheduler_op*, unsigned int, unsigned int) :785 0 0.0% 0.0% 0.0% boost::corosio::detail::waiter_node::completion_op::operator()() :799 7497x 100.0% 50.0% 100.0% boost::corosio::detail::waiter_node::completion_op::destroy() :824 8x 100.0% 50.0% 100.0% boost::corosio::detail::timer_service::implementation::wait(std::__1::coroutine_handle<void>, boost::capy::executor_ref, std::__1::stop_token, std::__1::error_code*) :851 7530x 100.0% 80.0% 100.0% boost::corosio::detail::timer_service_access::get_scheduler(boost::corosio::io_context&) :893 7556x 100.0% 100.0% boost::corosio::detail::timer_service_direct(boost::capy::execution_context&) :901 7556x 100.0% 100.0% boost::corosio::detail::timer_service_update_expiry(boost::corosio::io_timer::implementation&) :908 6x 100.0% 100.0% boost::corosio::detail::timer_service_cancel(boost::corosio::io_timer::implementation&) :915 700x 100.0% 50.0% 66.0% boost::corosio::detail::timer_service_cancel_one(boost::corosio::io_timer::implementation&) :922 2x 100.0% 50.0% 66.0% boost::corosio::detail::get_timer_service(boost::capy::execution_context&, boost::corosio::detail::scheduler&) :929 606x 100.0% 100.0%
Line Branch TLA Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco ([email protected])
3 // Copyright (c) 2026 Steve Gerbino
4 //
5 // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 //
8 // Official repository: https://github.com/cppalliance/corosio
9 //
10
11 #ifndef BOOST_COROSIO_DETAIL_TIMER_SERVICE_HPP
12 #define BOOST_COROSIO_DETAIL_TIMER_SERVICE_HPP
13
14 #include <boost/corosio/timer.hpp>
15 #include <boost/corosio/io_context.hpp>
16 #include <boost/corosio/detail/scheduler_op.hpp>
17 #include <boost/corosio/native/native_scheduler.hpp>
18 #include <boost/corosio/detail/intrusive.hpp>
19 #include <boost/corosio/detail/thread_local_ptr.hpp>
20 #include <boost/capy/error.hpp>
21 #include <boost/capy/ex/execution_context.hpp>
22 #include <boost/capy/ex/executor_ref.hpp>
23 #include <system_error>
24
25 #include <atomic>
26 #include <chrono>
27 #include <coroutine>
28 #include <cstddef>
29 #include <limits>
30 #include <mutex>
31 #include <optional>
32 #include <stop_token>
33 #include <utility>
34 #include <vector>
35
36 namespace boost::corosio::detail {
37
38 struct scheduler;
39
40 /*
41 Timer Service
42 =============
43
44 Data Structures
45 ---------------
46 waiter_node holds per-waiter state: coroutine handle, executor,
47 error output, stop_token, embedded completion_op. Each concurrent
48 co_await t.wait() allocates one waiter_node.
49
50 timer_service::implementation holds per-timer state: expiry,
51 heap index, and an intrusive_list of waiter_nodes. Multiple
52 coroutines can wait on the same timer simultaneously.
53
54 timer_service owns a min-heap of active timers, a free list
55 of recycled impls, and a free list of recycled waiter_nodes. The
56 heap is ordered by expiry time; the scheduler queries
57 nearest_expiry() to set the epoll/timerfd timeout.
58
59 Optimization Strategy
60 ---------------------
61 1. Deferred heap insertion — expires_after() stores the expiry
62 but does not insert into the heap. Insertion happens in wait().
63 2. Thread-local impl cache — single-slot per-thread cache.
64 3. Embedded completion_op — eliminates heap allocation per fire/cancel.
65 4. Cached nearest expiry — atomic avoids mutex in nearest_expiry().
66 5. might_have_pending_waits_ flag — skips lock when no wait issued.
67 6. Thread-local waiter cache — single-slot per-thread cache.
68
69 Concurrency
70 -----------
71 stop_token callbacks can fire from any thread. The impl_
72 pointer on waiter_node is used as a "still in list" marker.
73 */
74
75 struct BOOST_COROSIO_SYMBOL_VISIBLE waiter_node;
76
77 inline void timer_service_invalidate_cache() noexcept;
78
79 // timer_service class body — member function definitions are
80 // out-of-class (after implementation and waiter_node are complete)
81 class BOOST_COROSIO_DECL timer_service final
82 : public capy::execution_context::service
83 , public io_object::io_service
84 {
85 public:
86 using clock_type = std::chrono::steady_clock;
87 using time_point = clock_type::time_point;
88
89 /// Type-erased callback for earliest-expiry-changed notifications.
90 class callback
91 {
92 606x void* ctx_ = nullptr;
93 606x void (*fn_)(void*) = nullptr;
94
95 public:
96 /// Construct an empty callback.
97 1818x callback() = default;
98
99 /// Construct a callback with the given context and function.
100 1212x callback(void* ctx, void (*fn)(void*)) noexcept : ctx_(ctx), fn_(fn) {}
101
102 /// Return true if the callback is non-empty.
103 explicit operator bool() const noexcept
104 {
105 return fn_ != nullptr;
106 }
107
108 /// Invoke the callback.
109 7463x void operator()() const
110 {
111
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7463 times.
7463x if (fn_)
112 7463x fn_(ctx_);
113 7463x }
114 };
115
116 struct implementation;
117
118 private:
119 struct heap_entry
120 {
121 time_point time_;
122 implementation* timer_;
123 };
124
125 scheduler* sched_ = nullptr;
126 mutable std::mutex mutex_;
127 std::vector<heap_entry> heap_;
128 606x implementation* free_list_ = nullptr;
129 606x waiter_node* waiter_free_list_ = nullptr;
130 callback on_earliest_changed_;
131 606x bool shutting_down_ = false;
132 // Avoids mutex in nearest_expiry() and empty()
133 1212x mutable std::atomic<std::int64_t> cached_nearest_ns_{
134 606x (std::numeric_limits<std::int64_t>::max)()};
135
136 public:
137 /// Construct the timer service bound to a scheduler.
138 3030x inline timer_service(capy::execution_context&, scheduler& sched)
139 606x : sched_(&sched)
140 1818x {
141 1212x }
142
143 /// Return the associated scheduler.
144 15018x inline scheduler& get_scheduler() noexcept
145 {
146 15018x return *sched_;
147 }
148
149 /// Destroy the timer service.
150 1818x ~timer_service() override = default;
151
152 timer_service(timer_service const&) = delete;
153 timer_service& operator=(timer_service const&) = delete;
154
155 /// Register a callback invoked when the earliest expiry changes.
156 606x inline void set_on_earliest_changed(callback cb)
157 {
158 606x on_earliest_changed_ = cb;
159 606x }
160
161 /// Return true if no timers are in the heap.
162 inline bool empty() const noexcept
163 {
164 return cached_nearest_ns_.load(std::memory_order_acquire) ==
165 (std::numeric_limits<std::int64_t>::max)();
166 }
167
168 /// Return the nearest timer expiry without acquiring the mutex.
169 686557x inline time_point nearest_expiry() const noexcept
170 {
171 686557x auto ns = cached_nearest_ns_.load(std::memory_order_acquire);
172
2/4
✓ Branch 0 taken 686557 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 686557 times.
✗ Branch 3 not taken.
686557x return time_point(time_point::duration(ns));
173 }
174
175 /// Cancel all pending timers and free cached resources.
176 inline void shutdown() override;
177
178 /// Construct a new timer implementation.
179 inline io_object::implementation* construct() override;
180
181 /// Destroy a timer implementation, cancelling pending waiters.
182 inline void destroy(io_object::implementation* p) override;
183
184 /// Cancel and recycle a timer implementation.
185 inline void destroy_impl(implementation& impl);
186
187 /// Create or recycle a waiter node.
188 inline waiter_node* create_waiter();
189
190 /// Return a waiter node to the cache or free list.
191 inline void destroy_waiter(waiter_node* w);
192
193 /// Update the timer expiry, cancelling existing waiters.
194 inline std::size_t update_timer(implementation& impl, time_point new_time);
195
196 /// Insert a waiter into the timer's waiter list and the heap.
197 inline void insert_waiter(implementation& impl, waiter_node* w);
198
199 /// Cancel all waiters on a timer.
200 inline std::size_t cancel_timer(implementation& impl);
201
202 /// Cancel a single waiter ( stop_token callback path ).
203 inline void cancel_waiter(waiter_node* w);
204
205 /// Cancel one waiter on a timer.
206 inline std::size_t cancel_one_waiter(implementation& impl);
207
208 /// Complete all waiters whose timers have expired.
209 inline std::size_t process_expired();
210
211 private:
212 765759x inline void refresh_cached_nearest() noexcept
213 {
214
2/2
✓ Branch 0 taken 8559 times.
✓ Branch 1 taken 757200 times.
765759x auto ns = heap_.empty() ? (std::numeric_limits<std::int64_t>::max)()
215 757200x : heap_[0].time_.time_since_epoch().count();
216 765759x cached_nearest_ns_.store(ns, std::memory_order_release);
217 765759x }
218
219 inline void remove_timer_impl(implementation& impl);
220 inline void up_heap(std::size_t index);
221 inline void down_heap(std::size_t index);
222 inline void swap_heap(std::size_t i1, std::size_t i2);
223 };
224
225 struct BOOST_COROSIO_SYMBOL_VISIBLE waiter_node
226 : intrusive_list<waiter_node>::node
227 {
228 // Embedded completion op — avoids heap allocation per fire/cancel
229 struct completion_op final : scheduler_op
230 {
231 247x waiter_node* waiter_ = nullptr;
232
233 static void do_complete(
234 void* owner, scheduler_op* base, std::uint32_t, std::uint32_t);
235
236 741x completion_op() noexcept : scheduler_op(&do_complete) {}
237
238 void operator()() override;
239 void destroy() override;
240 };
241
242 // Per-waiter stop_token cancellation
243 struct canceller
244 {
245 waiter_node* waiter_;
246 void operator()() const;
247 };
248
249 // nullptr once removed from timer's waiter list (concurrency marker)
250 247x timer_service::implementation* impl_ = nullptr;
251 247x timer_service* svc_ = nullptr;
252 std::coroutine_handle<> h_;
253 capy::executor_ref d_;
254 247x std::error_code* ec_out_ = nullptr;
255 std::stop_token token_;
256 std::optional<std::stop_callback<canceller>> stop_cb_;
257 completion_op op_;
258 std::error_code ec_value_;
259 247x waiter_node* next_free_ = nullptr;
260
261 988x waiter_node() noexcept
262 247x {
263 247x op_.waiter_ = this;
264 494x }
265 };
266
267 struct timer_service::implementation final : timer::implementation
268 {
269 using clock_type = std::chrono::steady_clock;
270 using time_point = clock_type::time_point;
271 using duration = clock_type::duration;
272
273 timer_service* svc_ = nullptr;
274 intrusive_list<waiter_node> waiters_;
275
276 // Free list linkage (reused when impl is on free_list)
277 298x implementation* next_free_ = nullptr;
278
279 inline explicit implementation(timer_service& svc) noexcept;
280
281 inline std::coroutine_handle<> wait(
282 std::coroutine_handle<>,
283 capy::executor_ref,
284 std::stop_token,
285 std::error_code*) override;
286 };
287
288 // Thread-local caches avoid hot-path mutex acquisitions:
289 // 1. Impl cache — single-slot, validated by comparing svc_
290 // 2. Waiter cache — single-slot, no service affinity
291 // All caches are cleared by timer_service_invalidate_cache() during shutdown.
292
293 inline thread_local_ptr<timer_service::implementation> tl_cached_impl;
294 inline thread_local_ptr<waiter_node> tl_cached_waiter;
295
296 inline timer_service::implementation*
297 7556x try_pop_tl_cache(timer_service* svc) noexcept
298 {
299 7556x auto* impl = tl_cached_impl.get();
300
2/2
✓ Branch 0 taken 298 times.
✓ Branch 1 taken 7258 times.
7556x if (impl)
301 {
302 7258x tl_cached_impl.set(nullptr);
303
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7258 times.
7258x if (impl->svc_ == svc)
304 7258x return impl;
305 // Stale impl from a destroyed service
306 delete impl;
307 }
308 298x return nullptr;
309 7556x }
310
311 inline bool
312 7548x try_push_tl_cache(timer_service::implementation* impl) noexcept
313 {
314
2/2
✓ Branch 0 taken 90 times.
✓ Branch 1 taken 7458 times.
7548x if (!tl_cached_impl.get())
315 {
316 7458x tl_cached_impl.set(impl);
317 7458x return true;
318 }
319 90x return false;
320 7548x }
321
322 inline waiter_node*
323 7513x try_pop_waiter_tl_cache() noexcept
324 {
325 7513x auto* w = tl_cached_waiter.get();
326
2/2
✓ Branch 0 taken 7264 times.
✓ Branch 1 taken 249 times.
7513x if (w)
327 {
328 7264x tl_cached_waiter.set(nullptr);
329 7264x return w;
330 }
331 249x return nullptr;
332 7513x }
333
334 inline bool
335 7497x try_push_waiter_tl_cache(waiter_node* w) noexcept
336 {
337
2/2
✓ Branch 0 taken 86 times.
✓ Branch 1 taken 7411 times.
7497x if (!tl_cached_waiter.get())
338 {
339 7411x tl_cached_waiter.set(w);
340 7411x return true;
341 }
342 86x return false;
343 7497x }
344
345 inline void
346 606x timer_service_invalidate_cache() noexcept
347 {
348
2/2
✓ Branch 0 taken 406 times.
✓ Branch 1 taken 200 times.
606x delete tl_cached_impl.get();
349 606x tl_cached_impl.set(nullptr);
350
351
2/2
✓ Branch 0 taken 459 times.
✓ Branch 1 taken 147 times.
606x delete tl_cached_waiter.get();
352 606x tl_cached_waiter.set(nullptr);
353 606x }
354
355 // timer_service out-of-class member function definitions
356
357
1/2
✓ Branch 0 taken 298 times.
✗ Branch 1 not taken.
596x inline timer_service::implementation::implementation(
358 timer_service& svc) noexcept
359 298x : svc_(&svc)
360 596x {
361 596x }
362
363 inline void
364 606x timer_service::shutdown()
365 {
366 606x timer_service_invalidate_cache();
367 606x shutting_down_ = true;
368
369 // Snapshot impls and detach them from the heap so that
370 // coroutine-owned timer destructors (triggered by h.destroy()
371 // below) cannot re-enter remove_timer_impl() and mutate the
372 // vector during iteration.
373 606x std::vector<implementation*> impls;
374
1/2
✓ Branch 0 taken 606 times.
✗ Branch 1 not taken.
606x impls.reserve(heap_.size());
375
2/2
✓ Branch 0 taken 606 times.
✓ Branch 1 taken 8 times.
614x for (auto& entry : heap_)
376 {
377 8x entry.timer_->heap_index_ = (std::numeric_limits<std::size_t>::max)();
378
1/2
✓ Branch 0 taken 8 times.
✗ Branch 1 not taken.
8x impls.push_back(entry.timer_);
379 }
380 606x heap_.clear();
381 1212x cached_nearest_ns_.store(
382 606x (std::numeric_limits<std::int64_t>::max)(), std::memory_order_release);
383
384 // Cancel waiting timers. Each waiter called work_started()
385 // in implementation::wait(). On IOCP the scheduler shutdown
386 // loop exits when outstanding_work_ reaches zero, so we must
387 // call work_finished() here to balance it. On other backends
388 // this is harmless.
389
2/2
✓ Branch 0 taken 606 times.
✓ Branch 1 taken 8 times.
614x for (auto* impl : impls)
390 {
391
2/2
✓ Branch 0 taken 8 times.
✓ Branch 1 taken 8 times.
16x while (auto* w = impl->waiters_.pop_front())
392 {
393 8x w->stop_cb_.reset();
394 8x auto h = std::exchange(w->h_, {});
395 8x sched_->work_finished();
396
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8 times.
8x if (h)
397
1/2
✓ Branch 0 taken 8 times.
✗ Branch 1 not taken.
8x h.destroy();
398
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8 times.
8x delete w;
399 }
400
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8 times.
8x delete impl;
401 }
402
403 // Delete free-listed impls
404
2/2
✓ Branch 0 taken 90 times.
✓ Branch 1 taken 606 times.
696x while (free_list_)
405 {
406 90x auto* next = free_list_->next_free_;
407
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 90 times.
90x delete free_list_;
408 90x free_list_ = next;
409 }
410
411 // Delete free-listed waiters
412
2/2
✓ Branch 0 taken 84 times.
✓ Branch 1 taken 606 times.
690x while (waiter_free_list_)
413 {
414 84x auto* next = waiter_free_list_->next_free_;
415
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 84 times.
84x delete waiter_free_list_;
416 84x waiter_free_list_ = next;
417 }
418 606x }
419
420 inline io_object::implementation*
421 7556x timer_service::construct()
422 {
423 7556x implementation* impl = try_pop_tl_cache(this);
424
2/2
✓ Branch 0 taken 7258 times.
✓ Branch 1 taken 298 times.
7556x if (impl)
425 {
426 7258x impl->svc_ = this;
427 7258x impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
428 7258x impl->might_have_pending_waits_ = false;
429 7258x return impl;
430 }
431
432 298x std::lock_guard lock(mutex_);
433
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 298 times.
298x if (free_list_)
434 {
435 impl = free_list_;
436 free_list_ = impl->next_free_;
437 impl->next_free_ = nullptr;
438 impl->svc_ = this;
439 impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
440 impl->might_have_pending_waits_ = false;
441 }
442 else
443 {
444
1/2
✓ Branch 0 taken 298 times.
✗ Branch 1 not taken.
298x impl = new implementation(*this);
445 }
446 298x return impl;
447 7556x }
448
449 inline void
450 7554x timer_service::destroy(io_object::implementation* p)
451 {
452 7554x destroy_impl(static_cast<implementation&>(*p));
453 7554x }
454
455 inline void
456 7554x timer_service::destroy_impl(implementation& impl)
457 {
458 // During shutdown the impl is owned by the shutdown loop.
459 // Re-entering here (from a coroutine-owned timer destructor
460 // triggered by h.destroy()) must not modify the heap or
461 // recycle the impl — shutdown deletes it directly.
462
2/2
✓ Branch 0 taken 6 times.
✓ Branch 1 taken 7548 times.
7554x if (shutting_down_)
463 6x return;
464
465 7548x cancel_timer(impl);
466
467
1/2
✓ Branch 0 taken 7548 times.
✗ Branch 1 not taken.
7548x if (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)())
468 {
469 std::lock_guard lock(mutex_);
470 remove_timer_impl(impl);
471 refresh_cached_nearest();
472 }
473
474
2/2
✓ Branch 0 taken 7458 times.
✓ Branch 1 taken 90 times.
7548x if (try_push_tl_cache(&impl))
475 7458x return;
476
477 90x std::lock_guard lock(mutex_);
478 90x impl.next_free_ = free_list_;
479 90x free_list_ = &impl;
480 7554x }
481
482 inline waiter_node*
483 7513x timer_service::create_waiter()
484 {
485
2/2
✓ Branch 0 taken 7264 times.
✓ Branch 1 taken 249 times.
7513x if (auto* w = try_pop_waiter_tl_cache())
486 7264x return w;
487
488 249x std::lock_guard lock(mutex_);
489
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 247 times.
249x if (waiter_free_list_)
490 {
491 2x auto* w = waiter_free_list_;
492 2x waiter_free_list_ = w->next_free_;
493 2x w->next_free_ = nullptr;
494 2x return w;
495 }
496
497
1/2
✓ Branch 0 taken 247 times.
✗ Branch 1 not taken.
247x return new waiter_node();
498 7513x }
499
500 inline void
501 7497x timer_service::destroy_waiter(waiter_node* w)
502 {
503
2/2
✓ Branch 0 taken 7411 times.
✓ Branch 1 taken 86 times.
7497x if (try_push_waiter_tl_cache(w))
504 7411x return;
505
506 86x std::lock_guard lock(mutex_);
507 86x w->next_free_ = waiter_free_list_;
508 86x waiter_free_list_ = w;
509 7497x }
510
511 inline std::size_t
512 6x timer_service::update_timer(implementation& impl, time_point new_time)
513 {
514 6x bool in_heap =
515 6x (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)());
516
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
6x if (!in_heap && impl.waiters_.empty())
517 return 0;
518
519 6x bool notify = false;
520 6x intrusive_list<waiter_node> canceled;
521
522 {
523 6x std::lock_guard lock(mutex_);
524
525
2/2
✓ Branch 0 taken 10 times.
✓ Branch 1 taken 6 times.
16x while (auto* w = impl.waiters_.pop_front())
526 {
527 10x w->impl_ = nullptr;
528 10x canceled.push_back(w);
529 }
530
531
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
6x if (impl.heap_index_ < heap_.size())
532 {
533 6x time_point old_time = heap_[impl.heap_index_].time_;
534 6x heap_[impl.heap_index_].time_ = new_time;
535
536
2/4
✓ Branch 0 taken 6 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 6 times.
✗ Branch 3 not taken.
6x if (new_time < old_time)
537
1/2
✓ Branch 0 taken 6 times.
✗ Branch 1 not taken.
6x up_heap(impl.heap_index_);
538 else
539 down_heap(impl.heap_index_);
540
541 6x notify = (impl.heap_index_ == 0);
542 6x }
543
544 6x refresh_cached_nearest();
545 6x }
546
547 6x std::size_t count = 0;
548
2/2
✓ Branch 0 taken 10 times.
✓ Branch 1 taken 6 times.
16x while (auto* w = canceled.pop_front())
549 {
550 10x w->ec_value_ = make_error_code(capy::error::canceled);
551 10x sched_->post(&w->op_);
552 10x ++count;
553 }
554
555
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
6x if (notify)
556 6x on_earliest_changed_();
557
558 6x return count;
559 6x }
560
561 inline void
562 7513x timer_service::insert_waiter(implementation& impl, waiter_node* w)
563 {
564 7513x bool notify = false;
565 {
566 7513x std::lock_guard lock(mutex_);
567
2/2
✓ Branch 0 taken 22 times.
✓ Branch 1 taken 7491 times.
7513x if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)())
568 {
569 7491x impl.heap_index_ = heap_.size();
570
1/2
✓ Branch 0 taken 7491 times.
✗ Branch 1 not taken.
7491x heap_.push_back({impl.expiry_, &impl});
571
1/2
✓ Branch 0 taken 7491 times.
✗ Branch 1 not taken.
7491x up_heap(heap_.size() - 1);
572 7491x notify = (impl.heap_index_ == 0);
573 7491x refresh_cached_nearest();
574 7491x }
575 7513x impl.waiters_.push_back(w);
576 7513x }
577
2/2
✓ Branch 0 taken 56 times.
✓ Branch 1 taken 7457 times.
7513x if (notify)
578 7457x on_earliest_changed_();
579 7513x }
580
581 inline std::size_t
582 8248x timer_service::cancel_timer(implementation& impl)
583 {
584
2/2
✓ Branch 0 taken 7532 times.
✓ Branch 1 taken 716 times.
8248x if (!impl.might_have_pending_waits_)
585 7532x return 0;
586
587 // Not in heap and no waiters — just clear the flag
588
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 716 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
716x if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)() &&
589 impl.waiters_.empty())
590 {
591 impl.might_have_pending_waits_ = false;
592 return 0;
593 }
594
595 716x intrusive_list<waiter_node> canceled;
596
597 {
598 716x std::lock_guard lock(mutex_);
599
1/2
✓ Branch 0 taken 716 times.
✗ Branch 1 not taken.
716x remove_timer_impl(impl);
600
2/2
✓ Branch 0 taken 720 times.
✓ Branch 1 taken 716 times.
1436x while (auto* w = impl.waiters_.pop_front())
601 {
602 720x w->impl_ = nullptr;
603 720x canceled.push_back(w);
604 }
605 716x refresh_cached_nearest();
606 716x }
607
608 716x impl.might_have_pending_waits_ = false;
609
610 716x std::size_t count = 0;
611
2/2
✓ Branch 0 taken 720 times.
✓ Branch 1 taken 716 times.
1436x while (auto* w = canceled.pop_front())
612 {
613 720x w->ec_value_ = make_error_code(capy::error::canceled);
614 720x sched_->post(&w->op_);
615 720x ++count;
616 }
617
618 716x return count;
619 8248x }
620
621 inline void
622 36x timer_service::cancel_waiter(waiter_node* w)
623 {
624 {
625 36x std::lock_guard lock(mutex_);
626 // Already removed by cancel_timer or process_expired
627
1/2
✓ Branch 0 taken 36 times.
✗ Branch 1 not taken.
36x if (!w->impl_)
628 return;
629 36x auto* impl = w->impl_;
630 36x w->impl_ = nullptr;
631 36x impl->waiters_.remove(w);
632
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 34 times.
36x if (impl->waiters_.empty())
633 {
634
1/2
✓ Branch 0 taken 34 times.
✗ Branch 1 not taken.
34x remove_timer_impl(*impl);
635 34x impl->might_have_pending_waits_ = false;
636 34x }
637 36x refresh_cached_nearest();
638 36x }
639
640 36x w->ec_value_ = make_error_code(capy::error::canceled);
641 36x sched_->post(&w->op_);
642 36x }
643
644 inline std::size_t
645 2x timer_service::cancel_one_waiter(implementation& impl)
646 {
647
1/2
✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
2x if (!impl.might_have_pending_waits_)
648 return 0;
649
650 2x waiter_node* w = nullptr;
651
652 {
653 2x std::lock_guard lock(mutex_);
654 2x w = impl.waiters_.pop_front();
655
1/2
✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
2x if (!w)
656 return 0;
657 2x w->impl_ = nullptr;
658
1/2
✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
2x if (impl.waiters_.empty())
659 {
660 remove_timer_impl(impl);
661 impl.might_have_pending_waits_ = false;
662 }
663 2x refresh_cached_nearest();
664 2x }
665
666 2x w->ec_value_ = make_error_code(capy::error::canceled);
667 2x sched_->post(&w->op_);
668 2x return 1;
669 2x }
670
671 inline std::size_t
672 757508x timer_service::process_expired()
673 {
674 757508x intrusive_list<waiter_node> expired;
675
676 {
677 757508x std::lock_guard lock(mutex_);
678 757508x auto now = clock_type::now();
679
680
4/4
✓ Branch 0 taken 7817 times.
✓ Branch 1 taken 756424 times.
✓ Branch 2 taken 757508 times.
✓ Branch 3 taken 6733 times.
764241x while (!heap_.empty() && heap_[0].time_ <= now)
681 {
682 6733x implementation* t = heap_[0].timer_;
683
1/2
✓ Branch 0 taken 6733 times.
✗ Branch 1 not taken.
6733x remove_timer_impl(*t);
684
2/2
✓ Branch 0 taken 6737 times.
✓ Branch 1 taken 6733 times.
13470x while (auto* w = t->waiters_.pop_front())
685 {
686 6737x w->impl_ = nullptr;
687 6737x w->ec_value_ = {};
688 6737x expired.push_back(w);
689 }
690 6733x t->might_have_pending_waits_ = false;
691 }
692
693 757508x refresh_cached_nearest();
694 757508x }
695
696 757508x std::size_t count = 0;
697
2/2
✓ Branch 0 taken 6737 times.
✓ Branch 1 taken 757508 times.
764245x while (auto* w = expired.pop_front())
698 {
699 6737x sched_->post(&w->op_);
700 6737x ++count;
701 }
702
703 757508x return count;
704 }
705
706 inline void
707 7483x timer_service::remove_timer_impl(implementation& impl)
708 {
709 7483x std::size_t index = impl.heap_index_;
710
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7483 times.
7483x if (index >= heap_.size())
711 return; // Not in heap
712
713
2/2
✓ Branch 0 taken 862 times.
✓ Branch 1 taken 6621 times.
7483x if (index == heap_.size() - 1)
714 {
715 // Last element, just pop
716 862x impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
717 862x heap_.pop_back();
718 862x }
719 else
720 {
721 // Swap with last and reheapify
722 6621x swap_heap(index, heap_.size() - 1);
723 6621x impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
724 6621x heap_.pop_back();
725
726
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 6621 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
6621x if (index > 0 && heap_[index].time_ < heap_[(index - 1) / 2].time_)
727 up_heap(index);
728 else
729 6621x down_heap(index);
730 }
731 7483x }
732
733 inline void
734 7497x timer_service::up_heap(std::size_t index)
735 {
736
2/2
✓ Branch 0 taken 7463 times.
✓ Branch 1 taken 6631 times.
14094x while (index > 0)
737 {
738 6631x std::size_t parent = (index - 1) / 2;
739
2/2
✓ Branch 0 taken 34 times.
✓ Branch 1 taken 6597 times.
6631x if (!(heap_[index].time_ < heap_[parent].time_))
740 34x break;
741 6597x swap_heap(index, parent);
742 6597x index = parent;
743 }
744 7497x }
745
746 inline void
747 6621x timer_service::down_heap(std::size_t index)
748 {
749 6621x std::size_t child = index * 2 + 1;
750
2/2
✓ Branch 0 taken 6615 times.
✓ Branch 1 taken 6 times.
6621x while (child < heap_.size())
751 {
752
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
6x std::size_t min_child = (child + 1 == heap_.size() ||
753 heap_[child].time_ < heap_[child + 1].time_)
754 6x ? child
755 : child + 1;
756
757
1/2
✓ Branch 0 taken 6 times.
✗ Branch 1 not taken.
6x if (heap_[index].time_ < heap_[min_child].time_)
758 6x break;
759
760 swap_heap(index, min_child);
761 index = min_child;
762 child = index * 2 + 1;
763 }
764 6621x }
765
766 inline void
767 13218x timer_service::swap_heap(std::size_t i1, std::size_t i2)
768 {
769 13218x heap_entry tmp = heap_[i1];
770 13218x heap_[i1] = heap_[i2];
771 13218x heap_[i2] = tmp;
772 13218x heap_[i1].timer_->heap_index_ = i1;
773 13218x heap_[i2].timer_->heap_index_ = i2;
774 13218x }
775
776 // waiter_node out-of-class member function definitions
777
778 inline void
779 36x waiter_node::canceller::operator()() const
780 {
781 36x waiter_->svc_->cancel_waiter(waiter_);
782 36x }
783
784 inline void
785 waiter_node::completion_op::do_complete(
786 [[maybe_unused]] void* owner,
787 scheduler_op* base,
788 std::uint32_t,
789 std::uint32_t)
790 {
791 // owner is always non-null here. The destroy path (owner == nullptr)
792 // is unreachable because completion_op overrides destroy() directly,
793 // bypassing scheduler_op::destroy() which would call func_(nullptr, ...).
794 BOOST_COROSIO_ASSERT(owner);
795 static_cast<completion_op*>(base)->operator()();
796 }
797
798 inline void
799 7497x waiter_node::completion_op::operator()()
800 {
801 7497x auto* w = waiter_;
802 7497x w->stop_cb_.reset();
803
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7497 times.
7497x if (w->ec_out_)
804 7497x *w->ec_out_ = w->ec_value_;
805
806 7497x auto h = w->h_;
807 7497x auto d = w->d_;
808 7497x auto* svc = w->svc_;
809 7497x auto& sched = svc->get_scheduler();
810
811 7497x svc->destroy_waiter(w);
812
813 7497x d.post(h);
814 7497x sched.work_finished();
815 7497x }
816
817 // GCC 14 false-positive: inlining ~optional<stop_callback> through
818 // delete loses track that stop_cb_ was already .reset() above.
819 #if defined(__GNUC__) && !defined(__clang__)
820 #pragma GCC diagnostic push
821 #pragma GCC diagnostic ignored "-Wmaybe-uninitialized"
822 #endif
823 inline void
824 8x waiter_node::completion_op::destroy()
825 {
826 // Called during scheduler shutdown drain when this completion_op is
827 // in the scheduler's ready queue (posted by cancel_timer() or
828 // process_expired()). Balances the work_started() from
829 // implementation::wait(). The scheduler drain loop separately
830 // balances the work_started() from post(). On IOCP both decrements
831 // are required for outstanding_work_ to reach zero; on other
832 // backends this is harmless.
833 //
834 // This override also prevents scheduler_op::destroy() from calling
835 // do_complete(nullptr, ...). See also: timer_service::shutdown()
836 // which drains waiters still in the timer heap (the other path).
837 8x auto* w = waiter_;
838 8x w->stop_cb_.reset();
839 8x auto h = std::exchange(w->h_, {});
840 8x auto& sched = w->svc_->get_scheduler();
841
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8 times.
8x delete w;
842 8x sched.work_finished();
843
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8 times.
8x if (h)
844 8x h.destroy();
845 8x }
846 #if defined(__GNUC__) && !defined(__clang__)
847 #pragma GCC diagnostic pop
848 #endif
849
850 inline std::coroutine_handle<>
851 7530x timer_service::implementation::wait(
852 std::coroutine_handle<> h,
853 capy::executor_ref d,
854 std::stop_token token,
855 std::error_code* ec)
856 {
857 // Already-expired fast path — no waiter_node, no mutex.
858 // Post instead of dispatch so the coroutine yields to the
859 // scheduler, allowing other queued work to run.
860
2/2
✓ Branch 0 taken 22 times.
✓ Branch 1 taken 7508 times.
7530x if (heap_index_ == (std::numeric_limits<std::size_t>::max)())
861 {
862
3/4
✗ Branch 0 not taken.
✓ Branch 1 taken 7508 times.
✓ Branch 2 taken 17 times.
✓ Branch 3 taken 7491 times.
7508x if (expiry_ == (time_point::min)() || expiry_ <= clock_type::now())
863 {
864
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 17 times.
17x if (ec)
865 17x *ec = {};
866 17x d.post(h);
867 17x return std::noop_coroutine();
868 }
869 7491x }
870
871 7513x auto* w = svc_->create_waiter();
872 7513x w->impl_ = this;
873 7513x w->svc_ = svc_;
874 7513x w->h_ = h;
875 7513x w->d_ = d;
876 7513x w->token_ = std::move(token);
877 7513x w->ec_out_ = ec;
878
879 7513x svc_->insert_waiter(*this, w);
880 7513x might_have_pending_waits_ = true;
881 7513x svc_->get_scheduler().work_started();
882
883
2/2
✓ Branch 0 taken 7453 times.
✓ Branch 1 taken 60 times.
7513x if (w->token_.stop_possible())
884 60x w->stop_cb_.emplace(w->token_, waiter_node::canceller{w});
885
886 7513x return std::noop_coroutine();
887 7530x }
888
889 // Free functions
890
891 struct timer_service_access
892 {
893 7556x static native_scheduler& get_scheduler(io_context& ctx) noexcept
894 {
895 7556x return static_cast<native_scheduler&>(*ctx.sched_);
896 }
897 };
898
899 // Bypass find_service() mutex by reading the scheduler's cached pointer
900 inline io_object::io_service&
901 7556x timer_service_direct(capy::execution_context& ctx) noexcept
902 {
903 15112x return *timer_service_access::get_scheduler(static_cast<io_context&>(ctx))
904 7556x .timer_svc_;
905 }
906
907 inline std::size_t
908 6x timer_service_update_expiry(timer::implementation& base)
909 {
910 6x auto& impl = static_cast<timer_service::implementation&>(base);
911 6x return impl.svc_->update_timer(impl, impl.expiry_);
912 }
913
914 inline std::size_t
915 700x timer_service_cancel(timer::implementation& base) noexcept
916 {
917 700x auto& impl = static_cast<timer_service::implementation&>(base);
918
1/2
✓ Branch 0 taken 700 times.
✗ Branch 1 not taken.
700x return impl.svc_->cancel_timer(impl);
919 }
920
921 inline std::size_t
922 2x timer_service_cancel_one(timer::implementation& base) noexcept
923 {
924 2x auto& impl = static_cast<timer_service::implementation&>(base);
925
1/2
✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
2x return impl.svc_->cancel_one_waiter(impl);
926 }
927
928 inline timer_service&
929 606x get_timer_service(capy::execution_context& ctx, scheduler& sched)
930 {
931 606x return ctx.make_service<timer_service>(sched);
932 }
933
934 } // namespace boost::corosio::detail
935
936 #endif
937