include/boost/corosio/native/detail/iocp/win_scheduler.hpp

82.1% Lines (224/273) 93.3% List of functions (28/30) 70.5% Branches (110/156)
win_scheduler.hpp
f(x) Functions (30)
Function Calls Lines Branches Blocks
boost::corosio::detail::win_scheduler::native_handle() const :75 2655x 100.0% 100.0% boost::corosio::detail::win_scheduler::configure_iocp(unsigned int) :87 0 0.0% 0.0% boost::corosio::detail::win_scheduler::configure_single_threaded(bool) :97 0 0.0% 0.0% boost::corosio::detail::iocp::thread_context_guard::thread_context_guard(boost::corosio::detail::win_scheduler const*) :168 3097x 100.0% 100.0% boost::corosio::detail::iocp::thread_context_guard::~thread_context_guard() :174 3097x 100.0% 100.0% boost::corosio::detail::win_scheduler::win_scheduler(boost::capy::execution_context&, int) :182 442x 92.9% 72.7% 56.7% boost::corosio::detail::win_scheduler::~win_scheduler() :209 884x 100.0% 100.0% boost::corosio::detail::win_scheduler::shutdown() :216 442x 83.3% 66.7% 79.4% boost::corosio::detail::win_scheduler::post(std::__n4861::coroutine_handle<void>) const :272 8075x 60.0% 50.0% 53.8% boost::corosio::detail::win_scheduler::post(std::__n4861::coroutine_handle<void>) const::post_handler::do_complete(void*, boost::corosio::detail::scheduler_op*, unsigned int, unsigned int) :278 8075x 100.0% 70.0% 87.5% boost::corosio::detail::win_scheduler::post(std::__n4861::coroutine_handle<void>) const::post_handler::post_handler(std::__n4861::coroutine_handle<void>) :314 8075x 100.0% 100.0% boost::corosio::detail::win_scheduler::post(boost::corosio::detail::scheduler_op*) const :334 3555x 55.6% 50.0% 45.5% boost::corosio::detail::win_scheduler::running_in_this_thread() const :348 9934x 100.0% 75.0% 85.7% boost::corosio::detail::win_scheduler::work_started() :357 409579x 100.0% 100.0% boost::corosio::detail::win_scheduler::work_finished() :363 421195x 100.0% 100.0% 100.0% boost::corosio::detail::win_scheduler::on_pending(boost::corosio::detail::overlapped_op*) const :370 398402x 33.3% 16.7% 26.7% boost::corosio::detail::win_scheduler::on_completion(boost::corosio::detail::overlapped_op*, unsigned long, unsigned long) const :388 14x 63.6% 50.0% 41.7% boost::corosio::detail::win_scheduler::stop() :405 6130x 83.3% 66.7% 75.0% boost::corosio::detail::win_scheduler::stopped() const :423 2855x 100.0% 100.0% boost::corosio::detail::win_scheduler::restart() :430 2804x 100.0% 100.0% boost::corosio::detail::win_scheduler::run() :437 3076x 100.0% 90.9% 92.0% boost::corosio::detail::win_scheduler::run_one() :464 2x 71.4% 50.0% 71.4% boost::corosio::detail::win_scheduler::wait_one(long) :477 39x 100.0% 75.0% 84.2% boost::corosio::detail::win_scheduler::poll() :497 5x 100.0% 87.5% 88.9% boost::corosio::detail::win_scheduler::poll_one() :515 4x 100.0% 100.0% 85.7% boost::corosio::detail::win_scheduler::post_deferred_completions(boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&) :528 291x 20.0% 20.0% 31.2% boost::corosio::detail::win_scheduler::do_one(unsigned long) :546 410061x 86.7% 82.9% 88.1% boost::corosio::detail::win_scheduler::on_timer_changed(void*) :670 1185x 100.0% 100.0% boost::corosio::detail::win_scheduler::set_timer_service(boost::corosio::detail::timer_service*) :676 442x 100.0% 50.0% 100.0% boost::corosio::detail::win_scheduler::update_timeout() :687 1476x 100.0% 50.0% 90.0%
Line Branch TLA Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco ([email protected])
3 // Copyright (c) 2026 Steve Gerbino
4 //
5 // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 //
8 // Official repository: https://github.com/cppalliance/corosio
9 //
10
11 #ifndef BOOST_COROSIO_NATIVE_DETAIL_IOCP_WIN_SCHEDULER_HPP
12 #define BOOST_COROSIO_NATIVE_DETAIL_IOCP_WIN_SCHEDULER_HPP
13
14 #include <boost/corosio/detail/platform.hpp>
15
16 #if BOOST_COROSIO_HAS_IOCP
17
18 #include <boost/corosio/detail/config.hpp>
19 #include <boost/capy/ex/execution_context.hpp>
20
21 #include <boost/corosio/detail/scheduler.hpp>
22 #include <system_error>
23
24 #include <boost/corosio/detail/scheduler_op.hpp>
25 #include <boost/corosio/native/detail/iocp/win_completion_key.hpp>
26 #include <boost/corosio/native/detail/iocp/win_mutex.hpp>
27
28 #include <boost/corosio/native/detail/iocp/win_overlapped_op.hpp>
29 #include <boost/corosio/native/detail/iocp/win_timers.hpp>
30 #include <boost/corosio/detail/timer_service.hpp>
31 #include <boost/corosio/native/detail/iocp/win_resolver_service.hpp>
32 #include <boost/corosio/native/detail/make_err.hpp>
33 #include <boost/corosio/detail/except.hpp>
34 #include <boost/corosio/detail/thread_local_ptr.hpp>
35
36 #include <atomic>
37 #include <chrono>
38 #include <cstdint>
39 #include <limits>
40 #include <memory>
41
42 #include <boost/corosio/native/detail/iocp/win_windows.hpp>
43
44 namespace boost::corosio::detail {
45
46 // Forward declarations
47 struct overlapped_op;
48 class win_timers;
49
50 class BOOST_COROSIO_DECL win_scheduler final
51 : public scheduler
52 , public capy::execution_context::service
53 {
54 public:
55 using key_type = scheduler;
56
57 win_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
58 ~win_scheduler();
59 win_scheduler(win_scheduler const&) = delete;
60 win_scheduler& operator=(win_scheduler const&) = delete;
61
62 void shutdown() override;
63 void post(std::coroutine_handle<> h) const override;
64 void post(scheduler_op* h) const override;
65 bool running_in_this_thread() const noexcept override;
66 void stop() override;
67 bool stopped() const noexcept override;
68 void restart() override;
69 std::size_t run() override;
70 std::size_t run_one() override;
71 std::size_t wait_one(long usec) override;
72 std::size_t poll() override;
73 std::size_t poll_one() override;
74
75 2655x void* native_handle() const noexcept
76 {
77 2655x return iocp_;
78 }
79
80 void work_started() noexcept override;
81 void work_finished() noexcept override;
82
83 /** Apply runtime IOCP configuration.
84
85 @param gqcs_timeout_ms Max GQCS blocking time in milliseconds.
86 */
87 void configure_iocp(unsigned gqcs_timeout_ms) noexcept
88 {
89 gqcs_timeout_ms_ = gqcs_timeout_ms;
90 }
91
92 /** Enable or disable single-threaded (lockless) mode.
93
94 When enabled, the dispatch mutex becomes a no-op.
95 Cross-thread post() is undefined behavior.
96 */
97 void configure_single_threaded(bool v) noexcept
98 {
99 single_threaded_ = v;
100 dispatch_mutex_.set_enabled(!v);
101 }
102
103 /** Signal that an overlapped I/O operation is now pending.
104 Coordinates with do_one() via the ready_ CAS protocol. */
105 void on_pending(overlapped_op* op) const;
106
107 /** Post an immediate completion with pre-stored results.
108 Used for sync errors and noop paths. */
109 void on_completion(overlapped_op* op, DWORD error, DWORD bytes) const;
110
111 // Timer service integration
112 void set_timer_service(timer_service* svc);
113 void update_timeout();
114
115 private:
116 static void on_timer_changed(void* ctx);
117 void post_deferred_completions(op_queue& ops);
118 std::size_t do_one(unsigned long timeout_ms);
119
120 timer_service* timer_svc_ = nullptr;
121 void* iocp_;
122 mutable long outstanding_work_;
123 mutable long stopped_;
124 long stop_event_posted_;
125 mutable long dispatch_required_;
126 unsigned long gqcs_timeout_ms_ = 500;
127 bool single_threaded_ = false;
128
129 mutable win_mutex dispatch_mutex_;
130 mutable op_queue completed_ops_;
131 std::unique_ptr<win_timers> timers_;
132 };
133
134 /*
135 ARCHITECTURE NOTE: Function Pointer Dispatch
136
137 All I/O handles are registered with the IOCP using key_io (0).
138 Dispatch happens via the function pointer stored in each scheduler_op.
139
140 When GQCS returns with an OVERLAPPED*, we cast it to scheduler_op*
141 and call the function pointer directly - no virtual dispatch.
142
143 The completion_key enum values are used only for internal signals:
144 - key_io (0): Normal I/O completion, dispatch via func_
145 - key_wake_dispatch (1): Timer wakeup, check dispatch_required_
146 - key_shutdown (2): Stop signal
147 - key_result_stored (3): Results pre-stored in OVERLAPPED
148 */
149
150 namespace iocp {
151
152 // Max timeout for GQCS to allow periodic re-checking of conditions.
153 // Matches Asio's default_gqcs_timeout for pre-Vista compatibility.
154 inline constexpr unsigned long max_gqcs_timeout = 500;
155
156 struct BOOST_COROSIO_SYMBOL_VISIBLE scheduler_context
157 {
158 win_scheduler const* key;
159 scheduler_context* next;
160 };
161
162 inline thread_local_ptr<scheduler_context> context_stack;
163
164 struct thread_context_guard
165 {
166 scheduler_context frame_;
167
168 3097x explicit thread_context_guard(win_scheduler const* ctx) noexcept
169 3097x : frame_{ctx, context_stack.get()}
170 {
171 3097x context_stack.set(&frame_);
172 3097x }
173
174 3097x ~thread_context_guard() noexcept
175 {
176 3097x context_stack.set(frame_.next);
177 3097x }
178 };
179
180 } // namespace iocp
181
182 442x inline win_scheduler::win_scheduler(
183 442x capy::execution_context& ctx, int concurrency_hint)
184 442x : iocp_(nullptr)
185 442x , outstanding_work_(0)
186 442x , stopped_(0)
187 442x , stop_event_posted_(0)
188
1/1
✓ Branch 4 → 5 taken 442 times.
442x , dispatch_required_(0)
189 {
190 // concurrency_hint < 0 means use system default (DWORD(~0) = max)
191
2/3
✓ Branch 7 → 8 taken 442 times.
✗ Branch 7 → 9 not taken.
✓ Branch 10 → 11 taken 442 times.
442x iocp_ = ::CreateIoCompletionPort(
192 INVALID_HANDLE_VALUE, nullptr, 0,
193 static_cast<DWORD>(
194 concurrency_hint >= 0 ? concurrency_hint : DWORD(~0)));
195
196
1/2
✗ Branch 11 → 12 not taken.
✓ Branch 11 → 15 taken 442 times.
442x if (iocp_ == nullptr)
197 detail::throw_system_error(make_err(::GetLastError()));
198
199 // Create timer wakeup mechanism (tries NT native, falls back to thread)
200
1/1
✓ Branch 15 → 16 taken 442 times.
442x timers_ = make_win_timers(iocp_, &dispatch_required_);
201
202 // Connect timer service to scheduler
203
2/2
✓ Branch 18 → 19 taken 442 times.
✓ Branch 19 → 20 taken 442 times.
442x set_timer_service(&get_timer_service(ctx, *this));
204
205 // Initialize resolver service
206
1/1
✓ Branch 20 → 21 taken 442 times.
442x ctx.make_service<win_resolver_service>(*this);
207 442x }
208
209 884x inline win_scheduler::~win_scheduler()
210 {
211
1/2
✓ Branch 2 → 3 taken 442 times.
✗ Branch 2 → 4 not taken.
442x if (iocp_ != nullptr)
212 442x ::CloseHandle(iocp_);
213 884x }
214
215 inline void
216 442x win_scheduler::shutdown()
217 {
218
1/2
✓ Branch 3 → 4 taken 442 times.
✗ Branch 3 → 6 not taken.
442x if (timers_)
219 442x timers_->stop();
220
221 // Drain timer heap before the work-counting loop. The timer_service
222 // was registered after this scheduler (nested make_service from our
223 // constructor), so execution_context::shutdown() calls us first.
224 // Asio avoids this by owning timer queues directly inside the
225 // scheduler; we bridge the gap by shutting down the timer service
226 // early. The subsequent call from execution_context is a no-op.
227
1/2
✓ Branch 6 → 7 taken 442 times.
✗ Branch 6 → 8 not taken.
442x if (timer_svc_)
228 442x timer_svc_->shutdown();
229
230
2/2
✓ Branch 32 → 9 taken 15 times.
✓ Branch 32 → 33 taken 442 times.
914x while (::InterlockedExchangeAdd(&outstanding_work_, 0) > 0)
231 {
232 15x op_queue ops;
233 {
234 15x std::lock_guard<win_mutex> lock(dispatch_mutex_);
235 15x ops.splice(completed_ops_);
236 15x }
237
238
1/2
✗ Branch 13 → 14 not taken.
✓ Branch 13 → 20 taken 15 times.
15x if (!ops.empty())
239 {
240 while (auto* h = ops.pop())
241 {
242 ::InterlockedDecrement(&outstanding_work_);
243 h->destroy();
244 }
245 }
246 else
247 {
248 DWORD bytes;
249 ULONG_PTR key;
250 LPOVERLAPPED overlapped;
251
1/1
✓ Branch 20 → 21 taken 15 times.
15x ::GetQueuedCompletionStatus(
252 iocp_, &bytes, &key, &overlapped, gqcs_timeout_ms_);
253
2/2
✓ Branch 21 → 22 taken 14 times.
✓ Branch 21 → 28 taken 1 time.
15x if (overlapped)
254 {
255 14x ::InterlockedDecrement(&outstanding_work_);
256
2/2
✓ Branch 24 → 25 taken 10 times.
✓ Branch 24 → 26 taken 4 times.
14x if (key == key_posted)
257 {
258 10x auto* op = reinterpret_cast<scheduler_op*>(overlapped);
259
1/1
✓ Branch 25 → 28 taken 10 times.
10x op->destroy();
260 }
261 else
262 {
263 4x auto* op = overlapped_to_op(overlapped);
264
1/1
✓ Branch 27 → 28 taken 4 times.
4x op->destroy();
265 }
266 }
267 }
268 }
269 442x }
270
271 inline void
272 8075x win_scheduler::post(std::coroutine_handle<> h) const
273 {
274 struct post_handler final : scheduler_op
275 {
276 std::coroutine_handle<> h_;
277
278 8075x static void do_complete(
279 void* owner, scheduler_op* base, std::uint32_t, std::uint32_t)
280 {
281 8075x auto* self = static_cast<post_handler*>(base);
282
2/2
✓ Branch 2 → 3 taken 6 times.
✓ Branch 2 → 10 taken 8069 times.
8075x if (!owner)
283 {
284 // Shutdown path: destroy the coroutine frame synchronously.
285 //
286 // Bounded destruction invariant: the chain triggered by
287 // coro.destroy() is at most two levels deep:
288 // 1. task frame destroyed → ~io_awaitable_promise_base()
289 // destroys stored continuation (if != noop_coroutine)
290 // 2. continuation (trampoline) destroyed → final_suspend
291 // returns suspend_never, no further continuation
292 //
293 // If a future refactor adds deeper continuation chains,
294 // this would reintroduce re-entrant stack overflow risk.
295 #ifndef NDEBUG
296 static thread_local int destroy_depth = 0;
297 6x ++destroy_depth;
298
1/2
✗ Branch 3 → 4 not taken.
✓ Branch 3 → 5 taken 6 times.
6x BOOST_COROSIO_ASSERT(destroy_depth <= 2);
299 #endif
300 6x auto coro = self->h_;
301
1/2
✓ Branch 6 → 7 taken 6 times.
✗ Branch 6 → 8 not taken.
6x delete self;
302
1/1
✓ Branch 8 → 9 taken 6 times.
6x coro.destroy();
303 #ifndef NDEBUG
304 6x --destroy_depth;
305 #endif
306 6x return;
307 }
308 8069x auto coro = self->h_;
309
1/2
✓ Branch 10 → 11 taken 8069 times.
✗ Branch 10 → 12 not taken.
8069x delete self;
310 std::atomic_thread_fence(std::memory_order_acquire);
311
1/1
✓ Branch 13 → 14 taken 8069 times.
8069x coro.resume();
312 }
313
314 8075x explicit post_handler(std::coroutine_handle<> coro)
315 8075x : scheduler_op(&do_complete)
316 8075x , h_(coro)
317 {
318 8075x }
319 };
320
321 8075x auto* ph = new post_handler(h);
322 8075x ::InterlockedIncrement(&outstanding_work_);
323
324
1/2
✗ Branch 7 → 8 not taken.
✓ Branch 7 → 14 taken 8075 times.
8075x if (!::PostQueuedCompletionStatus(
325 8075x iocp_, 0, key_posted, reinterpret_cast<LPOVERLAPPED>(ph)))
326 {
327 std::lock_guard<win_mutex> lock(dispatch_mutex_);
328 completed_ops_.push(ph);
329 ::InterlockedExchange(&dispatch_required_, 1);
330 }
331 8075x }
332
333 inline void
334 3555x win_scheduler::post(scheduler_op* h) const
335 {
336 3555x ::InterlockedIncrement(&outstanding_work_);
337
338
1/2
✗ Branch 5 → 6 not taken.
✓ Branch 5 → 12 taken 3555 times.
3555x if (!::PostQueuedCompletionStatus(
339 3555x iocp_, 0, key_posted, reinterpret_cast<LPOVERLAPPED>(h)))
340 {
341 std::lock_guard<win_mutex> lock(dispatch_mutex_);
342 completed_ops_.push(h);
343 ::InterlockedExchange(&dispatch_required_, 1);
344 }
345 3555x }
346
347 inline bool
348 9934x win_scheduler::running_in_this_thread() const noexcept
349 {
350
2/2
✓ Branch 6 → 3 taken 3284 times.
✓ Branch 6 → 7 taken 6650 times.
9934x for (auto* c = iocp::context_stack.get(); c != nullptr; c = c->next)
351
1/2
✓ Branch 3 → 4 taken 3284 times.
✗ Branch 3 → 5 not taken.
3284x if (c->key == this)
352 3284x return true;
353 6650x return false;
354 }
355
356 inline void
357 409579x win_scheduler::work_started() noexcept
358 {
359 409579x ::InterlockedIncrement(&outstanding_work_);
360 409579x }
361
362 inline void
363 421195x win_scheduler::work_finished() noexcept
364 {
365
2/2
✓ Branch 4 → 5 taken 3057 times.
✓ Branch 4 → 6 taken 418138 times.
842390x if (::InterlockedDecrement(&outstanding_work_) == 0)
366 3057x stop();
367 421195x }
368
369 inline void
370 398402x win_scheduler::on_pending(overlapped_op* op) const
371 {
372 // CAS: try to set ready_ from 0 to 1.
373 // If the old value was 1, GQCS already grabbed this op and stored
374 // results — we need to re-post so do_one() can dispatch it.
375
1/2
✗ Branch 4 → 5 not taken.
✓ Branch 4 → 16 taken 398402 times.
796804x if (::InterlockedCompareExchange(&op->ready_, 1, 0) == 1)
376 {
377 if (!::PostQueuedCompletionStatus(
378 iocp_, 0, key_result_stored, static_cast<LPOVERLAPPED>(op)))
379 {
380 std::lock_guard<win_mutex> lock(dispatch_mutex_);
381 completed_ops_.push(op);
382 ::InterlockedExchange(&dispatch_required_, 1);
383 }
384 }
385 398402x }
386
387 inline void
388 14x win_scheduler::on_completion(overlapped_op* op, DWORD error, DWORD bytes) const
389 {
390 // Sync completion: pack results into op and post for dispatch.
391 14x op->ready_ = 1;
392 14x op->dwError = error;
393 14x op->bytes_transferred = bytes;
394
395
2/4
✓ Branch 2 → 3 taken 14 times.
✗ Branch 2 → 4 not taken.
✗ Branch 6 → 7 not taken.
✓ Branch 6 → 13 taken 14 times.
28x if (!::PostQueuedCompletionStatus(
396 14x iocp_, 0, key_result_stored, static_cast<LPOVERLAPPED>(op)))
397 {
398 std::lock_guard<win_mutex> lock(dispatch_mutex_);
399 completed_ops_.push(op);
400 ::InterlockedExchange(&dispatch_required_, 1);
401 }
402 14x }
403
404 inline void
405 6130x win_scheduler::stop()
406 {
407
2/2
✓ Branch 4 → 5 taken 3064 times.
✓ Branch 4 → 13 taken 3066 times.
12260x if (::InterlockedExchange(&stopped_, 1) == 0)
408 {
409
1/2
✓ Branch 7 → 8 taken 3064 times.
✗ Branch 7 → 13 not taken.
6128x if (::InterlockedExchange(&stop_event_posted_, 1) == 0)
410 {
411
1/2
✗ Branch 9 → 10 not taken.
✓ Branch 9 → 13 taken 3064 times.
3064x if (!::PostQueuedCompletionStatus(iocp_, 0, key_shutdown, nullptr))
412 {
413 // PQCS failure is non-fatal: stopped_ is already set.
414 // The run() loop will notice via the GQCS timeout
415 // (gqcs_timeout_ms_, default 500ms) and exit.
416 ::InterlockedExchange(&dispatch_required_, 1);
417 }
418 }
419 }
420 6130x }
421
422 inline bool
423 2855x win_scheduler::stopped() const noexcept
424 {
425 // equivalent to atomic read
426 5710x return ::InterlockedExchangeAdd(&stopped_, 0) != 0;
427 }
428
429 inline void
430 2804x win_scheduler::restart()
431 {
432 2804x ::InterlockedExchange(&stopped_, 0);
433 2804x ::InterlockedExchange(&stop_event_posted_, 0);
434 2804x }
435
436 inline std::size_t
437 3076x win_scheduler::run()
438 {
439
2/2
✓ Branch 4 → 5 taken 18 times.
✓ Branch 4 → 7 taken 3058 times.
6152x if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
440 {
441
1/1
✓ Branch 5 → 6 taken 18 times.
18x stop();
442 18x return 0;
443 }
444
445 3058x iocp::thread_context_guard ctx(this);
446
447 3058x std::size_t n = 0;
448 for (;;)
449 {
450
3/3
✓ Branch 9 → 10 taken 410017 times.
✓ Branch 10 → 11 taken 18 times.
✓ Branch 10 → 12 taken 409999 times.
410017x if (!do_one(INFINITE))
451 18x break;
452
1/2
✓ Branch 13 → 14 taken 409999 times.
✗ Branch 13 → 15 not taken.
409999x if (n != (std::numeric_limits<std::size_t>::max)())
453 409999x ++n;
454
2/2
✓ Branch 17 → 18 taken 3040 times.
✓ Branch 17 → 20 taken 406959 times.
819998x if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
455 {
456
1/1
✓ Branch 18 → 19 taken 3040 times.
3040x stop();
457 3040x break;
458 }
459 }
460 3058x return n;
461 3058x }
462
463 inline std::size_t
464 2x win_scheduler::run_one()
465 {
466
1/2
✗ Branch 4 → 5 not taken.
✓ Branch 4 → 7 taken 2 times.
4x if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
467 {
468 stop();
469 return 0;
470 }
471
472 2x iocp::thread_context_guard ctx(this);
473
1/1
✓ Branch 8 → 9 taken 2 times.
2x return do_one(INFINITE);
474 2x }
475
476 inline std::size_t
477 39x win_scheduler::wait_one(long usec)
478 {
479
2/2
✓ Branch 4 → 5 taken 8 times.
✓ Branch 4 → 7 taken 31 times.
78x if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
480 {
481
1/1
✓ Branch 5 → 6 taken 8 times.
8x stop();
482 8x return 0;
483 }
484
485 31x iocp::thread_context_guard ctx(this);
486 31x unsigned long timeout_ms = INFINITE;
487
1/2
✓ Branch 8 → 9 taken 31 times.
✗ Branch 8 → 13 not taken.
31x if (usec >= 0)
488 {
489 31x auto ms = (static_cast<long long>(usec) + 999) / 1000;
490
1/2
✓ Branch 9 → 10 taken 31 times.
✗ Branch 9 → 11 not taken.
31x timeout_ms = ms >= 0xFFFFFFFELL ? static_cast<unsigned long>(0xFFFFFFFE)
491 : static_cast<unsigned long>(ms);
492 }
493
1/1
✓ Branch 13 → 14 taken 31 times.
31x return do_one(timeout_ms);
494 31x }
495
496 inline std::size_t
497 5x win_scheduler::poll()
498 {
499
2/2
✓ Branch 4 → 5 taken 1 time.
✓ Branch 4 → 7 taken 4 times.
10x if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
500 {
501
1/1
✓ Branch 5 → 6 taken 1 time.
1x stop();
502 1x return 0;
503 }
504
505 4x iocp::thread_context_guard ctx(this);
506
507 4x std::size_t n = 0;
508
3/3
✓ Branch 12 → 13 taken 9 times.
✓ Branch 13 → 9 taken 5 times.
✓ Branch 13 → 14 taken 4 times.
9x while (do_one(0))
509
1/2
✓ Branch 10 → 11 taken 5 times.
✗ Branch 10 → 12 not taken.
5x if (n != (std::numeric_limits<std::size_t>::max)())
510 5x ++n;
511 4x return n;
512 4x }
513
514 inline std::size_t
515 4x win_scheduler::poll_one()
516 {
517
2/2
✓ Branch 4 → 5 taken 2 times.
✓ Branch 4 → 7 taken 2 times.
8x if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
518 {
519
1/1
✓ Branch 5 → 6 taken 2 times.
2x stop();
520 2x return 0;
521 }
522
523 2x iocp::thread_context_guard ctx(this);
524
1/1
✓ Branch 8 → 9 taken 2 times.
2x return do_one(0);
525 2x }
526
527 inline void
528 291x win_scheduler::post_deferred_completions(op_queue& ops)
529 {
530
1/2
✗ Branch 3 → 4 not taken.
✓ Branch 3 → 15 taken 291 times.
291x while (auto h = ops.pop())
531 {
532 if (::PostQueuedCompletionStatus(
533 iocp_, 0, key_posted, reinterpret_cast<LPOVERLAPPED>(h)))
534 continue;
535
536 // Out of resources, put the failed op and remaining ops back
537 ops.push(h);
538 std::lock_guard<win_mutex> lock(dispatch_mutex_);
539 completed_ops_.splice(ops);
540 ::InterlockedExchange(&dispatch_required_, 1);
541 return;
542 }
543 }
544
545 inline std::size_t
546 413162x win_scheduler::do_one(unsigned long timeout_ms)
547 {
548 for (;;)
549 {
550 // Check if we need to process timers or deferred ops
551
2/2
✓ Branch 4 → 5 taken 291 times.
✓ Branch 4 → 13 taken 412871 times.
826324x if (::InterlockedCompareExchange(&dispatch_required_, 0, 1) == 1)
552 {
553 291x op_queue local_ops;
554 {
555 291x std::lock_guard<win_mutex> lock(dispatch_mutex_);
556 291x local_ops.splice(completed_ops_);
557 291x }
558
1/1
✓ Branch 8 → 9 taken 291 times.
291x post_deferred_completions(local_ops);
559
560
1/2
✓ Branch 9 → 10 taken 291 times.
✗ Branch 9 → 11 not taken.
291x if (timer_svc_)
561
1/1
✓ Branch 10 → 11 taken 291 times.
291x timer_svc_->process_expired();
562
563
1/1
✓ Branch 11 → 12 taken 291 times.
291x update_timeout();
564 }
565
566 413162x DWORD bytes = 0;
567 413162x ULONG_PTR key = 0;
568 413162x LPOVERLAPPED overlapped = nullptr;
569
1/1
✓ Branch 13 → 14 taken 413162 times.
413162x ::SetLastError(0);
570
571
1/1
✓ Branch 17 → 18 taken 413162 times.
413162x BOOL result = ::GetQueuedCompletionStatus(
572 iocp_, &bytes, &key, &overlapped,
573
2/2
✓ Branch 14 → 15 taken 413116 times.
✓ Branch 14 → 16 taken 46 times.
413162x timeout_ms < gqcs_timeout_ms_ ? timeout_ms
574 : gqcs_timeout_ms_);
575
1/1
✓ Branch 18 → 19 taken 413162 times.
413162x DWORD dwError = ::GetLastError();
576
577 // Handle based on completion key
578
2/2
✓ Branch 19 → 20 taken 410036 times.
✓ Branch 19 → 39 taken 3126 times.
413162x if (overlapped)
579 {
580
2/2
✓ Branch 20 → 21 taken 2036 times.
✓ Branch 20 → 22 taken 408000 times.
410036x DWORD err = result ? 0 : dwError;
581
582
2/3
✓ Branch 23 → 24 taken 398416 times.
✓ Branch 23 → 35 taken 11620 times.
✗ Branch 23 → 38 not taken.
410036x switch (key)
583 {
584 398416x case key_io:
585 case key_result_stored:
586 {
587 398416x auto* ov_op = overlapped_to_op(overlapped);
588
589 // If key_result_stored, results are pre-stored in op fields
590
2/2
✓ Branch 25 → 26 taken 14 times.
✓ Branch 25 → 27 taken 398402 times.
398416x if (key == key_result_stored)
591 {
592 14x bytes = ov_op->bytes_transferred;
593 14x err = ov_op->dwError;
594 }
595
596 // Store GQCS results so on_pending() re-post has valid data
597 398416x ov_op->store_result(bytes, err);
598
599 // CAS: try to set ready_ from 0 to 1.
600 // If old value was 1, the initiator already returned
601 // (on_pending/on_completion set it) — safe to dispatch.
602 // If old value was 0, the initiator hasn't returned yet —
603 // skip dispatch; on_pending() will re-post.
604
1/2
✓ Branch 30 → 31 taken 398416 times.
✗ Branch 30 → 34 not taken.
796832x if (::InterlockedCompareExchange(&ov_op->ready_, 1, 0) == 1)
605 {
606
1/1
✓ Branch 31 → 32 taken 398416 times.
398416x ov_op->complete(this, bytes, err);
607 398416x work_finished();
608 410061x return 1;
609 }
610 3069x continue;
611 }
612
613 11620x case key_posted:
614 {
615 // Posted scheduler_op*: overlapped is actually a scheduler_op*
616 11620x auto* op = reinterpret_cast<scheduler_op*>(overlapped);
617
1/1
✓ Branch 35 → 36 taken 11620 times.
11620x op->complete(this, bytes, err);
618 11620x work_finished();
619 11620x return 1;
620 }
621
622 default:
623 continue;
624 }
625 }
626
627 // Signal completions (no OVERLAPPED)
628
2/2
✓ Branch 39 → 40 taken 3089 times.
✓ Branch 39 → 53 taken 37 times.
3126x if (result)
629 {
630
2/3
✓ Branch 40 → 41 taken 291 times.
✓ Branch 40 → 42 taken 2798 times.
✗ Branch 40 → 52 not taken.
3089x switch (key)
631 {
632 291x case key_wake_dispatch:
633 // Timer wakeup - loop to check dispatch_required_
634 291x continue;
635
636 2798x case key_shutdown:
637 2798x ::InterlockedExchange(&stop_event_posted_, 0);
638
2/2
✓ Branch 45 → 46 taken 20 times.
✓ Branch 45 → 51 taken 2778 times.
2798x if (stopped())
639 {
640 // Re-post for other waiting threads
641
1/2
✓ Branch 48 → 49 taken 20 times.
✗ Branch 48 → 50 not taken.
40x if (::InterlockedExchange(&stop_event_posted_, 1) == 0)
642 {
643
1/1
✓ Branch 49 → 50 taken 20 times.
20x ::PostQueuedCompletionStatus(
644 iocp_, 0, key_shutdown, nullptr);
645 }
646 20x return 0;
647 }
648 2778x continue;
649
650 default:
651 continue;
652 }
653 }
654
655 // Timeout or error
656
1/2
✗ Branch 53 → 54 not taken.
✓ Branch 53 → 56 taken 37 times.
37x if (dwError != WAIT_TIMEOUT)
657 detail::throw_system_error(make_err(dwError));
658
2/2
✓ Branch 56 → 57 taken 5 times.
✓ Branch 56 → 58 taken 32 times.
37x if (timeout_ms != INFINITE)
659 5x return 0;
660 // PQCS-failure fallback: stop() sets stopped_ and
661 // dispatch_required_ but if the key_shutdown post failed,
662 // no completion is ever dequeued. Catch it here on the
663 // periodic 500 ms GQCS timeout so run()/run_one() can exit.
664
1/2
✗ Branch 59 → 60 not taken.
✓ Branch 59 → 61 taken 32 times.
32x if (stopped())
665 return 0;
666 3101x }
667 }
668
669 inline void
670 1185x win_scheduler::on_timer_changed(void* ctx)
671 {
672 1185x static_cast<win_scheduler*>(ctx)->update_timeout();
673 1185x }
674
675 inline void
676 442x win_scheduler::set_timer_service(timer_service* svc)
677 {
678 442x timer_svc_ = svc;
679 // Pass 'this' as context - callback routes to correct instance
680 442x svc->set_on_earliest_changed(
681 442x timer_service::callback{this, &on_timer_changed});
682
1/2
✓ Branch 5 → 6 taken 442 times.
✗ Branch 5 → 8 not taken.
442x if (timers_)
683 442x timers_->start();
684 442x }
685
686 inline void
687 1476x win_scheduler::update_timeout()
688 {
689
3/6
✓ Branch 2 → 3 taken 1476 times.
✗ Branch 2 → 6 not taken.
✓ Branch 4 → 5 taken 1476 times.
✗ Branch 4 → 6 not taken.
✓ Branch 7 → 8 taken 1476 times.
✗ Branch 7 → 11 not taken.
1476x if (timer_svc_ && timers_)
690 1476x timers_->update_timeout(timer_svc_->nearest_expiry());
691 1476x }
692
693 } // namespace boost::corosio::detail
694
695 #endif // BOOST_COROSIO_HAS_IOCP
696
697 #endif // BOOST_COROSIO_NATIVE_DETAIL_IOCP_WIN_SCHEDULER_HPP
698