include/boost/corosio/detail/timer_service.hpp

91.6% Lines (340/371) 97.7% List of functions (43/44)
f(x) Functions (44)
Function Calls Lines Blocks
boost::corosio::detail::timer_service::callback::callback() :97 534x 100.0% 100.0% boost::corosio::detail::timer_service::callback::callback(void*, void (*)(void*)) :100 534x 100.0% 100.0% boost::corosio::detail::timer_service::callback::operator()() const :109 10176x 100.0% 100.0% boost::corosio::detail::timer_service::timer_service(boost::capy::execution_context&, boost::corosio::detail::scheduler&) :138 534x 100.0% 100.0% boost::corosio::detail::timer_service::get_scheduler() :144 20438x 100.0% 100.0% boost::corosio::detail::timer_service::~timer_service() :150 1068x 100.0% 100.0% boost::corosio::detail::timer_service::set_on_earliest_changed(boost::corosio::detail::timer_service::callback) :156 534x 100.0% 100.0% boost::corosio::detail::timer_service::nearest_expiry() const :169 220877x 100.0% 73.0% boost::corosio::detail::timer_service::refresh_cached_nearest() :212 246277x 100.0% 70.0% boost::corosio::detail::waiter_node::completion_op::completion_op() :236 229x 100.0% 100.0% boost::corosio::detail::waiter_node::waiter_node() :261 229x 100.0% 100.0% boost::corosio::detail::try_pop_tl_cache(boost::corosio::detail::timer_service*) :297 10428x 87.5% 78.0% boost::corosio::detail::try_push_tl_cache(boost::corosio::detail::timer_service::implementation*) :312 10420x 100.0% 100.0% boost::corosio::detail::try_pop_waiter_tl_cache() :323 10223x 100.0% 100.0% boost::corosio::detail::try_push_waiter_tl_cache(boost::corosio::detail::waiter_node*) :335 10207x 100.0% 100.0% boost::corosio::detail::timer_service_invalidate_cache() :346 534x 100.0% 100.0% boost::corosio::detail::timer_service::implementation::implementation(boost::corosio::detail::timer_service&) :357 268x 100.0% 100.0% boost::corosio::detail::timer_service::shutdown() :364 534x 100.0% 82.0% boost::corosio::detail::timer_service::construct() :421 10428x 66.7% 76.0% boost::corosio::detail::timer_service::destroy(boost::corosio::io_object::implementation*) :450 10426x 100.0% 100.0% boost::corosio::detail::timer_service::destroy_impl(boost::corosio::detail::timer_service::implementation&) :456 10426x 73.3% 64.0% boost::corosio::detail::timer_service::create_waiter() :483 10223x 100.0% 87.0% boost::corosio::detail::timer_service::destroy_waiter(boost::corosio::detail::waiter_node*) :501 10207x 100.0% 100.0% boost::corosio::detail::timer_service::update_timer(boost::corosio::detail::timer_service::implementation&, std::chrono::time_point<std::chrono::_V2::steady_clock, std::chrono::duration<long, std::ratio<1l, 1000000000l> > >) :512 6x 93.1% 79.0% boost::corosio::detail::timer_service::insert_waiter(boost::corosio::detail::timer_service::implementation&, boost::corosio::detail::waiter_node*) :562 10223x 100.0% 82.0% boost::corosio::detail::timer_service::cancel_timer(boost::corosio::detail::timer_service::implementation&) :582 11144x 87.5% 78.0% boost::corosio::detail::timer_service::cancel_waiter(boost::corosio::detail::waiter_node*) :622 30x 92.9% 80.0% boost::corosio::detail::timer_service::cancel_one_waiter(boost::corosio::detail::timer_service::implementation&) :645 2x 76.5% 70.0% boost::corosio::detail::timer_service::process_expired() :672 235298x 100.0% 91.0% boost::corosio::detail::timer_service::remove_timer_impl(boost::corosio::detail::timer_service::implementation&) :707 10193x 84.6% 65.0% boost::corosio::detail::timer_service::up_heap(unsigned long) :734 10207x 100.0% 100.0% boost::corosio::detail::timer_service::down_heap(unsigned long) :747 9324x 69.2% 62.0% boost::corosio::detail::timer_service::swap_heap(unsigned long, unsigned long) :767 18627x 100.0% 100.0% boost::corosio::detail::waiter_node::canceller::operator()() const :779 30x 100.0% 100.0% boost::corosio::detail::waiter_node::completion_op::do_complete(void*, boost::corosio::detail::scheduler_op*, unsigned int, unsigned int) :785 0 0.0% 0.0% boost::corosio::detail::waiter_node::completion_op::operator()() :799 10207x 100.0% 100.0% boost::corosio::detail::waiter_node::completion_op::destroy() :824 8x 100.0% 100.0% boost::corosio::detail::timer_service::implementation::wait(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, std::stop_token, std::error_code*) :851 10257x 100.0% 97.0% boost::corosio::detail::timer_service_access::get_scheduler(boost::corosio::io_context&) :893 10428x 100.0% 100.0% boost::corosio::detail::timer_service_direct(boost::capy::execution_context&) :901 10428x 100.0% 100.0% boost::corosio::detail::timer_service_update_expiry(boost::corosio::io_timer::implementation&) :908 6x 100.0% 100.0% boost::corosio::detail::timer_service_cancel(boost::corosio::io_timer::implementation&) :915 724x 100.0% 100.0% boost::corosio::detail::timer_service_cancel_one(boost::corosio::io_timer::implementation&) :922 2x 100.0% 100.0% boost::corosio::detail::get_timer_service(boost::capy::execution_context&, boost::corosio::detail::scheduler&) :929 534x 100.0% 100.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco ([email protected])
3 // Copyright (c) 2026 Steve Gerbino
4 //
5 // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 //
8 // Official repository: https://github.com/cppalliance/corosio
9 //
10
11 #ifndef BOOST_COROSIO_DETAIL_TIMER_SERVICE_HPP
12 #define BOOST_COROSIO_DETAIL_TIMER_SERVICE_HPP
13
14 #include <boost/corosio/timer.hpp>
15 #include <boost/corosio/io_context.hpp>
16 #include <boost/corosio/detail/scheduler_op.hpp>
17 #include <boost/corosio/native/native_scheduler.hpp>
18 #include <boost/corosio/detail/intrusive.hpp>
19 #include <boost/corosio/detail/thread_local_ptr.hpp>
20 #include <boost/capy/error.hpp>
21 #include <boost/capy/ex/execution_context.hpp>
22 #include <boost/capy/ex/executor_ref.hpp>
23 #include <system_error>
24
25 #include <atomic>
26 #include <chrono>
27 #include <coroutine>
28 #include <cstddef>
29 #include <limits>
30 #include <mutex>
31 #include <optional>
32 #include <stop_token>
33 #include <utility>
34 #include <vector>
35
36 namespace boost::corosio::detail {
37
38 struct scheduler;
39
40 /*
41 Timer Service
42 =============
43
44 Data Structures
45 ---------------
46 waiter_node holds per-waiter state: coroutine handle, executor,
47 error output, stop_token, embedded completion_op. Each concurrent
48 co_await t.wait() allocates one waiter_node.
49
50 timer_service::implementation holds per-timer state: expiry,
51 heap index, and an intrusive_list of waiter_nodes. Multiple
52 coroutines can wait on the same timer simultaneously.
53
54 timer_service owns a min-heap of active timers, a free list
55 of recycled impls, and a free list of recycled waiter_nodes. The
56 heap is ordered by expiry time; the scheduler queries
57 nearest_expiry() to set the epoll/timerfd timeout.
58
59 Optimization Strategy
60 ---------------------
61 1. Deferred heap insertion — expires_after() stores the expiry
62 but does not insert into the heap. Insertion happens in wait().
63 2. Thread-local impl cache — single-slot per-thread cache.
64 3. Embedded completion_op — eliminates heap allocation per fire/cancel.
65 4. Cached nearest expiry — atomic avoids mutex in nearest_expiry().
66 5. might_have_pending_waits_ flag — skips lock when no wait issued.
67 6. Thread-local waiter cache — single-slot per-thread cache.
68
69 Concurrency
70 -----------
71 stop_token callbacks can fire from any thread. The impl_
72 pointer on waiter_node is used as a "still in list" marker.
73 */
74
75 struct BOOST_COROSIO_SYMBOL_VISIBLE waiter_node;
76
77 inline void timer_service_invalidate_cache() noexcept;
78
79 // timer_service class body — member function definitions are
80 // out-of-class (after implementation and waiter_node are complete)
81 class BOOST_COROSIO_DECL timer_service final
82 : public capy::execution_context::service
83 , public io_object::io_service
84 {
85 public:
86 using clock_type = std::chrono::steady_clock;
87 using time_point = clock_type::time_point;
88
89 /// Type-erased callback for earliest-expiry-changed notifications.
90 class callback
91 {
92 void* ctx_ = nullptr;
93 void (*fn_)(void*) = nullptr;
94
95 public:
96 /// Construct an empty callback.
97 534x callback() = default;
98
99 /// Construct a callback with the given context and function.
100 534x callback(void* ctx, void (*fn)(void*)) noexcept : ctx_(ctx), fn_(fn) {}
101
102 /// Return true if the callback is non-empty.
103 explicit operator bool() const noexcept
104 {
105 return fn_ != nullptr;
106 }
107
108 /// Invoke the callback.
109 10176x void operator()() const
110 {
111 10176x if (fn_)
112 10176x fn_(ctx_);
113 10176x }
114 };
115
116 struct implementation;
117
118 private:
119 struct heap_entry
120 {
121 time_point time_;
122 implementation* timer_;
123 };
124
125 scheduler* sched_ = nullptr;
126 mutable std::mutex mutex_;
127 std::vector<heap_entry> heap_;
128 implementation* free_list_ = nullptr;
129 waiter_node* waiter_free_list_ = nullptr;
130 callback on_earliest_changed_;
131 bool shutting_down_ = false;
132 // Avoids mutex in nearest_expiry() and empty()
133 mutable std::atomic<std::int64_t> cached_nearest_ns_{
134 (std::numeric_limits<std::int64_t>::max)()};
135
136 public:
137 /// Construct the timer service bound to a scheduler.
138 534x inline timer_service(capy::execution_context&, scheduler& sched)
139 534x : sched_(&sched)
140 {
141 534x }
142
143 /// Return the associated scheduler.
144 20438x inline scheduler& get_scheduler() noexcept
145 {
146 20438x return *sched_;
147 }
148
149 /// Destroy the timer service.
150 1068x ~timer_service() override = default;
151
152 timer_service(timer_service const&) = delete;
153 timer_service& operator=(timer_service const&) = delete;
154
155 /// Register a callback invoked when the earliest expiry changes.
156 534x inline void set_on_earliest_changed(callback cb)
157 {
158 534x on_earliest_changed_ = cb;
159 534x }
160
161 /// Return true if no timers are in the heap.
162 inline bool empty() const noexcept
163 {
164 return cached_nearest_ns_.load(std::memory_order_acquire) ==
165 (std::numeric_limits<std::int64_t>::max)();
166 }
167
168 /// Return the nearest timer expiry without acquiring the mutex.
169 220877x inline time_point nearest_expiry() const noexcept
170 {
171 220877x auto ns = cached_nearest_ns_.load(std::memory_order_acquire);
172 220877x return time_point(time_point::duration(ns));
173 }
174
175 /// Cancel all pending timers and free cached resources.
176 inline void shutdown() override;
177
178 /// Construct a new timer implementation.
179 inline io_object::implementation* construct() override;
180
181 /// Destroy a timer implementation, cancelling pending waiters.
182 inline void destroy(io_object::implementation* p) override;
183
184 /// Cancel and recycle a timer implementation.
185 inline void destroy_impl(implementation& impl);
186
187 /// Create or recycle a waiter node.
188 inline waiter_node* create_waiter();
189
190 /// Return a waiter node to the cache or free list.
191 inline void destroy_waiter(waiter_node* w);
192
193 /// Update the timer expiry, cancelling existing waiters.
194 inline std::size_t update_timer(implementation& impl, time_point new_time);
195
196 /// Insert a waiter into the timer's waiter list and the heap.
197 inline void insert_waiter(implementation& impl, waiter_node* w);
198
199 /// Cancel all waiters on a timer.
200 inline std::size_t cancel_timer(implementation& impl);
201
202 /// Cancel a single waiter ( stop_token callback path ).
203 inline void cancel_waiter(waiter_node* w);
204
205 /// Cancel one waiter on a timer.
206 inline std::size_t cancel_one_waiter(implementation& impl);
207
208 /// Complete all waiters whose timers have expired.
209 inline std::size_t process_expired();
210
211 private:
212 246277x inline void refresh_cached_nearest() noexcept
213 {
214 246277x auto ns = heap_.empty() ? (std::numeric_limits<std::int64_t>::max)()
215 245074x : heap_[0].time_.time_since_epoch().count();
216 246277x cached_nearest_ns_.store(ns, std::memory_order_release);
217 246277x }
218
219 inline void remove_timer_impl(implementation& impl);
220 inline void up_heap(std::size_t index);
221 inline void down_heap(std::size_t index);
222 inline void swap_heap(std::size_t i1, std::size_t i2);
223 };
224
225 struct BOOST_COROSIO_SYMBOL_VISIBLE waiter_node
226 : intrusive_list<waiter_node>::node
227 {
228 // Embedded completion op — avoids heap allocation per fire/cancel
229 struct completion_op final : scheduler_op
230 {
231 waiter_node* waiter_ = nullptr;
232
233 static void do_complete(
234 void* owner, scheduler_op* base, std::uint32_t, std::uint32_t);
235
236 229x completion_op() noexcept : scheduler_op(&do_complete) {}
237
238 void operator()() override;
239 void destroy() override;
240 };
241
242 // Per-waiter stop_token cancellation
243 struct canceller
244 {
245 waiter_node* waiter_;
246 void operator()() const;
247 };
248
249 // nullptr once removed from timer's waiter list (concurrency marker)
250 timer_service::implementation* impl_ = nullptr;
251 timer_service* svc_ = nullptr;
252 std::coroutine_handle<> h_;
253 capy::executor_ref d_;
254 std::error_code* ec_out_ = nullptr;
255 std::stop_token token_;
256 std::optional<std::stop_callback<canceller>> stop_cb_;
257 completion_op op_;
258 std::error_code ec_value_;
259 waiter_node* next_free_ = nullptr;
260
261 229x waiter_node() noexcept
262 229x {
263 229x op_.waiter_ = this;
264 229x }
265 };
266
267 struct timer_service::implementation final : timer::implementation
268 {
269 using clock_type = std::chrono::steady_clock;
270 using time_point = clock_type::time_point;
271 using duration = clock_type::duration;
272
273 timer_service* svc_ = nullptr;
274 intrusive_list<waiter_node> waiters_;
275
276 // Free list linkage (reused when impl is on free_list)
277 implementation* next_free_ = nullptr;
278
279 inline explicit implementation(timer_service& svc) noexcept;
280
281 inline std::coroutine_handle<> wait(
282 std::coroutine_handle<>,
283 capy::executor_ref,
284 std::stop_token,
285 std::error_code*) override;
286 };
287
288 // Thread-local caches avoid hot-path mutex acquisitions:
289 // 1. Impl cache — single-slot, validated by comparing svc_
290 // 2. Waiter cache — single-slot, no service affinity
291 // All caches are cleared by timer_service_invalidate_cache() during shutdown.
292
293 inline thread_local_ptr<timer_service::implementation> tl_cached_impl;
294 inline thread_local_ptr<waiter_node> tl_cached_waiter;
295
296 inline timer_service::implementation*
297 10428x try_pop_tl_cache(timer_service* svc) noexcept
298 {
299 10428x auto* impl = tl_cached_impl.get();
300 10428x if (impl)
301 {
302 10160x tl_cached_impl.set(nullptr);
303 10160x if (impl->svc_ == svc)
304 10160x return impl;
305 // Stale impl from a destroyed service
306 delete impl;
307 }
308 268x return nullptr;
309 }
310
311 inline bool
312 10420x try_push_tl_cache(timer_service::implementation* impl) noexcept
313 {
314 10420x if (!tl_cached_impl.get())
315 {
316 10340x tl_cached_impl.set(impl);
317 10340x return true;
318 }
319 80x return false;
320 }
321
322 inline waiter_node*
323 10223x try_pop_waiter_tl_cache() noexcept
324 {
325 10223x auto* w = tl_cached_waiter.get();
326 10223x if (w)
327 {
328 9992x tl_cached_waiter.set(nullptr);
329 9992x return w;
330 }
331 231x return nullptr;
332 }
333
334 inline bool
335 10207x try_push_waiter_tl_cache(waiter_node* w) noexcept
336 {
337 10207x if (!tl_cached_waiter.get())
338 {
339 10127x tl_cached_waiter.set(w);
340 10127x return true;
341 }
342 80x return false;
343 }
344
345 inline void
346 534x timer_service_invalidate_cache() noexcept
347 {
348 534x delete tl_cached_impl.get();
349 534x tl_cached_impl.set(nullptr);
350
351 534x delete tl_cached_waiter.get();
352 534x tl_cached_waiter.set(nullptr);
353 534x }
354
355 // timer_service out-of-class member function definitions
356
357 268x inline timer_service::implementation::implementation(
358 268x timer_service& svc) noexcept
359 268x : svc_(&svc)
360 {
361 268x }
362
363 inline void
364 534x timer_service::shutdown()
365 {
366 534x timer_service_invalidate_cache();
367 534x shutting_down_ = true;
368
369 // Snapshot impls and detach them from the heap so that
370 // coroutine-owned timer destructors (triggered by h.destroy()
371 // below) cannot re-enter remove_timer_impl() and mutate the
372 // vector during iteration.
373 534x std::vector<implementation*> impls;
374 534x impls.reserve(heap_.size());
375 542x for (auto& entry : heap_)
376 {
377 8x entry.timer_->heap_index_ = (std::numeric_limits<std::size_t>::max)();
378 8x impls.push_back(entry.timer_);
379 }
380 534x heap_.clear();
381 534x cached_nearest_ns_.store(
382 (std::numeric_limits<std::int64_t>::max)(), std::memory_order_release);
383
384 // Cancel waiting timers. Each waiter called work_started()
385 // in implementation::wait(). On IOCP the scheduler shutdown
386 // loop exits when outstanding_work_ reaches zero, so we must
387 // call work_finished() here to balance it. On other backends
388 // this is harmless.
389 542x for (auto* impl : impls)
390 {
391 16x while (auto* w = impl->waiters_.pop_front())
392 {
393 8x w->stop_cb_.reset();
394 8x auto h = std::exchange(w->h_, {});
395 8x sched_->work_finished();
396 8x if (h)
397 8x h.destroy();
398 8x delete w;
399 8x }
400 8x delete impl;
401 }
402
403 // Delete free-listed impls
404 614x while (free_list_)
405 {
406 80x auto* next = free_list_->next_free_;
407 80x delete free_list_;
408 80x free_list_ = next;
409 }
410
411 // Delete free-listed waiters
412 612x while (waiter_free_list_)
413 {
414 78x auto* next = waiter_free_list_->next_free_;
415 78x delete waiter_free_list_;
416 78x waiter_free_list_ = next;
417 }
418 534x }
419
420 inline io_object::implementation*
421 10428x timer_service::construct()
422 {
423 10428x implementation* impl = try_pop_tl_cache(this);
424 10428x if (impl)
425 {
426 10160x impl->svc_ = this;
427 10160x impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
428 10160x impl->might_have_pending_waits_ = false;
429 10160x return impl;
430 }
431
432 268x std::lock_guard lock(mutex_);
433 268x if (free_list_)
434 {
435 impl = free_list_;
436 free_list_ = impl->next_free_;
437 impl->next_free_ = nullptr;
438 impl->svc_ = this;
439 impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
440 impl->might_have_pending_waits_ = false;
441 }
442 else
443 {
444 268x impl = new implementation(*this);
445 }
446 268x return impl;
447 268x }
448
449 inline void
450 10426x timer_service::destroy(io_object::implementation* p)
451 {
452 10426x destroy_impl(static_cast<implementation&>(*p));
453 10426x }
454
455 inline void
456 10426x timer_service::destroy_impl(implementation& impl)
457 {
458 // During shutdown the impl is owned by the shutdown loop.
459 // Re-entering here (from a coroutine-owned timer destructor
460 // triggered by h.destroy()) must not modify the heap or
461 // recycle the impl — shutdown deletes it directly.
462 10426x if (shutting_down_)
463 10346x return;
464
465 10420x cancel_timer(impl);
466
467 10420x if (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)())
468 {
469 std::lock_guard lock(mutex_);
470 remove_timer_impl(impl);
471 refresh_cached_nearest();
472 }
473
474 10420x if (try_push_tl_cache(&impl))
475 10340x return;
476
477 80x std::lock_guard lock(mutex_);
478 80x impl.next_free_ = free_list_;
479 80x free_list_ = &impl;
480 80x }
481
482 inline waiter_node*
483 10223x timer_service::create_waiter()
484 {
485 10223x if (auto* w = try_pop_waiter_tl_cache())
486 9992x return w;
487
488 231x std::lock_guard lock(mutex_);
489 231x if (waiter_free_list_)
490 {
491 2x auto* w = waiter_free_list_;
492 2x waiter_free_list_ = w->next_free_;
493 2x w->next_free_ = nullptr;
494 2x return w;
495 }
496
497 229x return new waiter_node();
498 231x }
499
500 inline void
501 10207x timer_service::destroy_waiter(waiter_node* w)
502 {
503 10207x if (try_push_waiter_tl_cache(w))
504 10127x return;
505
506 80x std::lock_guard lock(mutex_);
507 80x w->next_free_ = waiter_free_list_;
508 80x waiter_free_list_ = w;
509 80x }
510
511 inline std::size_t
512 6x timer_service::update_timer(implementation& impl, time_point new_time)
513 {
514 bool in_heap =
515 6x (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)());
516 6x if (!in_heap && impl.waiters_.empty())
517 return 0;
518
519 6x bool notify = false;
520 6x intrusive_list<waiter_node> canceled;
521
522 {
523 6x std::lock_guard lock(mutex_);
524
525 16x while (auto* w = impl.waiters_.pop_front())
526 {
527 10x w->impl_ = nullptr;
528 10x canceled.push_back(w);
529 10x }
530
531 6x if (impl.heap_index_ < heap_.size())
532 {
533 6x time_point old_time = heap_[impl.heap_index_].time_;
534 6x heap_[impl.heap_index_].time_ = new_time;
535
536 6x if (new_time < old_time)
537 6x up_heap(impl.heap_index_);
538 else
539 down_heap(impl.heap_index_);
540
541 6x notify = (impl.heap_index_ == 0);
542 }
543
544 6x refresh_cached_nearest();
545 6x }
546
547 6x std::size_t count = 0;
548 16x while (auto* w = canceled.pop_front())
549 {
550 10x w->ec_value_ = make_error_code(capy::error::canceled);
551 10x sched_->post(&w->op_);
552 10x ++count;
553 10x }
554
555 6x if (notify)
556 6x on_earliest_changed_();
557
558 6x return count;
559 }
560
561 inline void
562 10223x timer_service::insert_waiter(implementation& impl, waiter_node* w)
563 {
564 10223x bool notify = false;
565 {
566 10223x std::lock_guard lock(mutex_);
567 10223x if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)())
568 {
569 10201x impl.heap_index_ = heap_.size();
570 10201x heap_.push_back({impl.expiry_, &impl});
571 10201x up_heap(heap_.size() - 1);
572 10201x notify = (impl.heap_index_ == 0);
573 10201x refresh_cached_nearest();
574 }
575 10223x impl.waiters_.push_back(w);
576 10223x }
577 10223x if (notify)
578 10170x on_earliest_changed_();
579 10223x }
580
581 inline std::size_t
582 11144x timer_service::cancel_timer(implementation& impl)
583 {
584 11144x if (!impl.might_have_pending_waits_)
585 10404x return 0;
586
587 // Not in heap and no waiters — just clear the flag
588 740x if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)() &&
589 impl.waiters_.empty())
590 {
591 impl.might_have_pending_waits_ = false;
592 return 0;
593 }
594
595 740x intrusive_list<waiter_node> canceled;
596
597 {
598 740x std::lock_guard lock(mutex_);
599 740x remove_timer_impl(impl);
600 1484x while (auto* w = impl.waiters_.pop_front())
601 {
602 744x w->impl_ = nullptr;
603 744x canceled.push_back(w);
604 744x }
605 740x refresh_cached_nearest();
606 740x }
607
608 740x impl.might_have_pending_waits_ = false;
609
610 740x std::size_t count = 0;
611 1484x while (auto* w = canceled.pop_front())
612 {
613 744x w->ec_value_ = make_error_code(capy::error::canceled);
614 744x sched_->post(&w->op_);
615 744x ++count;
616 744x }
617
618 740x return count;
619 }
620
621 inline void
622 30x timer_service::cancel_waiter(waiter_node* w)
623 {
624 {
625 30x std::lock_guard lock(mutex_);
626 // Already removed by cancel_timer or process_expired
627 30x if (!w->impl_)
628 return;
629 30x auto* impl = w->impl_;
630 30x w->impl_ = nullptr;
631 30x impl->waiters_.remove(w);
632 30x if (impl->waiters_.empty())
633 {
634 28x remove_timer_impl(*impl);
635 28x impl->might_have_pending_waits_ = false;
636 }
637 30x refresh_cached_nearest();
638 30x }
639
640 30x w->ec_value_ = make_error_code(capy::error::canceled);
641 30x sched_->post(&w->op_);
642 }
643
644 inline std::size_t
645 2x timer_service::cancel_one_waiter(implementation& impl)
646 {
647 2x if (!impl.might_have_pending_waits_)
648 return 0;
649
650 2x waiter_node* w = nullptr;
651
652 {
653 2x std::lock_guard lock(mutex_);
654 2x w = impl.waiters_.pop_front();
655 2x if (!w)
656 return 0;
657 2x w->impl_ = nullptr;
658 2x if (impl.waiters_.empty())
659 {
660 remove_timer_impl(impl);
661 impl.might_have_pending_waits_ = false;
662 }
663 2x refresh_cached_nearest();
664 2x }
665
666 2x w->ec_value_ = make_error_code(capy::error::canceled);
667 2x sched_->post(&w->op_);
668 2x return 1;
669 }
670
671 inline std::size_t
672 235298x timer_service::process_expired()
673 {
674 235298x intrusive_list<waiter_node> expired;
675
676 {
677 235298x std::lock_guard lock(mutex_);
678 235298x auto now = clock_type::now();
679
680 244723x while (!heap_.empty() && heap_[0].time_ <= now)
681 {
682 9425x implementation* t = heap_[0].timer_;
683 9425x remove_timer_impl(*t);
684 18854x while (auto* w = t->waiters_.pop_front())
685 {
686 9429x w->impl_ = nullptr;
687 9429x w->ec_value_ = {};
688 9429x expired.push_back(w);
689 9429x }
690 9425x t->might_have_pending_waits_ = false;
691 }
692
693 235298x refresh_cached_nearest();
694 235298x }
695
696 235298x std::size_t count = 0;
697 244727x while (auto* w = expired.pop_front())
698 {
699 9429x sched_->post(&w->op_);
700 9429x ++count;
701 9429x }
702
703 235298x return count;
704 }
705
706 inline void
707 10193x timer_service::remove_timer_impl(implementation& impl)
708 {
709 10193x std::size_t index = impl.heap_index_;
710 10193x if (index >= heap_.size())
711 return; // Not in heap
712
713 10193x if (index == heap_.size() - 1)
714 {
715 // Last element, just pop
716 869x impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
717 869x heap_.pop_back();
718 }
719 else
720 {
721 // Swap with last and reheapify
722 9324x swap_heap(index, heap_.size() - 1);
723 9324x impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
724 9324x heap_.pop_back();
725
726 9324x if (index > 0 && heap_[index].time_ < heap_[(index - 1) / 2].time_)
727 up_heap(index);
728 else
729 9324x down_heap(index);
730 }
731 }
732
733 inline void
734 10207x timer_service::up_heap(std::size_t index)
735 {
736 19510x while (index > 0)
737 {
738 9334x std::size_t parent = (index - 1) / 2;
739 9334x if (!(heap_[index].time_ < heap_[parent].time_))
740 31x break;
741 9303x swap_heap(index, parent);
742 9303x index = parent;
743 }
744 10207x }
745
746 inline void
747 9324x timer_service::down_heap(std::size_t index)
748 {
749 9324x std::size_t child = index * 2 + 1;
750 9324x while (child < heap_.size())
751 {
752 6x std::size_t min_child = (child + 1 == heap_.size() ||
753 heap_[child].time_ < heap_[child + 1].time_)
754 6x ? child
755 6x : child + 1;
756
757 6x if (heap_[index].time_ < heap_[min_child].time_)
758 6x break;
759
760 swap_heap(index, min_child);
761 index = min_child;
762 child = index * 2 + 1;
763 }
764 9324x }
765
766 inline void
767 18627x timer_service::swap_heap(std::size_t i1, std::size_t i2)
768 {
769 18627x heap_entry tmp = heap_[i1];
770 18627x heap_[i1] = heap_[i2];
771 18627x heap_[i2] = tmp;
772 18627x heap_[i1].timer_->heap_index_ = i1;
773 18627x heap_[i2].timer_->heap_index_ = i2;
774 18627x }
775
776 // waiter_node out-of-class member function definitions
777
778 inline void
779 30x waiter_node::canceller::operator()() const
780 {
781 30x waiter_->svc_->cancel_waiter(waiter_);
782 30x }
783
784 inline void
785 waiter_node::completion_op::do_complete(
786 [[maybe_unused]] void* owner,
787 scheduler_op* base,
788 std::uint32_t,
789 std::uint32_t)
790 {
791 // owner is always non-null here. The destroy path (owner == nullptr)
792 // is unreachable because completion_op overrides destroy() directly,
793 // bypassing scheduler_op::destroy() which would call func_(nullptr, ...).
794 BOOST_COROSIO_ASSERT(owner);
795 static_cast<completion_op*>(base)->operator()();
796 }
797
798 inline void
799 10207x waiter_node::completion_op::operator()()
800 {
801 10207x auto* w = waiter_;
802 10207x w->stop_cb_.reset();
803 10207x if (w->ec_out_)
804 10207x *w->ec_out_ = w->ec_value_;
805
806 10207x auto h = w->h_;
807 10207x auto d = w->d_;
808 10207x auto* svc = w->svc_;
809 10207x auto& sched = svc->get_scheduler();
810
811 10207x svc->destroy_waiter(w);
812
813 10207x d.post(h);
814 10207x sched.work_finished();
815 10207x }
816
817 // GCC 14 false-positive: inlining ~optional<stop_callback> through
818 // delete loses track that stop_cb_ was already .reset() above.
819 #if defined(__GNUC__) && !defined(__clang__)
820 #pragma GCC diagnostic push
821 #pragma GCC diagnostic ignored "-Wmaybe-uninitialized"
822 #endif
823 inline void
824 8x waiter_node::completion_op::destroy()
825 {
826 // Called during scheduler shutdown drain when this completion_op is
827 // in the scheduler's ready queue (posted by cancel_timer() or
828 // process_expired()). Balances the work_started() from
829 // implementation::wait(). The scheduler drain loop separately
830 // balances the work_started() from post(). On IOCP both decrements
831 // are required for outstanding_work_ to reach zero; on other
832 // backends this is harmless.
833 //
834 // This override also prevents scheduler_op::destroy() from calling
835 // do_complete(nullptr, ...). See also: timer_service::shutdown()
836 // which drains waiters still in the timer heap (the other path).
837 8x auto* w = waiter_;
838 8x w->stop_cb_.reset();
839 8x auto h = std::exchange(w->h_, {});
840 8x auto& sched = w->svc_->get_scheduler();
841 8x delete w;
842 8x sched.work_finished();
843 8x if (h)
844 8x h.destroy();
845 8x }
846 #if defined(__GNUC__) && !defined(__clang__)
847 #pragma GCC diagnostic pop
848 #endif
849
850 inline std::coroutine_handle<>
851 10257x timer_service::implementation::wait(
852 std::coroutine_handle<> h,
853 capy::executor_ref d,
854 std::stop_token token,
855 std::error_code* ec)
856 {
857 // Already-expired fast path — no waiter_node, no mutex.
858 // Post instead of dispatch so the coroutine yields to the
859 // scheduler, allowing other queued work to run.
860 10257x if (heap_index_ == (std::numeric_limits<std::size_t>::max)())
861 {
862 10235x if (expiry_ == (time_point::min)() || expiry_ <= clock_type::now())
863 {
864 34x if (ec)
865 34x *ec = {};
866 34x d.post(h);
867 34x return std::noop_coroutine();
868 }
869 }
870
871 10223x auto* w = svc_->create_waiter();
872 10223x w->impl_ = this;
873 10223x w->svc_ = svc_;
874 10223x w->h_ = h;
875 10223x w->d_ = d;
876 10223x w->token_ = std::move(token);
877 10223x w->ec_out_ = ec;
878
879 10223x svc_->insert_waiter(*this, w);
880 10223x might_have_pending_waits_ = true;
881 10223x svc_->get_scheduler().work_started();
882
883 10223x if (w->token_.stop_possible())
884 48x w->stop_cb_.emplace(w->token_, waiter_node::canceller{w});
885
886 10223x return std::noop_coroutine();
887 }
888
889 // Free functions
890
891 struct timer_service_access
892 {
893 10428x static native_scheduler& get_scheduler(io_context& ctx) noexcept
894 {
895 10428x return static_cast<native_scheduler&>(*ctx.sched_);
896 }
897 };
898
899 // Bypass find_service() mutex by reading the scheduler's cached pointer
900 inline io_object::io_service&
901 10428x timer_service_direct(capy::execution_context& ctx) noexcept
902 {
903 10428x return *timer_service_access::get_scheduler(static_cast<io_context&>(ctx))
904 10428x .timer_svc_;
905 }
906
907 inline std::size_t
908 6x timer_service_update_expiry(timer::implementation& base)
909 {
910 6x auto& impl = static_cast<timer_service::implementation&>(base);
911 6x return impl.svc_->update_timer(impl, impl.expiry_);
912 }
913
914 inline std::size_t
915 724x timer_service_cancel(timer::implementation& base) noexcept
916 {
917 724x auto& impl = static_cast<timer_service::implementation&>(base);
918 724x return impl.svc_->cancel_timer(impl);
919 }
920
921 inline std::size_t
922 2x timer_service_cancel_one(timer::implementation& base) noexcept
923 {
924 2x auto& impl = static_cast<timer_service::implementation&>(base);
925 2x return impl.svc_->cancel_one_waiter(impl);
926 }
927
928 inline timer_service&
929 534x get_timer_service(capy::execution_context& ctx, scheduler& sched)
930 {
931 534x return ctx.make_service<timer_service>(sched);
932 }
933
934 } // namespace boost::corosio::detail
935
936 #endif
937