include/boost/corosio/native/detail/iocp/win_scheduler.hpp

84.8% Lines (246/290) 97.1% List of functions (33/34) 72.0% Branches (118/164)
win_scheduler.hpp
f(x) Functions (34)
Function Calls Lines Branches Blocks
boost::corosio::detail::win_scheduler::native_handle() const :77 2993x 100.0% 100.0% boost::corosio::detail::win_scheduler::configure_iocp(unsigned int) :89 6x 100.0% 100.0% boost::corosio::detail::win_scheduler::configure_single_threaded(bool) :99 5x 100.0% 100.0% boost::corosio::detail::win_scheduler::is_single_threaded() const :106 0 0.0% 0.0% boost::corosio::detail::iocp::thread_context_guard::thread_context_guard(boost::corosio::detail::win_scheduler const*) :201 3171x 100.0% 100.0% boost::corosio::detail::iocp::thread_context_guard::~thread_context_guard() :207 3171x 100.0% 100.0% boost::corosio::detail::win_scheduler::win_scheduler(boost::capy::execution_context&, int) :215 598x 92.9% 72.7% 57.1% boost::corosio::detail::win_scheduler::post(std::__n4861::coroutine_handle<void>) const :247 8203x 60.0% 50.0% 53.8% boost::corosio::detail::win_scheduler::post(std::__n4861::coroutine_handle<void>) const::post_handler::do_complete(void*, boost::corosio::detail::scheduler_op*, unsigned int, unsigned int) :253 8203x 100.0% 70.0% 87.5% boost::corosio::detail::win_scheduler::post(std::__n4861::coroutine_handle<void>) const::post_handler::post_handler(std::__n4861::coroutine_handle<void>) :289 8203x 100.0% 100.0% boost::corosio::detail::win_scheduler::post(boost::corosio::detail::scheduler_op*) const :309 4418x 55.6% 50.0% 45.5% boost::corosio::detail::win_scheduler::running_in_this_thread() const :323 10121x 100.0% 75.0% 85.7% boost::corosio::detail::win_scheduler::work_started() :332 603172x 100.0% 100.0% boost::corosio::detail::win_scheduler::work_finished() :338 615777x 100.0% 100.0% 100.0% boost::corosio::detail::win_scheduler::on_pending(boost::corosio::detail::overlapped_op*) const :345 590958x 33.3% 16.7% 26.7% boost::corosio::detail::win_scheduler::on_completion(boost::corosio::detail::overlapped_op*, unsigned long, unsigned long) const :363 45x 63.6% 50.0% 41.7% boost::corosio::detail::win_scheduler::stop() :380 6310x 83.3% 66.7% 75.0% boost::corosio::detail::win_scheduler::stopped() const :398 2887x 100.0% 100.0% boost::corosio::detail::win_scheduler::restart() :405 2823x 100.0% 100.0% boost::corosio::detail::win_scheduler::run() :412 3161x 100.0% 90.9% 92.0% boost::corosio::detail::win_scheduler::run_one() :439 2x 71.4% 50.0% 71.4% boost::corosio::detail::win_scheduler::wait_one(long) :452 28x 100.0% 75.0% 84.2% boost::corosio::detail::win_scheduler::poll() :472 14x 100.0% 87.5% 88.9% boost::corosio::detail::win_scheduler::poll_one() :490 4x 100.0% 100.0% 85.7% boost::corosio::detail::win_scheduler::post_deferred_completions(boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&) :503 1138x 20.0% 20.0% 31.2% boost::corosio::detail::win_scheduler::do_one(unsigned long) :521 603642x 86.7% 82.9% 88.1% boost::corosio::detail::win_scheduler::on_timer_changed(void*) :645 2015x 100.0% 100.0% boost::corosio::detail::win_scheduler::set_timer_service(boost::corosio::detail::timer_service*) :651 598x 100.0% 50.0% 100.0% boost::corosio::detail::win_scheduler::update_timeout() :662 3153x 100.0% 50.0% 90.0% boost::corosio::detail::win_scheduler::shutdown() :684 598x 84.6% 70.0% 81.6% boost::corosio::detail::win_scheduler::~win_scheduler() :745 1196x 100.0% 100.0% boost::corosio::detail::win_scheduler::wait_reactor() :756 9x 100.0% 100.0% boost::corosio::detail::win_scheduler::wait_reactor()::{lambda()#1}::operator()() const :762 9x 100.0% 100.0% 100.0% boost::corosio::detail::win_scheduler::cancel_wait_if_constructed(boost::corosio::detail::overlapped_op*) :770 20787x 100.0% 100.0% 100.0%
Line Branch TLA Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco ([email protected])
3 // Copyright (c) 2026 Steve Gerbino
4 //
5 // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 //
8 // Official repository: https://github.com/cppalliance/corosio
9 //
10
11 #ifndef BOOST_COROSIO_NATIVE_DETAIL_IOCP_WIN_SCHEDULER_HPP
12 #define BOOST_COROSIO_NATIVE_DETAIL_IOCP_WIN_SCHEDULER_HPP
13
14 #include <boost/corosio/detail/platform.hpp>
15
16 #if BOOST_COROSIO_HAS_IOCP
17
18 #include <boost/corosio/detail/config.hpp>
19 #include <boost/capy/ex/execution_context.hpp>
20
21 #include <boost/corosio/detail/scheduler.hpp>
22 #include <system_error>
23
24 #include <boost/corosio/detail/scheduler_op.hpp>
25 #include <boost/corosio/native/detail/iocp/win_completion_key.hpp>
26 #include <boost/corosio/native/detail/iocp/win_mutex.hpp>
27
28 #include <boost/corosio/native/detail/iocp/win_overlapped_op.hpp>
29 #include <boost/corosio/native/detail/iocp/win_timers.hpp>
30 #include <boost/corosio/detail/timer_service.hpp>
31 #include <boost/corosio/native/detail/iocp/win_resolver_service.hpp>
32 #include <boost/corosio/native/detail/make_err.hpp>
33 #include <boost/corosio/detail/except.hpp>
34 #include <boost/corosio/detail/thread_local_ptr.hpp>
35
36 #include <atomic>
37 #include <chrono>
38 #include <cstdint>
39 #include <limits>
40 #include <memory>
41 #include <mutex>
42
43 #include <boost/corosio/native/detail/iocp/win_windows.hpp>
44
45 namespace boost::corosio::detail {
46
47 // Forward declarations
48 struct overlapped_op;
49 class win_timers;
50 class win_wait_reactor;
51
52 class BOOST_COROSIO_DECL win_scheduler final
53 : public scheduler
54 , public capy::execution_context::service
55 {
56 public:
57 using key_type = scheduler;
58
59 win_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
60 ~win_scheduler();
61 win_scheduler(win_scheduler const&) = delete;
62 win_scheduler& operator=(win_scheduler const&) = delete;
63
64 void shutdown() override;
65 void post(std::coroutine_handle<> h) const override;
66 void post(scheduler_op* h) const override;
67 bool running_in_this_thread() const noexcept override;
68 void stop() override;
69 bool stopped() const noexcept override;
70 void restart() override;
71 std::size_t run() override;
72 std::size_t run_one() override;
73 std::size_t wait_one(long usec) override;
74 std::size_t poll() override;
75 std::size_t poll_one() override;
76
77 2993x void* native_handle() const noexcept
78 {
79 2993x return iocp_;
80 }
81
82 void work_started() noexcept override;
83 void work_finished() noexcept override;
84
85 /** Apply runtime IOCP configuration.
86
87 @param gqcs_timeout_ms Max GQCS blocking time in milliseconds.
88 */
89 6x void configure_iocp(unsigned gqcs_timeout_ms) noexcept
90 {
91 6x gqcs_timeout_ms_ = gqcs_timeout_ms;
92 6x }
93
94 /** Enable or disable single-threaded (lockless) mode.
95
96 When enabled, the dispatch mutex becomes a no-op.
97 Cross-thread post() is undefined behavior.
98 */
99 5x void configure_single_threaded(bool v) noexcept override
100 {
101 5x single_threaded_ = v;
102 5x dispatch_mutex_.set_enabled(!v);
103 5x }
104
105 /// Return true if single-threaded (lockless) mode is active.
106 bool is_single_threaded() const noexcept override
107 {
108 return single_threaded_;
109 }
110
111 /** Signal that an overlapped I/O operation is now pending.
112 Coordinates with do_one() via the ready_ CAS protocol. */
113 void on_pending(overlapped_op* op) const;
114
115 /** Post an immediate completion with pre-stored results.
116 Used for sync errors and noop paths. */
117 void on_completion(overlapped_op* op, DWORD error, DWORD bytes) const;
118
119 // Timer service integration
120 void set_timer_service(timer_service* svc);
121 void update_timeout();
122
123 private:
124 static void on_timer_changed(void* ctx);
125 void post_deferred_completions(op_queue& ops);
126 std::size_t do_one(unsigned long timeout_ms);
127
128 timer_service* timer_svc_ = nullptr;
129 void* iocp_;
130 mutable long outstanding_work_;
131 mutable long stopped_;
132 long stop_event_posted_;
133 mutable long dispatch_required_;
134 unsigned long gqcs_timeout_ms_ = 500;
135 bool single_threaded_ = false;
136
137 mutable win_mutex dispatch_mutex_;
138 mutable op_queue completed_ops_;
139 std::unique_ptr<win_timers> timers_;
140 std::unique_ptr<win_wait_reactor> wait_reactor_;
141 std::once_flag wait_reactor_once_;
142 std::atomic<bool> wait_reactor_ready_{false};
143
144 public:
145 /** Auxiliary select-based reactor for IOCP wait operations.
146
147 Lazily created on first access; lives for the lifetime of the
148 scheduler and is stopped+joined in ~win_scheduler. Used by
149 socket and acceptor wait() implementations whose readiness
150 cannot be expressed natively in IOCP (datagram-read,
151 acceptor-read, error-wait).
152 */
153 win_wait_reactor& wait_reactor();
154
155 /** Cancel a parked wait op only if the reactor exists.
156
157 Safe to call from any thread. If no wait op has ever been
158 registered, the reactor was never constructed, so there is
159 nothing to cancel and we avoid spinning up a thread + wakeup
160 socketpair on the cancel path. Acquire/release pairs with the
161 store in wait_reactor() so reads see a fully-constructed
162 reactor when the flag is true.
163 */
164 void cancel_wait_if_constructed(overlapped_op* op) noexcept;
165 };
166
167 /*
168 ARCHITECTURE NOTE: Function Pointer Dispatch
169
170 All I/O handles are registered with the IOCP using key_io (0).
171 Dispatch happens via the function pointer stored in each scheduler_op.
172
173 When GQCS returns with an OVERLAPPED*, we cast it to scheduler_op*
174 and call the function pointer directly - no virtual dispatch.
175
176 The completion_key enum values are used only for internal signals:
177 - key_io (0): Normal I/O completion, dispatch via func_
178 - key_wake_dispatch (1): Timer wakeup, check dispatch_required_
179 - key_shutdown (2): Stop signal
180 - key_result_stored (3): Results pre-stored in OVERLAPPED
181 */
182
183 namespace iocp {
184
185 // Max timeout for GQCS to allow periodic re-checking of conditions.
186 // Matches Asio's default_gqcs_timeout for pre-Vista compatibility.
187 inline constexpr unsigned long max_gqcs_timeout = 500;
188
189 struct BOOST_COROSIO_SYMBOL_VISIBLE scheduler_context
190 {
191 win_scheduler const* key;
192 scheduler_context* next;
193 };
194
195 inline thread_local_ptr<scheduler_context> context_stack;
196
197 struct thread_context_guard
198 {
199 scheduler_context frame_;
200
201 3171x explicit thread_context_guard(win_scheduler const* ctx) noexcept
202 3171x : frame_{ctx, context_stack.get()}
203 {
204 3171x context_stack.set(&frame_);
205 3171x }
206
207 3171x ~thread_context_guard() noexcept
208 {
209 3171x context_stack.set(frame_.next);
210 3171x }
211 };
212
213 } // namespace iocp
214
215 598x inline win_scheduler::win_scheduler(
216 598x capy::execution_context& ctx, int concurrency_hint)
217 598x : iocp_(nullptr)
218 598x , outstanding_work_(0)
219 598x , stopped_(0)
220 598x , stop_event_posted_(0)
221
1/1
✓ Branch 4 → 5 taken 598 times.
598x , dispatch_required_(0)
222 {
223 // concurrency_hint < 0 means use system default (DWORD(~0) = max)
224
2/3
✓ Branch 10 → 11 taken 598 times.
✗ Branch 10 → 12 not taken.
✓ Branch 13 → 14 taken 598 times.
598x iocp_ = ::CreateIoCompletionPort(
225 INVALID_HANDLE_VALUE, nullptr, 0,
226 static_cast<DWORD>(
227 concurrency_hint >= 0 ? concurrency_hint : DWORD(~0)));
228
229
1/2
✗ Branch 14 → 15 not taken.
✓ Branch 14 → 18 taken 598 times.
598x if (iocp_ == nullptr)
230 detail::throw_system_error(make_err(::GetLastError()));
231
232 // Create timer wakeup mechanism (tries NT native, falls back to thread)
233
1/1
✓ Branch 18 → 19 taken 598 times.
598x timers_ = make_win_timers(iocp_, &dispatch_required_);
234
235 // Connect timer service to scheduler
236
2/2
✓ Branch 21 → 22 taken 598 times.
✓ Branch 22 → 23 taken 598 times.
598x set_timer_service(&get_timer_service(ctx, *this));
237
238 // Initialize resolver service
239
1/1
✓ Branch 23 → 24 taken 598 times.
598x ctx.make_service<win_resolver_service>(*this);
240 598x }
241
242 // ~win_scheduler() and shutdown() are defined at the bottom of this
243 // header so the unique_ptr<win_wait_reactor>'s deleter and
244 // wait_reactor_->stop() see the type complete.
245
246 inline void
247 8203x win_scheduler::post(std::coroutine_handle<> h) const
248 {
249 struct post_handler final : scheduler_op
250 {
251 std::coroutine_handle<> h_;
252
253 8203x static void do_complete(
254 void* owner, scheduler_op* base, std::uint32_t, std::uint32_t)
255 {
256 8203x auto* self = static_cast<post_handler*>(base);
257
2/2
✓ Branch 2 → 3 taken 6 times.
✓ Branch 2 → 10 taken 8197 times.
8203x if (!owner)
258 {
259 // Shutdown path: destroy the coroutine frame synchronously.
260 //
261 // Bounded destruction invariant: the chain triggered by
262 // coro.destroy() is at most two levels deep:
263 // 1. task frame destroyed → ~io_awaitable_promise_base()
264 // destroys stored continuation (if != noop_coroutine)
265 // 2. continuation (trampoline) destroyed → final_suspend
266 // returns suspend_never, no further continuation
267 //
268 // If a future refactor adds deeper continuation chains,
269 // this would reintroduce re-entrant stack overflow risk.
270 #ifndef NDEBUG
271 static thread_local int destroy_depth = 0;
272 6x ++destroy_depth;
273
1/2
✗ Branch 3 → 4 not taken.
✓ Branch 3 → 5 taken 6 times.
6x BOOST_COROSIO_ASSERT(destroy_depth <= 2);
274 #endif
275 6x auto coro = self->h_;
276
1/2
✓ Branch 6 → 7 taken 6 times.
✗ Branch 6 → 8 not taken.
6x delete self;
277
1/1
✓ Branch 8 → 9 taken 6 times.
6x coro.destroy();
278 #ifndef NDEBUG
279 6x --destroy_depth;
280 #endif
281 6x return;
282 }
283 8197x auto coro = self->h_;
284
1/2
✓ Branch 10 → 11 taken 8197 times.
✗ Branch 10 → 12 not taken.
8197x delete self;
285 std::atomic_thread_fence(std::memory_order_acquire);
286
1/1
✓ Branch 13 → 14 taken 8197 times.
8197x coro.resume();
287 }
288
289 8203x explicit post_handler(std::coroutine_handle<> coro)
290 8203x : scheduler_op(&do_complete)
291 8203x , h_(coro)
292 {
293 8203x }
294 };
295
296 8203x auto* ph = new post_handler(h);
297 8203x ::InterlockedIncrement(&outstanding_work_);
298
299
1/2
✗ Branch 7 → 8 not taken.
✓ Branch 7 → 14 taken 8203 times.
8203x if (!::PostQueuedCompletionStatus(
300 8203x iocp_, 0, key_posted, reinterpret_cast<LPOVERLAPPED>(ph)))
301 {
302 std::lock_guard<win_mutex> lock(dispatch_mutex_);
303 completed_ops_.push(ph);
304 ::InterlockedExchange(&dispatch_required_, 1);
305 }
306 8203x }
307
308 inline void
309 4418x win_scheduler::post(scheduler_op* h) const
310 {
311 4418x ::InterlockedIncrement(&outstanding_work_);
312
313
1/2
✗ Branch 5 → 6 not taken.
✓ Branch 5 → 12 taken 4418 times.
4418x if (!::PostQueuedCompletionStatus(
314 4418x iocp_, 0, key_posted, reinterpret_cast<LPOVERLAPPED>(h)))
315 {
316 std::lock_guard<win_mutex> lock(dispatch_mutex_);
317 completed_ops_.push(h);
318 ::InterlockedExchange(&dispatch_required_, 1);
319 }
320 4418x }
321
322 inline bool
323 10121x win_scheduler::running_in_this_thread() const noexcept
324 {
325
2/2
✓ Branch 6 → 3 taken 3358 times.
✓ Branch 6 → 7 taken 6763 times.
10121x for (auto* c = iocp::context_stack.get(); c != nullptr; c = c->next)
326
1/2
✓ Branch 3 → 4 taken 3358 times.
✗ Branch 3 → 5 not taken.
3358x if (c->key == this)
327 3358x return true;
328 6763x return false;
329 }
330
331 inline void
332 603172x win_scheduler::work_started() noexcept
333 {
334 603172x ::InterlockedIncrement(&outstanding_work_);
335 603172x }
336
337 inline void
338 615777x win_scheduler::work_finished() noexcept
339 {
340
2/2
✓ Branch 4 → 5 taken 3143 times.
✓ Branch 4 → 6 taken 612634 times.
1231554x if (::InterlockedDecrement(&outstanding_work_) == 0)
341 3143x stop();
342 615777x }
343
344 inline void
345 590958x win_scheduler::on_pending(overlapped_op* op) const
346 {
347 // CAS: try to set ready_ from 0 to 1.
348 // If the old value was 1, GQCS already grabbed this op and stored
349 // results — we need to re-post so do_one() can dispatch it.
350
1/2
✗ Branch 4 → 5 not taken.
✓ Branch 4 → 16 taken 590958 times.
1181916x if (::InterlockedCompareExchange(&op->ready_, 1, 0) == 1)
351 {
352 if (!::PostQueuedCompletionStatus(
353 iocp_, 0, key_result_stored, static_cast<LPOVERLAPPED>(op)))
354 {
355 std::lock_guard<win_mutex> lock(dispatch_mutex_);
356 completed_ops_.push(op);
357 ::InterlockedExchange(&dispatch_required_, 1);
358 }
359 }
360 590958x }
361
362 inline void
363 45x win_scheduler::on_completion(overlapped_op* op, DWORD error, DWORD bytes) const
364 {
365 // Sync completion: pack results into op and post for dispatch.
366 45x op->ready_ = 1;
367 45x op->dwError = error;
368 45x op->bytes_transferred = bytes;
369
370
2/4
✓ Branch 2 → 3 taken 45 times.
✗ Branch 2 → 4 not taken.
✗ Branch 6 → 7 not taken.
✓ Branch 6 → 13 taken 45 times.
90x if (!::PostQueuedCompletionStatus(
371 45x iocp_, 0, key_result_stored, static_cast<LPOVERLAPPED>(op)))
372 {
373 std::lock_guard<win_mutex> lock(dispatch_mutex_);
374 completed_ops_.push(op);
375 ::InterlockedExchange(&dispatch_required_, 1);
376 }
377 45x }
378
379 inline void
380 6310x win_scheduler::stop()
381 {
382
2/2
✓ Branch 4 → 5 taken 3155 times.
✓ Branch 4 → 13 taken 3155 times.
12620x if (::InterlockedExchange(&stopped_, 1) == 0)
383 {
384
1/2
✓ Branch 7 → 8 taken 3155 times.
✗ Branch 7 → 13 not taken.
6310x if (::InterlockedExchange(&stop_event_posted_, 1) == 0)
385 {
386
1/2
✗ Branch 9 → 10 not taken.
✓ Branch 9 → 13 taken 3155 times.
3155x if (!::PostQueuedCompletionStatus(iocp_, 0, key_shutdown, nullptr))
387 {
388 // PQCS failure is non-fatal: stopped_ is already set.
389 // The run() loop will notice via the GQCS timeout
390 // (gqcs_timeout_ms_, default 500ms) and exit.
391 ::InterlockedExchange(&dispatch_required_, 1);
392 }
393 }
394 }
395 6310x }
396
397 inline bool
398 2887x win_scheduler::stopped() const noexcept
399 {
400 // equivalent to atomic read
401 5774x return ::InterlockedExchangeAdd(&stopped_, 0) != 0;
402 }
403
404 inline void
405 2823x win_scheduler::restart()
406 {
407 2823x ::InterlockedExchange(&stopped_, 0);
408 2823x ::InterlockedExchange(&stop_event_posted_, 0);
409 2823x }
410
411 inline std::size_t
412 3161x win_scheduler::run()
413 {
414
2/2
✓ Branch 4 → 5 taken 22 times.
✓ Branch 4 → 7 taken 3139 times.
6322x if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
415 {
416
1/1
✓ Branch 5 → 6 taken 22 times.
22x stop();
417 22x return 0;
418 }
419
420 3139x iocp::thread_context_guard ctx(this);
421
422 3139x std::size_t n = 0;
423 for (;;)
424 {
425
3/3
✓ Branch 9 → 10 taken 603589 times.
✓ Branch 10 → 11 taken 14 times.
✓ Branch 10 → 12 taken 603575 times.
603589x if (!do_one(INFINITE))
426 14x break;
427
1/2
✓ Branch 13 → 14 taken 603575 times.
✗ Branch 13 → 15 not taken.
603575x if (n != (std::numeric_limits<std::size_t>::max)())
428 603575x ++n;
429
2/2
✓ Branch 17 → 18 taken 3125 times.
✓ Branch 17 → 20 taken 600450 times.
1207150x if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
430 {
431
1/1
✓ Branch 18 → 19 taken 3125 times.
3125x stop();
432 3125x break;
433 }
434 }
435 3139x return n;
436 3139x }
437
438 inline std::size_t
439 2x win_scheduler::run_one()
440 {
441
1/2
✗ Branch 4 → 5 not taken.
✓ Branch 4 → 7 taken 2 times.
4x if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
442 {
443 stop();
444 return 0;
445 }
446
447 2x iocp::thread_context_guard ctx(this);
448
1/1
✓ Branch 8 → 9 taken 2 times.
2x return do_one(INFINITE);
449 2x }
450
451 inline std::size_t
452 28x win_scheduler::wait_one(long usec)
453 {
454
2/2
✓ Branch 4 → 5 taken 11 times.
✓ Branch 4 → 7 taken 17 times.
56x if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
455 {
456
1/1
✓ Branch 5 → 6 taken 11 times.
11x stop();
457 11x return 0;
458 }
459
460 17x iocp::thread_context_guard ctx(this);
461 17x unsigned long timeout_ms = INFINITE;
462
1/2
✓ Branch 8 → 9 taken 17 times.
✗ Branch 8 → 13 not taken.
17x if (usec >= 0)
463 {
464 17x auto ms = (static_cast<long long>(usec) + 999) / 1000;
465
1/2
✓ Branch 9 → 10 taken 17 times.
✗ Branch 9 → 11 not taken.
17x timeout_ms = ms >= 0xFFFFFFFELL ? static_cast<unsigned long>(0xFFFFFFFE)
466 : static_cast<unsigned long>(ms);
467 }
468
1/1
✓ Branch 13 → 14 taken 17 times.
17x return do_one(timeout_ms);
469 17x }
470
471 inline std::size_t
472 14x win_scheduler::poll()
473 {
474
2/2
✓ Branch 4 → 5 taken 3 times.
✓ Branch 4 → 7 taken 11 times.
28x if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
475 {
476
1/1
✓ Branch 5 → 6 taken 3 times.
3x stop();
477 3x return 0;
478 }
479
480 11x iocp::thread_context_guard ctx(this);
481
482 11x std::size_t n = 0;
483
3/3
✓ Branch 12 → 13 taken 32 times.
✓ Branch 13 → 9 taken 21 times.
✓ Branch 13 → 14 taken 11 times.
32x while (do_one(0))
484
1/2
✓ Branch 10 → 11 taken 21 times.
✗ Branch 10 → 12 not taken.
21x if (n != (std::numeric_limits<std::size_t>::max)())
485 21x ++n;
486 11x return n;
487 11x }
488
489 inline std::size_t
490 4x win_scheduler::poll_one()
491 {
492
2/2
✓ Branch 4 → 5 taken 2 times.
✓ Branch 4 → 7 taken 2 times.
8x if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
493 {
494
1/1
✓ Branch 5 → 6 taken 2 times.
2x stop();
495 2x return 0;
496 }
497
498 2x iocp::thread_context_guard ctx(this);
499
1/1
✓ Branch 8 → 9 taken 2 times.
2x return do_one(0);
500 2x }
501
502 inline void
503 1138x win_scheduler::post_deferred_completions(op_queue& ops)
504 {
505
1/2
✗ Branch 3 → 4 not taken.
✓ Branch 3 → 15 taken 1138 times.
1138x while (auto h = ops.pop())
506 {
507 if (::PostQueuedCompletionStatus(
508 iocp_, 0, key_posted, reinterpret_cast<LPOVERLAPPED>(h)))
509 continue;
510
511 // Out of resources, put the failed op and remaining ops back
512 ops.push(h);
513 std::lock_guard<win_mutex> lock(dispatch_mutex_);
514 completed_ops_.splice(ops);
515 ::InterlockedExchange(&dispatch_required_, 1);
516 return;
517 }
518 }
519
520 inline std::size_t
521 607608x win_scheduler::do_one(unsigned long timeout_ms)
522 {
523 for (;;)
524 {
525 // Check if we need to process timers or deferred ops
526
2/2
✓ Branch 4 → 5 taken 1138 times.
✓ Branch 4 → 13 taken 606470 times.
1215216x if (::InterlockedCompareExchange(&dispatch_required_, 0, 1) == 1)
527 {
528 1138x op_queue local_ops;
529 {
530 1138x std::lock_guard<win_mutex> lock(dispatch_mutex_);
531 1138x local_ops.splice(completed_ops_);
532 1138x }
533
1/1
✓ Branch 8 → 9 taken 1138 times.
1138x post_deferred_completions(local_ops);
534
535
1/2
✓ Branch 9 → 10 taken 1138 times.
✗ Branch 9 → 11 not taken.
1138x if (timer_svc_)
536
1/1
✓ Branch 10 → 11 taken 1138 times.
1138x timer_svc_->process_expired();
537
538
1/1
✓ Branch 11 → 12 taken 1138 times.
1138x update_timeout();
539 }
540
541 607608x DWORD bytes = 0;
542 607608x ULONG_PTR key = 0;
543 607608x LPOVERLAPPED overlapped = nullptr;
544
1/1
✓ Branch 13 → 14 taken 607608 times.
607608x ::SetLastError(0);
545
546
1/1
✓ Branch 17 → 18 taken 607608 times.
607608x BOOL result = ::GetQueuedCompletionStatus(
547 iocp_, &bytes, &key, &overlapped,
548
2/2
✓ Branch 14 → 15 taken 607560 times.
✓ Branch 14 → 16 taken 48 times.
607608x timeout_ms < gqcs_timeout_ms_ ? timeout_ms
549 : gqcs_timeout_ms_);
550
1/1
✓ Branch 18 → 19 taken 607608 times.
607608x DWORD dwError = ::GetLastError();
551
552 // Handle based on completion key
553
2/2
✓ Branch 19 → 20 taken 603612 times.
✓ Branch 19 → 39 taken 3996 times.
607608x if (overlapped)
554 {
555
2/2
✓ Branch 20 → 21 taken 2043 times.
✓ Branch 20 → 22 taken 601569 times.
603612x DWORD err = result ? 0 : dwError;
556
557
2/3
✓ Branch 23 → 24 taken 591003 times.
✓ Branch 23 → 35 taken 12609 times.
✗ Branch 23 → 38 not taken.
603612x switch (key)
558 {
559 591003x case key_io:
560 case key_result_stored:
561 {
562 591003x auto* ov_op = overlapped_to_op(overlapped);
563
564 // If key_result_stored, results are pre-stored in op fields
565
2/2
✓ Branch 25 → 26 taken 45 times.
✓ Branch 25 → 27 taken 590958 times.
591003x if (key == key_result_stored)
566 {
567 45x bytes = ov_op->bytes_transferred;
568 45x err = ov_op->dwError;
569 }
570
571 // Store GQCS results so on_pending() re-post has valid data
572 591003x ov_op->store_result(bytes, err);
573
574 // CAS: try to set ready_ from 0 to 1.
575 // If old value was 1, the initiator already returned
576 // (on_pending/on_completion set it) — safe to dispatch.
577 // If old value was 0, the initiator hasn't returned yet —
578 // skip dispatch; on_pending() will re-post.
579
1/2
✓ Branch 30 → 31 taken 591003 times.
✗ Branch 30 → 34 not taken.
1182006x if (::InterlockedCompareExchange(&ov_op->ready_, 1, 0) == 1)
580 {
581
1/1
✓ Branch 31 → 32 taken 591003 times.
591003x ov_op->complete(this, bytes, err);
582 591003x work_finished();
583 603642x return 1;
584 }
585 3931x continue;
586 }
587
588 12609x case key_posted:
589 {
590 // Posted scheduler_op*: overlapped is actually a scheduler_op*
591 12609x auto* op = reinterpret_cast<scheduler_op*>(overlapped);
592
1/1
✓ Branch 35 → 36 taken 12609 times.
12609x op->complete(this, bytes, err);
593 12609x work_finished();
594 12609x return 1;
595 }
596
597 default:
598 continue;
599 }
600 }
601
602 // Signal completions (no OVERLAPPED)
603
2/2
✓ Branch 39 → 40 taken 3949 times.
✓ Branch 39 → 53 taken 47 times.
3996x if (result)
604 {
605
2/3
✓ Branch 40 → 41 taken 1138 times.
✓ Branch 40 → 42 taken 2811 times.
✗ Branch 40 → 52 not taken.
3949x switch (key)
606 {
607 1138x case key_wake_dispatch:
608 // Timer wakeup - loop to check dispatch_required_
609 1138x continue;
610
611 2811x case key_shutdown:
612 2811x ::InterlockedExchange(&stop_event_posted_, 0);
613
2/2
✓ Branch 45 → 46 taken 18 times.
✓ Branch 45 → 51 taken 2793 times.
2811x if (stopped())
614 {
615 // Re-post for other waiting threads
616
1/2
✓ Branch 48 → 49 taken 18 times.
✗ Branch 48 → 50 not taken.
36x if (::InterlockedExchange(&stop_event_posted_, 1) == 0)
617 {
618
1/1
✓ Branch 49 → 50 taken 18 times.
18x ::PostQueuedCompletionStatus(
619 iocp_, 0, key_shutdown, nullptr);
620 }
621 18x return 0;
622 }
623 2793x continue;
624
625 default:
626 continue;
627 }
628 }
629
630 // Timeout or error
631
1/2
✗ Branch 53 → 54 not taken.
✓ Branch 53 → 56 taken 47 times.
47x if (dwError != WAIT_TIMEOUT)
632 detail::throw_system_error(make_err(dwError));
633
2/2
✓ Branch 56 → 57 taken 12 times.
✓ Branch 56 → 58 taken 35 times.
47x if (timeout_ms != INFINITE)
634 12x return 0;
635 // PQCS-failure fallback: stop() sets stopped_ and
636 // dispatch_required_ but if the key_shutdown post failed,
637 // no completion is ever dequeued. Catch it here on the
638 // periodic 500 ms GQCS timeout so run()/run_one() can exit.
639
1/2
✗ Branch 59 → 60 not taken.
✓ Branch 59 → 61 taken 35 times.
35x if (stopped())
640 return 0;
641 3966x }
642 }
643
644 inline void
645 2015x win_scheduler::on_timer_changed(void* ctx)
646 {
647 2015x static_cast<win_scheduler*>(ctx)->update_timeout();
648 2015x }
649
650 inline void
651 598x win_scheduler::set_timer_service(timer_service* svc)
652 {
653 598x timer_svc_ = svc;
654 // Pass 'this' as context - callback routes to correct instance
655 598x svc->set_on_earliest_changed(
656 598x timer_service::callback{this, &on_timer_changed});
657
1/2
✓ Branch 5 → 6 taken 598 times.
✗ Branch 5 → 8 not taken.
598x if (timers_)
658 598x timers_->start();
659 598x }
660
661 inline void
662 3153x win_scheduler::update_timeout()
663 {
664
3/6
✓ Branch 2 → 3 taken 3153 times.
✗ Branch 2 → 6 not taken.
✓ Branch 4 → 5 taken 3153 times.
✗ Branch 4 → 6 not taken.
✓ Branch 7 → 8 taken 3153 times.
✗ Branch 7 → 11 not taken.
3153x if (timer_svc_ && timers_)
665 3153x timers_->update_timeout(timer_svc_->nearest_expiry());
666 3153x }
667
668 } // namespace boost::corosio::detail
669
670 // Defer including the auxiliary wait reactor until the scheduler is
671 // fully defined, since the reactor's inline methods call back into
672 // win_scheduler. This also gives the dtor and wait_reactor() below a
673 // complete win_wait_reactor type for unique_ptr destruction and
674 // lazy construction.
675 //
676 // The macro lets win_wait_reactor.hpp diagnose direct inclusion
677 // (which would land it here with win_scheduler still incomplete).
678 #define BOOST_COROSIO_DETAIL_IOCP_WIN_SCHEDULER_BODY_DONE
679 #include <boost/corosio/native/detail/iocp/win_wait_reactor.hpp>
680
681 namespace boost::corosio::detail {
682
683 inline void
684 598x win_scheduler::shutdown()
685 {
686
1/2
✓ Branch 3 → 4 taken 598 times.
✗ Branch 3 → 6 not taken.
598x if (timers_)
687 598x timers_->stop();
688
689 // Drain timer heap before the work-counting loop. The timer_service
690 // was registered after this scheduler (nested make_service from our
691 // constructor), so execution_context::shutdown() calls us first.
692 // Asio avoids this by owning timer queues directly inside the
693 // scheduler; we bridge the gap by shutting down the timer service
694 // early. The subsequent call from execution_context is a no-op.
695
1/2
✓ Branch 6 → 7 taken 598 times.
✗ Branch 6 → 8 not taken.
598x if (timer_svc_)
696 598x timer_svc_->shutdown();
697
698 // Same problem for the auxiliary wait reactor: ops parked in it
699 // hold work_started credit. Stop the reactor early so its loop
700 // drains them as cancelled and the work counter can reach zero.
701
2/2
✓ Branch 9 → 10 taken 9 times.
✓ Branch 9 → 12 taken 589 times.
598x if (wait_reactor_ready_.load(std::memory_order_acquire))
702 9x wait_reactor_->stop();
703
704
2/2
✓ Branch 36 → 13 taken 17 times.
✓ Branch 36 → 37 taken 598 times.
1230x while (::InterlockedExchangeAdd(&outstanding_work_, 0) > 0)
705 {
706 17x op_queue ops;
707 {
708 17x std::lock_guard<win_mutex> lock(dispatch_mutex_);
709 17x ops.splice(completed_ops_);
710 17x }
711
712
1/2
✗ Branch 17 → 18 not taken.
✓ Branch 17 → 24 taken 17 times.
17x if (!ops.empty())
713 {
714 while (auto* h = ops.pop())
715 {
716 ::InterlockedDecrement(&outstanding_work_);
717 h->destroy();
718 }
719 }
720 else
721 {
722 DWORD bytes;
723 ULONG_PTR key;
724 LPOVERLAPPED overlapped;
725
1/1
✓ Branch 24 → 25 taken 17 times.
17x ::GetQueuedCompletionStatus(
726 iocp_, &bytes, &key, &overlapped, gqcs_timeout_ms_);
727
2/2
✓ Branch 25 → 26 taken 16 times.
✓ Branch 25 → 32 taken 1 time.
17x if (overlapped)
728 {
729 16x ::InterlockedDecrement(&outstanding_work_);
730
2/2
✓ Branch 28 → 29 taken 12 times.
✓ Branch 28 → 30 taken 4 times.
16x if (key == key_posted)
731 {
732 12x auto* op = reinterpret_cast<scheduler_op*>(overlapped);
733
1/1
✓ Branch 29 → 32 taken 12 times.
12x op->destroy();
734 }
735 else
736 {
737 4x auto* op = overlapped_to_op(overlapped);
738
1/1
✓ Branch 31 → 32 taken 4 times.
4x op->destroy();
739 }
740 }
741 }
742 }
743 598x }
744
745 1196x inline win_scheduler::~win_scheduler()
746 {
747
2/2
✓ Branch 3 → 4 taken 9 times.
✓ Branch 3 → 6 taken 589 times.
598x if (wait_reactor_)
748 9x wait_reactor_->stop();
749 598x wait_reactor_.reset();
750
751
1/2
✓ Branch 7 → 8 taken 598 times.
✗ Branch 7 → 9 not taken.
598x if (iocp_ != nullptr)
752 598x ::CloseHandle(iocp_);
753 1196x }
754
755 inline win_wait_reactor&
756 9x win_scheduler::wait_reactor()
757 {
758 // Lazy thread-safe init: multiple IOCP workers may race the first
759 // wait() call. wait_reactor_ready_ is set with release ordering
760 // after construction so cancel_wait_if_constructed can safely
761 // observe the reactor without forcing construction itself.
762
1/1
✓ Branch 2 → 3 taken 9 times.
9x std::call_once(wait_reactor_once_, [this] {
763
1/1
✓ Branch 2 → 3 taken 9 times.
9x wait_reactor_ = std::make_unique<win_wait_reactor>(*this);
764 9x wait_reactor_ready_.store(true, std::memory_order_release);
765 9x });
766 9x return *wait_reactor_;
767 }
768
769 inline void
770 20787x win_scheduler::cancel_wait_if_constructed(overlapped_op* op) noexcept
771 {
772
2/2
✓ Branch 3 → 4 taken 54 times.
✓ Branch 3 → 6 taken 20733 times.
20787x if (wait_reactor_ready_.load(std::memory_order_acquire))
773 54x wait_reactor_->cancel_wait(op);
774 20787x }
775
776 } // namespace boost::corosio::detail
777
778 #endif // BOOST_COROSIO_HAS_IOCP
779
780 #endif // BOOST_COROSIO_NATIVE_DETAIL_IOCP_WIN_SCHEDULER_HPP
781