include/boost/corosio/native/detail/iocp/win_scheduler.hpp

85.4% Lines (246/288) 100.0% List of functions (33/33) 72.0% Branches (118/164)
win_scheduler.hpp
f(x) Functions (33)
Function Calls Lines Branches Blocks
boost::corosio::detail::win_scheduler::native_handle() const :77 2973x 100.0% 100.0% boost::corosio::detail::win_scheduler::configure_iocp(unsigned int) :89 7x 100.0% 100.0% boost::corosio::detail::win_scheduler::configure_single_threaded(bool) :99 5x 100.0% 100.0% boost::corosio::detail::iocp::thread_context_guard::thread_context_guard(boost::corosio::detail::win_scheduler const*) :195 3149x 100.0% 100.0% boost::corosio::detail::iocp::thread_context_guard::~thread_context_guard() :201 3149x 100.0% 100.0% boost::corosio::detail::win_scheduler::win_scheduler(boost::capy::execution_context&, int) :209 594x 92.9% 72.7% 57.1% boost::corosio::detail::win_scheduler::post(std::__n4861::coroutine_handle<void>) const :241 8144x 60.0% 50.0% 53.8% boost::corosio::detail::win_scheduler::post(std::__n4861::coroutine_handle<void>) const::post_handler::do_complete(void*, boost::corosio::detail::scheduler_op*, unsigned int, unsigned int) :247 8144x 100.0% 70.0% 87.5% boost::corosio::detail::win_scheduler::post(std::__n4861::coroutine_handle<void>) const::post_handler::post_handler(std::__n4861::coroutine_handle<void>) :283 8144x 100.0% 100.0% boost::corosio::detail::win_scheduler::post(boost::corosio::detail::scheduler_op*) const :303 3565x 55.6% 50.0% 45.5% boost::corosio::detail::win_scheduler::running_in_this_thread() const :317 10013x 100.0% 75.0% 85.7% boost::corosio::detail::win_scheduler::work_started() :326 440287x 100.0% 100.0% boost::corosio::detail::win_scheduler::work_finished() :332 451980x 100.0% 100.0% 100.0% boost::corosio::detail::win_scheduler::on_pending(boost::corosio::detail::overlapped_op*) const :339 429014x 33.3% 16.7% 26.7% boost::corosio::detail::win_scheduler::on_completion(boost::corosio::detail::overlapped_op*, unsigned long, unsigned long) const :357 48x 63.6% 50.0% 41.7% boost::corosio::detail::win_scheduler::stop() :374 6255x 83.3% 66.7% 75.0% boost::corosio::detail::win_scheduler::stopped() const :392 2859x 100.0% 100.0% boost::corosio::detail::win_scheduler::restart() :399 2789x 100.0% 100.0% boost::corosio::detail::win_scheduler::run() :406 3136x 100.0% 90.9% 92.0% boost::corosio::detail::win_scheduler::run_one() :433 2x 71.4% 50.0% 71.4% boost::corosio::detail::win_scheduler::wait_one(long) :446 26x 100.0% 75.0% 84.2% boost::corosio::detail::win_scheduler::poll() :466 14x 100.0% 87.5% 88.9% boost::corosio::detail::win_scheduler::poll_one() :484 4x 100.0% 100.0% 85.7% boost::corosio::detail::win_scheduler::post_deferred_completions(boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&) :497 318x 20.0% 20.0% 31.2% boost::corosio::detail::win_scheduler::do_one(unsigned long) :515 440792x 86.7% 82.9% 88.1% boost::corosio::detail::win_scheduler::on_timer_changed(void*) :639 1180x 100.0% 100.0% boost::corosio::detail::win_scheduler::set_timer_service(boost::corosio::detail::timer_service*) :645 594x 100.0% 50.0% 100.0% boost::corosio::detail::win_scheduler::update_timeout() :656 1498x 100.0% 50.0% 90.0% boost::corosio::detail::win_scheduler::shutdown() :678 594x 84.6% 70.0% 81.6% boost::corosio::detail::win_scheduler::~win_scheduler() :739 1188x 100.0% 100.0% boost::corosio::detail::win_scheduler::wait_reactor() :750 14x 100.0% 100.0% boost::corosio::detail::win_scheduler::wait_reactor()::{lambda()#1}::operator()() const :756 14x 100.0% 100.0% 100.0% boost::corosio::detail::win_scheduler::cancel_wait_if_constructed(boost::corosio::detail::overlapped_op*) :764 20617x 100.0% 100.0% 100.0%
Line Branch TLA Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco ([email protected])
3 // Copyright (c) 2026 Steve Gerbino
4 //
5 // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 //
8 // Official repository: https://github.com/cppalliance/corosio
9 //
10
11 #ifndef BOOST_COROSIO_NATIVE_DETAIL_IOCP_WIN_SCHEDULER_HPP
12 #define BOOST_COROSIO_NATIVE_DETAIL_IOCP_WIN_SCHEDULER_HPP
13
14 #include <boost/corosio/detail/platform.hpp>
15
16 #if BOOST_COROSIO_HAS_IOCP
17
18 #include <boost/corosio/detail/config.hpp>
19 #include <boost/capy/ex/execution_context.hpp>
20
21 #include <boost/corosio/detail/scheduler.hpp>
22 #include <system_error>
23
24 #include <boost/corosio/detail/scheduler_op.hpp>
25 #include <boost/corosio/native/detail/iocp/win_completion_key.hpp>
26 #include <boost/corosio/native/detail/iocp/win_mutex.hpp>
27
28 #include <boost/corosio/native/detail/iocp/win_overlapped_op.hpp>
29 #include <boost/corosio/native/detail/iocp/win_timers.hpp>
30 #include <boost/corosio/detail/timer_service.hpp>
31 #include <boost/corosio/native/detail/iocp/win_resolver_service.hpp>
32 #include <boost/corosio/native/detail/make_err.hpp>
33 #include <boost/corosio/detail/except.hpp>
34 #include <boost/corosio/detail/thread_local_ptr.hpp>
35
36 #include <atomic>
37 #include <chrono>
38 #include <cstdint>
39 #include <limits>
40 #include <memory>
41 #include <mutex>
42
43 #include <boost/corosio/native/detail/iocp/win_windows.hpp>
44
45 namespace boost::corosio::detail {
46
47 // Forward declarations
48 struct overlapped_op;
49 class win_timers;
50 class win_wait_reactor;
51
52 class BOOST_COROSIO_DECL win_scheduler final
53 : public scheduler
54 , public capy::execution_context::service
55 {
56 public:
57 using key_type = scheduler;
58
59 win_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
60 ~win_scheduler();
61 win_scheduler(win_scheduler const&) = delete;
62 win_scheduler& operator=(win_scheduler const&) = delete;
63
64 void shutdown() override;
65 void post(std::coroutine_handle<> h) const override;
66 void post(scheduler_op* h) const override;
67 bool running_in_this_thread() const noexcept override;
68 void stop() override;
69 bool stopped() const noexcept override;
70 void restart() override;
71 std::size_t run() override;
72 std::size_t run_one() override;
73 std::size_t wait_one(long usec) override;
74 std::size_t poll() override;
75 std::size_t poll_one() override;
76
77 2973x void* native_handle() const noexcept
78 {
79 2973x return iocp_;
80 }
81
82 void work_started() noexcept override;
83 void work_finished() noexcept override;
84
85 /** Apply runtime IOCP configuration.
86
87 @param gqcs_timeout_ms Max GQCS blocking time in milliseconds.
88 */
89 7x void configure_iocp(unsigned gqcs_timeout_ms) noexcept
90 {
91 7x gqcs_timeout_ms_ = gqcs_timeout_ms;
92 7x }
93
94 /** Enable or disable single-threaded (lockless) mode.
95
96 When enabled, the dispatch mutex becomes a no-op.
97 Cross-thread post() is undefined behavior.
98 */
99 5x void configure_single_threaded(bool v) noexcept
100 {
101 5x single_threaded_ = v;
102 5x dispatch_mutex_.set_enabled(!v);
103 5x }
104
105 /** Signal that an overlapped I/O operation is now pending.
106 Coordinates with do_one() via the ready_ CAS protocol. */
107 void on_pending(overlapped_op* op) const;
108
109 /** Post an immediate completion with pre-stored results.
110 Used for sync errors and noop paths. */
111 void on_completion(overlapped_op* op, DWORD error, DWORD bytes) const;
112
113 // Timer service integration
114 void set_timer_service(timer_service* svc);
115 void update_timeout();
116
117 private:
118 static void on_timer_changed(void* ctx);
119 void post_deferred_completions(op_queue& ops);
120 std::size_t do_one(unsigned long timeout_ms);
121
122 timer_service* timer_svc_ = nullptr;
123 void* iocp_;
124 mutable long outstanding_work_;
125 mutable long stopped_;
126 long stop_event_posted_;
127 mutable long dispatch_required_;
128 unsigned long gqcs_timeout_ms_ = 500;
129 bool single_threaded_ = false;
130
131 mutable win_mutex dispatch_mutex_;
132 mutable op_queue completed_ops_;
133 std::unique_ptr<win_timers> timers_;
134 std::unique_ptr<win_wait_reactor> wait_reactor_;
135 std::once_flag wait_reactor_once_;
136 std::atomic<bool> wait_reactor_ready_{false};
137
138 public:
139 /** Auxiliary select-based reactor for IOCP wait operations.
140
141 Lazily created on first access; lives for the lifetime of the
142 scheduler and is stopped+joined in ~win_scheduler. Used by
143 socket and acceptor wait() implementations whose readiness
144 cannot be expressed natively in IOCP (datagram-read,
145 acceptor-read, error-wait).
146 */
147 win_wait_reactor& wait_reactor();
148
149 /** Cancel a parked wait op only if the reactor exists.
150
151 Safe to call from any thread. If no wait op has ever been
152 registered, the reactor was never constructed, so there is
153 nothing to cancel and we avoid spinning up a thread + wakeup
154 socketpair on the cancel path. Acquire/release pairs with the
155 store in wait_reactor() so reads see a fully-constructed
156 reactor when the flag is true.
157 */
158 void cancel_wait_if_constructed(overlapped_op* op) noexcept;
159 };
160
161 /*
162 ARCHITECTURE NOTE: Function Pointer Dispatch
163
164 All I/O handles are registered with the IOCP using key_io (0).
165 Dispatch happens via the function pointer stored in each scheduler_op.
166
167 When GQCS returns with an OVERLAPPED*, we cast it to scheduler_op*
168 and call the function pointer directly - no virtual dispatch.
169
170 The completion_key enum values are used only for internal signals:
171 - key_io (0): Normal I/O completion, dispatch via func_
172 - key_wake_dispatch (1): Timer wakeup, check dispatch_required_
173 - key_shutdown (2): Stop signal
174 - key_result_stored (3): Results pre-stored in OVERLAPPED
175 */
176
177 namespace iocp {
178
179 // Max timeout for GQCS to allow periodic re-checking of conditions.
180 // Matches Asio's default_gqcs_timeout for pre-Vista compatibility.
181 inline constexpr unsigned long max_gqcs_timeout = 500;
182
183 struct BOOST_COROSIO_SYMBOL_VISIBLE scheduler_context
184 {
185 win_scheduler const* key;
186 scheduler_context* next;
187 };
188
189 inline thread_local_ptr<scheduler_context> context_stack;
190
191 struct thread_context_guard
192 {
193 scheduler_context frame_;
194
195 3149x explicit thread_context_guard(win_scheduler const* ctx) noexcept
196 3149x : frame_{ctx, context_stack.get()}
197 {
198 3149x context_stack.set(&frame_);
199 3149x }
200
201 3149x ~thread_context_guard() noexcept
202 {
203 3149x context_stack.set(frame_.next);
204 3149x }
205 };
206
207 } // namespace iocp
208
209 594x inline win_scheduler::win_scheduler(
210 594x capy::execution_context& ctx, int concurrency_hint)
211 594x : iocp_(nullptr)
212 594x , outstanding_work_(0)
213 594x , stopped_(0)
214 594x , stop_event_posted_(0)
215
1/1
✓ Branch 4 → 5 taken 594 times.
594x , dispatch_required_(0)
216 {
217 // concurrency_hint < 0 means use system default (DWORD(~0) = max)
218
2/3
✓ Branch 10 → 11 taken 594 times.
✗ Branch 10 → 12 not taken.
✓ Branch 13 → 14 taken 594 times.
594x iocp_ = ::CreateIoCompletionPort(
219 INVALID_HANDLE_VALUE, nullptr, 0,
220 static_cast<DWORD>(
221 concurrency_hint >= 0 ? concurrency_hint : DWORD(~0)));
222
223
1/2
✗ Branch 14 → 15 not taken.
✓ Branch 14 → 18 taken 594 times.
594x if (iocp_ == nullptr)
224 detail::throw_system_error(make_err(::GetLastError()));
225
226 // Create timer wakeup mechanism (tries NT native, falls back to thread)
227
1/1
✓ Branch 18 → 19 taken 594 times.
594x timers_ = make_win_timers(iocp_, &dispatch_required_);
228
229 // Connect timer service to scheduler
230
2/2
✓ Branch 21 → 22 taken 594 times.
✓ Branch 22 → 23 taken 594 times.
594x set_timer_service(&get_timer_service(ctx, *this));
231
232 // Initialize resolver service
233
1/1
✓ Branch 23 → 24 taken 594 times.
594x ctx.make_service<win_resolver_service>(*this);
234 594x }
235
236 // ~win_scheduler() and shutdown() are defined at the bottom of this
237 // header so the unique_ptr<win_wait_reactor>'s deleter and
238 // wait_reactor_->stop() see the type complete.
239
240 inline void
241 8144x win_scheduler::post(std::coroutine_handle<> h) const
242 {
243 struct post_handler final : scheduler_op
244 {
245 std::coroutine_handle<> h_;
246
247 8144x static void do_complete(
248 void* owner, scheduler_op* base, std::uint32_t, std::uint32_t)
249 {
250 8144x auto* self = static_cast<post_handler*>(base);
251
2/2
✓ Branch 2 → 3 taken 6 times.
✓ Branch 2 → 10 taken 8138 times.
8144x if (!owner)
252 {
253 // Shutdown path: destroy the coroutine frame synchronously.
254 //
255 // Bounded destruction invariant: the chain triggered by
256 // coro.destroy() is at most two levels deep:
257 // 1. task frame destroyed → ~io_awaitable_promise_base()
258 // destroys stored continuation (if != noop_coroutine)
259 // 2. continuation (trampoline) destroyed → final_suspend
260 // returns suspend_never, no further continuation
261 //
262 // If a future refactor adds deeper continuation chains,
263 // this would reintroduce re-entrant stack overflow risk.
264 #ifndef NDEBUG
265 static thread_local int destroy_depth = 0;
266 6x ++destroy_depth;
267
1/2
✗ Branch 3 → 4 not taken.
✓ Branch 3 → 5 taken 6 times.
6x BOOST_COROSIO_ASSERT(destroy_depth <= 2);
268 #endif
269 6x auto coro = self->h_;
270
1/2
✓ Branch 6 → 7 taken 6 times.
✗ Branch 6 → 8 not taken.
6x delete self;
271
1/1
✓ Branch 8 → 9 taken 6 times.
6x coro.destroy();
272 #ifndef NDEBUG
273 6x --destroy_depth;
274 #endif
275 6x return;
276 }
277 8138x auto coro = self->h_;
278
1/2
✓ Branch 10 → 11 taken 8138 times.
✗ Branch 10 → 12 not taken.
8138x delete self;
279 std::atomic_thread_fence(std::memory_order_acquire);
280
1/1
✓ Branch 13 → 14 taken 8138 times.
8138x coro.resume();
281 }
282
283 8144x explicit post_handler(std::coroutine_handle<> coro)
284 8144x : scheduler_op(&do_complete)
285 8144x , h_(coro)
286 {
287 8144x }
288 };
289
290 8144x auto* ph = new post_handler(h);
291 8144x ::InterlockedIncrement(&outstanding_work_);
292
293
1/2
✗ Branch 7 → 8 not taken.
✓ Branch 7 → 14 taken 8144 times.
8144x if (!::PostQueuedCompletionStatus(
294 8144x iocp_, 0, key_posted, reinterpret_cast<LPOVERLAPPED>(ph)))
295 {
296 std::lock_guard<win_mutex> lock(dispatch_mutex_);
297 completed_ops_.push(ph);
298 ::InterlockedExchange(&dispatch_required_, 1);
299 }
300 8144x }
301
302 inline void
303 3565x win_scheduler::post(scheduler_op* h) const
304 {
305 3565x ::InterlockedIncrement(&outstanding_work_);
306
307
1/2
✗ Branch 5 → 6 not taken.
✓ Branch 5 → 12 taken 3565 times.
3565x if (!::PostQueuedCompletionStatus(
308 3565x iocp_, 0, key_posted, reinterpret_cast<LPOVERLAPPED>(h)))
309 {
310 std::lock_guard<win_mutex> lock(dispatch_mutex_);
311 completed_ops_.push(h);
312 ::InterlockedExchange(&dispatch_required_, 1);
313 }
314 3565x }
315
316 inline bool
317 10013x win_scheduler::running_in_this_thread() const noexcept
318 {
319
2/2
✓ Branch 6 → 3 taken 3309 times.
✓ Branch 6 → 7 taken 6704 times.
10013x for (auto* c = iocp::context_stack.get(); c != nullptr; c = c->next)
320
1/2
✓ Branch 3 → 4 taken 3309 times.
✗ Branch 3 → 5 not taken.
3309x if (c->key == this)
321 3309x return true;
322 6704x return false;
323 }
324
325 inline void
326 440287x win_scheduler::work_started() noexcept
327 {
328 440287x ::InterlockedIncrement(&outstanding_work_);
329 440287x }
330
331 inline void
332 451980x win_scheduler::work_finished() noexcept
333 {
334
2/2
✓ Branch 4 → 5 taken 3118 times.
✓ Branch 4 → 6 taken 448862 times.
903960x if (::InterlockedDecrement(&outstanding_work_) == 0)
335 3118x stop();
336 451980x }
337
338 inline void
339 429014x win_scheduler::on_pending(overlapped_op* op) const
340 {
341 // CAS: try to set ready_ from 0 to 1.
342 // If the old value was 1, GQCS already grabbed this op and stored
343 // results — we need to re-post so do_one() can dispatch it.
344
1/2
✗ Branch 4 → 5 not taken.
✓ Branch 4 → 16 taken 429014 times.
858028x if (::InterlockedCompareExchange(&op->ready_, 1, 0) == 1)
345 {
346 if (!::PostQueuedCompletionStatus(
347 iocp_, 0, key_result_stored, static_cast<LPOVERLAPPED>(op)))
348 {
349 std::lock_guard<win_mutex> lock(dispatch_mutex_);
350 completed_ops_.push(op);
351 ::InterlockedExchange(&dispatch_required_, 1);
352 }
353 }
354 429014x }
355
356 inline void
357 48x win_scheduler::on_completion(overlapped_op* op, DWORD error, DWORD bytes) const
358 {
359 // Sync completion: pack results into op and post for dispatch.
360 48x op->ready_ = 1;
361 48x op->dwError = error;
362 48x op->bytes_transferred = bytes;
363
364
2/4
✓ Branch 2 → 3 taken 48 times.
✗ Branch 2 → 4 not taken.
✗ Branch 6 → 7 not taken.
✓ Branch 6 → 13 taken 48 times.
96x if (!::PostQueuedCompletionStatus(
365 48x iocp_, 0, key_result_stored, static_cast<LPOVERLAPPED>(op)))
366 {
367 std::lock_guard<win_mutex> lock(dispatch_mutex_);
368 completed_ops_.push(op);
369 ::InterlockedExchange(&dispatch_required_, 1);
370 }
371 48x }
372
373 inline void
374 6255x win_scheduler::stop()
375 {
376
2/2
✓ Branch 4 → 5 taken 3130 times.
✓ Branch 4 → 13 taken 3125 times.
12510x if (::InterlockedExchange(&stopped_, 1) == 0)
377 {
378
1/2
✓ Branch 7 → 8 taken 3130 times.
✗ Branch 7 → 13 not taken.
6260x if (::InterlockedExchange(&stop_event_posted_, 1) == 0)
379 {
380
1/2
✗ Branch 9 → 10 not taken.
✓ Branch 9 → 13 taken 3130 times.
3130x if (!::PostQueuedCompletionStatus(iocp_, 0, key_shutdown, nullptr))
381 {
382 // PQCS failure is non-fatal: stopped_ is already set.
383 // The run() loop will notice via the GQCS timeout
384 // (gqcs_timeout_ms_, default 500ms) and exit.
385 ::InterlockedExchange(&dispatch_required_, 1);
386 }
387 }
388 }
389 6255x }
390
391 inline bool
392 2859x win_scheduler::stopped() const noexcept
393 {
394 // equivalent to atomic read
395 5718x return ::InterlockedExchangeAdd(&stopped_, 0) != 0;
396 }
397
398 inline void
399 2789x win_scheduler::restart()
400 {
401 2789x ::InterlockedExchange(&stopped_, 0);
402 2789x ::InterlockedExchange(&stop_event_posted_, 0);
403 2789x }
404
405 inline std::size_t
406 3136x win_scheduler::run()
407 {
408
2/2
✓ Branch 4 → 5 taken 17 times.
✓ Branch 4 → 7 taken 3119 times.
6272x if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
409 {
410
1/1
✓ Branch 5 → 6 taken 17 times.
17x stop();
411 17x return 0;
412 }
413
414 3119x iocp::thread_context_guard ctx(this);
415
416 3119x std::size_t n = 0;
417 for (;;)
418 {
419
3/3
✓ Branch 9 → 10 taken 440741 times.
✓ Branch 10 → 11 taken 19 times.
✓ Branch 10 → 12 taken 440722 times.
440741x if (!do_one(INFINITE))
420 19x break;
421
1/2
✓ Branch 13 → 14 taken 440722 times.
✗ Branch 13 → 15 not taken.
440722x if (n != (std::numeric_limits<std::size_t>::max)())
422 440722x ++n;
423
2/2
✓ Branch 17 → 18 taken 3100 times.
✓ Branch 17 → 20 taken 437622 times.
881444x if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
424 {
425
1/1
✓ Branch 18 → 19 taken 3100 times.
3100x stop();
426 3100x break;
427 }
428 }
429 3119x return n;
430 3119x }
431
432 inline std::size_t
433 2x win_scheduler::run_one()
434 {
435
1/2
✗ Branch 4 → 5 not taken.
✓ Branch 4 → 7 taken 2 times.
4x if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
436 {
437 stop();
438 return 0;
439 }
440
441 2x iocp::thread_context_guard ctx(this);
442
1/1
✓ Branch 8 → 9 taken 2 times.
2x return do_one(INFINITE);
443 2x }
444
445 inline std::size_t
446 26x win_scheduler::wait_one(long usec)
447 {
448
2/2
✓ Branch 4 → 5 taken 11 times.
✓ Branch 4 → 7 taken 15 times.
52x if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
449 {
450
1/1
✓ Branch 5 → 6 taken 11 times.
11x stop();
451 11x return 0;
452 }
453
454 15x iocp::thread_context_guard ctx(this);
455 15x unsigned long timeout_ms = INFINITE;
456
1/2
✓ Branch 8 → 9 taken 15 times.
✗ Branch 8 → 13 not taken.
15x if (usec >= 0)
457 {
458 15x auto ms = (static_cast<long long>(usec) + 999) / 1000;
459
1/2
✓ Branch 9 → 10 taken 15 times.
✗ Branch 9 → 11 not taken.
15x timeout_ms = ms >= 0xFFFFFFFELL ? static_cast<unsigned long>(0xFFFFFFFE)
460 : static_cast<unsigned long>(ms);
461 }
462
1/1
✓ Branch 13 → 14 taken 15 times.
15x return do_one(timeout_ms);
463 15x }
464
465 inline std::size_t
466 14x win_scheduler::poll()
467 {
468
2/2
✓ Branch 4 → 5 taken 3 times.
✓ Branch 4 → 7 taken 11 times.
28x if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
469 {
470
1/1
✓ Branch 5 → 6 taken 3 times.
3x stop();
471 3x return 0;
472 }
473
474 11x iocp::thread_context_guard ctx(this);
475
476 11x std::size_t n = 0;
477
3/3
✓ Branch 12 → 13 taken 32 times.
✓ Branch 13 → 9 taken 21 times.
✓ Branch 13 → 14 taken 11 times.
32x while (do_one(0))
478
1/2
✓ Branch 10 → 11 taken 21 times.
✗ Branch 10 → 12 not taken.
21x if (n != (std::numeric_limits<std::size_t>::max)())
479 21x ++n;
480 11x return n;
481 11x }
482
483 inline std::size_t
484 4x win_scheduler::poll_one()
485 {
486
2/2
✓ Branch 4 → 5 taken 2 times.
✓ Branch 4 → 7 taken 2 times.
8x if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
487 {
488
1/1
✓ Branch 5 → 6 taken 2 times.
2x stop();
489 2x return 0;
490 }
491
492 2x iocp::thread_context_guard ctx(this);
493
1/1
✓ Branch 8 → 9 taken 2 times.
2x return do_one(0);
494 2x }
495
496 inline void
497 318x win_scheduler::post_deferred_completions(op_queue& ops)
498 {
499
1/2
✗ Branch 3 → 4 not taken.
✓ Branch 3 → 15 taken 318 times.
318x while (auto h = ops.pop())
500 {
501 if (::PostQueuedCompletionStatus(
502 iocp_, 0, key_posted, reinterpret_cast<LPOVERLAPPED>(h)))
503 continue;
504
505 // Out of resources, put the failed op and remaining ops back
506 ops.push(h);
507 std::lock_guard<win_mutex> lock(dispatch_mutex_);
508 completed_ops_.splice(ops);
509 ::InterlockedExchange(&dispatch_required_, 1);
510 return;
511 }
512 }
513
514 inline std::size_t
515 443907x win_scheduler::do_one(unsigned long timeout_ms)
516 {
517 for (;;)
518 {
519 // Check if we need to process timers or deferred ops
520
2/2
✓ Branch 4 → 5 taken 318 times.
✓ Branch 4 → 13 taken 443589 times.
887814x if (::InterlockedCompareExchange(&dispatch_required_, 0, 1) == 1)
521 {
522 318x op_queue local_ops;
523 {
524 318x std::lock_guard<win_mutex> lock(dispatch_mutex_);
525 318x local_ops.splice(completed_ops_);
526 318x }
527
1/1
✓ Branch 8 → 9 taken 318 times.
318x post_deferred_completions(local_ops);
528
529
1/2
✓ Branch 9 → 10 taken 318 times.
✗ Branch 9 → 11 not taken.
318x if (timer_svc_)
530
1/1
✓ Branch 10 → 11 taken 318 times.
318x timer_svc_->process_expired();
531
532
1/1
✓ Branch 11 → 12 taken 318 times.
318x update_timeout();
533 }
534
535 443907x DWORD bytes = 0;
536 443907x ULONG_PTR key = 0;
537 443907x LPOVERLAPPED overlapped = nullptr;
538
1/1
✓ Branch 13 → 14 taken 443907 times.
443907x ::SetLastError(0);
539
540
1/1
✓ Branch 17 → 18 taken 443907 times.
443907x BOOL result = ::GetQueuedCompletionStatus(
541 iocp_, &bytes, &key, &overlapped,
542
2/2
✓ Branch 14 → 15 taken 443861 times.
✓ Branch 14 → 16 taken 46 times.
443907x timeout_ms < gqcs_timeout_ms_ ? timeout_ms
543 : gqcs_timeout_ms_);
544
1/1
✓ Branch 18 → 19 taken 443907 times.
443907x DWORD dwError = ::GetLastError();
545
546 // Handle based on completion key
547
2/2
✓ Branch 19 → 20 taken 440759 times.
✓ Branch 19 → 39 taken 3148 times.
443907x if (overlapped)
548 {
549
2/2
✓ Branch 20 → 21 taken 2016 times.
✓ Branch 20 → 22 taken 438743 times.
440759x DWORD err = result ? 0 : dwError;
550
551
2/3
✓ Branch 23 → 24 taken 429062 times.
✓ Branch 23 → 35 taken 11697 times.
✗ Branch 23 → 38 not taken.
440759x switch (key)
552 {
553 429062x case key_io:
554 case key_result_stored:
555 {
556 429062x auto* ov_op = overlapped_to_op(overlapped);
557
558 // If key_result_stored, results are pre-stored in op fields
559
2/2
✓ Branch 25 → 26 taken 48 times.
✓ Branch 25 → 27 taken 429014 times.
429062x if (key == key_result_stored)
560 {
561 48x bytes = ov_op->bytes_transferred;
562 48x err = ov_op->dwError;
563 }
564
565 // Store GQCS results so on_pending() re-post has valid data
566 429062x ov_op->store_result(bytes, err);
567
568 // CAS: try to set ready_ from 0 to 1.
569 // If old value was 1, the initiator already returned
570 // (on_pending/on_completion set it) — safe to dispatch.
571 // If old value was 0, the initiator hasn't returned yet —
572 // skip dispatch; on_pending() will re-post.
573
1/2
✓ Branch 30 → 31 taken 429062 times.
✗ Branch 30 → 34 not taken.
858124x if (::InterlockedCompareExchange(&ov_op->ready_, 1, 0) == 1)
574 {
575
1/1
✓ Branch 31 → 32 taken 429062 times.
429062x ov_op->complete(this, bytes, err);
576 429062x work_finished();
577 440792x return 1;
578 }
579 3077x continue;
580 }
581
582 11697x case key_posted:
583 {
584 // Posted scheduler_op*: overlapped is actually a scheduler_op*
585 11697x auto* op = reinterpret_cast<scheduler_op*>(overlapped);
586
1/1
✓ Branch 35 → 36 taken 11697 times.
11697x op->complete(this, bytes, err);
587 11697x work_finished();
588 11697x return 1;
589 }
590
591 default:
592 continue;
593 }
594 }
595
596 // Signal completions (no OVERLAPPED)
597
2/2
✓ Branch 39 → 40 taken 3100 times.
✓ Branch 39 → 53 taken 48 times.
3148x if (result)
598 {
599
2/3
✓ Branch 40 → 41 taken 318 times.
✓ Branch 40 → 42 taken 2782 times.
✗ Branch 40 → 52 not taken.
3100x switch (key)
600 {
601 318x case key_wake_dispatch:
602 // Timer wakeup - loop to check dispatch_required_
603 318x continue;
604
605 2782x case key_shutdown:
606 2782x ::InterlockedExchange(&stop_event_posted_, 0);
607
2/2
✓ Branch 45 → 46 taken 23 times.
✓ Branch 45 → 51 taken 2759 times.
2782x if (stopped())
608 {
609 // Re-post for other waiting threads
610
1/2
✓ Branch 48 → 49 taken 23 times.
✗ Branch 48 → 50 not taken.
46x if (::InterlockedExchange(&stop_event_posted_, 1) == 0)
611 {
612
1/1
✓ Branch 49 → 50 taken 23 times.
23x ::PostQueuedCompletionStatus(
613 iocp_, 0, key_shutdown, nullptr);
614 }
615 23x return 0;
616 }
617 2759x continue;
618
619 default:
620 continue;
621 }
622 }
623
624 // Timeout or error
625
1/2
✗ Branch 53 → 54 not taken.
✓ Branch 53 → 56 taken 48 times.
48x if (dwError != WAIT_TIMEOUT)
626 detail::throw_system_error(make_err(dwError));
627
2/2
✓ Branch 56 → 57 taken 10 times.
✓ Branch 56 → 58 taken 38 times.
48x if (timeout_ms != INFINITE)
628 10x return 0;
629 // PQCS-failure fallback: stop() sets stopped_ and
630 // dispatch_required_ but if the key_shutdown post failed,
631 // no completion is ever dequeued. Catch it here on the
632 // periodic 500 ms GQCS timeout so run()/run_one() can exit.
633
1/2
✗ Branch 59 → 60 not taken.
✓ Branch 59 → 61 taken 38 times.
38x if (stopped())
634 return 0;
635 3115x }
636 }
637
638 inline void
639 1180x win_scheduler::on_timer_changed(void* ctx)
640 {
641 1180x static_cast<win_scheduler*>(ctx)->update_timeout();
642 1180x }
643
644 inline void
645 594x win_scheduler::set_timer_service(timer_service* svc)
646 {
647 594x timer_svc_ = svc;
648 // Pass 'this' as context - callback routes to correct instance
649 594x svc->set_on_earliest_changed(
650 594x timer_service::callback{this, &on_timer_changed});
651
1/2
✓ Branch 5 → 6 taken 594 times.
✗ Branch 5 → 8 not taken.
594x if (timers_)
652 594x timers_->start();
653 594x }
654
655 inline void
656 1498x win_scheduler::update_timeout()
657 {
658
3/6
✓ Branch 2 → 3 taken 1498 times.
✗ Branch 2 → 6 not taken.
✓ Branch 4 → 5 taken 1498 times.
✗ Branch 4 → 6 not taken.
✓ Branch 7 → 8 taken 1498 times.
✗ Branch 7 → 11 not taken.
1498x if (timer_svc_ && timers_)
659 1498x timers_->update_timeout(timer_svc_->nearest_expiry());
660 1498x }
661
662 } // namespace boost::corosio::detail
663
664 // Defer including the auxiliary wait reactor until the scheduler is
665 // fully defined, since the reactor's inline methods call back into
666 // win_scheduler. This also gives the dtor and wait_reactor() below a
667 // complete win_wait_reactor type for unique_ptr destruction and
668 // lazy construction.
669 //
670 // The macro lets win_wait_reactor.hpp diagnose direct inclusion
671 // (which would land it here with win_scheduler still incomplete).
672 #define BOOST_COROSIO_DETAIL_IOCP_WIN_SCHEDULER_BODY_DONE
673 #include <boost/corosio/native/detail/iocp/win_wait_reactor.hpp>
674
675 namespace boost::corosio::detail {
676
677 inline void
678 594x win_scheduler::shutdown()
679 {
680
1/2
✓ Branch 3 → 4 taken 594 times.
✗ Branch 3 → 6 not taken.
594x if (timers_)
681 594x timers_->stop();
682
683 // Drain timer heap before the work-counting loop. The timer_service
684 // was registered after this scheduler (nested make_service from our
685 // constructor), so execution_context::shutdown() calls us first.
686 // Asio avoids this by owning timer queues directly inside the
687 // scheduler; we bridge the gap by shutting down the timer service
688 // early. The subsequent call from execution_context is a no-op.
689
1/2
✓ Branch 6 → 7 taken 594 times.
✗ Branch 6 → 8 not taken.
594x if (timer_svc_)
690 594x timer_svc_->shutdown();
691
692 // Same problem for the auxiliary wait reactor: ops parked in it
693 // hold work_started credit. Stop the reactor early so its loop
694 // drains them as cancelled and the work counter can reach zero.
695
2/2
✓ Branch 9 → 10 taken 14 times.
✓ Branch 9 → 12 taken 580 times.
594x if (wait_reactor_ready_.load(std::memory_order_acquire))
696 14x wait_reactor_->stop();
697
698
2/2
✓ Branch 36 → 13 taken 17 times.
✓ Branch 36 → 37 taken 594 times.
1222x while (::InterlockedExchangeAdd(&outstanding_work_, 0) > 0)
699 {
700 17x op_queue ops;
701 {
702 17x std::lock_guard<win_mutex> lock(dispatch_mutex_);
703 17x ops.splice(completed_ops_);
704 17x }
705
706
1/2
✗ Branch 17 → 18 not taken.
✓ Branch 17 → 24 taken 17 times.
17x if (!ops.empty())
707 {
708 while (auto* h = ops.pop())
709 {
710 ::InterlockedDecrement(&outstanding_work_);
711 h->destroy();
712 }
713 }
714 else
715 {
716 DWORD bytes;
717 ULONG_PTR key;
718 LPOVERLAPPED overlapped;
719
1/1
✓ Branch 24 → 25 taken 17 times.
17x ::GetQueuedCompletionStatus(
720 iocp_, &bytes, &key, &overlapped, gqcs_timeout_ms_);
721
2/2
✓ Branch 25 → 26 taken 16 times.
✓ Branch 25 → 32 taken 1 time.
17x if (overlapped)
722 {
723 16x ::InterlockedDecrement(&outstanding_work_);
724
2/2
✓ Branch 28 → 29 taken 12 times.
✓ Branch 28 → 30 taken 4 times.
16x if (key == key_posted)
725 {
726 12x auto* op = reinterpret_cast<scheduler_op*>(overlapped);
727
1/1
✓ Branch 29 → 32 taken 12 times.
12x op->destroy();
728 }
729 else
730 {
731 4x auto* op = overlapped_to_op(overlapped);
732
1/1
✓ Branch 31 → 32 taken 4 times.
4x op->destroy();
733 }
734 }
735 }
736 }
737 594x }
738
739 1188x inline win_scheduler::~win_scheduler()
740 {
741
2/2
✓ Branch 3 → 4 taken 14 times.
✓ Branch 3 → 6 taken 580 times.
594x if (wait_reactor_)
742 14x wait_reactor_->stop();
743 594x wait_reactor_.reset();
744
745
1/2
✓ Branch 7 → 8 taken 594 times.
✗ Branch 7 → 9 not taken.
594x if (iocp_ != nullptr)
746 594x ::CloseHandle(iocp_);
747 1188x }
748
749 inline win_wait_reactor&
750 14x win_scheduler::wait_reactor()
751 {
752 // Lazy thread-safe init: multiple IOCP workers may race the first
753 // wait() call. wait_reactor_ready_ is set with release ordering
754 // after construction so cancel_wait_if_constructed can safely
755 // observe the reactor without forcing construction itself.
756
1/1
✓ Branch 2 → 3 taken 14 times.
14x std::call_once(wait_reactor_once_, [this] {
757
1/1
✓ Branch 2 → 3 taken 14 times.
14x wait_reactor_ = std::make_unique<win_wait_reactor>(*this);
758 14x wait_reactor_ready_.store(true, std::memory_order_release);
759 14x });
760 14x return *wait_reactor_;
761 }
762
763 inline void
764 20617x win_scheduler::cancel_wait_if_constructed(overlapped_op* op) noexcept
765 {
766
2/2
✓ Branch 3 → 4 taken 79 times.
✓ Branch 3 → 6 taken 20538 times.
20617x if (wait_reactor_ready_.load(std::memory_order_acquire))
767 79x wait_reactor_->cancel_wait(op);
768 20617x }
769
770 } // namespace boost::corosio::detail
771
772 #endif // BOOST_COROSIO_HAS_IOCP
773
774 #endif // BOOST_COROSIO_NATIVE_DETAIL_IOCP_WIN_SCHEDULER_HPP
775