include/boost/corosio/native/detail/iocp/win_scheduler.hpp

84.8% Lines (246/290) 97.1% List of functions (33/34) 72.0% Branches (118/164)
win_scheduler.hpp
f(x) Functions (34)
Function Calls Lines Branches Blocks
boost::corosio::detail::win_scheduler::native_handle() const :77 2993x 100.0% 100.0% boost::corosio::detail::win_scheduler::configure_iocp(unsigned int) :89 6x 100.0% 100.0% boost::corosio::detail::win_scheduler::configure_single_threaded(bool) :99 5x 100.0% 100.0% boost::corosio::detail::win_scheduler::is_single_threaded() const :106 0 0.0% 0.0% boost::corosio::detail::iocp::thread_context_guard::thread_context_guard(boost::corosio::detail::win_scheduler const*) :201 3111x 100.0% 100.0% boost::corosio::detail::iocp::thread_context_guard::~thread_context_guard() :207 3111x 100.0% 100.0% boost::corosio::detail::win_scheduler::win_scheduler(boost::capy::execution_context&, int) :215 598x 92.9% 72.7% 57.1% boost::corosio::detail::win_scheduler::post(std::__n4861::coroutine_handle<void>) const :247 8083x 60.0% 50.0% 53.8% boost::corosio::detail::win_scheduler::post(std::__n4861::coroutine_handle<void>) const::post_handler::do_complete(void*, boost::corosio::detail::scheduler_op*, unsigned int, unsigned int) :253 8083x 100.0% 70.0% 87.5% boost::corosio::detail::win_scheduler::post(std::__n4861::coroutine_handle<void>) const::post_handler::post_handler(std::__n4861::coroutine_handle<void>) :289 8083x 100.0% 100.0% boost::corosio::detail::win_scheduler::post(boost::corosio::detail::scheduler_op*) const :309 4408x 55.6% 50.0% 45.5% boost::corosio::detail::win_scheduler::running_in_this_thread() const :323 9952x 100.0% 75.0% 85.7% boost::corosio::detail::win_scheduler::work_started() :332 556643x 100.0% 100.0% boost::corosio::detail::win_scheduler::work_finished() :338 569118x 100.0% 100.0% 100.0% boost::corosio::detail::win_scheduler::on_pending(boost::corosio::detail::overlapped_op*) const :345 544574x 33.3% 16.7% 26.7% boost::corosio::detail::win_scheduler::on_completion(boost::corosio::detail::overlapped_op*, unsigned long, unsigned long) const :363 45x 63.6% 50.0% 41.7% boost::corosio::detail::win_scheduler::stop() :380 6200x 83.3% 66.7% 75.0% boost::corosio::detail::win_scheduler::stopped() const :398 2829x 100.0% 100.0% boost::corosio::detail::win_scheduler::restart() :405 2767x 100.0% 100.0% boost::corosio::detail::win_scheduler::run() :412 3105x 100.0% 90.9% 92.0% boost::corosio::detail::win_scheduler::run_one() :439 2x 71.4% 50.0% 71.4% boost::corosio::detail::win_scheduler::wait_one(long) :452 26x 100.0% 75.0% 84.2% boost::corosio::detail::win_scheduler::poll() :472 14x 100.0% 87.5% 88.9% boost::corosio::detail::win_scheduler::poll_one() :490 4x 100.0% 100.0% 85.7% boost::corosio::detail::win_scheduler::post_deferred_completions(boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&) :503 1183x 20.0% 20.0% 31.2% boost::corosio::detail::win_scheduler::do_one(unsigned long) :521 557124x 86.7% 82.9% 88.1% boost::corosio::detail::win_scheduler::on_timer_changed(void*) :645 2040x 100.0% 100.0% boost::corosio::detail::win_scheduler::set_timer_service(boost::corosio::detail::timer_service*) :651 598x 100.0% 50.0% 100.0% boost::corosio::detail::win_scheduler::update_timeout() :662 3223x 100.0% 50.0% 90.0% boost::corosio::detail::win_scheduler::shutdown() :684 598x 84.6% 70.0% 81.6% boost::corosio::detail::win_scheduler::~win_scheduler() :745 1196x 100.0% 100.0% boost::corosio::detail::win_scheduler::wait_reactor() :756 9x 100.0% 100.0% boost::corosio::detail::win_scheduler::wait_reactor()::{lambda()#1}::operator()() const :762 9x 100.0% 100.0% 100.0% boost::corosio::detail::win_scheduler::cancel_wait_if_constructed(boost::corosio::detail::overlapped_op*) :770 20442x 100.0% 100.0% 100.0%
Line Branch TLA Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco ([email protected])
3 // Copyright (c) 2026 Steve Gerbino
4 //
5 // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 //
8 // Official repository: https://github.com/cppalliance/corosio
9 //
10
11 #ifndef BOOST_COROSIO_NATIVE_DETAIL_IOCP_WIN_SCHEDULER_HPP
12 #define BOOST_COROSIO_NATIVE_DETAIL_IOCP_WIN_SCHEDULER_HPP
13
14 #include <boost/corosio/detail/platform.hpp>
15
16 #if BOOST_COROSIO_HAS_IOCP
17
18 #include <boost/corosio/detail/config.hpp>
19 #include <boost/capy/ex/execution_context.hpp>
20
21 #include <boost/corosio/detail/scheduler.hpp>
22 #include <system_error>
23
24 #include <boost/corosio/detail/scheduler_op.hpp>
25 #include <boost/corosio/native/detail/iocp/win_completion_key.hpp>
26 #include <boost/corosio/native/detail/iocp/win_mutex.hpp>
27
28 #include <boost/corosio/native/detail/iocp/win_overlapped_op.hpp>
29 #include <boost/corosio/native/detail/iocp/win_timers.hpp>
30 #include <boost/corosio/detail/timer_service.hpp>
31 #include <boost/corosio/native/detail/iocp/win_resolver_service.hpp>
32 #include <boost/corosio/native/detail/make_err.hpp>
33 #include <boost/corosio/detail/except.hpp>
34 #include <boost/corosio/detail/thread_local_ptr.hpp>
35
36 #include <atomic>
37 #include <chrono>
38 #include <cstdint>
39 #include <limits>
40 #include <memory>
41 #include <mutex>
42
43 #include <boost/corosio/native/detail/iocp/win_windows.hpp>
44
45 namespace boost::corosio::detail {
46
47 // Forward declarations
48 struct overlapped_op;
49 class win_timers;
50 class win_wait_reactor;
51
52 class BOOST_COROSIO_DECL win_scheduler final
53 : public scheduler
54 , public capy::execution_context::service
55 {
56 public:
57 using key_type = scheduler;
58
59 win_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
60 ~win_scheduler();
61 win_scheduler(win_scheduler const&) = delete;
62 win_scheduler& operator=(win_scheduler const&) = delete;
63
64 void shutdown() override;
65 void post(std::coroutine_handle<> h) const override;
66 void post(scheduler_op* h) const override;
67 bool running_in_this_thread() const noexcept override;
68 void stop() override;
69 bool stopped() const noexcept override;
70 void restart() override;
71 std::size_t run() override;
72 std::size_t run_one() override;
73 std::size_t wait_one(long usec) override;
74 std::size_t poll() override;
75 std::size_t poll_one() override;
76
77 2993x void* native_handle() const noexcept
78 {
79 2993x return iocp_;
80 }
81
82 void work_started() noexcept override;
83 void work_finished() noexcept override;
84
85 /** Apply runtime IOCP configuration.
86
87 @param gqcs_timeout_ms Max GQCS blocking time in milliseconds.
88 */
89 6x void configure_iocp(unsigned gqcs_timeout_ms) noexcept
90 {
91 6x gqcs_timeout_ms_ = gqcs_timeout_ms;
92 6x }
93
94 /** Enable or disable single-threaded (lockless) mode.
95
96 When enabled, the dispatch mutex becomes a no-op.
97 Cross-thread post() is undefined behavior.
98 */
99 5x void configure_single_threaded(bool v) noexcept override
100 {
101 5x single_threaded_ = v;
102 5x dispatch_mutex_.set_enabled(!v);
103 5x }
104
105 /// Return true if single-threaded (lockless) mode is active.
106 bool is_single_threaded() const noexcept override
107 {
108 return single_threaded_;
109 }
110
111 /** Signal that an overlapped I/O operation is now pending.
112 Coordinates with do_one() via the ready_ CAS protocol. */
113 void on_pending(overlapped_op* op) const;
114
115 /** Post an immediate completion with pre-stored results.
116 Used for sync errors and noop paths. */
117 void on_completion(overlapped_op* op, DWORD error, DWORD bytes) const;
118
119 // Timer service integration
120 void set_timer_service(timer_service* svc);
121 void update_timeout();
122
123 private:
124 static void on_timer_changed(void* ctx);
125 void post_deferred_completions(op_queue& ops);
126 std::size_t do_one(unsigned long timeout_ms);
127
128 timer_service* timer_svc_ = nullptr;
129 void* iocp_;
130 mutable long outstanding_work_;
131 mutable long stopped_;
132 long stop_event_posted_;
133 mutable long dispatch_required_;
134 unsigned long gqcs_timeout_ms_ = 500;
135 bool single_threaded_ = false;
136
137 mutable win_mutex dispatch_mutex_;
138 mutable op_queue completed_ops_;
139 std::unique_ptr<win_timers> timers_;
140 std::unique_ptr<win_wait_reactor> wait_reactor_;
141 std::once_flag wait_reactor_once_;
142 std::atomic<bool> wait_reactor_ready_{false};
143
144 public:
145 /** Auxiliary select-based reactor for IOCP wait operations.
146
147 Lazily created on first access; lives for the lifetime of the
148 scheduler and is stopped+joined in ~win_scheduler. Used by
149 socket and acceptor wait() implementations whose readiness
150 cannot be expressed natively in IOCP (datagram-read,
151 acceptor-read, error-wait).
152 */
153 win_wait_reactor& wait_reactor();
154
155 /** Cancel a parked wait op only if the reactor exists.
156
157 Safe to call from any thread. If no wait op has ever been
158 registered, the reactor was never constructed, so there is
159 nothing to cancel and we avoid spinning up a thread + wakeup
160 socketpair on the cancel path. Acquire/release pairs with the
161 store in wait_reactor() so reads see a fully-constructed
162 reactor when the flag is true.
163 */
164 void cancel_wait_if_constructed(overlapped_op* op) noexcept;
165 };
166
167 /*
168 ARCHITECTURE NOTE: Function Pointer Dispatch
169
170 All I/O handles are registered with the IOCP using key_io (0).
171 Dispatch happens via the function pointer stored in each scheduler_op.
172
173 When GQCS returns with an OVERLAPPED*, we cast it to scheduler_op*
174 and call the function pointer directly - no virtual dispatch.
175
176 The completion_key enum values are used only for internal signals:
177 - key_io (0): Normal I/O completion, dispatch via func_
178 - key_wake_dispatch (1): Timer wakeup, check dispatch_required_
179 - key_shutdown (2): Stop signal
180 - key_result_stored (3): Results pre-stored in OVERLAPPED
181 */
182
183 namespace iocp {
184
185 // Max timeout for GQCS to allow periodic re-checking of conditions.
186 // Matches Asio's default_gqcs_timeout for pre-Vista compatibility.
187 inline constexpr unsigned long max_gqcs_timeout = 500;
188
189 struct BOOST_COROSIO_SYMBOL_VISIBLE scheduler_context
190 {
191 win_scheduler const* key;
192 scheduler_context* next;
193 };
194
195 inline thread_local_ptr<scheduler_context> context_stack;
196
197 struct thread_context_guard
198 {
199 scheduler_context frame_;
200
201 3111x explicit thread_context_guard(win_scheduler const* ctx) noexcept
202 3111x : frame_{ctx, context_stack.get()}
203 {
204 3111x context_stack.set(&frame_);
205 3111x }
206
207 3111x ~thread_context_guard() noexcept
208 {
209 3111x context_stack.set(frame_.next);
210 3111x }
211 };
212
213 } // namespace iocp
214
215 598x inline win_scheduler::win_scheduler(
216 598x capy::execution_context& ctx, int concurrency_hint)
217 598x : iocp_(nullptr)
218 598x , outstanding_work_(0)
219 598x , stopped_(0)
220 598x , stop_event_posted_(0)
221
1/1
✓ Branch 4 → 5 taken 598 times.
598x , dispatch_required_(0)
222 {
223 // concurrency_hint < 0 means use system default (DWORD(~0) = max)
224
2/3
✓ Branch 10 → 11 taken 598 times.
✗ Branch 10 → 12 not taken.
✓ Branch 13 → 14 taken 598 times.
598x iocp_ = ::CreateIoCompletionPort(
225 INVALID_HANDLE_VALUE, nullptr, 0,
226 static_cast<DWORD>(
227 concurrency_hint >= 0 ? concurrency_hint : DWORD(~0)));
228
229
1/2
✗ Branch 14 → 15 not taken.
✓ Branch 14 → 18 taken 598 times.
598x if (iocp_ == nullptr)
230 detail::throw_system_error(make_err(::GetLastError()));
231
232 // Create timer wakeup mechanism (tries NT native, falls back to thread)
233
1/1
✓ Branch 18 → 19 taken 598 times.
598x timers_ = make_win_timers(iocp_, &dispatch_required_);
234
235 // Connect timer service to scheduler
236
2/2
✓ Branch 21 → 22 taken 598 times.
✓ Branch 22 → 23 taken 598 times.
598x set_timer_service(&get_timer_service(ctx, *this));
237
238 // Initialize resolver service
239
1/1
✓ Branch 23 → 24 taken 598 times.
598x ctx.make_service<win_resolver_service>(*this);
240 598x }
241
242 // ~win_scheduler() and shutdown() are defined at the bottom of this
243 // header so the unique_ptr<win_wait_reactor>'s deleter and
244 // wait_reactor_->stop() see the type complete.
245
246 inline void
247 8083x win_scheduler::post(std::coroutine_handle<> h) const
248 {
249 struct post_handler final : scheduler_op
250 {
251 std::coroutine_handle<> h_;
252
253 8083x static void do_complete(
254 void* owner, scheduler_op* base, std::uint32_t, std::uint32_t)
255 {
256 8083x auto* self = static_cast<post_handler*>(base);
257
2/2
✓ Branch 2 → 3 taken 6 times.
✓ Branch 2 → 10 taken 8077 times.
8083x if (!owner)
258 {
259 // Shutdown path: destroy the coroutine frame synchronously.
260 //
261 // Bounded destruction invariant: the chain triggered by
262 // coro.destroy() is at most two levels deep:
263 // 1. task frame destroyed → ~io_awaitable_promise_base()
264 // destroys stored continuation (if != noop_coroutine)
265 // 2. continuation (trampoline) destroyed → final_suspend
266 // returns suspend_never, no further continuation
267 //
268 // If a future refactor adds deeper continuation chains,
269 // this would reintroduce re-entrant stack overflow risk.
270 #ifndef NDEBUG
271 static thread_local int destroy_depth = 0;
272 6x ++destroy_depth;
273
1/2
✗ Branch 3 → 4 not taken.
✓ Branch 3 → 5 taken 6 times.
6x BOOST_COROSIO_ASSERT(destroy_depth <= 2);
274 #endif
275 6x auto coro = self->h_;
276
1/2
✓ Branch 6 → 7 taken 6 times.
✗ Branch 6 → 8 not taken.
6x delete self;
277
1/1
✓ Branch 8 → 9 taken 6 times.
6x coro.destroy();
278 #ifndef NDEBUG
279 6x --destroy_depth;
280 #endif
281 6x return;
282 }
283 8077x auto coro = self->h_;
284
1/2
✓ Branch 10 → 11 taken 8077 times.
✗ Branch 10 → 12 not taken.
8077x delete self;
285 std::atomic_thread_fence(std::memory_order_acquire);
286
1/1
✓ Branch 13 → 14 taken 8077 times.
8077x coro.resume();
287 }
288
289 8083x explicit post_handler(std::coroutine_handle<> coro)
290 8083x : scheduler_op(&do_complete)
291 8083x , h_(coro)
292 {
293 8083x }
294 };
295
296 8083x auto* ph = new post_handler(h);
297 8083x ::InterlockedIncrement(&outstanding_work_);
298
299
1/2
✗ Branch 7 → 8 not taken.
✓ Branch 7 → 14 taken 8083 times.
8083x if (!::PostQueuedCompletionStatus(
300 8083x iocp_, 0, key_posted, reinterpret_cast<LPOVERLAPPED>(ph)))
301 {
302 std::lock_guard<win_mutex> lock(dispatch_mutex_);
303 completed_ops_.push(ph);
304 ::InterlockedExchange(&dispatch_required_, 1);
305 }
306 8083x }
307
308 inline void
309 4408x win_scheduler::post(scheduler_op* h) const
310 {
311 4408x ::InterlockedIncrement(&outstanding_work_);
312
313
1/2
✗ Branch 5 → 6 not taken.
✓ Branch 5 → 12 taken 4408 times.
4408x if (!::PostQueuedCompletionStatus(
314 4408x iocp_, 0, key_posted, reinterpret_cast<LPOVERLAPPED>(h)))
315 {
316 std::lock_guard<win_mutex> lock(dispatch_mutex_);
317 completed_ops_.push(h);
318 ::InterlockedExchange(&dispatch_required_, 1);
319 }
320 4408x }
321
322 inline bool
323 9952x win_scheduler::running_in_this_thread() const noexcept
324 {
325
2/2
✓ Branch 6 → 3 taken 3309 times.
✓ Branch 6 → 7 taken 6643 times.
9952x for (auto* c = iocp::context_stack.get(); c != nullptr; c = c->next)
326
1/2
✓ Branch 3 → 4 taken 3309 times.
✗ Branch 3 → 5 not taken.
3309x if (c->key == this)
327 3309x return true;
328 6643x return false;
329 }
330
331 inline void
332 556643x win_scheduler::work_started() noexcept
333 {
334 556643x ::InterlockedIncrement(&outstanding_work_);
335 556643x }
336
337 inline void
338 569118x win_scheduler::work_finished() noexcept
339 {
340
2/2
✓ Branch 4 → 5 taken 3087 times.
✓ Branch 4 → 6 taken 566031 times.
1138236x if (::InterlockedDecrement(&outstanding_work_) == 0)
341 3087x stop();
342 569118x }
343
344 inline void
345 544574x win_scheduler::on_pending(overlapped_op* op) const
346 {
347 // CAS: try to set ready_ from 0 to 1.
348 // If the old value was 1, GQCS already grabbed this op and stored
349 // results — we need to re-post so do_one() can dispatch it.
350
1/2
✗ Branch 4 → 5 not taken.
✓ Branch 4 → 16 taken 544574 times.
1089148x if (::InterlockedCompareExchange(&op->ready_, 1, 0) == 1)
351 {
352 if (!::PostQueuedCompletionStatus(
353 iocp_, 0, key_result_stored, static_cast<LPOVERLAPPED>(op)))
354 {
355 std::lock_guard<win_mutex> lock(dispatch_mutex_);
356 completed_ops_.push(op);
357 ::InterlockedExchange(&dispatch_required_, 1);
358 }
359 }
360 544574x }
361
362 inline void
363 45x win_scheduler::on_completion(overlapped_op* op, DWORD error, DWORD bytes) const
364 {
365 // Sync completion: pack results into op and post for dispatch.
366 45x op->ready_ = 1;
367 45x op->dwError = error;
368 45x op->bytes_transferred = bytes;
369
370
2/4
✓ Branch 2 → 3 taken 45 times.
✗ Branch 2 → 4 not taken.
✗ Branch 6 → 7 not taken.
✓ Branch 6 → 13 taken 45 times.
90x if (!::PostQueuedCompletionStatus(
371 45x iocp_, 0, key_result_stored, static_cast<LPOVERLAPPED>(op)))
372 {
373 std::lock_guard<win_mutex> lock(dispatch_mutex_);
374 completed_ops_.push(op);
375 ::InterlockedExchange(&dispatch_required_, 1);
376 }
377 45x }
378
379 inline void
380 6200x win_scheduler::stop()
381 {
382
2/2
✓ Branch 4 → 5 taken 3099 times.
✓ Branch 4 → 13 taken 3101 times.
12400x if (::InterlockedExchange(&stopped_, 1) == 0)
383 {
384
1/2
✓ Branch 7 → 8 taken 3099 times.
✗ Branch 7 → 13 not taken.
6198x if (::InterlockedExchange(&stop_event_posted_, 1) == 0)
385 {
386
1/2
✗ Branch 9 → 10 not taken.
✓ Branch 9 → 13 taken 3099 times.
3099x if (!::PostQueuedCompletionStatus(iocp_, 0, key_shutdown, nullptr))
387 {
388 // PQCS failure is non-fatal: stopped_ is already set.
389 // The run() loop will notice via the GQCS timeout
390 // (gqcs_timeout_ms_, default 500ms) and exit.
391 ::InterlockedExchange(&dispatch_required_, 1);
392 }
393 }
394 }
395 6200x }
396
397 inline bool
398 2829x win_scheduler::stopped() const noexcept
399 {
400 // equivalent to atomic read
401 5658x return ::InterlockedExchangeAdd(&stopped_, 0) != 0;
402 }
403
404 inline void
405 2767x win_scheduler::restart()
406 {
407 2767x ::InterlockedExchange(&stopped_, 0);
408 2767x ::InterlockedExchange(&stop_event_posted_, 0);
409 2767x }
410
411 inline std::size_t
412 3105x win_scheduler::run()
413 {
414
2/2
✓ Branch 4 → 5 taken 24 times.
✓ Branch 4 → 7 taken 3081 times.
6210x if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
415 {
416
1/1
✓ Branch 5 → 6 taken 24 times.
24x stop();
417 24x return 0;
418 }
419
420 3081x iocp::thread_context_guard ctx(this);
421
422 3081x std::size_t n = 0;
423 for (;;)
424 {
425
3/3
✓ Branch 9 → 10 taken 557073 times.
✓ Branch 10 → 11 taken 12 times.
✓ Branch 10 → 12 taken 557061 times.
557073x if (!do_one(INFINITE))
426 12x break;
427
1/2
✓ Branch 13 → 14 taken 557061 times.
✗ Branch 13 → 15 not taken.
557061x if (n != (std::numeric_limits<std::size_t>::max)())
428 557061x ++n;
429
2/2
✓ Branch 17 → 18 taken 3069 times.
✓ Branch 17 → 20 taken 553992 times.
1114122x if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
430 {
431
1/1
✓ Branch 18 → 19 taken 3069 times.
3069x stop();
432 3069x break;
433 }
434 }
435 3081x return n;
436 3081x }
437
438 inline std::size_t
439 2x win_scheduler::run_one()
440 {
441
1/2
✗ Branch 4 → 5 not taken.
✓ Branch 4 → 7 taken 2 times.
4x if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
442 {
443 stop();
444 return 0;
445 }
446
447 2x iocp::thread_context_guard ctx(this);
448
1/1
✓ Branch 8 → 9 taken 2 times.
2x return do_one(INFINITE);
449 2x }
450
451 inline std::size_t
452 26x win_scheduler::wait_one(long usec)
453 {
454
2/2
✓ Branch 4 → 5 taken 11 times.
✓ Branch 4 → 7 taken 15 times.
52x if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
455 {
456
1/1
✓ Branch 5 → 6 taken 11 times.
11x stop();
457 11x return 0;
458 }
459
460 15x iocp::thread_context_guard ctx(this);
461 15x unsigned long timeout_ms = INFINITE;
462
1/2
✓ Branch 8 → 9 taken 15 times.
✗ Branch 8 → 13 not taken.
15x if (usec >= 0)
463 {
464 15x auto ms = (static_cast<long long>(usec) + 999) / 1000;
465
1/2
✓ Branch 9 → 10 taken 15 times.
✗ Branch 9 → 11 not taken.
15x timeout_ms = ms >= 0xFFFFFFFELL ? static_cast<unsigned long>(0xFFFFFFFE)
466 : static_cast<unsigned long>(ms);
467 }
468
1/1
✓ Branch 13 → 14 taken 15 times.
15x return do_one(timeout_ms);
469 15x }
470
471 inline std::size_t
472 14x win_scheduler::poll()
473 {
474
2/2
✓ Branch 4 → 5 taken 3 times.
✓ Branch 4 → 7 taken 11 times.
28x if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
475 {
476
1/1
✓ Branch 5 → 6 taken 3 times.
3x stop();
477 3x return 0;
478 }
479
480 11x iocp::thread_context_guard ctx(this);
481
482 11x std::size_t n = 0;
483
3/3
✓ Branch 12 → 13 taken 32 times.
✓ Branch 13 → 9 taken 21 times.
✓ Branch 13 → 14 taken 11 times.
32x while (do_one(0))
484
1/2
✓ Branch 10 → 11 taken 21 times.
✗ Branch 10 → 12 not taken.
21x if (n != (std::numeric_limits<std::size_t>::max)())
485 21x ++n;
486 11x return n;
487 11x }
488
489 inline std::size_t
490 4x win_scheduler::poll_one()
491 {
492
2/2
✓ Branch 4 → 5 taken 2 times.
✓ Branch 4 → 7 taken 2 times.
8x if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
493 {
494
1/1
✓ Branch 5 → 6 taken 2 times.
2x stop();
495 2x return 0;
496 }
497
498 2x iocp::thread_context_guard ctx(this);
499
1/1
✓ Branch 8 → 9 taken 2 times.
2x return do_one(0);
500 2x }
501
502 inline void
503 1183x win_scheduler::post_deferred_completions(op_queue& ops)
504 {
505
1/2
✗ Branch 3 → 4 not taken.
✓ Branch 3 → 15 taken 1183 times.
1183x while (auto h = ops.pop())
506 {
507 if (::PostQueuedCompletionStatus(
508 iocp_, 0, key_posted, reinterpret_cast<LPOVERLAPPED>(h)))
509 continue;
510
511 // Out of resources, put the failed op and remaining ops back
512 ops.push(h);
513 std::lock_guard<win_mutex> lock(dispatch_mutex_);
514 completed_ops_.splice(ops);
515 ::InterlockedExchange(&dispatch_required_, 1);
516 return;
517 }
518 }
519
520 inline std::size_t
521 561081x win_scheduler::do_one(unsigned long timeout_ms)
522 {
523 for (;;)
524 {
525 // Check if we need to process timers or deferred ops
526
2/2
✓ Branch 4 → 5 taken 1183 times.
✓ Branch 4 → 13 taken 559898 times.
1122162x if (::InterlockedCompareExchange(&dispatch_required_, 0, 1) == 1)
527 {
528 1183x op_queue local_ops;
529 {
530 1183x std::lock_guard<win_mutex> lock(dispatch_mutex_);
531 1183x local_ops.splice(completed_ops_);
532 1183x }
533
1/1
✓ Branch 8 → 9 taken 1183 times.
1183x post_deferred_completions(local_ops);
534
535
1/2
✓ Branch 9 → 10 taken 1183 times.
✗ Branch 9 → 11 not taken.
1183x if (timer_svc_)
536
1/1
✓ Branch 10 → 11 taken 1183 times.
1183x timer_svc_->process_expired();
537
538
1/1
✓ Branch 11 → 12 taken 1183 times.
1183x update_timeout();
539 }
540
541 561081x DWORD bytes = 0;
542 561081x ULONG_PTR key = 0;
543 561081x LPOVERLAPPED overlapped = nullptr;
544
1/1
✓ Branch 13 → 14 taken 561081 times.
561081x ::SetLastError(0);
545
546
1/1
✓ Branch 17 → 18 taken 561081 times.
561081x BOOL result = ::GetQueuedCompletionStatus(
547 iocp_, &bytes, &key, &overlapped,
548
2/2
✓ Branch 14 → 15 taken 561035 times.
✓ Branch 14 → 16 taken 46 times.
561081x timeout_ms < gqcs_timeout_ms_ ? timeout_ms
549 : gqcs_timeout_ms_);
550
1/1
✓ Branch 18 → 19 taken 561081 times.
561081x DWORD dwError = ::GetLastError();
551
552 // Handle based on completion key
553
2/2
✓ Branch 19 → 20 taken 557098 times.
✓ Branch 19 → 39 taken 3983 times.
561081x if (overlapped)
554 {
555
2/2
✓ Branch 20 → 21 taken 2010 times.
✓ Branch 20 → 22 taken 555088 times.
557098x DWORD err = result ? 0 : dwError;
556
557
2/3
✓ Branch 23 → 24 taken 544619 times.
✓ Branch 23 → 35 taken 12479 times.
✗ Branch 23 → 38 not taken.
557098x switch (key)
558 {
559 544619x case key_io:
560 case key_result_stored:
561 {
562 544619x auto* ov_op = overlapped_to_op(overlapped);
563
564 // If key_result_stored, results are pre-stored in op fields
565
2/2
✓ Branch 25 → 26 taken 45 times.
✓ Branch 25 → 27 taken 544574 times.
544619x if (key == key_result_stored)
566 {
567 45x bytes = ov_op->bytes_transferred;
568 45x err = ov_op->dwError;
569 }
570
571 // Store GQCS results so on_pending() re-post has valid data
572 544619x ov_op->store_result(bytes, err);
573
574 // CAS: try to set ready_ from 0 to 1.
575 // If old value was 1, the initiator already returned
576 // (on_pending/on_completion set it) — safe to dispatch.
577 // If old value was 0, the initiator hasn't returned yet —
578 // skip dispatch; on_pending() will re-post.
579
1/2
✓ Branch 30 → 31 taken 544619 times.
✗ Branch 30 → 34 not taken.
1089238x if (::InterlockedCompareExchange(&ov_op->ready_, 1, 0) == 1)
580 {
581
1/1
✓ Branch 31 → 32 taken 544619 times.
544619x ov_op->complete(this, bytes, err);
582 544619x work_finished();
583 557124x return 1;
584 }
585 3920x continue;
586 }
587
588 12479x case key_posted:
589 {
590 // Posted scheduler_op*: overlapped is actually a scheduler_op*
591 12479x auto* op = reinterpret_cast<scheduler_op*>(overlapped);
592
1/1
✓ Branch 35 → 36 taken 12479 times.
12479x op->complete(this, bytes, err);
593 12479x work_finished();
594 12479x return 1;
595 }
596
597 default:
598 continue;
599 }
600 }
601
602 // Signal completions (no OVERLAPPED)
603
2/2
✓ Branch 39 → 40 taken 3936 times.
✓ Branch 39 → 53 taken 47 times.
3983x if (result)
604 {
605
2/3
✓ Branch 40 → 41 taken 1183 times.
✓ Branch 40 → 42 taken 2753 times.
✗ Branch 40 → 52 not taken.
3936x switch (key)
606 {
607 1183x case key_wake_dispatch:
608 // Timer wakeup - loop to check dispatch_required_
609 1183x continue;
610
611 2753x case key_shutdown:
612 2753x ::InterlockedExchange(&stop_event_posted_, 0);
613
2/2
✓ Branch 45 → 46 taken 16 times.
✓ Branch 45 → 51 taken 2737 times.
2753x if (stopped())
614 {
615 // Re-post for other waiting threads
616
1/2
✓ Branch 48 → 49 taken 16 times.
✗ Branch 48 → 50 not taken.
32x if (::InterlockedExchange(&stop_event_posted_, 1) == 0)
617 {
618
1/1
✓ Branch 49 → 50 taken 16 times.
16x ::PostQueuedCompletionStatus(
619 iocp_, 0, key_shutdown, nullptr);
620 }
621 16x return 0;
622 }
623 2737x continue;
624
625 default:
626 continue;
627 }
628 }
629
630 // Timeout or error
631
1/2
✗ Branch 53 → 54 not taken.
✓ Branch 53 → 56 taken 47 times.
47x if (dwError != WAIT_TIMEOUT)
632 detail::throw_system_error(make_err(dwError));
633
2/2
✓ Branch 56 → 57 taken 10 times.
✓ Branch 56 → 58 taken 37 times.
47x if (timeout_ms != INFINITE)
634 10x return 0;
635 // PQCS-failure fallback: stop() sets stopped_ and
636 // dispatch_required_ but if the key_shutdown post failed,
637 // no completion is ever dequeued. Catch it here on the
638 // periodic 500 ms GQCS timeout so run()/run_one() can exit.
639
1/2
✗ Branch 59 → 60 not taken.
✓ Branch 59 → 61 taken 37 times.
37x if (stopped())
640 return 0;
641 3957x }
642 }
643
644 inline void
645 2040x win_scheduler::on_timer_changed(void* ctx)
646 {
647 2040x static_cast<win_scheduler*>(ctx)->update_timeout();
648 2040x }
649
650 inline void
651 598x win_scheduler::set_timer_service(timer_service* svc)
652 {
653 598x timer_svc_ = svc;
654 // Pass 'this' as context - callback routes to correct instance
655 598x svc->set_on_earliest_changed(
656 598x timer_service::callback{this, &on_timer_changed});
657
1/2
✓ Branch 5 → 6 taken 598 times.
✗ Branch 5 → 8 not taken.
598x if (timers_)
658 598x timers_->start();
659 598x }
660
661 inline void
662 3223x win_scheduler::update_timeout()
663 {
664
3/6
✓ Branch 2 → 3 taken 3223 times.
✗ Branch 2 → 6 not taken.
✓ Branch 4 → 5 taken 3223 times.
✗ Branch 4 → 6 not taken.
✓ Branch 7 → 8 taken 3223 times.
✗ Branch 7 → 11 not taken.
3223x if (timer_svc_ && timers_)
665 3223x timers_->update_timeout(timer_svc_->nearest_expiry());
666 3223x }
667
668 } // namespace boost::corosio::detail
669
670 // Defer including the auxiliary wait reactor until the scheduler is
671 // fully defined, since the reactor's inline methods call back into
672 // win_scheduler. This also gives the dtor and wait_reactor() below a
673 // complete win_wait_reactor type for unique_ptr destruction and
674 // lazy construction.
675 //
676 // The macro lets win_wait_reactor.hpp diagnose direct inclusion
677 // (which would land it here with win_scheduler still incomplete).
678 #define BOOST_COROSIO_DETAIL_IOCP_WIN_SCHEDULER_BODY_DONE
679 #include <boost/corosio/native/detail/iocp/win_wait_reactor.hpp>
680
681 namespace boost::corosio::detail {
682
683 inline void
684 598x win_scheduler::shutdown()
685 {
686
1/2
✓ Branch 3 → 4 taken 598 times.
✗ Branch 3 → 6 not taken.
598x if (timers_)
687 598x timers_->stop();
688
689 // Drain timer heap before the work-counting loop. The timer_service
690 // was registered after this scheduler (nested make_service from our
691 // constructor), so execution_context::shutdown() calls us first.
692 // Asio avoids this by owning timer queues directly inside the
693 // scheduler; we bridge the gap by shutting down the timer service
694 // early. The subsequent call from execution_context is a no-op.
695
1/2
✓ Branch 6 → 7 taken 598 times.
✗ Branch 6 → 8 not taken.
598x if (timer_svc_)
696 598x timer_svc_->shutdown();
697
698 // Same problem for the auxiliary wait reactor: ops parked in it
699 // hold work_started credit. Stop the reactor early so its loop
700 // drains them as cancelled and the work counter can reach zero.
701
2/2
✓ Branch 9 → 10 taken 9 times.
✓ Branch 9 → 12 taken 589 times.
598x if (wait_reactor_ready_.load(std::memory_order_acquire))
702 9x wait_reactor_->stop();
703
704
2/2
✓ Branch 36 → 13 taken 17 times.
✓ Branch 36 → 37 taken 598 times.
1230x while (::InterlockedExchangeAdd(&outstanding_work_, 0) > 0)
705 {
706 17x op_queue ops;
707 {
708 17x std::lock_guard<win_mutex> lock(dispatch_mutex_);
709 17x ops.splice(completed_ops_);
710 17x }
711
712
1/2
✗ Branch 17 → 18 not taken.
✓ Branch 17 → 24 taken 17 times.
17x if (!ops.empty())
713 {
714 while (auto* h = ops.pop())
715 {
716 ::InterlockedDecrement(&outstanding_work_);
717 h->destroy();
718 }
719 }
720 else
721 {
722 DWORD bytes;
723 ULONG_PTR key;
724 LPOVERLAPPED overlapped;
725
1/1
✓ Branch 24 → 25 taken 17 times.
17x ::GetQueuedCompletionStatus(
726 iocp_, &bytes, &key, &overlapped, gqcs_timeout_ms_);
727
2/2
✓ Branch 25 → 26 taken 16 times.
✓ Branch 25 → 32 taken 1 time.
17x if (overlapped)
728 {
729 16x ::InterlockedDecrement(&outstanding_work_);
730
2/2
✓ Branch 28 → 29 taken 12 times.
✓ Branch 28 → 30 taken 4 times.
16x if (key == key_posted)
731 {
732 12x auto* op = reinterpret_cast<scheduler_op*>(overlapped);
733
1/1
✓ Branch 29 → 32 taken 12 times.
12x op->destroy();
734 }
735 else
736 {
737 4x auto* op = overlapped_to_op(overlapped);
738
1/1
✓ Branch 31 → 32 taken 4 times.
4x op->destroy();
739 }
740 }
741 }
742 }
743 598x }
744
745 1196x inline win_scheduler::~win_scheduler()
746 {
747
2/2
✓ Branch 3 → 4 taken 9 times.
✓ Branch 3 → 6 taken 589 times.
598x if (wait_reactor_)
748 9x wait_reactor_->stop();
749 598x wait_reactor_.reset();
750
751
1/2
✓ Branch 7 → 8 taken 598 times.
✗ Branch 7 → 9 not taken.
598x if (iocp_ != nullptr)
752 598x ::CloseHandle(iocp_);
753 1196x }
754
755 inline win_wait_reactor&
756 9x win_scheduler::wait_reactor()
757 {
758 // Lazy thread-safe init: multiple IOCP workers may race the first
759 // wait() call. wait_reactor_ready_ is set with release ordering
760 // after construction so cancel_wait_if_constructed can safely
761 // observe the reactor without forcing construction itself.
762
1/1
✓ Branch 2 → 3 taken 9 times.
9x std::call_once(wait_reactor_once_, [this] {
763
1/1
✓ Branch 2 → 3 taken 9 times.
9x wait_reactor_ = std::make_unique<win_wait_reactor>(*this);
764 9x wait_reactor_ready_.store(true, std::memory_order_release);
765 9x });
766 9x return *wait_reactor_;
767 }
768
769 inline void
770 20442x win_scheduler::cancel_wait_if_constructed(overlapped_op* op) noexcept
771 {
772
2/2
✓ Branch 3 → 4 taken 54 times.
✓ Branch 3 → 6 taken 20388 times.
20442x if (wait_reactor_ready_.load(std::memory_order_acquire))
773 54x wait_reactor_->cancel_wait(op);
774 20442x }
775
776 } // namespace boost::corosio::detail
777
778 #endif // BOOST_COROSIO_HAS_IOCP
779
780 #endif // BOOST_COROSIO_NATIVE_DETAIL_IOCP_WIN_SCHEDULER_HPP
781