include/boost/corosio/native/detail/iocp/win_scheduler.hpp

84.8% Lines (246/290) 97.1% List of functions (33/34) 72.0% Branches (118/164)
win_scheduler.hpp
f(x) Functions (34)
Function Calls Lines Branches Blocks
boost::corosio::detail::win_scheduler::native_handle() const :77 2838x 100.0% 100.0% boost::corosio::detail::win_scheduler::configure_iocp(unsigned int) :89 6x 100.0% 100.0% boost::corosio::detail::win_scheduler::configure_single_threaded(bool) :99 5x 100.0% 100.0% boost::corosio::detail::win_scheduler::is_single_threaded() const :106 0 0.0% 0.0% boost::corosio::detail::iocp::thread_context_guard::thread_context_guard(boost::corosio::detail::win_scheduler const*) :201 3159x 100.0% 100.0% boost::corosio::detail::iocp::thread_context_guard::~thread_context_guard() :207 3159x 100.0% 100.0% boost::corosio::detail::win_scheduler::win_scheduler(boost::capy::execution_context&, int) :215 567x 92.9% 72.7% 57.1% boost::corosio::detail::win_scheduler::post(std::__n4861::coroutine_handle<void>) const :247 8163x 60.0% 50.0% 53.8% boost::corosio::detail::win_scheduler::post(std::__n4861::coroutine_handle<void>) const::post_handler::do_complete(void*, boost::corosio::detail::scheduler_op*, unsigned int, unsigned int) :253 8163x 100.0% 70.0% 87.5% boost::corosio::detail::win_scheduler::post(std::__n4861::coroutine_handle<void>) const::post_handler::post_handler(std::__n4861::coroutine_handle<void>) :289 8163x 100.0% 100.0% boost::corosio::detail::win_scheduler::post(boost::corosio::detail::scheduler_op*) const :309 3560x 55.6% 50.0% 45.5% boost::corosio::detail::win_scheduler::running_in_this_thread() const :323 10082x 100.0% 75.0% 85.7% boost::corosio::detail::win_scheduler::work_started() :332 437727x 100.0% 100.0% boost::corosio::detail::win_scheduler::work_finished() :338 449434x 100.0% 100.0% 100.0% boost::corosio::detail::win_scheduler::on_pending(boost::corosio::detail::overlapped_op*) const :345 426413x 33.3% 16.7% 26.7% boost::corosio::detail::win_scheduler::on_completion(boost::corosio::detail::overlapped_op*, unsigned long, unsigned long) const :363 31x 63.6% 50.0% 41.7% boost::corosio::detail::win_scheduler::stop() :380 6286x 83.3% 66.7% 75.0% boost::corosio::detail::win_scheduler::stopped() const :398 2888x 100.0% 100.0% boost::corosio::detail::win_scheduler::restart() :405 2828x 100.0% 100.0% boost::corosio::detail::win_scheduler::run() :412 3149x 100.0% 90.9% 92.0% boost::corosio::detail::win_scheduler::run_one() :439 2x 71.4% 50.0% 71.4% boost::corosio::detail::win_scheduler::wait_one(long) :452 27x 100.0% 75.0% 84.2% boost::corosio::detail::win_scheduler::poll() :472 14x 100.0% 87.5% 88.9% boost::corosio::detail::win_scheduler::poll_one() :490 4x 100.0% 100.0% 85.7% boost::corosio::detail::win_scheduler::post_deferred_completions(boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&) :503 300x 20.0% 20.0% 31.2% boost::corosio::detail::win_scheduler::do_one(unsigned long) :521 438184x 86.7% 82.9% 88.1% boost::corosio::detail::win_scheduler::on_timer_changed(void*) :645 1169x 100.0% 100.0% boost::corosio::detail::win_scheduler::set_timer_service(boost::corosio::detail::timer_service*) :651 567x 100.0% 50.0% 100.0% boost::corosio::detail::win_scheduler::update_timeout() :662 1469x 100.0% 50.0% 90.0% boost::corosio::detail::win_scheduler::shutdown() :684 567x 84.6% 70.0% 81.6% boost::corosio::detail::win_scheduler::~win_scheduler() :745 1134x 100.0% 100.0% boost::corosio::detail::win_scheduler::wait_reactor() :756 6x 100.0% 100.0% boost::corosio::detail::win_scheduler::wait_reactor()::{lambda()#1}::operator()() const :762 6x 100.0% 100.0% 100.0% boost::corosio::detail::win_scheduler::cancel_wait_if_constructed(boost::corosio::detail::overlapped_op*) :770 20629x 100.0% 100.0% 100.0%
Line Branch TLA Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco ([email protected])
3 // Copyright (c) 2026 Steve Gerbino
4 //
5 // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 //
8 // Official repository: https://github.com/cppalliance/corosio
9 //
10
11 #ifndef BOOST_COROSIO_NATIVE_DETAIL_IOCP_WIN_SCHEDULER_HPP
12 #define BOOST_COROSIO_NATIVE_DETAIL_IOCP_WIN_SCHEDULER_HPP
13
14 #include <boost/corosio/detail/platform.hpp>
15
16 #if BOOST_COROSIO_HAS_IOCP
17
18 #include <boost/corosio/detail/config.hpp>
19 #include <boost/capy/ex/execution_context.hpp>
20
21 #include <boost/corosio/detail/scheduler.hpp>
22 #include <system_error>
23
24 #include <boost/corosio/detail/scheduler_op.hpp>
25 #include <boost/corosio/native/detail/iocp/win_completion_key.hpp>
26 #include <boost/corosio/native/detail/iocp/win_mutex.hpp>
27
28 #include <boost/corosio/native/detail/iocp/win_overlapped_op.hpp>
29 #include <boost/corosio/native/detail/iocp/win_timers.hpp>
30 #include <boost/corosio/detail/timer_service.hpp>
31 #include <boost/corosio/native/detail/iocp/win_resolver_service.hpp>
32 #include <boost/corosio/native/detail/make_err.hpp>
33 #include <boost/corosio/detail/except.hpp>
34 #include <boost/corosio/detail/thread_local_ptr.hpp>
35
36 #include <atomic>
37 #include <chrono>
38 #include <cstdint>
39 #include <limits>
40 #include <memory>
41 #include <mutex>
42
43 #include <boost/corosio/native/detail/iocp/win_windows.hpp>
44
45 namespace boost::corosio::detail {
46
47 // Forward declarations
48 struct overlapped_op;
49 class win_timers;
50 class win_wait_reactor;
51
52 class BOOST_COROSIO_DECL win_scheduler final
53 : public scheduler
54 , public capy::execution_context::service
55 {
56 public:
57 using key_type = scheduler;
58
59 win_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
60 ~win_scheduler();
61 win_scheduler(win_scheduler const&) = delete;
62 win_scheduler& operator=(win_scheduler const&) = delete;
63
64 void shutdown() override;
65 void post(std::coroutine_handle<> h) const override;
66 void post(scheduler_op* h) const override;
67 bool running_in_this_thread() const noexcept override;
68 void stop() override;
69 bool stopped() const noexcept override;
70 void restart() override;
71 std::size_t run() override;
72 std::size_t run_one() override;
73 std::size_t wait_one(long usec) override;
74 std::size_t poll() override;
75 std::size_t poll_one() override;
76
77 2838x void* native_handle() const noexcept
78 {
79 2838x return iocp_;
80 }
81
82 void work_started() noexcept override;
83 void work_finished() noexcept override;
84
85 /** Apply runtime IOCP configuration.
86
87 @param gqcs_timeout_ms Max GQCS blocking time in milliseconds.
88 */
89 6x void configure_iocp(unsigned gqcs_timeout_ms) noexcept
90 {
91 6x gqcs_timeout_ms_ = gqcs_timeout_ms;
92 6x }
93
94 /** Enable or disable single-threaded (lockless) mode.
95
96 When enabled, the dispatch mutex becomes a no-op.
97 Cross-thread post() is undefined behavior.
98 */
99 5x void configure_single_threaded(bool v) noexcept override
100 {
101 5x single_threaded_ = v;
102 5x dispatch_mutex_.set_enabled(!v);
103 5x }
104
105 /// Return true if single-threaded (lockless) mode is active.
106 bool is_single_threaded() const noexcept override
107 {
108 return single_threaded_;
109 }
110
111 /** Signal that an overlapped I/O operation is now pending.
112 Coordinates with do_one() via the ready_ CAS protocol. */
113 void on_pending(overlapped_op* op) const;
114
115 /** Post an immediate completion with pre-stored results.
116 Used for sync errors and noop paths. */
117 void on_completion(overlapped_op* op, DWORD error, DWORD bytes) const;
118
119 // Timer service integration
120 void set_timer_service(timer_service* svc);
121 void update_timeout();
122
123 private:
124 static void on_timer_changed(void* ctx);
125 void post_deferred_completions(op_queue& ops);
126 std::size_t do_one(unsigned long timeout_ms);
127
128 timer_service* timer_svc_ = nullptr;
129 void* iocp_;
130 mutable long outstanding_work_;
131 mutable long stopped_;
132 long stop_event_posted_;
133 mutable long dispatch_required_;
134 unsigned long gqcs_timeout_ms_ = 500;
135 bool single_threaded_ = false;
136
137 mutable win_mutex dispatch_mutex_;
138 mutable op_queue completed_ops_;
139 std::unique_ptr<win_timers> timers_;
140 std::unique_ptr<win_wait_reactor> wait_reactor_;
141 std::once_flag wait_reactor_once_;
142 std::atomic<bool> wait_reactor_ready_{false};
143
144 public:
145 /** Auxiliary select-based reactor for IOCP wait operations.
146
147 Lazily created on first access; lives for the lifetime of the
148 scheduler and is stopped+joined in ~win_scheduler. Used by
149 socket and acceptor wait() implementations whose readiness
150 cannot be expressed natively in IOCP (datagram-read,
151 acceptor-read, error-wait).
152 */
153 win_wait_reactor& wait_reactor();
154
155 /** Cancel a parked wait op only if the reactor exists.
156
157 Safe to call from any thread. If no wait op has ever been
158 registered, the reactor was never constructed, so there is
159 nothing to cancel and we avoid spinning up a thread + wakeup
160 socketpair on the cancel path. Acquire/release pairs with the
161 store in wait_reactor() so reads see a fully-constructed
162 reactor when the flag is true.
163 */
164 void cancel_wait_if_constructed(overlapped_op* op) noexcept;
165 };
166
167 /*
168 ARCHITECTURE NOTE: Function Pointer Dispatch
169
170 All I/O handles are registered with the IOCP using key_io (0).
171 Dispatch happens via the function pointer stored in each scheduler_op.
172
173 When GQCS returns with an OVERLAPPED*, we cast it to scheduler_op*
174 and call the function pointer directly - no virtual dispatch.
175
176 The completion_key enum values are used only for internal signals:
177 - key_io (0): Normal I/O completion, dispatch via func_
178 - key_wake_dispatch (1): Timer wakeup, check dispatch_required_
179 - key_shutdown (2): Stop signal
180 - key_result_stored (3): Results pre-stored in OVERLAPPED
181 */
182
183 namespace iocp {
184
185 // Max timeout for GQCS to allow periodic re-checking of conditions.
186 // Matches Asio's default_gqcs_timeout for pre-Vista compatibility.
187 inline constexpr unsigned long max_gqcs_timeout = 500;
188
189 struct BOOST_COROSIO_SYMBOL_VISIBLE scheduler_context
190 {
191 win_scheduler const* key;
192 scheduler_context* next;
193 };
194
195 inline thread_local_ptr<scheduler_context> context_stack;
196
197 struct thread_context_guard
198 {
199 scheduler_context frame_;
200
201 3159x explicit thread_context_guard(win_scheduler const* ctx) noexcept
202 3159x : frame_{ctx, context_stack.get()}
203 {
204 3159x context_stack.set(&frame_);
205 3159x }
206
207 3159x ~thread_context_guard() noexcept
208 {
209 3159x context_stack.set(frame_.next);
210 3159x }
211 };
212
213 } // namespace iocp
214
215 567x inline win_scheduler::win_scheduler(
216 567x capy::execution_context& ctx, int concurrency_hint)
217 567x : iocp_(nullptr)
218 567x , outstanding_work_(0)
219 567x , stopped_(0)
220 567x , stop_event_posted_(0)
221
1/1
✓ Branch 4 → 5 taken 567 times.
567x , dispatch_required_(0)
222 {
223 // concurrency_hint < 0 means use system default (DWORD(~0) = max)
224
2/3
✓ Branch 10 → 11 taken 567 times.
✗ Branch 10 → 12 not taken.
✓ Branch 13 → 14 taken 567 times.
567x iocp_ = ::CreateIoCompletionPort(
225 INVALID_HANDLE_VALUE, nullptr, 0,
226 static_cast<DWORD>(
227 concurrency_hint >= 0 ? concurrency_hint : DWORD(~0)));
228
229
1/2
✗ Branch 14 → 15 not taken.
✓ Branch 14 → 18 taken 567 times.
567x if (iocp_ == nullptr)
230 detail::throw_system_error(make_err(::GetLastError()));
231
232 // Create timer wakeup mechanism (tries NT native, falls back to thread)
233
1/1
✓ Branch 18 → 19 taken 567 times.
567x timers_ = make_win_timers(iocp_, &dispatch_required_);
234
235 // Connect timer service to scheduler
236
2/2
✓ Branch 21 → 22 taken 567 times.
✓ Branch 22 → 23 taken 567 times.
567x set_timer_service(&get_timer_service(ctx, *this));
237
238 // Initialize resolver service
239
1/1
✓ Branch 23 → 24 taken 567 times.
567x ctx.make_service<win_resolver_service>(*this);
240 567x }
241
242 // ~win_scheduler() and shutdown() are defined at the bottom of this
243 // header so the unique_ptr<win_wait_reactor>'s deleter and
244 // wait_reactor_->stop() see the type complete.
245
246 inline void
247 8163x win_scheduler::post(std::coroutine_handle<> h) const
248 {
249 struct post_handler final : scheduler_op
250 {
251 std::coroutine_handle<> h_;
252
253 8163x static void do_complete(
254 void* owner, scheduler_op* base, std::uint32_t, std::uint32_t)
255 {
256 8163x auto* self = static_cast<post_handler*>(base);
257
2/2
✓ Branch 2 → 3 taken 6 times.
✓ Branch 2 → 10 taken 8157 times.
8163x if (!owner)
258 {
259 // Shutdown path: destroy the coroutine frame synchronously.
260 //
261 // Bounded destruction invariant: the chain triggered by
262 // coro.destroy() is at most two levels deep:
263 // 1. task frame destroyed → ~io_awaitable_promise_base()
264 // destroys stored continuation (if != noop_coroutine)
265 // 2. continuation (trampoline) destroyed → final_suspend
266 // returns suspend_never, no further continuation
267 //
268 // If a future refactor adds deeper continuation chains,
269 // this would reintroduce re-entrant stack overflow risk.
270 #ifndef NDEBUG
271 static thread_local int destroy_depth = 0;
272 6x ++destroy_depth;
273
1/2
✗ Branch 3 → 4 not taken.
✓ Branch 3 → 5 taken 6 times.
6x BOOST_COROSIO_ASSERT(destroy_depth <= 2);
274 #endif
275 6x auto coro = self->h_;
276
1/2
✓ Branch 6 → 7 taken 6 times.
✗ Branch 6 → 8 not taken.
6x delete self;
277
1/1
✓ Branch 8 → 9 taken 6 times.
6x coro.destroy();
278 #ifndef NDEBUG
279 6x --destroy_depth;
280 #endif
281 6x return;
282 }
283 8157x auto coro = self->h_;
284
1/2
✓ Branch 10 → 11 taken 8157 times.
✗ Branch 10 → 12 not taken.
8157x delete self;
285 std::atomic_thread_fence(std::memory_order_acquire);
286
1/1
✓ Branch 13 → 14 taken 8157 times.
8157x coro.resume();
287 }
288
289 8163x explicit post_handler(std::coroutine_handle<> coro)
290 8163x : scheduler_op(&do_complete)
291 8163x , h_(coro)
292 {
293 8163x }
294 };
295
296 8163x auto* ph = new post_handler(h);
297 8163x ::InterlockedIncrement(&outstanding_work_);
298
299
1/2
✗ Branch 7 → 8 not taken.
✓ Branch 7 → 14 taken 8163 times.
8163x if (!::PostQueuedCompletionStatus(
300 8163x iocp_, 0, key_posted, reinterpret_cast<LPOVERLAPPED>(ph)))
301 {
302 std::lock_guard<win_mutex> lock(dispatch_mutex_);
303 completed_ops_.push(ph);
304 ::InterlockedExchange(&dispatch_required_, 1);
305 }
306 8163x }
307
308 inline void
309 3560x win_scheduler::post(scheduler_op* h) const
310 {
311 3560x ::InterlockedIncrement(&outstanding_work_);
312
313
1/2
✗ Branch 5 → 6 not taken.
✓ Branch 5 → 12 taken 3560 times.
3560x if (!::PostQueuedCompletionStatus(
314 3560x iocp_, 0, key_posted, reinterpret_cast<LPOVERLAPPED>(h)))
315 {
316 std::lock_guard<win_mutex> lock(dispatch_mutex_);
317 completed_ops_.push(h);
318 ::InterlockedExchange(&dispatch_required_, 1);
319 }
320 3560x }
321
322 inline bool
323 10082x win_scheduler::running_in_this_thread() const noexcept
324 {
325
2/2
✓ Branch 6 → 3 taken 3359 times.
✓ Branch 6 → 7 taken 6723 times.
10082x for (auto* c = iocp::context_stack.get(); c != nullptr; c = c->next)
326
1/2
✓ Branch 3 → 4 taken 3359 times.
✗ Branch 3 → 5 not taken.
3359x if (c->key == this)
327 3359x return true;
328 6723x return false;
329 }
330
331 inline void
332 437727x win_scheduler::work_started() noexcept
333 {
334 437727x ::InterlockedIncrement(&outstanding_work_);
335 437727x }
336
337 inline void
338 449434x win_scheduler::work_finished() noexcept
339 {
340
2/2
✓ Branch 4 → 5 taken 3131 times.
✓ Branch 4 → 6 taken 446303 times.
898868x if (::InterlockedDecrement(&outstanding_work_) == 0)
341 3131x stop();
342 449434x }
343
344 inline void
345 426413x win_scheduler::on_pending(overlapped_op* op) const
346 {
347 // CAS: try to set ready_ from 0 to 1.
348 // If the old value was 1, GQCS already grabbed this op and stored
349 // results — we need to re-post so do_one() can dispatch it.
350
1/2
✗ Branch 4 → 5 not taken.
✓ Branch 4 → 16 taken 426413 times.
852826x if (::InterlockedCompareExchange(&op->ready_, 1, 0) == 1)
351 {
352 if (!::PostQueuedCompletionStatus(
353 iocp_, 0, key_result_stored, static_cast<LPOVERLAPPED>(op)))
354 {
355 std::lock_guard<win_mutex> lock(dispatch_mutex_);
356 completed_ops_.push(op);
357 ::InterlockedExchange(&dispatch_required_, 1);
358 }
359 }
360 426413x }
361
362 inline void
363 31x win_scheduler::on_completion(overlapped_op* op, DWORD error, DWORD bytes) const
364 {
365 // Sync completion: pack results into op and post for dispatch.
366 31x op->ready_ = 1;
367 31x op->dwError = error;
368 31x op->bytes_transferred = bytes;
369
370
2/4
✓ Branch 2 → 3 taken 31 times.
✗ Branch 2 → 4 not taken.
✗ Branch 6 → 7 not taken.
✓ Branch 6 → 13 taken 31 times.
62x if (!::PostQueuedCompletionStatus(
371 31x iocp_, 0, key_result_stored, static_cast<LPOVERLAPPED>(op)))
372 {
373 std::lock_guard<win_mutex> lock(dispatch_mutex_);
374 completed_ops_.push(op);
375 ::InterlockedExchange(&dispatch_required_, 1);
376 }
377 31x }
378
379 inline void
380 6286x win_scheduler::stop()
381 {
382
2/2
✓ Branch 4 → 5 taken 3143 times.
✓ Branch 4 → 13 taken 3143 times.
12572x if (::InterlockedExchange(&stopped_, 1) == 0)
383 {
384
1/2
✓ Branch 7 → 8 taken 3143 times.
✗ Branch 7 → 13 not taken.
6286x if (::InterlockedExchange(&stop_event_posted_, 1) == 0)
385 {
386
1/2
✗ Branch 9 → 10 not taken.
✓ Branch 9 → 13 taken 3143 times.
3143x if (!::PostQueuedCompletionStatus(iocp_, 0, key_shutdown, nullptr))
387 {
388 // PQCS failure is non-fatal: stopped_ is already set.
389 // The run() loop will notice via the GQCS timeout
390 // (gqcs_timeout_ms_, default 500ms) and exit.
391 ::InterlockedExchange(&dispatch_required_, 1);
392 }
393 }
394 }
395 6286x }
396
397 inline bool
398 2888x win_scheduler::stopped() const noexcept
399 {
400 // equivalent to atomic read
401 5776x return ::InterlockedExchangeAdd(&stopped_, 0) != 0;
402 }
403
404 inline void
405 2828x win_scheduler::restart()
406 {
407 2828x ::InterlockedExchange(&stopped_, 0);
408 2828x ::InterlockedExchange(&stop_event_posted_, 0);
409 2828x }
410
411 inline std::size_t
412 3149x win_scheduler::run()
413 {
414
2/2
✓ Branch 4 → 5 taken 21 times.
✓ Branch 4 → 7 taken 3128 times.
6298x if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
415 {
416
1/1
✓ Branch 5 → 6 taken 21 times.
21x stop();
417 21x return 0;
418 }
419
420 3128x iocp::thread_context_guard ctx(this);
421
422 3128x std::size_t n = 0;
423 for (;;)
424 {
425
3/3
✓ Branch 9 → 10 taken 438132 times.
✓ Branch 10 → 11 taken 14 times.
✓ Branch 10 → 12 taken 438118 times.
438132x if (!do_one(INFINITE))
426 14x break;
427
1/2
✓ Branch 13 → 14 taken 438118 times.
✗ Branch 13 → 15 not taken.
438118x if (n != (std::numeric_limits<std::size_t>::max)())
428 438118x ++n;
429
2/2
✓ Branch 17 → 18 taken 3114 times.
✓ Branch 17 → 20 taken 435004 times.
876236x if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
430 {
431
1/1
✓ Branch 18 → 19 taken 3114 times.
3114x stop();
432 3114x break;
433 }
434 }
435 3128x return n;
436 3128x }
437
438 inline std::size_t
439 2x win_scheduler::run_one()
440 {
441
1/2
✗ Branch 4 → 5 not taken.
✓ Branch 4 → 7 taken 2 times.
4x if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
442 {
443 stop();
444 return 0;
445 }
446
447 2x iocp::thread_context_guard ctx(this);
448
1/1
✓ Branch 8 → 9 taken 2 times.
2x return do_one(INFINITE);
449 2x }
450
451 inline std::size_t
452 27x win_scheduler::wait_one(long usec)
453 {
454
2/2
✓ Branch 4 → 5 taken 11 times.
✓ Branch 4 → 7 taken 16 times.
54x if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
455 {
456
1/1
✓ Branch 5 → 6 taken 11 times.
11x stop();
457 11x return 0;
458 }
459
460 16x iocp::thread_context_guard ctx(this);
461 16x unsigned long timeout_ms = INFINITE;
462
1/2
✓ Branch 8 → 9 taken 16 times.
✗ Branch 8 → 13 not taken.
16x if (usec >= 0)
463 {
464 16x auto ms = (static_cast<long long>(usec) + 999) / 1000;
465
1/2
✓ Branch 9 → 10 taken 16 times.
✗ Branch 9 → 11 not taken.
16x timeout_ms = ms >= 0xFFFFFFFELL ? static_cast<unsigned long>(0xFFFFFFFE)
466 : static_cast<unsigned long>(ms);
467 }
468
1/1
✓ Branch 13 → 14 taken 16 times.
16x return do_one(timeout_ms);
469 16x }
470
471 inline std::size_t
472 14x win_scheduler::poll()
473 {
474
2/2
✓ Branch 4 → 5 taken 3 times.
✓ Branch 4 → 7 taken 11 times.
28x if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
475 {
476
1/1
✓ Branch 5 → 6 taken 3 times.
3x stop();
477 3x return 0;
478 }
479
480 11x iocp::thread_context_guard ctx(this);
481
482 11x std::size_t n = 0;
483
3/3
✓ Branch 12 → 13 taken 32 times.
✓ Branch 13 → 9 taken 21 times.
✓ Branch 13 → 14 taken 11 times.
32x while (do_one(0))
484
1/2
✓ Branch 10 → 11 taken 21 times.
✗ Branch 10 → 12 not taken.
21x if (n != (std::numeric_limits<std::size_t>::max)())
485 21x ++n;
486 11x return n;
487 11x }
488
489 inline std::size_t
490 4x win_scheduler::poll_one()
491 {
492
2/2
✓ Branch 4 → 5 taken 2 times.
✓ Branch 4 → 7 taken 2 times.
8x if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
493 {
494
1/1
✓ Branch 5 → 6 taken 2 times.
2x stop();
495 2x return 0;
496 }
497
498 2x iocp::thread_context_guard ctx(this);
499
1/1
✓ Branch 8 → 9 taken 2 times.
2x return do_one(0);
500 2x }
501
502 inline void
503 300x win_scheduler::post_deferred_completions(op_queue& ops)
504 {
505
1/2
✗ Branch 3 → 4 not taken.
✓ Branch 3 → 15 taken 300 times.
300x while (auto h = ops.pop())
506 {
507 if (::PostQueuedCompletionStatus(
508 iocp_, 0, key_posted, reinterpret_cast<LPOVERLAPPED>(h)))
509 continue;
510
511 // Out of resources, put the failed op and remaining ops back
512 ops.push(h);
513 std::lock_guard<win_mutex> lock(dispatch_mutex_);
514 completed_ops_.splice(ops);
515 ::InterlockedExchange(&dispatch_required_, 1);
516 return;
517 }
518 }
519
520 inline std::size_t
521 441314x win_scheduler::do_one(unsigned long timeout_ms)
522 {
523 for (;;)
524 {
525 // Check if we need to process timers or deferred ops
526
2/2
✓ Branch 4 → 5 taken 300 times.
✓ Branch 4 → 13 taken 441014 times.
882628x if (::InterlockedCompareExchange(&dispatch_required_, 0, 1) == 1)
527 {
528 300x op_queue local_ops;
529 {
530 300x std::lock_guard<win_mutex> lock(dispatch_mutex_);
531 300x local_ops.splice(completed_ops_);
532 300x }
533
1/1
✓ Branch 8 → 9 taken 300 times.
300x post_deferred_completions(local_ops);
534
535
1/2
✓ Branch 9 → 10 taken 300 times.
✗ Branch 9 → 11 not taken.
300x if (timer_svc_)
536
1/1
✓ Branch 10 → 11 taken 300 times.
300x timer_svc_->process_expired();
537
538
1/1
✓ Branch 11 → 12 taken 300 times.
300x update_timeout();
539 }
540
541 441314x DWORD bytes = 0;
542 441314x ULONG_PTR key = 0;
543 441314x LPOVERLAPPED overlapped = nullptr;
544
1/1
✓ Branch 13 → 14 taken 441314 times.
441314x ::SetLastError(0);
545
546
1/1
✓ Branch 17 → 18 taken 441314 times.
441314x BOOL result = ::GetQueuedCompletionStatus(
547 iocp_, &bytes, &key, &overlapped,
548
2/2
✓ Branch 14 → 15 taken 441267 times.
✓ Branch 14 → 16 taken 47 times.
441314x timeout_ms < gqcs_timeout_ms_ ? timeout_ms
549 : gqcs_timeout_ms_);
550
1/1
✓ Branch 18 → 19 taken 441314 times.
441314x DWORD dwError = ::GetLastError();
551
552 // Handle based on completion key
553
2/2
✓ Branch 19 → 20 taken 438155 times.
✓ Branch 19 → 39 taken 3159 times.
441314x if (overlapped)
554 {
555
2/2
✓ Branch 20 → 21 taken 2029 times.
✓ Branch 20 → 22 taken 436126 times.
438155x DWORD err = result ? 0 : dwError;
556
557
2/3
✓ Branch 23 → 24 taken 426444 times.
✓ Branch 23 → 35 taken 11711 times.
✗ Branch 23 → 38 not taken.
438155x switch (key)
558 {
559 426444x case key_io:
560 case key_result_stored:
561 {
562 426444x auto* ov_op = overlapped_to_op(overlapped);
563
564 // If key_result_stored, results are pre-stored in op fields
565
2/2
✓ Branch 25 → 26 taken 31 times.
✓ Branch 25 → 27 taken 426413 times.
426444x if (key == key_result_stored)
566 {
567 31x bytes = ov_op->bytes_transferred;
568 31x err = ov_op->dwError;
569 }
570
571 // Store GQCS results so on_pending() re-post has valid data
572 426444x ov_op->store_result(bytes, err);
573
574 // CAS: try to set ready_ from 0 to 1.
575 // If old value was 1, the initiator already returned
576 // (on_pending/on_completion set it) — safe to dispatch.
577 // If old value was 0, the initiator hasn't returned yet —
578 // skip dispatch; on_pending() will re-post.
579
1/2
✓ Branch 30 → 31 taken 426444 times.
✗ Branch 30 → 34 not taken.
852888x if (::InterlockedCompareExchange(&ov_op->ready_, 1, 0) == 1)
580 {
581
1/1
✓ Branch 31 → 32 taken 426444 times.
426444x ov_op->complete(this, bytes, err);
582 426444x work_finished();
583 438184x return 1;
584 }
585 3098x continue;
586 }
587
588 11711x case key_posted:
589 {
590 // Posted scheduler_op*: overlapped is actually a scheduler_op*
591 11711x auto* op = reinterpret_cast<scheduler_op*>(overlapped);
592
1/1
✓ Branch 35 → 36 taken 11711 times.
11711x op->complete(this, bytes, err);
593 11711x work_finished();
594 11711x return 1;
595 }
596
597 default:
598 continue;
599 }
600 }
601
602 // Signal completions (no OVERLAPPED)
603
2/2
✓ Branch 39 → 40 taken 3116 times.
✓ Branch 39 → 53 taken 43 times.
3159x if (result)
604 {
605
2/3
✓ Branch 40 → 41 taken 300 times.
✓ Branch 40 → 42 taken 2816 times.
✗ Branch 40 → 52 not taken.
3116x switch (key)
606 {
607 300x case key_wake_dispatch:
608 // Timer wakeup - loop to check dispatch_required_
609 300x continue;
610
611 2816x case key_shutdown:
612 2816x ::InterlockedExchange(&stop_event_posted_, 0);
613
2/2
✓ Branch 45 → 46 taken 18 times.
✓ Branch 45 → 51 taken 2798 times.
2816x if (stopped())
614 {
615 // Re-post for other waiting threads
616
1/2
✓ Branch 48 → 49 taken 18 times.
✗ Branch 48 → 50 not taken.
36x if (::InterlockedExchange(&stop_event_posted_, 1) == 0)
617 {
618
1/1
✓ Branch 49 → 50 taken 18 times.
18x ::PostQueuedCompletionStatus(
619 iocp_, 0, key_shutdown, nullptr);
620 }
621 18x return 0;
622 }
623 2798x continue;
624
625 default:
626 continue;
627 }
628 }
629
630 // Timeout or error
631
1/2
✗ Branch 53 → 54 not taken.
✓ Branch 53 → 56 taken 43 times.
43x if (dwError != WAIT_TIMEOUT)
632 detail::throw_system_error(make_err(dwError));
633
2/2
✓ Branch 56 → 57 taken 11 times.
✓ Branch 56 → 58 taken 32 times.
43x if (timeout_ms != INFINITE)
634 11x return 0;
635 // PQCS-failure fallback: stop() sets stopped_ and
636 // dispatch_required_ but if the key_shutdown post failed,
637 // no completion is ever dequeued. Catch it here on the
638 // periodic 500 ms GQCS timeout so run()/run_one() can exit.
639
1/2
✗ Branch 59 → 60 not taken.
✓ Branch 59 → 61 taken 32 times.
32x if (stopped())
640 return 0;
641 3130x }
642 }
643
644 inline void
645 1169x win_scheduler::on_timer_changed(void* ctx)
646 {
647 1169x static_cast<win_scheduler*>(ctx)->update_timeout();
648 1169x }
649
650 inline void
651 567x win_scheduler::set_timer_service(timer_service* svc)
652 {
653 567x timer_svc_ = svc;
654 // Pass 'this' as context - callback routes to correct instance
655 567x svc->set_on_earliest_changed(
656 567x timer_service::callback{this, &on_timer_changed});
657
1/2
✓ Branch 5 → 6 taken 567 times.
✗ Branch 5 → 8 not taken.
567x if (timers_)
658 567x timers_->start();
659 567x }
660
661 inline void
662 1469x win_scheduler::update_timeout()
663 {
664
3/6
✓ Branch 2 → 3 taken 1469 times.
✗ Branch 2 → 6 not taken.
✓ Branch 4 → 5 taken 1469 times.
✗ Branch 4 → 6 not taken.
✓ Branch 7 → 8 taken 1469 times.
✗ Branch 7 → 11 not taken.
1469x if (timer_svc_ && timers_)
665 1469x timers_->update_timeout(timer_svc_->nearest_expiry());
666 1469x }
667
668 } // namespace boost::corosio::detail
669
670 // Defer including the auxiliary wait reactor until the scheduler is
671 // fully defined, since the reactor's inline methods call back into
672 // win_scheduler. This also gives the dtor and wait_reactor() below a
673 // complete win_wait_reactor type for unique_ptr destruction and
674 // lazy construction.
675 //
676 // The macro lets win_wait_reactor.hpp diagnose direct inclusion
677 // (which would land it here with win_scheduler still incomplete).
678 #define BOOST_COROSIO_DETAIL_IOCP_WIN_SCHEDULER_BODY_DONE
679 #include <boost/corosio/native/detail/iocp/win_wait_reactor.hpp>
680
681 namespace boost::corosio::detail {
682
683 inline void
684 567x win_scheduler::shutdown()
685 {
686
1/2
✓ Branch 3 → 4 taken 567 times.
✗ Branch 3 → 6 not taken.
567x if (timers_)
687 567x timers_->stop();
688
689 // Drain timer heap before the work-counting loop. The timer_service
690 // was registered after this scheduler (nested make_service from our
691 // constructor), so execution_context::shutdown() calls us first.
692 // Asio avoids this by owning timer queues directly inside the
693 // scheduler; we bridge the gap by shutting down the timer service
694 // early. The subsequent call from execution_context is a no-op.
695
1/2
✓ Branch 6 → 7 taken 567 times.
✗ Branch 6 → 8 not taken.
567x if (timer_svc_)
696 567x timer_svc_->shutdown();
697
698 // Same problem for the auxiliary wait reactor: ops parked in it
699 // hold work_started credit. Stop the reactor early so its loop
700 // drains them as cancelled and the work counter can reach zero.
701
2/2
✓ Branch 9 → 10 taken 6 times.
✓ Branch 9 → 12 taken 561 times.
567x if (wait_reactor_ready_.load(std::memory_order_acquire))
702 6x wait_reactor_->stop();
703
704
2/2
✓ Branch 36 → 13 taken 17 times.
✓ Branch 36 → 37 taken 567 times.
1168x while (::InterlockedExchangeAdd(&outstanding_work_, 0) > 0)
705 {
706 17x op_queue ops;
707 {
708 17x std::lock_guard<win_mutex> lock(dispatch_mutex_);
709 17x ops.splice(completed_ops_);
710 17x }
711
712
1/2
✗ Branch 17 → 18 not taken.
✓ Branch 17 → 24 taken 17 times.
17x if (!ops.empty())
713 {
714 while (auto* h = ops.pop())
715 {
716 ::InterlockedDecrement(&outstanding_work_);
717 h->destroy();
718 }
719 }
720 else
721 {
722 DWORD bytes;
723 ULONG_PTR key;
724 LPOVERLAPPED overlapped;
725
1/1
✓ Branch 24 → 25 taken 17 times.
17x ::GetQueuedCompletionStatus(
726 iocp_, &bytes, &key, &overlapped, gqcs_timeout_ms_);
727
2/2
✓ Branch 25 → 26 taken 16 times.
✓ Branch 25 → 32 taken 1 time.
17x if (overlapped)
728 {
729 16x ::InterlockedDecrement(&outstanding_work_);
730
2/2
✓ Branch 28 → 29 taken 12 times.
✓ Branch 28 → 30 taken 4 times.
16x if (key == key_posted)
731 {
732 12x auto* op = reinterpret_cast<scheduler_op*>(overlapped);
733
1/1
✓ Branch 29 → 32 taken 12 times.
12x op->destroy();
734 }
735 else
736 {
737 4x auto* op = overlapped_to_op(overlapped);
738
1/1
✓ Branch 31 → 32 taken 4 times.
4x op->destroy();
739 }
740 }
741 }
742 }
743 567x }
744
745 1134x inline win_scheduler::~win_scheduler()
746 {
747
2/2
✓ Branch 3 → 4 taken 6 times.
✓ Branch 3 → 6 taken 561 times.
567x if (wait_reactor_)
748 6x wait_reactor_->stop();
749 567x wait_reactor_.reset();
750
751
1/2
✓ Branch 7 → 8 taken 567 times.
✗ Branch 7 → 9 not taken.
567x if (iocp_ != nullptr)
752 567x ::CloseHandle(iocp_);
753 1134x }
754
755 inline win_wait_reactor&
756 6x win_scheduler::wait_reactor()
757 {
758 // Lazy thread-safe init: multiple IOCP workers may race the first
759 // wait() call. wait_reactor_ready_ is set with release ordering
760 // after construction so cancel_wait_if_constructed can safely
761 // observe the reactor without forcing construction itself.
762
1/1
✓ Branch 2 → 3 taken 6 times.
6x std::call_once(wait_reactor_once_, [this] {
763
1/1
✓ Branch 2 → 3 taken 6 times.
6x wait_reactor_ = std::make_unique<win_wait_reactor>(*this);
764 6x wait_reactor_ready_.store(true, std::memory_order_release);
765 6x });
766 6x return *wait_reactor_;
767 }
768
769 inline void
770 20629x win_scheduler::cancel_wait_if_constructed(overlapped_op* op) noexcept
771 {
772
2/2
✓ Branch 3 → 4 taken 41 times.
✓ Branch 3 → 6 taken 20588 times.
20629x if (wait_reactor_ready_.load(std::memory_order_acquire))
773 41x wait_reactor_->cancel_wait(op);
774 20629x }
775
776 } // namespace boost::corosio::detail
777
778 #endif // BOOST_COROSIO_HAS_IOCP
779
780 #endif // BOOST_COROSIO_NATIVE_DETAIL_IOCP_WIN_SCHEDULER_HPP
781