include/boost/corosio/native/detail/iocp/win_scheduler.hpp

84.2% Lines (224/266) 100.0% List of functions (28/28) 70.5% Branches (110/156)
f(x) Functions (28)
Function Calls Lines Branches Blocks
boost::corosio::detail::win_scheduler::native_handle() const :75 741x 100.0% 100.0% boost::corosio::detail::iocp::thread_context_guard::thread_context_guard(boost::corosio::detail::win_scheduler const*) :145 2787x 100.0% 100.0% boost::corosio::detail::iocp::thread_context_guard::~thread_context_guard() :151 2787x 100.0% 100.0% boost::corosio::detail::win_scheduler::win_scheduler(boost::capy::execution_context&, int) :159 369x 92.9% 72.7% 56.7% boost::corosio::detail::win_scheduler::~win_scheduler() :186 738x 100.0% 100.0% boost::corosio::detail::win_scheduler::shutdown() :193 369x 83.3% 66.7% 79.4% boost::corosio::detail::win_scheduler::post(std::__n4861::coroutine_handle<void>) const :249 9450x 60.0% 50.0% 53.8% boost::corosio::detail::win_scheduler::post(std::__n4861::coroutine_handle<void>) const::post_handler::do_complete(void*, boost::corosio::detail::scheduler_op*, unsigned int, unsigned int) :255 9450x 100.0% 70.0% 87.5% boost::corosio::detail::win_scheduler::post(std::__n4861::coroutine_handle<void>) const::post_handler::post_handler(std::__n4861::coroutine_handle<void>) :291 9450x 100.0% 100.0% boost::corosio::detail::win_scheduler::post(boost::corosio::detail::scheduler_op*) const :311 1984x 55.6% 50.0% 45.5% boost::corosio::detail::win_scheduler::running_in_this_thread() const :325 9164x 100.0% 75.0% 85.7% boost::corosio::detail::win_scheduler::work_started() :334 556550x 100.0% 100.0% boost::corosio::detail::win_scheduler::work_finished() :340 567970x 100.0% 100.0% 100.0% boost::corosio::detail::win_scheduler::on_pending(boost::corosio::detail::overlapped_op*) const :347 545404x 33.3% 16.7% 26.7% boost::corosio::detail::win_scheduler::on_completion(boost::corosio::detail::overlapped_op*, unsigned long, unsigned long) const :365 14x 63.6% 50.0% 41.7% boost::corosio::detail::win_scheduler::stop() :382 5523x 83.3% 66.7% 75.0% boost::corosio::detail::win_scheduler::stopped() const :400 2561x 100.0% 100.0% boost::corosio::detail::win_scheduler::restart() :407 2535x 100.0% 100.0% boost::corosio::detail::win_scheduler::run() :414 2772x 100.0% 90.9% 92.0% boost::corosio::detail::win_scheduler::run_one() :441 2x 71.4% 50.0% 71.4% boost::corosio::detail::win_scheduler::wait_one(long) :454 36x 100.0% 75.0% 84.2% boost::corosio::detail::win_scheduler::poll() :474 5x 100.0% 87.5% 88.9% boost::corosio::detail::win_scheduler::poll_one() :492 4x 100.0% 100.0% 85.7% boost::corosio::detail::win_scheduler::post_deferred_completions(boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&) :505 1161x 20.0% 20.0% 31.2% boost::corosio::detail::win_scheduler::do_one(unsigned long) :523 556861x 86.7% 82.9% 88.1% boost::corosio::detail::win_scheduler::on_timer_changed(void*) :647 1927x 100.0% 100.0% boost::corosio::detail::win_scheduler::set_timer_service(boost::corosio::detail::timer_service*) :653 369x 100.0% 50.0% 100.0% boost::corosio::detail::win_scheduler::update_timeout() :664 3088x 100.0% 50.0% 90.0%
Line Branch TLA Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco ([email protected])
3 // Copyright (c) 2026 Steve Gerbino
4 //
5 // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 //
8 // Official repository: https://github.com/cppalliance/corosio
9 //
10
11 #ifndef BOOST_COROSIO_NATIVE_DETAIL_IOCP_WIN_SCHEDULER_HPP
12 #define BOOST_COROSIO_NATIVE_DETAIL_IOCP_WIN_SCHEDULER_HPP
13
14 #include <boost/corosio/detail/platform.hpp>
15
16 #if BOOST_COROSIO_HAS_IOCP
17
18 #include <boost/corosio/detail/config.hpp>
19 #include <boost/capy/ex/execution_context.hpp>
20
21 #include <boost/corosio/native/native_scheduler.hpp>
22 #include <system_error>
23
24 #include <boost/corosio/detail/scheduler_op.hpp>
25 #include <boost/corosio/native/detail/iocp/win_completion_key.hpp>
26 #include <boost/corosio/native/detail/iocp/win_mutex.hpp>
27
28 #include <boost/corosio/native/detail/iocp/win_overlapped_op.hpp>
29 #include <boost/corosio/native/detail/iocp/win_timers.hpp>
30 #include <boost/corosio/detail/timer_service.hpp>
31 #include <boost/corosio/native/detail/iocp/win_resolver_service.hpp>
32 #include <boost/corosio/native/detail/make_err.hpp>
33 #include <boost/corosio/detail/except.hpp>
34 #include <boost/corosio/detail/thread_local_ptr.hpp>
35
36 #include <atomic>
37 #include <chrono>
38 #include <cstdint>
39 #include <limits>
40 #include <memory>
41
42 #include <boost/corosio/native/detail/iocp/win_windows.hpp>
43
44 namespace boost::corosio::detail {
45
46 // Forward declarations
47 struct overlapped_op;
48 class win_timers;
49
50 class BOOST_COROSIO_DECL win_scheduler final
51 : public native_scheduler
52 , public capy::execution_context::service
53 {
54 public:
55 using key_type = scheduler;
56
57 win_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
58 ~win_scheduler();
59 win_scheduler(win_scheduler const&) = delete;
60 win_scheduler& operator=(win_scheduler const&) = delete;
61
62 void shutdown() override;
63 void post(std::coroutine_handle<> h) const override;
64 void post(scheduler_op* h) const override;
65 bool running_in_this_thread() const noexcept override;
66 void stop() override;
67 bool stopped() const noexcept override;
68 void restart() override;
69 std::size_t run() override;
70 std::size_t run_one() override;
71 std::size_t wait_one(long usec) override;
72 std::size_t poll() override;
73 std::size_t poll_one() override;
74
75 741x void* native_handle() const noexcept
76 {
77 741x return iocp_;
78 }
79
80 void work_started() noexcept override;
81 void work_finished() noexcept override;
82
83 /** Signal that an overlapped I/O operation is now pending.
84 Coordinates with do_one() via the ready_ CAS protocol. */
85 void on_pending(overlapped_op* op) const;
86
87 /** Post an immediate completion with pre-stored results.
88 Used for sync errors and noop paths. */
89 void on_completion(overlapped_op* op, DWORD error, DWORD bytes) const;
90
91 // Timer service integration
92 void set_timer_service(timer_service* svc);
93 void update_timeout();
94
95 private:
96 static void on_timer_changed(void* ctx);
97 void post_deferred_completions(op_queue& ops);
98 std::size_t do_one(unsigned long timeout_ms);
99
100 void* iocp_;
101 mutable long outstanding_work_;
102 mutable long stopped_;
103 long stop_event_posted_;
104 mutable long dispatch_required_;
105
106 mutable win_mutex dispatch_mutex_;
107 mutable op_queue completed_ops_;
108 std::unique_ptr<win_timers> timers_;
109 };
110
111 /*
112 ARCHITECTURE NOTE: Function Pointer Dispatch
113
114 All I/O handles are registered with the IOCP using key_io (0).
115 Dispatch happens via the function pointer stored in each scheduler_op.
116
117 When GQCS returns with an OVERLAPPED*, we cast it to scheduler_op*
118 and call the function pointer directly - no virtual dispatch.
119
120 The completion_key enum values are used only for internal signals:
121 - key_io (0): Normal I/O completion, dispatch via func_
122 - key_wake_dispatch (1): Timer wakeup, check dispatch_required_
123 - key_shutdown (2): Stop signal
124 - key_result_stored (3): Results pre-stored in OVERLAPPED
125 */
126
127 namespace iocp {
128
129 // Max timeout for GQCS to allow periodic re-checking of conditions.
130 // Matches Asio's default_gqcs_timeout for pre-Vista compatibility.
131 inline constexpr unsigned long max_gqcs_timeout = 500;
132
133 struct BOOST_COROSIO_SYMBOL_VISIBLE scheduler_context
134 {
135 win_scheduler const* key;
136 scheduler_context* next;
137 };
138
139 inline thread_local_ptr<scheduler_context> context_stack;
140
141 struct thread_context_guard
142 {
143 scheduler_context frame_;
144
145 2787x explicit thread_context_guard(win_scheduler const* ctx) noexcept
146 2787x : frame_{ctx, context_stack.get()}
147 {
148 2787x context_stack.set(&frame_);
149 2787x }
150
151 2787x ~thread_context_guard() noexcept
152 {
153 2787x context_stack.set(frame_.next);
154 2787x }
155 };
156
157 } // namespace iocp
158
159 369x inline win_scheduler::win_scheduler(
160 369x capy::execution_context& ctx, int concurrency_hint)
161 369x : iocp_(nullptr)
162 369x , outstanding_work_(0)
163 369x , stopped_(0)
164 369x , stop_event_posted_(0)
165
1/1
✓ Branch 4 → 5 taken 369 times.
369x , dispatch_required_(0)
166 {
167 // concurrency_hint < 0 means use system default (DWORD(~0) = max)
168
2/3
✓ Branch 7 → 8 taken 369 times.
✗ Branch 7 → 9 not taken.
✓ Branch 10 → 11 taken 369 times.
369x iocp_ = ::CreateIoCompletionPort(
169 INVALID_HANDLE_VALUE, nullptr, 0,
170 static_cast<DWORD>(
171 concurrency_hint >= 0 ? concurrency_hint : DWORD(~0)));
172
173
1/2
✗ Branch 11 → 12 not taken.
✓ Branch 11 → 15 taken 369 times.
369x if (iocp_ == nullptr)
174 detail::throw_system_error(make_err(::GetLastError()));
175
176 // Create timer wakeup mechanism (tries NT native, falls back to thread)
177
1/1
✓ Branch 15 → 16 taken 369 times.
369x timers_ = make_win_timers(iocp_, &dispatch_required_);
178
179 // Connect timer service to scheduler
180
2/2
✓ Branch 18 → 19 taken 369 times.
✓ Branch 19 → 20 taken 369 times.
369x set_timer_service(&get_timer_service(ctx, *this));
181
182 // Initialize resolver service
183
1/1
✓ Branch 20 → 21 taken 369 times.
369x ctx.make_service<win_resolver_service>(*this);
184 369x }
185
186 738x inline win_scheduler::~win_scheduler()
187 {
188
1/2
✓ Branch 2 → 3 taken 369 times.
✗ Branch 2 → 4 not taken.
369x if (iocp_ != nullptr)
189 369x ::CloseHandle(iocp_);
190 738x }
191
192 inline void
193 369x win_scheduler::shutdown()
194 {
195
1/2
✓ Branch 3 → 4 taken 369 times.
✗ Branch 3 → 6 not taken.
369x if (timers_)
196 369x timers_->stop();
197
198 // Drain timer heap before the work-counting loop. The timer_service
199 // was registered after this scheduler (nested make_service from our
200 // constructor), so execution_context::shutdown() calls us first.
201 // Asio avoids this by owning timer queues directly inside the
202 // scheduler; we bridge the gap by shutting down the timer service
203 // early. The subsequent call from execution_context is a no-op.
204
1/2
✓ Branch 6 → 7 taken 369 times.
✗ Branch 6 → 8 not taken.
369x if (timer_svc_)
205 369x timer_svc_->shutdown();
206
207
2/2
✓ Branch 32 → 9 taken 15 times.
✓ Branch 32 → 33 taken 369 times.
768x while (::InterlockedExchangeAdd(&outstanding_work_, 0) > 0)
208 {
209 15x op_queue ops;
210 {
211 15x std::lock_guard<win_mutex> lock(dispatch_mutex_);
212 15x ops.splice(completed_ops_);
213 15x }
214
215
1/2
✗ Branch 13 → 14 not taken.
✓ Branch 13 → 20 taken 15 times.
15x if (!ops.empty())
216 {
217 while (auto* h = ops.pop())
218 {
219 ::InterlockedDecrement(&outstanding_work_);
220 h->destroy();
221 }
222 }
223 else
224 {
225 DWORD bytes;
226 ULONG_PTR key;
227 LPOVERLAPPED overlapped;
228
1/1
✓ Branch 20 → 21 taken 15 times.
15x ::GetQueuedCompletionStatus(
229 iocp_, &bytes, &key, &overlapped, iocp::max_gqcs_timeout);
230
2/2
✓ Branch 21 → 22 taken 14 times.
✓ Branch 21 → 28 taken 1 time.
15x if (overlapped)
231 {
232 14x ::InterlockedDecrement(&outstanding_work_);
233
2/2
✓ Branch 24 → 25 taken 10 times.
✓ Branch 24 → 26 taken 4 times.
14x if (key == key_posted)
234 {
235 10x auto* op = reinterpret_cast<scheduler_op*>(overlapped);
236
1/1
✓ Branch 25 → 28 taken 10 times.
10x op->destroy();
237 }
238 else
239 {
240 4x auto* op = overlapped_to_op(overlapped);
241
1/1
✓ Branch 27 → 28 taken 4 times.
4x op->destroy();
242 }
243 }
244 }
245 }
246 369x }
247
248 inline void
249 9450x win_scheduler::post(std::coroutine_handle<> h) const
250 {
251 struct post_handler final : scheduler_op
252 {
253 std::coroutine_handle<> h_;
254
255 9450x static void do_complete(
256 void* owner, scheduler_op* base, std::uint32_t, std::uint32_t)
257 {
258 9450x auto* self = static_cast<post_handler*>(base);
259
2/2
✓ Branch 2 → 3 taken 6 times.
✓ Branch 2 → 10 taken 9444 times.
9450x if (!owner)
260 {
261 // Shutdown path: destroy the coroutine frame synchronously.
262 //
263 // Bounded destruction invariant: the chain triggered by
264 // coro.destroy() is at most two levels deep:
265 // 1. task frame destroyed → ~io_awaitable_promise_base()
266 // destroys stored continuation (if != noop_coroutine)
267 // 2. continuation (trampoline) destroyed → final_suspend
268 // returns suspend_never, no further continuation
269 //
270 // If a future refactor adds deeper continuation chains,
271 // this would reintroduce re-entrant stack overflow risk.
272 #ifndef NDEBUG
273 static thread_local int destroy_depth = 0;
274 6x ++destroy_depth;
275
1/2
✗ Branch 3 → 4 not taken.
✓ Branch 3 → 5 taken 6 times.
6x BOOST_COROSIO_ASSERT(destroy_depth <= 2);
276 #endif
277 6x auto coro = self->h_;
278
1/2
✓ Branch 6 → 7 taken 6 times.
✗ Branch 6 → 8 not taken.
6x delete self;
279
1/1
✓ Branch 8 → 9 taken 6 times.
6x coro.destroy();
280 #ifndef NDEBUG
281 6x --destroy_depth;
282 #endif
283 6x return;
284 }
285 9444x auto coro = self->h_;
286
1/2
✓ Branch 10 → 11 taken 9444 times.
✗ Branch 10 → 12 not taken.
9444x delete self;
287 std::atomic_thread_fence(std::memory_order_acquire);
288
1/1
✓ Branch 13 → 14 taken 9444 times.
9444x coro.resume();
289 }
290
291 9450x explicit post_handler(std::coroutine_handle<> coro)
292 9450x : scheduler_op(&do_complete)
293 9450x , h_(coro)
294 {
295 9450x }
296 };
297
298 9450x auto* ph = new post_handler(h);
299 9450x ::InterlockedIncrement(&outstanding_work_);
300
301
1/2
✗ Branch 7 → 8 not taken.
✓ Branch 7 → 14 taken 9450 times.
9450x if (!::PostQueuedCompletionStatus(
302 9450x iocp_, 0, key_posted, reinterpret_cast<LPOVERLAPPED>(ph)))
303 {
304 std::lock_guard<win_mutex> lock(dispatch_mutex_);
305 completed_ops_.push(ph);
306 ::InterlockedExchange(&dispatch_required_, 1);
307 }
308 9450x }
309
310 inline void
311 1984x win_scheduler::post(scheduler_op* h) const
312 {
313 1984x ::InterlockedIncrement(&outstanding_work_);
314
315
1/2
✗ Branch 5 → 6 not taken.
✓ Branch 5 → 12 taken 1984 times.
1984x if (!::PostQueuedCompletionStatus(
316 1984x iocp_, 0, key_posted, reinterpret_cast<LPOVERLAPPED>(h)))
317 {
318 std::lock_guard<win_mutex> lock(dispatch_mutex_);
319 completed_ops_.push(h);
320 ::InterlockedExchange(&dispatch_required_, 1);
321 }
322 1984x }
323
324 inline bool
325 9164x win_scheduler::running_in_this_thread() const noexcept
326 {
327
2/2
✓ Branch 6 → 3 taken 3333 times.
✓ Branch 6 → 7 taken 5831 times.
9164x for (auto* c = iocp::context_stack.get(); c != nullptr; c = c->next)
328
1/2
✓ Branch 3 → 4 taken 3333 times.
✗ Branch 3 → 5 not taken.
3333x if (c->key == this)
329 3333x return true;
330 5831x return false;
331 }
332
333 inline void
334 556550x win_scheduler::work_started() noexcept
335 {
336 556550x ::InterlockedIncrement(&outstanding_work_);
337 556550x }
338
339 inline void
340 567970x win_scheduler::work_finished() noexcept
341 {
342
2/2
✓ Branch 4 → 5 taken 2751 times.
✓ Branch 4 → 6 taken 565219 times.
1135940x if (::InterlockedDecrement(&outstanding_work_) == 0)
343 2751x stop();
344 567970x }
345
346 inline void
347 545404x win_scheduler::on_pending(overlapped_op* op) const
348 {
349 // CAS: try to set ready_ from 0 to 1.
350 // If the old value was 1, GQCS already grabbed this op and stored
351 // results — we need to re-post so do_one() can dispatch it.
352
1/2
✗ Branch 4 → 5 not taken.
✓ Branch 4 → 16 taken 545404 times.
1090808x if (::InterlockedCompareExchange(&op->ready_, 1, 0) == 1)
353 {
354 if (!::PostQueuedCompletionStatus(
355 iocp_, 0, key_result_stored, static_cast<LPOVERLAPPED>(op)))
356 {
357 std::lock_guard<win_mutex> lock(dispatch_mutex_);
358 completed_ops_.push(op);
359 ::InterlockedExchange(&dispatch_required_, 1);
360 }
361 }
362 545404x }
363
364 inline void
365 14x win_scheduler::on_completion(overlapped_op* op, DWORD error, DWORD bytes) const
366 {
367 // Sync completion: pack results into op and post for dispatch.
368 14x op->ready_ = 1;
369 14x op->dwError = error;
370 14x op->bytes_transferred = bytes;
371
372
2/4
✓ Branch 2 → 3 taken 14 times.
✗ Branch 2 → 4 not taken.
✗ Branch 6 → 7 not taken.
✓ Branch 6 → 13 taken 14 times.
28x if (!::PostQueuedCompletionStatus(
373 14x iocp_, 0, key_result_stored, static_cast<LPOVERLAPPED>(op)))
374 {
375 std::lock_guard<win_mutex> lock(dispatch_mutex_);
376 completed_ops_.push(op);
377 ::InterlockedExchange(&dispatch_required_, 1);
378 }
379 14x }
380
381 inline void
382 5523x win_scheduler::stop()
383 {
384
2/2
✓ Branch 4 → 5 taken 2758 times.
✓ Branch 4 → 13 taken 2765 times.
11046x if (::InterlockedExchange(&stopped_, 1) == 0)
385 {
386
1/2
✓ Branch 7 → 8 taken 2758 times.
✗ Branch 7 → 13 not taken.
5516x if (::InterlockedExchange(&stop_event_posted_, 1) == 0)
387 {
388
1/2
✗ Branch 9 → 10 not taken.
✓ Branch 9 → 13 taken 2758 times.
2758x if (!::PostQueuedCompletionStatus(iocp_, 0, key_shutdown, nullptr))
389 {
390 // PQCS failure is non-fatal: stopped_ is already set.
391 // The run() loop will notice via the GQCS timeout
392 // (max_gqcs_timeout = 500ms) and exit.
393 ::InterlockedExchange(&dispatch_required_, 1);
394 }
395 }
396 }
397 5523x }
398
399 inline bool
400 2561x win_scheduler::stopped() const noexcept
401 {
402 // equivalent to atomic read
403 5122x return ::InterlockedExchangeAdd(&stopped_, 0) != 0;
404 }
405
406 inline void
407 2535x win_scheduler::restart()
408 {
409 2535x ::InterlockedExchange(&stopped_, 0);
410 2535x ::InterlockedExchange(&stop_event_posted_, 0);
411 2535x }
412
413 inline std::size_t
414 2772x win_scheduler::run()
415 {
416
2/2
✓ Branch 4 → 5 taken 21 times.
✓ Branch 4 → 7 taken 2751 times.
5544x if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
417 {
418
1/1
✓ Branch 5 → 6 taken 21 times.
21x stop();
419 21x return 0;
420 }
421
422 2751x iocp::thread_context_guard ctx(this);
423
424 2751x std::size_t n = 0;
425 for (;;)
426 {
427
3/3
✓ Branch 9 → 10 taken 556820 times.
✓ Branch 10 → 11 taken 15 times.
✓ Branch 10 → 12 taken 556805 times.
556820x if (!do_one(INFINITE))
428 15x break;
429
1/2
✓ Branch 13 → 14 taken 556805 times.
✗ Branch 13 → 15 not taken.
556805x if (n != (std::numeric_limits<std::size_t>::max)())
430 556805x ++n;
431
2/2
✓ Branch 17 → 18 taken 2736 times.
✓ Branch 17 → 20 taken 554069 times.
1113610x if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
432 {
433
1/1
✓ Branch 18 → 19 taken 2736 times.
2736x stop();
434 2736x break;
435 }
436 }
437 2751x return n;
438 2751x }
439
440 inline std::size_t
441 2x win_scheduler::run_one()
442 {
443
1/2
✗ Branch 4 → 5 not taken.
✓ Branch 4 → 7 taken 2 times.
4x if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
444 {
445 stop();
446 return 0;
447 }
448
449 2x iocp::thread_context_guard ctx(this);
450
1/1
✓ Branch 8 → 9 taken 2 times.
2x return do_one(INFINITE);
451 2x }
452
453 inline std::size_t
454 36x win_scheduler::wait_one(long usec)
455 {
456
2/2
✓ Branch 4 → 5 taken 8 times.
✓ Branch 4 → 7 taken 28 times.
72x if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
457 {
458
1/1
✓ Branch 5 → 6 taken 8 times.
8x stop();
459 8x return 0;
460 }
461
462 28x iocp::thread_context_guard ctx(this);
463 28x unsigned long timeout_ms = INFINITE;
464
1/2
✓ Branch 8 → 9 taken 28 times.
✗ Branch 8 → 13 not taken.
28x if (usec >= 0)
465 {
466 28x auto ms = (static_cast<long long>(usec) + 999) / 1000;
467
1/2
✓ Branch 9 → 10 taken 28 times.
✗ Branch 9 → 11 not taken.
28x timeout_ms = ms >= 0xFFFFFFFELL ? static_cast<unsigned long>(0xFFFFFFFE)
468 : static_cast<unsigned long>(ms);
469 }
470
1/1
✓ Branch 13 → 14 taken 28 times.
28x return do_one(timeout_ms);
471 28x }
472
473 inline std::size_t
474 5x win_scheduler::poll()
475 {
476
2/2
✓ Branch 4 → 5 taken 1 time.
✓ Branch 4 → 7 taken 4 times.
10x if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
477 {
478
1/1
✓ Branch 5 → 6 taken 1 time.
1x stop();
479 1x return 0;
480 }
481
482 4x iocp::thread_context_guard ctx(this);
483
484 4x std::size_t n = 0;
485
3/3
✓ Branch 12 → 13 taken 9 times.
✓ Branch 13 → 9 taken 5 times.
✓ Branch 13 → 14 taken 4 times.
9x while (do_one(0))
486
1/2
✓ Branch 10 → 11 taken 5 times.
✗ Branch 10 → 12 not taken.
5x if (n != (std::numeric_limits<std::size_t>::max)())
487 5x ++n;
488 4x return n;
489 4x }
490
491 inline std::size_t
492 4x win_scheduler::poll_one()
493 {
494
2/2
✓ Branch 4 → 5 taken 2 times.
✓ Branch 4 → 7 taken 2 times.
8x if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
495 {
496
1/1
✓ Branch 5 → 6 taken 2 times.
2x stop();
497 2x return 0;
498 }
499
500 2x iocp::thread_context_guard ctx(this);
501
1/1
✓ Branch 8 → 9 taken 2 times.
2x return do_one(0);
502 2x }
503
504 inline void
505 1161x win_scheduler::post_deferred_completions(op_queue& ops)
506 {
507
1/2
✗ Branch 3 → 4 not taken.
✓ Branch 3 → 15 taken 1161 times.
1161x while (auto h = ops.pop())
508 {
509 if (::PostQueuedCompletionStatus(
510 iocp_, 0, key_posted, reinterpret_cast<LPOVERLAPPED>(h)))
511 continue;
512
513 // Out of resources, put the failed op and remaining ops back
514 ops.push(h);
515 std::lock_guard<win_mutex> lock(dispatch_mutex_);
516 completed_ops_.splice(ops);
517 ::InterlockedExchange(&dispatch_required_, 1);
518 return;
519 }
520 }
521
522 inline std::size_t
523 560544x win_scheduler::do_one(unsigned long timeout_ms)
524 {
525 for (;;)
526 {
527 // Check if we need to process timers or deferred ops
528
2/2
✓ Branch 4 → 5 taken 1161 times.
✓ Branch 4 → 13 taken 559383 times.
1121088x if (::InterlockedCompareExchange(&dispatch_required_, 0, 1) == 1)
529 {
530 1161x op_queue local_ops;
531 {
532 1161x std::lock_guard<win_mutex> lock(dispatch_mutex_);
533 1161x local_ops.splice(completed_ops_);
534 1161x }
535
1/1
✓ Branch 8 → 9 taken 1161 times.
1161x post_deferred_completions(local_ops);
536
537
1/2
✓ Branch 9 → 10 taken 1161 times.
✗ Branch 9 → 11 not taken.
1161x if (timer_svc_)
538
1/1
✓ Branch 10 → 11 taken 1161 times.
1161x timer_svc_->process_expired();
539
540
1/1
✓ Branch 11 → 12 taken 1161 times.
1161x update_timeout();
541 }
542
543 560544x DWORD bytes = 0;
544 560544x ULONG_PTR key = 0;
545 560544x LPOVERLAPPED overlapped = nullptr;
546
1/1
✓ Branch 13 → 14 taken 560544 times.
560544x ::SetLastError(0);
547
548
1/1
✓ Branch 17 → 18 taken 560544 times.
560544x BOOL result = ::GetQueuedCompletionStatus(
549 iocp_, &bytes, &key, &overlapped,
550
2/2
✓ Branch 14 → 15 taken 43 times.
✓ Branch 14 → 16 taken 560501 times.
560544x timeout_ms < iocp::max_gqcs_timeout ? timeout_ms
551 : iocp::max_gqcs_timeout);
552
1/1
✓ Branch 18 → 19 taken 560544 times.
560544x DWORD dwError = ::GetLastError();
553
554 // Handle based on completion key
555
2/2
✓ Branch 19 → 20 taken 556842 times.
✓ Branch 19 → 39 taken 3702 times.
560544x if (overlapped)
556 {
557
2/2
✓ Branch 20 → 21 taken 1911 times.
✓ Branch 20 → 22 taken 554931 times.
556842x DWORD err = result ? 0 : dwError;
558
559
2/3
✓ Branch 23 → 24 taken 545418 times.
✓ Branch 23 → 35 taken 11424 times.
✗ Branch 23 → 38 not taken.
556842x switch (key)
560 {
561 545418x case key_io:
562 case key_result_stored:
563 {
564 545418x auto* ov_op = overlapped_to_op(overlapped);
565
566 // If key_result_stored, results are pre-stored in op fields
567
2/2
✓ Branch 25 → 26 taken 14 times.
✓ Branch 25 → 27 taken 545404 times.
545418x if (key == key_result_stored)
568 {
569 14x bytes = ov_op->bytes_transferred;
570 14x err = ov_op->dwError;
571 }
572
573 // Store GQCS results so on_pending() re-post has valid data
574 545418x ov_op->store_result(bytes, err);
575
576 // CAS: try to set ready_ from 0 to 1.
577 // If old value was 1, the initiator already returned
578 // (on_pending/on_completion set it) — safe to dispatch.
579 // If old value was 0, the initiator hasn't returned yet —
580 // skip dispatch; on_pending() will re-post.
581
1/2
✓ Branch 30 → 31 taken 545418 times.
✗ Branch 30 → 34 not taken.
1090836x if (::InterlockedCompareExchange(&ov_op->ready_, 1, 0) == 1)
582 {
583
1/1
✓ Branch 31 → 32 taken 545418 times.
545418x ov_op->complete(this, bytes, err);
584 545418x work_finished();
585 556861x return 1;
586 }
587 3670x continue;
588 }
589
590 11424x case key_posted:
591 {
592 // Posted scheduler_op*: overlapped is actually a scheduler_op*
593 11424x auto* op = reinterpret_cast<scheduler_op*>(overlapped);
594
1/1
✓ Branch 35 → 36 taken 11424 times.
11424x op->complete(this, bytes, err);
595 11424x work_finished();
596 11424x return 1;
597 }
598
599 default:
600 continue;
601 }
602 }
603
604 // Signal completions (no OVERLAPPED)
605
2/2
✓ Branch 39 → 40 taken 3687 times.
✓ Branch 39 → 53 taken 15 times.
3702x if (result)
606 {
607
2/3
✓ Branch 40 → 41 taken 1161 times.
✓ Branch 40 → 42 taken 2526 times.
✗ Branch 40 → 52 not taken.
3687x switch (key)
608 {
609 1161x case key_wake_dispatch:
610 // Timer wakeup - loop to check dispatch_required_
611 1161x continue;
612
613 2526x case key_shutdown:
614 2526x ::InterlockedExchange(&stop_event_posted_, 0);
615
2/2
✓ Branch 45 → 46 taken 17 times.
✓ Branch 45 → 51 taken 2509 times.
2526x if (stopped())
616 {
617 // Re-post for other waiting threads
618
1/2
✓ Branch 48 → 49 taken 17 times.
✗ Branch 48 → 50 not taken.
34x if (::InterlockedExchange(&stop_event_posted_, 1) == 0)
619 {
620
1/1
✓ Branch 49 → 50 taken 17 times.
17x ::PostQueuedCompletionStatus(
621 iocp_, 0, key_shutdown, nullptr);
622 }
623 17x return 0;
624 }
625 2509x continue;
626
627 default:
628 continue;
629 }
630 }
631
632 // Timeout or error
633
1/2
✗ Branch 53 → 54 not taken.
✓ Branch 53 → 56 taken 15 times.
15x if (dwError != WAIT_TIMEOUT)
634 detail::throw_system_error(make_err(dwError));
635
2/2
✓ Branch 56 → 57 taken 2 times.
✓ Branch 56 → 58 taken 13 times.
15x if (timeout_ms != INFINITE)
636 2x return 0;
637 // PQCS-failure fallback: stop() sets stopped_ and
638 // dispatch_required_ but if the key_shutdown post failed,
639 // no completion is ever dequeued. Catch it here on the
640 // periodic 500 ms GQCS timeout so run()/run_one() can exit.
641
1/2
✗ Branch 59 → 60 not taken.
✓ Branch 59 → 61 taken 13 times.
13x if (stopped())
642 return 0;
643 3683x }
644 }
645
646 inline void
647 1927x win_scheduler::on_timer_changed(void* ctx)
648 {
649 1927x static_cast<win_scheduler*>(ctx)->update_timeout();
650 1927x }
651
652 inline void
653 369x win_scheduler::set_timer_service(timer_service* svc)
654 {
655 369x timer_svc_ = svc;
656 // Pass 'this' as context - callback routes to correct instance
657 369x svc->set_on_earliest_changed(
658 369x timer_service::callback{this, &on_timer_changed});
659
1/2
✓ Branch 5 → 6 taken 369 times.
✗ Branch 5 → 8 not taken.
369x if (timers_)
660 369x timers_->start();
661 369x }
662
663 inline void
664 3088x win_scheduler::update_timeout()
665 {
666
3/6
✓ Branch 2 → 3 taken 3088 times.
✗ Branch 2 → 6 not taken.
✓ Branch 4 → 5 taken 3088 times.
✗ Branch 4 → 6 not taken.
✓ Branch 7 → 8 taken 3088 times.
✗ Branch 7 → 11 not taken.
3088x if (timer_svc_ && timers_)
667 3088x timers_->update_timeout(timer_svc_->nearest_expiry());
668 3088x }
669
670 } // namespace boost::corosio::detail
671
672 #endif // BOOST_COROSIO_HAS_IOCP
673
674 #endif // BOOST_COROSIO_NATIVE_DETAIL_IOCP_WIN_SCHEDULER_HPP
675