include/boost/corosio/native/detail/iocp/win_scheduler.hpp

82.1% Lines (224/273) 93.3% List of functions (28/30) 70.5% Branches (110/156)
win_scheduler.hpp
f(x) Functions (30)
Function Calls Lines Branches Blocks
boost::corosio::detail::win_scheduler::native_handle() const :75 1727x 100.0% 100.0% boost::corosio::detail::win_scheduler::configure_iocp(unsigned int) :87 0 0.0% 0.0% boost::corosio::detail::win_scheduler::configure_single_threaded(bool) :97 0 0.0% 0.0% boost::corosio::detail::iocp::thread_context_guard::thread_context_guard(boost::corosio::detail::win_scheduler const*) :168 2704x 100.0% 100.0% boost::corosio::detail::iocp::thread_context_guard::~thread_context_guard() :174 2704x 100.0% 100.0% boost::corosio::detail::win_scheduler::win_scheduler(boost::capy::execution_context&, int) :182 431x 92.9% 72.7% 56.7% boost::corosio::detail::win_scheduler::~win_scheduler() :209 862x 100.0% 100.0% boost::corosio::detail::win_scheduler::shutdown() :216 431x 83.3% 66.7% 79.4% boost::corosio::detail::win_scheduler::post(std::__n4861::coroutine_handle<void>) const :272 7112x 60.0% 50.0% 53.8% boost::corosio::detail::win_scheduler::post(std::__n4861::coroutine_handle<void>) const::post_handler::do_complete(void*, boost::corosio::detail::scheduler_op*, unsigned int, unsigned int) :278 7112x 100.0% 70.0% 87.5% boost::corosio::detail::win_scheduler::post(std::__n4861::coroutine_handle<void>) const::post_handler::post_handler(std::__n4861::coroutine_handle<void>) :314 7112x 100.0% 100.0% boost::corosio::detail::win_scheduler::post(boost::corosio::detail::scheduler_op*) const :334 3860x 55.6% 50.0% 45.5% boost::corosio::detail::win_scheduler::running_in_this_thread() const :348 8969x 100.0% 75.0% 85.7% boost::corosio::detail::win_scheduler::work_started() :357 481630x 100.0% 100.0% boost::corosio::detail::win_scheduler::work_finished() :363 492588x 100.0% 100.0% 100.0% boost::corosio::detail::win_scheduler::on_pending(boost::corosio::detail::overlapped_op*) const :370 470924x 33.3% 16.7% 26.7% boost::corosio::detail::win_scheduler::on_completion(boost::corosio::detail::overlapped_op*, unsigned long, unsigned long) const :388 14x 63.6% 50.0% 41.7% boost::corosio::detail::win_scheduler::stop() :405 5356x 83.3% 66.7% 75.0% boost::corosio::detail::win_scheduler::stopped() const :423 2454x 100.0% 100.0% boost::corosio::detail::win_scheduler::restart() :430 2426x 100.0% 100.0% boost::corosio::detail::win_scheduler::run() :437 2687x 100.0% 90.9% 92.0% boost::corosio::detail::win_scheduler::run_one() :464 2x 71.4% 50.0% 71.4% boost::corosio::detail::win_scheduler::wait_one(long) :477 39x 100.0% 75.0% 84.2% boost::corosio::detail::win_scheduler::poll() :497 5x 100.0% 87.5% 88.9% boost::corosio::detail::win_scheduler::poll_one() :515 4x 100.0% 100.0% 85.7% boost::corosio::detail::win_scheduler::post_deferred_completions(boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&) :528 974x 20.0% 20.0% 31.2% boost::corosio::detail::win_scheduler::do_one(unsigned long) :546 481921x 86.7% 82.9% 88.1% boost::corosio::detail::win_scheduler::on_timer_changed(void*) :670 1679x 100.0% 100.0% boost::corosio::detail::win_scheduler::set_timer_service(boost::corosio::detail::timer_service*) :676 431x 100.0% 50.0% 100.0% boost::corosio::detail::win_scheduler::update_timeout() :687 2653x 100.0% 50.0% 90.0%
Line Branch TLA Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco ([email protected])
3 // Copyright (c) 2026 Steve Gerbino
4 //
5 // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 //
8 // Official repository: https://github.com/cppalliance/corosio
9 //
10
11 #ifndef BOOST_COROSIO_NATIVE_DETAIL_IOCP_WIN_SCHEDULER_HPP
12 #define BOOST_COROSIO_NATIVE_DETAIL_IOCP_WIN_SCHEDULER_HPP
13
14 #include <boost/corosio/detail/platform.hpp>
15
16 #if BOOST_COROSIO_HAS_IOCP
17
18 #include <boost/corosio/detail/config.hpp>
19 #include <boost/capy/ex/execution_context.hpp>
20
21 #include <boost/corosio/detail/scheduler.hpp>
22 #include <system_error>
23
24 #include <boost/corosio/detail/scheduler_op.hpp>
25 #include <boost/corosio/native/detail/iocp/win_completion_key.hpp>
26 #include <boost/corosio/native/detail/iocp/win_mutex.hpp>
27
28 #include <boost/corosio/native/detail/iocp/win_overlapped_op.hpp>
29 #include <boost/corosio/native/detail/iocp/win_timers.hpp>
30 #include <boost/corosio/detail/timer_service.hpp>
31 #include <boost/corosio/native/detail/iocp/win_resolver_service.hpp>
32 #include <boost/corosio/native/detail/make_err.hpp>
33 #include <boost/corosio/detail/except.hpp>
34 #include <boost/corosio/detail/thread_local_ptr.hpp>
35
36 #include <atomic>
37 #include <chrono>
38 #include <cstdint>
39 #include <limits>
40 #include <memory>
41
42 #include <boost/corosio/native/detail/iocp/win_windows.hpp>
43
44 namespace boost::corosio::detail {
45
46 // Forward declarations
47 struct overlapped_op;
48 class win_timers;
49
50 class BOOST_COROSIO_DECL win_scheduler final
51 : public scheduler
52 , public capy::execution_context::service
53 {
54 public:
55 using key_type = scheduler;
56
57 win_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
58 ~win_scheduler();
59 win_scheduler(win_scheduler const&) = delete;
60 win_scheduler& operator=(win_scheduler const&) = delete;
61
62 void shutdown() override;
63 void post(std::coroutine_handle<> h) const override;
64 void post(scheduler_op* h) const override;
65 bool running_in_this_thread() const noexcept override;
66 void stop() override;
67 bool stopped() const noexcept override;
68 void restart() override;
69 std::size_t run() override;
70 std::size_t run_one() override;
71 std::size_t wait_one(long usec) override;
72 std::size_t poll() override;
73 std::size_t poll_one() override;
74
75 1727x void* native_handle() const noexcept
76 {
77 1727x return iocp_;
78 }
79
80 void work_started() noexcept override;
81 void work_finished() noexcept override;
82
83 /** Apply runtime IOCP configuration.
84
85 @param gqcs_timeout_ms Max GQCS blocking time in milliseconds.
86 */
87 void configure_iocp(unsigned gqcs_timeout_ms) noexcept
88 {
89 gqcs_timeout_ms_ = gqcs_timeout_ms;
90 }
91
92 /** Enable or disable single-threaded (lockless) mode.
93
94 When enabled, the dispatch mutex becomes a no-op.
95 Cross-thread post() is undefined behavior.
96 */
97 void configure_single_threaded(bool v) noexcept
98 {
99 single_threaded_ = v;
100 dispatch_mutex_.set_enabled(!v);
101 }
102
103 /** Signal that an overlapped I/O operation is now pending.
104 Coordinates with do_one() via the ready_ CAS protocol. */
105 void on_pending(overlapped_op* op) const;
106
107 /** Post an immediate completion with pre-stored results.
108 Used for sync errors and noop paths. */
109 void on_completion(overlapped_op* op, DWORD error, DWORD bytes) const;
110
111 // Timer service integration
112 void set_timer_service(timer_service* svc);
113 void update_timeout();
114
115 private:
116 static void on_timer_changed(void* ctx);
117 void post_deferred_completions(op_queue& ops);
118 std::size_t do_one(unsigned long timeout_ms);
119
120 timer_service* timer_svc_ = nullptr;
121 void* iocp_;
122 mutable long outstanding_work_;
123 mutable long stopped_;
124 long stop_event_posted_;
125 mutable long dispatch_required_;
126 unsigned long gqcs_timeout_ms_ = 500;
127 bool single_threaded_ = false;
128
129 mutable win_mutex dispatch_mutex_;
130 mutable op_queue completed_ops_;
131 std::unique_ptr<win_timers> timers_;
132 };
133
134 /*
135 ARCHITECTURE NOTE: Function Pointer Dispatch
136
137 All I/O handles are registered with the IOCP using key_io (0).
138 Dispatch happens via the function pointer stored in each scheduler_op.
139
140 When GQCS returns with an OVERLAPPED*, we cast it to scheduler_op*
141 and call the function pointer directly - no virtual dispatch.
142
143 The completion_key enum values are used only for internal signals:
144 - key_io (0): Normal I/O completion, dispatch via func_
145 - key_wake_dispatch (1): Timer wakeup, check dispatch_required_
146 - key_shutdown (2): Stop signal
147 - key_result_stored (3): Results pre-stored in OVERLAPPED
148 */
149
150 namespace iocp {
151
152 // Max timeout for GQCS to allow periodic re-checking of conditions.
153 // Matches Asio's default_gqcs_timeout for pre-Vista compatibility.
154 inline constexpr unsigned long max_gqcs_timeout = 500;
155
156 struct BOOST_COROSIO_SYMBOL_VISIBLE scheduler_context
157 {
158 win_scheduler const* key;
159 scheduler_context* next;
160 };
161
162 inline thread_local_ptr<scheduler_context> context_stack;
163
164 struct thread_context_guard
165 {
166 scheduler_context frame_;
167
168 2704x explicit thread_context_guard(win_scheduler const* ctx) noexcept
169 2704x : frame_{ctx, context_stack.get()}
170 {
171 2704x context_stack.set(&frame_);
172 2704x }
173
174 2704x ~thread_context_guard() noexcept
175 {
176 2704x context_stack.set(frame_.next);
177 2704x }
178 };
179
180 } // namespace iocp
181
182 431x inline win_scheduler::win_scheduler(
183 431x capy::execution_context& ctx, int concurrency_hint)
184 431x : iocp_(nullptr)
185 431x , outstanding_work_(0)
186 431x , stopped_(0)
187 431x , stop_event_posted_(0)
188
1/1
✓ Branch 4 → 5 taken 431 times.
431x , dispatch_required_(0)
189 {
190 // concurrency_hint < 0 means use system default (DWORD(~0) = max)
191
2/3
✓ Branch 7 → 8 taken 431 times.
✗ Branch 7 → 9 not taken.
✓ Branch 10 → 11 taken 431 times.
431x iocp_ = ::CreateIoCompletionPort(
192 INVALID_HANDLE_VALUE, nullptr, 0,
193 static_cast<DWORD>(
194 concurrency_hint >= 0 ? concurrency_hint : DWORD(~0)));
195
196
1/2
✗ Branch 11 → 12 not taken.
✓ Branch 11 → 15 taken 431 times.
431x if (iocp_ == nullptr)
197 detail::throw_system_error(make_err(::GetLastError()));
198
199 // Create timer wakeup mechanism (tries NT native, falls back to thread)
200
1/1
✓ Branch 15 → 16 taken 431 times.
431x timers_ = make_win_timers(iocp_, &dispatch_required_);
201
202 // Connect timer service to scheduler
203
2/2
✓ Branch 18 → 19 taken 431 times.
✓ Branch 19 → 20 taken 431 times.
431x set_timer_service(&get_timer_service(ctx, *this));
204
205 // Initialize resolver service
206
1/1
✓ Branch 20 → 21 taken 431 times.
431x ctx.make_service<win_resolver_service>(*this);
207 431x }
208
209 862x inline win_scheduler::~win_scheduler()
210 {
211
1/2
✓ Branch 2 → 3 taken 431 times.
✗ Branch 2 → 4 not taken.
431x if (iocp_ != nullptr)
212 431x ::CloseHandle(iocp_);
213 862x }
214
215 inline void
216 431x win_scheduler::shutdown()
217 {
218
1/2
✓ Branch 3 → 4 taken 431 times.
✗ Branch 3 → 6 not taken.
431x if (timers_)
219 431x timers_->stop();
220
221 // Drain timer heap before the work-counting loop. The timer_service
222 // was registered after this scheduler (nested make_service from our
223 // constructor), so execution_context::shutdown() calls us first.
224 // Asio avoids this by owning timer queues directly inside the
225 // scheduler; we bridge the gap by shutting down the timer service
226 // early. The subsequent call from execution_context is a no-op.
227
1/2
✓ Branch 6 → 7 taken 431 times.
✗ Branch 6 → 8 not taken.
431x if (timer_svc_)
228 431x timer_svc_->shutdown();
229
230
2/2
✓ Branch 32 → 9 taken 15 times.
✓ Branch 32 → 33 taken 431 times.
892x while (::InterlockedExchangeAdd(&outstanding_work_, 0) > 0)
231 {
232 15x op_queue ops;
233 {
234 15x std::lock_guard<win_mutex> lock(dispatch_mutex_);
235 15x ops.splice(completed_ops_);
236 15x }
237
238
1/2
✗ Branch 13 → 14 not taken.
✓ Branch 13 → 20 taken 15 times.
15x if (!ops.empty())
239 {
240 while (auto* h = ops.pop())
241 {
242 ::InterlockedDecrement(&outstanding_work_);
243 h->destroy();
244 }
245 }
246 else
247 {
248 DWORD bytes;
249 ULONG_PTR key;
250 LPOVERLAPPED overlapped;
251
1/1
✓ Branch 20 → 21 taken 15 times.
15x ::GetQueuedCompletionStatus(
252 iocp_, &bytes, &key, &overlapped, gqcs_timeout_ms_);
253
2/2
✓ Branch 21 → 22 taken 14 times.
✓ Branch 21 → 28 taken 1 time.
15x if (overlapped)
254 {
255 14x ::InterlockedDecrement(&outstanding_work_);
256
2/2
✓ Branch 24 → 25 taken 10 times.
✓ Branch 24 → 26 taken 4 times.
14x if (key == key_posted)
257 {
258 10x auto* op = reinterpret_cast<scheduler_op*>(overlapped);
259
1/1
✓ Branch 25 → 28 taken 10 times.
10x op->destroy();
260 }
261 else
262 {
263 4x auto* op = overlapped_to_op(overlapped);
264
1/1
✓ Branch 27 → 28 taken 4 times.
4x op->destroy();
265 }
266 }
267 }
268 }
269 431x }
270
271 inline void
272 7112x win_scheduler::post(std::coroutine_handle<> h) const
273 {
274 struct post_handler final : scheduler_op
275 {
276 std::coroutine_handle<> h_;
277
278 7112x static void do_complete(
279 void* owner, scheduler_op* base, std::uint32_t, std::uint32_t)
280 {
281 7112x auto* self = static_cast<post_handler*>(base);
282
2/2
✓ Branch 2 → 3 taken 6 times.
✓ Branch 2 → 10 taken 7106 times.
7112x if (!owner)
283 {
284 // Shutdown path: destroy the coroutine frame synchronously.
285 //
286 // Bounded destruction invariant: the chain triggered by
287 // coro.destroy() is at most two levels deep:
288 // 1. task frame destroyed → ~io_awaitable_promise_base()
289 // destroys stored continuation (if != noop_coroutine)
290 // 2. continuation (trampoline) destroyed → final_suspend
291 // returns suspend_never, no further continuation
292 //
293 // If a future refactor adds deeper continuation chains,
294 // this would reintroduce re-entrant stack overflow risk.
295 #ifndef NDEBUG
296 static thread_local int destroy_depth = 0;
297 6x ++destroy_depth;
298
1/2
✗ Branch 3 → 4 not taken.
✓ Branch 3 → 5 taken 6 times.
6x BOOST_COROSIO_ASSERT(destroy_depth <= 2);
299 #endif
300 6x auto coro = self->h_;
301
1/2
✓ Branch 6 → 7 taken 6 times.
✗ Branch 6 → 8 not taken.
6x delete self;
302
1/1
✓ Branch 8 → 9 taken 6 times.
6x coro.destroy();
303 #ifndef NDEBUG
304 6x --destroy_depth;
305 #endif
306 6x return;
307 }
308 7106x auto coro = self->h_;
309
1/2
✓ Branch 10 → 11 taken 7106 times.
✗ Branch 10 → 12 not taken.
7106x delete self;
310 std::atomic_thread_fence(std::memory_order_acquire);
311
1/1
✓ Branch 13 → 14 taken 7106 times.
7106x coro.resume();
312 }
313
314 7112x explicit post_handler(std::coroutine_handle<> coro)
315 7112x : scheduler_op(&do_complete)
316 7112x , h_(coro)
317 {
318 7112x }
319 };
320
321 7112x auto* ph = new post_handler(h);
322 7112x ::InterlockedIncrement(&outstanding_work_);
323
324
1/2
✗ Branch 7 → 8 not taken.
✓ Branch 7 → 14 taken 7112 times.
7112x if (!::PostQueuedCompletionStatus(
325 7112x iocp_, 0, key_posted, reinterpret_cast<LPOVERLAPPED>(ph)))
326 {
327 std::lock_guard<win_mutex> lock(dispatch_mutex_);
328 completed_ops_.push(ph);
329 ::InterlockedExchange(&dispatch_required_, 1);
330 }
331 7112x }
332
333 inline void
334 3860x win_scheduler::post(scheduler_op* h) const
335 {
336 3860x ::InterlockedIncrement(&outstanding_work_);
337
338
1/2
✗ Branch 5 → 6 not taken.
✓ Branch 5 → 12 taken 3860 times.
3860x if (!::PostQueuedCompletionStatus(
339 3860x iocp_, 0, key_posted, reinterpret_cast<LPOVERLAPPED>(h)))
340 {
341 std::lock_guard<win_mutex> lock(dispatch_mutex_);
342 completed_ops_.push(h);
343 ::InterlockedExchange(&dispatch_required_, 1);
344 }
345 3860x }
346
347 inline bool
348 8969x win_scheduler::running_in_this_thread() const noexcept
349 {
350
2/2
✓ Branch 6 → 3 taken 3282 times.
✓ Branch 6 → 7 taken 5687 times.
8969x for (auto* c = iocp::context_stack.get(); c != nullptr; c = c->next)
351
1/2
✓ Branch 3 → 4 taken 3282 times.
✗ Branch 3 → 5 not taken.
3282x if (c->key == this)
352 3282x return true;
353 5687x return false;
354 }
355
356 inline void
357 481630x win_scheduler::work_started() noexcept
358 {
359 481630x ::InterlockedIncrement(&outstanding_work_);
360 481630x }
361
362 inline void
363 492588x win_scheduler::work_finished() noexcept
364 {
365
2/2
✓ Branch 4 → 5 taken 2668 times.
✓ Branch 4 → 6 taken 489920 times.
985176x if (::InterlockedDecrement(&outstanding_work_) == 0)
366 2668x stop();
367 492588x }
368
369 inline void
370 470924x win_scheduler::on_pending(overlapped_op* op) const
371 {
372 // CAS: try to set ready_ from 0 to 1.
373 // If the old value was 1, GQCS already grabbed this op and stored
374 // results — we need to re-post so do_one() can dispatch it.
375
1/2
✗ Branch 4 → 5 not taken.
✓ Branch 4 → 16 taken 470924 times.
941848x if (::InterlockedCompareExchange(&op->ready_, 1, 0) == 1)
376 {
377 if (!::PostQueuedCompletionStatus(
378 iocp_, 0, key_result_stored, static_cast<LPOVERLAPPED>(op)))
379 {
380 std::lock_guard<win_mutex> lock(dispatch_mutex_);
381 completed_ops_.push(op);
382 ::InterlockedExchange(&dispatch_required_, 1);
383 }
384 }
385 470924x }
386
387 inline void
388 14x win_scheduler::on_completion(overlapped_op* op, DWORD error, DWORD bytes) const
389 {
390 // Sync completion: pack results into op and post for dispatch.
391 14x op->ready_ = 1;
392 14x op->dwError = error;
393 14x op->bytes_transferred = bytes;
394
395
2/4
✓ Branch 2 → 3 taken 14 times.
✗ Branch 2 → 4 not taken.
✗ Branch 6 → 7 not taken.
✓ Branch 6 → 13 taken 14 times.
28x if (!::PostQueuedCompletionStatus(
396 14x iocp_, 0, key_result_stored, static_cast<LPOVERLAPPED>(op)))
397 {
398 std::lock_guard<win_mutex> lock(dispatch_mutex_);
399 completed_ops_.push(op);
400 ::InterlockedExchange(&dispatch_required_, 1);
401 }
402 14x }
403
404 inline void
405 5356x win_scheduler::stop()
406 {
407
2/2
✓ Branch 4 → 5 taken 2675 times.
✓ Branch 4 → 13 taken 2681 times.
10712x if (::InterlockedExchange(&stopped_, 1) == 0)
408 {
409
1/2
✓ Branch 7 → 8 taken 2675 times.
✗ Branch 7 → 13 not taken.
5350x if (::InterlockedExchange(&stop_event_posted_, 1) == 0)
410 {
411
1/2
✗ Branch 9 → 10 not taken.
✓ Branch 9 → 13 taken 2675 times.
2675x if (!::PostQueuedCompletionStatus(iocp_, 0, key_shutdown, nullptr))
412 {
413 // PQCS failure is non-fatal: stopped_ is already set.
414 // The run() loop will notice via the GQCS timeout
415 // (gqcs_timeout_ms_, default 500ms) and exit.
416 ::InterlockedExchange(&dispatch_required_, 1);
417 }
418 }
419 }
420 5356x }
421
422 inline bool
423 2454x win_scheduler::stopped() const noexcept
424 {
425 // equivalent to atomic read
426 4908x return ::InterlockedExchangeAdd(&stopped_, 0) != 0;
427 }
428
429 inline void
430 2426x win_scheduler::restart()
431 {
432 2426x ::InterlockedExchange(&stopped_, 0);
433 2426x ::InterlockedExchange(&stop_event_posted_, 0);
434 2426x }
435
436 inline std::size_t
437 2687x win_scheduler::run()
438 {
439
2/2
✓ Branch 4 → 5 taken 22 times.
✓ Branch 4 → 7 taken 2665 times.
5374x if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
440 {
441
1/1
✓ Branch 5 → 6 taken 22 times.
22x stop();
442 22x return 0;
443 }
444
445 2665x iocp::thread_context_guard ctx(this);
446
447 2665x std::size_t n = 0;
448 for (;;)
449 {
450
3/3
✓ Branch 9 → 10 taken 481877 times.
✓ Branch 10 → 11 taken 14 times.
✓ Branch 10 → 12 taken 481863 times.
481877x if (!do_one(INFINITE))
451 14x break;
452
1/2
✓ Branch 13 → 14 taken 481863 times.
✗ Branch 13 → 15 not taken.
481863x if (n != (std::numeric_limits<std::size_t>::max)())
453 481863x ++n;
454
2/2
✓ Branch 17 → 18 taken 2651 times.
✓ Branch 17 → 20 taken 479212 times.
963726x if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
455 {
456
1/1
✓ Branch 18 → 19 taken 2651 times.
2651x stop();
457 2651x break;
458 }
459 }
460 2665x return n;
461 2665x }
462
463 inline std::size_t
464 2x win_scheduler::run_one()
465 {
466
1/2
✗ Branch 4 → 5 not taken.
✓ Branch 4 → 7 taken 2 times.
4x if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
467 {
468 stop();
469 return 0;
470 }
471
472 2x iocp::thread_context_guard ctx(this);
473
1/1
✓ Branch 8 → 9 taken 2 times.
2x return do_one(INFINITE);
474 2x }
475
476 inline std::size_t
477 39x win_scheduler::wait_one(long usec)
478 {
479
2/2
✓ Branch 4 → 5 taken 8 times.
✓ Branch 4 → 7 taken 31 times.
78x if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
480 {
481
1/1
✓ Branch 5 → 6 taken 8 times.
8x stop();
482 8x return 0;
483 }
484
485 31x iocp::thread_context_guard ctx(this);
486 31x unsigned long timeout_ms = INFINITE;
487
1/2
✓ Branch 8 → 9 taken 31 times.
✗ Branch 8 → 13 not taken.
31x if (usec >= 0)
488 {
489 31x auto ms = (static_cast<long long>(usec) + 999) / 1000;
490
1/2
✓ Branch 9 → 10 taken 31 times.
✗ Branch 9 → 11 not taken.
31x timeout_ms = ms >= 0xFFFFFFFELL ? static_cast<unsigned long>(0xFFFFFFFE)
491 : static_cast<unsigned long>(ms);
492 }
493
1/1
✓ Branch 13 → 14 taken 31 times.
31x return do_one(timeout_ms);
494 31x }
495
496 inline std::size_t
497 5x win_scheduler::poll()
498 {
499
2/2
✓ Branch 4 → 5 taken 1 time.
✓ Branch 4 → 7 taken 4 times.
10x if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
500 {
501
1/1
✓ Branch 5 → 6 taken 1 time.
1x stop();
502 1x return 0;
503 }
504
505 4x iocp::thread_context_guard ctx(this);
506
507 4x std::size_t n = 0;
508
3/3
✓ Branch 12 → 13 taken 9 times.
✓ Branch 13 → 9 taken 5 times.
✓ Branch 13 → 14 taken 4 times.
9x while (do_one(0))
509
1/2
✓ Branch 10 → 11 taken 5 times.
✗ Branch 10 → 12 not taken.
5x if (n != (std::numeric_limits<std::size_t>::max)())
510 5x ++n;
511 4x return n;
512 4x }
513
514 inline std::size_t
515 4x win_scheduler::poll_one()
516 {
517
2/2
✓ Branch 4 → 5 taken 2 times.
✓ Branch 4 → 7 taken 2 times.
8x if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
518 {
519
1/1
✓ Branch 5 → 6 taken 2 times.
2x stop();
520 2x return 0;
521 }
522
523 2x iocp::thread_context_guard ctx(this);
524
1/1
✓ Branch 8 → 9 taken 2 times.
2x return do_one(0);
525 2x }
526
527 inline void
528 974x win_scheduler::post_deferred_completions(op_queue& ops)
529 {
530
1/2
✗ Branch 3 → 4 not taken.
✓ Branch 3 → 15 taken 974 times.
974x while (auto h = ops.pop())
531 {
532 if (::PostQueuedCompletionStatus(
533 iocp_, 0, key_posted, reinterpret_cast<LPOVERLAPPED>(h)))
534 continue;
535
536 // Out of resources, put the failed op and remaining ops back
537 ops.push(h);
538 std::lock_guard<win_mutex> lock(dispatch_mutex_);
539 completed_ops_.splice(ops);
540 ::InterlockedExchange(&dispatch_required_, 1);
541 return;
542 }
543 }
544
545 inline std::size_t
546 485308x win_scheduler::do_one(unsigned long timeout_ms)
547 {
548 for (;;)
549 {
550 // Check if we need to process timers or deferred ops
551
2/2
✓ Branch 4 → 5 taken 974 times.
✓ Branch 4 → 13 taken 484334 times.
970616x if (::InterlockedCompareExchange(&dispatch_required_, 0, 1) == 1)
552 {
553 974x op_queue local_ops;
554 {
555 974x std::lock_guard<win_mutex> lock(dispatch_mutex_);
556 974x local_ops.splice(completed_ops_);
557 974x }
558
1/1
✓ Branch 8 → 9 taken 974 times.
974x post_deferred_completions(local_ops);
559
560
1/2
✓ Branch 9 → 10 taken 974 times.
✗ Branch 9 → 11 not taken.
974x if (timer_svc_)
561
1/1
✓ Branch 10 → 11 taken 974 times.
974x timer_svc_->process_expired();
562
563
1/1
✓ Branch 11 → 12 taken 974 times.
974x update_timeout();
564 }
565
566 485308x DWORD bytes = 0;
567 485308x ULONG_PTR key = 0;
568 485308x LPOVERLAPPED overlapped = nullptr;
569
1/1
✓ Branch 13 → 14 taken 485308 times.
485308x ::SetLastError(0);
570
571
1/1
✓ Branch 17 → 18 taken 485308 times.
485308x BOOL result = ::GetQueuedCompletionStatus(
572 iocp_, &bytes, &key, &overlapped,
573
2/2
✓ Branch 14 → 15 taken 485262 times.
✓ Branch 14 → 16 taken 46 times.
485308x timeout_ms < gqcs_timeout_ms_ ? timeout_ms
574 : gqcs_timeout_ms_);
575
1/1
✓ Branch 18 → 19 taken 485308 times.
485308x DWORD dwError = ::GetLastError();
576
577 // Handle based on completion key
578
2/2
✓ Branch 19 → 20 taken 481900 times.
✓ Branch 19 → 39 taken 3408 times.
485308x if (overlapped)
579 {
580
2/2
✓ Branch 20 → 21 taken 1840 times.
✓ Branch 20 → 22 taken 480060 times.
481900x DWORD err = result ? 0 : dwError;
581
582
2/3
✓ Branch 23 → 24 taken 470938 times.
✓ Branch 23 → 35 taken 10962 times.
✗ Branch 23 → 38 not taken.
481900x switch (key)
583 {
584 470938x case key_io:
585 case key_result_stored:
586 {
587 470938x auto* ov_op = overlapped_to_op(overlapped);
588
589 // If key_result_stored, results are pre-stored in op fields
590
2/2
✓ Branch 25 → 26 taken 14 times.
✓ Branch 25 → 27 taken 470924 times.
470938x if (key == key_result_stored)
591 {
592 14x bytes = ov_op->bytes_transferred;
593 14x err = ov_op->dwError;
594 }
595
596 // Store GQCS results so on_pending() re-post has valid data
597 470938x ov_op->store_result(bytes, err);
598
599 // CAS: try to set ready_ from 0 to 1.
600 // If old value was 1, the initiator already returned
601 // (on_pending/on_completion set it) — safe to dispatch.
602 // If old value was 0, the initiator hasn't returned yet —
603 // skip dispatch; on_pending() will re-post.
604
1/2
✓ Branch 30 → 31 taken 470938 times.
✗ Branch 30 → 34 not taken.
941876x if (::InterlockedCompareExchange(&ov_op->ready_, 1, 0) == 1)
605 {
606
1/1
✓ Branch 31 → 32 taken 470938 times.
470938x ov_op->complete(this, bytes, err);
607 470938x work_finished();
608 481921x return 1;
609 }
610 3374x continue;
611 }
612
613 10962x case key_posted:
614 {
615 // Posted scheduler_op*: overlapped is actually a scheduler_op*
616 10962x auto* op = reinterpret_cast<scheduler_op*>(overlapped);
617
1/1
✓ Branch 35 → 36 taken 10962 times.
10962x op->complete(this, bytes, err);
618 10962x work_finished();
619 10962x return 1;
620 }
621
622 default:
623 continue;
624 }
625 }
626
627 // Signal completions (no OVERLAPPED)
628
2/2
✓ Branch 39 → 40 taken 3390 times.
✓ Branch 39 → 53 taken 18 times.
3408x if (result)
629 {
630
2/3
✓ Branch 40 → 41 taken 974 times.
✓ Branch 40 → 42 taken 2416 times.
✗ Branch 40 → 52 not taken.
3390x switch (key)
631 {
632 974x case key_wake_dispatch:
633 // Timer wakeup - loop to check dispatch_required_
634 974x continue;
635
636 2416x case key_shutdown:
637 2416x ::InterlockedExchange(&stop_event_posted_, 0);
638
2/2
✓ Branch 45 → 46 taken 16 times.
✓ Branch 45 → 51 taken 2400 times.
2416x if (stopped())
639 {
640 // Re-post for other waiting threads
641
1/2
✓ Branch 48 → 49 taken 16 times.
✗ Branch 48 → 50 not taken.
32x if (::InterlockedExchange(&stop_event_posted_, 1) == 0)
642 {
643
1/1
✓ Branch 49 → 50 taken 16 times.
16x ::PostQueuedCompletionStatus(
644 iocp_, 0, key_shutdown, nullptr);
645 }
646 16x return 0;
647 }
648 2400x continue;
649
650 default:
651 continue;
652 }
653 }
654
655 // Timeout or error
656
1/2
✗ Branch 53 → 54 not taken.
✓ Branch 53 → 56 taken 18 times.
18x if (dwError != WAIT_TIMEOUT)
657 detail::throw_system_error(make_err(dwError));
658
2/2
✓ Branch 56 → 57 taken 5 times.
✓ Branch 56 → 58 taken 13 times.
18x if (timeout_ms != INFINITE)
659 5x return 0;
660 // PQCS-failure fallback: stop() sets stopped_ and
661 // dispatch_required_ but if the key_shutdown post failed,
662 // no completion is ever dequeued. Catch it here on the
663 // periodic 500 ms GQCS timeout so run()/run_one() can exit.
664
1/2
✗ Branch 59 → 60 not taken.
✓ Branch 59 → 61 taken 13 times.
13x if (stopped())
665 return 0;
666 3387x }
667 }
668
669 inline void
670 1679x win_scheduler::on_timer_changed(void* ctx)
671 {
672 1679x static_cast<win_scheduler*>(ctx)->update_timeout();
673 1679x }
674
675 inline void
676 431x win_scheduler::set_timer_service(timer_service* svc)
677 {
678 431x timer_svc_ = svc;
679 // Pass 'this' as context - callback routes to correct instance
680 431x svc->set_on_earliest_changed(
681 431x timer_service::callback{this, &on_timer_changed});
682
1/2
✓ Branch 5 → 6 taken 431 times.
✗ Branch 5 → 8 not taken.
431x if (timers_)
683 431x timers_->start();
684 431x }
685
686 inline void
687 2653x win_scheduler::update_timeout()
688 {
689
3/6
✓ Branch 2 → 3 taken 2653 times.
✗ Branch 2 → 6 not taken.
✓ Branch 4 → 5 taken 2653 times.
✗ Branch 4 → 6 not taken.
✓ Branch 7 → 8 taken 2653 times.
✗ Branch 7 → 11 not taken.
2653x if (timer_svc_ && timers_)
690 2653x timers_->update_timeout(timer_svc_->nearest_expiry());
691 2653x }
692
693 } // namespace boost::corosio::detail
694
695 #endif // BOOST_COROSIO_HAS_IOCP
696
697 #endif // BOOST_COROSIO_NATIVE_DETAIL_IOCP_WIN_SCHEDULER_HPP
698