include/boost/corosio/native/detail/iocp/win_scheduler.hpp

80.7% Lines (213/264) 100.0% Functions (28/28) 66.0% Branches (97/147)
include/boost/corosio/native/detail/iocp/win_scheduler.hpp
Line Branch TLA Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco ([email protected])
3 // Copyright (c) 2026 Steve Gerbino
4 //
5 // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 //
8 // Official repository: https://github.com/cppalliance/corosio
9 //
10
11 #ifndef BOOST_COROSIO_NATIVE_DETAIL_IOCP_WIN_SCHEDULER_HPP
12 #define BOOST_COROSIO_NATIVE_DETAIL_IOCP_WIN_SCHEDULER_HPP
13
14 #include <boost/corosio/detail/platform.hpp>
15
16 #if BOOST_COROSIO_HAS_IOCP
17
18 #include <boost/corosio/detail/config.hpp>
19 #include <boost/capy/ex/execution_context.hpp>
20
21 #include <boost/corosio/native/native_scheduler.hpp>
22 #include <system_error>
23
24 #include <boost/corosio/detail/scheduler_op.hpp>
25 #include <boost/corosio/native/detail/iocp/win_completion_key.hpp>
26 #include <boost/corosio/native/detail/iocp/win_mutex.hpp>
27
28 #include <boost/corosio/native/detail/iocp/win_overlapped_op.hpp>
29 #include <boost/corosio/native/detail/iocp/win_timers.hpp>
30 #include <boost/corosio/detail/timer_service.hpp>
31 #include <boost/corosio/native/detail/iocp/win_resolver_service.hpp>
32 #include <boost/corosio/detail/make_err.hpp>
33 #include <boost/corosio/detail/except.hpp>
34 #include <boost/corosio/detail/thread_local_ptr.hpp>
35
36 #include <atomic>
37 #include <chrono>
38 #include <cstdint>
39 #include <limits>
40 #include <memory>
41
42 #include <boost/corosio/native/detail/iocp/win_windows.hpp>
43
44 namespace boost::corosio::detail {
45
46 // Forward declarations
47 struct overlapped_op;
48 class win_timers;
49
50 class BOOST_COROSIO_DECL win_scheduler final
51 : public native_scheduler
52 , public capy::execution_context::service
53 {
54 public:
55 using key_type = scheduler;
56
57 win_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
58 ~win_scheduler();
59 win_scheduler(win_scheduler const&) = delete;
60 win_scheduler& operator=(win_scheduler const&) = delete;
61
62 void shutdown() override;
63 void post(std::coroutine_handle<> h) const override;
64 void post(scheduler_op* h) const override;
65 bool running_in_this_thread() const noexcept override;
66 void stop() override;
67 bool stopped() const noexcept override;
68 void restart() override;
69 std::size_t run() override;
70 std::size_t run_one() override;
71 std::size_t wait_one(long usec) override;
72 std::size_t poll() override;
73 std::size_t poll_one() override;
74
75 290 void* native_handle() const noexcept
76 {
77 290 return iocp_;
78 }
79
80 void work_started() noexcept override;
81 void work_finished() noexcept override;
82
83 /** Signal that an overlapped I/O operation is now pending.
84 Coordinates with do_one() via the ready_ CAS protocol. */
85 void on_pending(overlapped_op* op) const;
86
87 /** Post an immediate completion with pre-stored results.
88 Used for sync errors and noop paths. */
89 void on_completion(overlapped_op* op, DWORD error, DWORD bytes) const;
90
91 // Timer service integration
92 void set_timer_service(timer_service* svc);
93 void update_timeout();
94
95 private:
96 static void on_timer_changed(void* ctx);
97 void post_deferred_completions(op_queue& ops);
98 std::size_t do_one(unsigned long timeout_ms);
99
100 void* iocp_;
101 mutable long outstanding_work_;
102 mutable long stopped_;
103 long shutdown_;
104 long stop_event_posted_;
105 mutable long dispatch_required_;
106
107 mutable win_mutex dispatch_mutex_;
108 mutable op_queue completed_ops_;
109 std::unique_ptr<win_timers> timers_;
110 };
111
112 /*
113 ARCHITECTURE NOTE: Function Pointer Dispatch
114
115 All I/O handles are registered with the IOCP using key_io (0).
116 Dispatch happens via the function pointer stored in each scheduler_op.
117
118 When GQCS returns with an OVERLAPPED*, we cast it to scheduler_op*
119 and call the function pointer directly - no virtual dispatch.
120
121 The completion_key enum values are used only for internal signals:
122 - key_io (0): Normal I/O completion, dispatch via func_
123 - key_wake_dispatch (1): Timer wakeup, check dispatch_required_
124 - key_shutdown (2): Stop signal
125 - key_result_stored (3): Results pre-stored in OVERLAPPED
126 */
127
128 namespace iocp {
129
130 // Max timeout for GQCS to allow periodic re-checking of conditions.
131 // Matches Asio's default_gqcs_timeout for pre-Vista compatibility.
132 inline constexpr unsigned long max_gqcs_timeout = 500;
133
134 struct BOOST_COROSIO_SYMBOL_VISIBLE scheduler_context
135 {
136 win_scheduler const* key;
137 scheduler_context* next;
138 };
139
140 inline thread_local_ptr<scheduler_context> context_stack;
141
142 struct thread_context_guard
143 {
144 scheduler_context frame_;
145
146 2732 explicit thread_context_guard(win_scheduler const* ctx) noexcept
147 2732 : frame_{ctx, context_stack.get()}
148 {
149 2732 context_stack.set(&frame_);
150 2732 }
151
152 2732 ~thread_context_guard() noexcept
153 {
154 2732 context_stack.set(frame_.next);
155 2732 }
156 };
157
158 } // namespace iocp
159
160 290 inline win_scheduler::win_scheduler(
161 290 capy::execution_context& ctx, int concurrency_hint)
162 290 : iocp_(nullptr)
163 290 , outstanding_work_(0)
164 290 , stopped_(0)
165 290 , shutdown_(0)
166 290 , stop_event_posted_(0)
167
1/1
✓ Branch 4 → 5 taken 290 times.
290 , dispatch_required_(0)
168 {
169 // concurrency_hint < 0 means use system default (DWORD(~0) = max)
170
2/3
✓ Branch 7 → 8 taken 290 times.
✗ Branch 7 → 9 not taken.
✓ Branch 10 → 11 taken 290 times.
290 iocp_ = ::CreateIoCompletionPort(
171 INVALID_HANDLE_VALUE, nullptr, 0,
172 static_cast<DWORD>(
173 concurrency_hint >= 0 ? concurrency_hint : DWORD(~0)));
174
175
1/2
✗ Branch 11 → 12 not taken.
✓ Branch 11 → 15 taken 290 times.
290 if (iocp_ == nullptr)
176 detail::throw_system_error(make_err(::GetLastError()));
177
178 // Create timer wakeup mechanism (tries NT native, falls back to thread)
179
1/1
✓ Branch 15 → 16 taken 290 times.
290 timers_ = make_win_timers(iocp_, &dispatch_required_);
180
181 // Connect timer service to scheduler
182
2/2
✓ Branch 18 → 19 taken 290 times.
✓ Branch 19 → 20 taken 290 times.
290 set_timer_service(&get_timer_service(ctx, *this));
183
184 // Initialize resolver service
185
1/1
✓ Branch 20 → 21 taken 290 times.
290 ctx.make_service<win_resolver_service>(*this);
186 290 }
187
188 580 inline win_scheduler::~win_scheduler()
189 {
190
1/2
✓ Branch 2 → 3 taken 290 times.
✗ Branch 2 → 4 not taken.
290 if (iocp_ != nullptr)
191 290 ::CloseHandle(iocp_);
192 580 }
193
194 inline void
195 290 win_scheduler::shutdown()
196 {
197 290 ::InterlockedExchange(&shutdown_, 1);
198
199
1/2
✓ Branch 5 → 6 taken 290 times.
✗ Branch 5 → 8 not taken.
290 if (timers_)
200 290 timers_->stop();
201
202 for (;;)
203 {
204 290 op_queue ops;
205 {
206 290 std::lock_guard<win_mutex> lock(dispatch_mutex_);
207 290 ops.splice(completed_ops_);
208 290 }
209
210 290 bool drained_any = false;
211
212
1/2
✗ Branch 14 → 15 not taken.
✓ Branch 14 → 17 taken 290 times.
290 while (auto* h = ops.pop())
213 {
214 h->destroy();
215 drained_any = true;
216 }
217
218 DWORD bytes;
219 ULONG_PTR key;
220 LPOVERLAPPED overlapped;
221
1/1
✓ Branch 17 → 18 taken 290 times.
290 ::GetQueuedCompletionStatus(iocp_, &bytes, &key, &overlapped, 0);
222
1/2
✗ Branch 18 → 19 not taken.
✓ Branch 18 → 24 taken 290 times.
290 if (overlapped)
223 {
224 if (key == key_posted)
225 {
226 auto* op = reinterpret_cast<scheduler_op*>(overlapped);
227 op->destroy();
228 }
229 else
230 {
231 auto* op = overlapped_to_op(overlapped);
232 op->destroy();
233 }
234 drained_any = true;
235 }
236
237
1/2
✓ Branch 24 → 25 taken 290 times.
✗ Branch 24 → 28 not taken.
290 if (!drained_any)
238 290 break;
239 }
240
241 290 ::InterlockedExchange(&outstanding_work_, 0);
242 290 }
243
244 inline void
245 9241 win_scheduler::post(std::coroutine_handle<> h) const
246 {
247 struct post_handler final : scheduler_op
248 {
249 std::coroutine_handle<> h_;
250
251 9241 static void do_complete(
252 void* owner, scheduler_op* base, std::uint32_t, std::uint32_t)
253 {
254 9241 auto* self = static_cast<post_handler*>(base);
255
1/2
✗ Branch 2 → 3 not taken.
✓ Branch 2 → 6 taken 9241 times.
9241 if (!owner)
256 {
257 delete self;
258 return;
259 }
260 9241 auto coro = self->h_;
261
1/2
✓ Branch 6 → 7 taken 9241 times.
✗ Branch 6 → 8 not taken.
9241 delete self;
262 std::atomic_thread_fence(std::memory_order_acquire);
263
1/1
✓ Branch 9 → 10 taken 9241 times.
9241 coro.resume();
264 }
265
266 9241 explicit post_handler(std::coroutine_handle<> coro)
267 9241 : scheduler_op(&do_complete)
268 9241 , h_(coro)
269 {
270 9241 }
271 };
272
273 9241 auto* ph = new post_handler(h);
274 9241 ::InterlockedIncrement(&outstanding_work_);
275
276
1/2
✗ Branch 7 → 8 not taken.
✓ Branch 7 → 14 taken 9241 times.
9241 if (!::PostQueuedCompletionStatus(
277 9241 iocp_, 0, key_posted, reinterpret_cast<LPOVERLAPPED>(ph)))
278 {
279 std::lock_guard<win_mutex> lock(dispatch_mutex_);
280 completed_ops_.push(ph);
281 ::InterlockedExchange(&dispatch_required_, 1);
282 }
283 9241 }
284
285 inline void
286 1815 win_scheduler::post(scheduler_op* h) const
287 {
288 1815 ::InterlockedIncrement(&outstanding_work_);
289
290
1/2
✗ Branch 5 → 6 not taken.
✓ Branch 5 → 12 taken 1815 times.
1815 if (!::PostQueuedCompletionStatus(
291 1815 iocp_, 0, key_posted, reinterpret_cast<LPOVERLAPPED>(h)))
292 {
293 std::lock_guard<win_mutex> lock(dispatch_mutex_);
294 completed_ops_.push(h);
295 ::InterlockedExchange(&dispatch_required_, 1);
296 }
297 1815 }
298
299 inline bool
300 8994 win_scheduler::running_in_this_thread() const noexcept
301 {
302
2/2
✓ Branch 6 → 3 taken 3276 times.
✓ Branch 6 → 7 taken 5718 times.
8994 for (auto* c = iocp::context_stack.get(); c != nullptr; c = c->next)
303
1/2
✓ Branch 3 → 4 taken 3276 times.
✗ Branch 3 → 5 not taken.
3276 if (c->key == this)
304 3276 return true;
305 5718 return false;
306 }
307
308 inline void
309 483763 win_scheduler::work_started() noexcept
310 {
311 483763 ::InterlockedIncrement(&outstanding_work_);
312 483763 }
313
314 inline void
315 494819 win_scheduler::work_finished() noexcept
316 {
317
2/2
✓ Branch 4 → 5 taken 2704 times.
✓ Branch 4 → 6 taken 492115 times.
989638 if (::InterlockedDecrement(&outstanding_work_) == 0)
318 2704 stop();
319 494819 }
320
321 inline void
322 472972 win_scheduler::on_pending(overlapped_op* op) const
323 {
324 // CAS: try to set ready_ from 0 to 1.
325 // If the old value was 1, GQCS already grabbed this op and stored
326 // results — we need to re-post so do_one() can dispatch it.
327
1/2
✗ Branch 4 → 5 not taken.
✓ Branch 4 → 16 taken 472972 times.
945944 if (::InterlockedCompareExchange(&op->ready_, 1, 0) == 1)
328 {
329 if (!::PostQueuedCompletionStatus(
330 iocp_, 0, key_result_stored, static_cast<LPOVERLAPPED>(op)))
331 {
332 std::lock_guard<win_mutex> lock(dispatch_mutex_);
333 completed_ops_.push(op);
334 ::InterlockedExchange(&dispatch_required_, 1);
335 }
336 }
337 472972 }
338
339 inline void
340 6 win_scheduler::on_completion(
341 overlapped_op* op, DWORD error, DWORD bytes) const
342 {
343 // Sync completion: pack results into op and post for dispatch.
344 6 op->ready_ = 1;
345 6 op->dwError = error;
346 6 op->bytes_transferred = bytes;
347
348
2/4
✓ Branch 2 → 3 taken 6 times.
✗ Branch 2 → 4 not taken.
✗ Branch 6 → 7 not taken.
✓ Branch 6 → 13 taken 6 times.
12 if (!::PostQueuedCompletionStatus(
349 6 iocp_, 0, key_result_stored, static_cast<LPOVERLAPPED>(op)))
350 {
351 std::lock_guard<win_mutex> lock(dispatch_mutex_);
352 completed_ops_.push(op);
353 ::InterlockedExchange(&dispatch_required_, 1);
354 }
355 6 }
356
357 inline void
358 5439 win_scheduler::stop()
359 {
360
2/2
✓ Branch 4 → 5 taken 2711 times.
✓ Branch 4 → 13 taken 2728 times.
10878 if (::InterlockedExchange(&stopped_, 1) == 0)
361 {
362
1/2
✓ Branch 7 → 8 taken 2711 times.
✗ Branch 7 → 13 not taken.
5422 if (::InterlockedExchange(&stop_event_posted_, 1) == 0)
363 {
364
1/2
✗ Branch 9 → 10 not taken.
✓ Branch 9 → 13 taken 2711 times.
2711 if (!::PostQueuedCompletionStatus(iocp_, 0, key_shutdown, nullptr))
365 {
366 DWORD dwError = ::GetLastError();
367 detail::throw_system_error(make_err(dwError));
368 }
369 }
370 }
371 5439 }
372
373 inline bool
374 2538 win_scheduler::stopped() const noexcept
375 {
376 // equivalent to atomic read
377 5076 return ::InterlockedExchangeAdd(&stopped_, 0) != 0;
378 }
379
380 inline void
381 2535 win_scheduler::restart()
382 {
383 2535 ::InterlockedExchange(&stopped_, 0);
384 2535 ::InterlockedExchange(&stop_event_posted_, 0);
385 2535 }
386
387 inline std::size_t
388 2727 win_scheduler::run()
389 {
390
2/2
✓ Branch 4 → 5 taken 29 times.
✓ Branch 4 → 7 taken 2698 times.
5454 if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
391 {
392
1/1
✓ Branch 5 → 6 taken 29 times.
29 stop();
393 29 return 0;
394 }
395
396 2698 iocp::thread_context_guard ctx(this);
397
398 2698 std::size_t n = 0;
399 for (;;)
400 {
401
3/3
✓ Branch 9 → 10 taken 484004 times.
✓ Branch 10 → 11 taken 5 times.
✓ Branch 10 → 12 taken 483999 times.
484004 if (!do_one(INFINITE))
402 5 break;
403
1/2
✓ Branch 13 → 14 taken 483999 times.
✗ Branch 13 → 15 not taken.
483999 if (n != (std::numeric_limits<std::size_t>::max)())
404 483999 ++n;
405
2/2
✓ Branch 17 → 18 taken 2693 times.
✓ Branch 17 → 20 taken 481306 times.
967998 if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
406 {
407
1/1
✓ Branch 18 → 19 taken 2693 times.
2693 stop();
408 2693 break;
409 }
410 }
411 2698 return n;
412 2698 }
413
414 inline std::size_t
415 2 win_scheduler::run_one()
416 {
417
1/2
✗ Branch 4 → 5 not taken.
✓ Branch 4 → 7 taken 2 times.
4 if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
418 {
419 stop();
420 return 0;
421 }
422
423 2 iocp::thread_context_guard ctx(this);
424
1/1
✓ Branch 8 → 9 taken 2 times.
2 return do_one(INFINITE);
425 2 }
426
427 inline std::size_t
428 36 win_scheduler::wait_one(long usec)
429 {
430
2/2
✓ Branch 4 → 5 taken 8 times.
✓ Branch 4 → 7 taken 28 times.
72 if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
431 {
432
1/1
✓ Branch 5 → 6 taken 8 times.
8 stop();
433 8 return 0;
434 }
435
436 28 iocp::thread_context_guard ctx(this);
437 28 unsigned long timeout_ms = INFINITE;
438
1/2
✓ Branch 8 → 9 taken 28 times.
✗ Branch 8 → 13 not taken.
28 if (usec >= 0)
439 {
440 28 auto ms = (static_cast<long long>(usec) + 999) / 1000;
441
1/2
✓ Branch 9 → 10 taken 28 times.
✗ Branch 9 → 11 not taken.
28 timeout_ms = ms >= 0xFFFFFFFELL ? static_cast<unsigned long>(0xFFFFFFFE)
442 : static_cast<unsigned long>(ms);
443 }
444
1/1
✓ Branch 13 → 14 taken 28 times.
28 return do_one(timeout_ms);
445 28 }
446
447 inline std::size_t
448 3 win_scheduler::poll()
449 {
450
2/2
✓ Branch 4 → 5 taken 1 time.
✓ Branch 4 → 7 taken 2 times.
6 if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
451 {
452
1/1
✓ Branch 5 → 6 taken 1 time.
1 stop();
453 1 return 0;
454 }
455
456 2 iocp::thread_context_guard ctx(this);
457
458 2 std::size_t n = 0;
459
3/3
✓ Branch 12 → 13 taken 5 times.
✓ Branch 13 → 9 taken 3 times.
✓ Branch 13 → 14 taken 2 times.
5 while (do_one(0))
460
1/2
✓ Branch 10 → 11 taken 3 times.
✗ Branch 10 → 12 not taken.
3 if (n != (std::numeric_limits<std::size_t>::max)())
461 3 ++n;
462 2 return n;
463 2 }
464
465 inline std::size_t
466 4 win_scheduler::poll_one()
467 {
468
2/2
✓ Branch 4 → 5 taken 2 times.
✓ Branch 4 → 7 taken 2 times.
8 if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
469 {
470
1/1
✓ Branch 5 → 6 taken 2 times.
2 stop();
471 2 return 0;
472 }
473
474 2 iocp::thread_context_guard ctx(this);
475
1/1
✓ Branch 8 → 9 taken 2 times.
2 return do_one(0);
476 2 }
477
478 inline void
479 1036 win_scheduler::post_deferred_completions(op_queue& ops)
480 {
481
1/2
✗ Branch 3 → 4 not taken.
✓ Branch 3 → 15 taken 1036 times.
1036 while (auto h = ops.pop())
482 {
483 if (::PostQueuedCompletionStatus(
484 iocp_, 0, key_posted, reinterpret_cast<LPOVERLAPPED>(h)))
485 continue;
486
487 // Out of resources, put the failed op and remaining ops back
488 ops.push(h);
489 std::lock_guard<win_mutex> lock(dispatch_mutex_);
490 completed_ops_.splice(ops);
491 ::InterlockedExchange(&dispatch_required_, 1);
492 return;
493 }
494 }
495
496 inline std::size_t
497 487595 win_scheduler::do_one(unsigned long timeout_ms)
498 {
499 for (;;)
500 {
501 // Check if we need to process timers or deferred ops
502
2/2
✓ Branch 4 → 5 taken 1036 times.
✓ Branch 4 → 13 taken 486559 times.
975190 if (::InterlockedCompareExchange(&dispatch_required_, 0, 1) == 1)
503 {
504 1036 op_queue local_ops;
505 {
506 1036 std::lock_guard<win_mutex> lock(dispatch_mutex_);
507 1036 local_ops.splice(completed_ops_);
508 1036 }
509
1/1
✓ Branch 8 → 9 taken 1036 times.
1036 post_deferred_completions(local_ops);
510
511
1/2
✓ Branch 9 → 10 taken 1036 times.
✗ Branch 9 → 11 not taken.
1036 if (timer_svc_)
512
1/1
✓ Branch 10 → 11 taken 1036 times.
1036 timer_svc_->process_expired();
513
514
1/1
✓ Branch 11 → 12 taken 1036 times.
1036 update_timeout();
515 }
516
517 487595 DWORD bytes = 0;
518 487595 ULONG_PTR key = 0;
519 487595 LPOVERLAPPED overlapped = nullptr;
520
1/1
✓ Branch 13 → 14 taken 487595 times.
487595 ::SetLastError(0);
521
522
1/1
✓ Branch 17 → 18 taken 487595 times.
487595 BOOL result = ::GetQueuedCompletionStatus(
523 iocp_, &bytes, &key, &overlapped,
524
2/2
✓ Branch 14 → 15 taken 43 times.
✓ Branch 14 → 16 taken 487552 times.
487595 timeout_ms < iocp::max_gqcs_timeout ? timeout_ms
525 : iocp::max_gqcs_timeout);
526
1/1
✓ Branch 18 → 19 taken 487595 times.
487595 DWORD dwError = ::GetLastError();
527
528 // Handle based on completion key
529
2/2
✓ Branch 19 → 20 taken 484034 times.
✓ Branch 19 → 39 taken 3561 times.
487595 if (overlapped)
530 {
531
2/2
✓ Branch 20 → 21 taken 1859 times.
✓ Branch 20 → 22 taken 482175 times.
484034 DWORD err = result ? 0 : dwError;
532
533
2/3
✓ Branch 23 → 24 taken 472978 times.
✓ Branch 23 → 35 taken 11056 times.
✗ Branch 23 → 38 not taken.
484034 switch (key)
534 {
535 472978 case key_io:
536 case key_result_stored:
537 {
538 472978 auto* ov_op = overlapped_to_op(overlapped);
539
540 // If key_result_stored, results are pre-stored in op fields
541
2/2
✓ Branch 25 → 26 taken 6 times.
✓ Branch 25 → 27 taken 472972 times.
472978 if (key == key_result_stored)
542 {
543 6 bytes = ov_op->bytes_transferred;
544 6 err = ov_op->dwError;
545 }
546
547 // Store GQCS results so on_pending() re-post has valid data
548 472978 ov_op->store_result(bytes, err);
549
550 // CAS: try to set ready_ from 0 to 1.
551 // If old value was 1, the initiator already returned
552 // (on_pending/on_completion set it) — safe to dispatch.
553 // If old value was 0, the initiator hasn't returned yet —
554 // skip dispatch; on_pending() will re-post.
555 472978 if (::InterlockedCompareExchange(
556
1/2
✓ Branch 30 → 31 taken 472978 times.
✗ Branch 30 → 34 not taken.
945956 &ov_op->ready_, 1, 0) == 1)
557 {
558
1/1
✓ Branch 31 → 32 taken 472978 times.
472978 ov_op->complete(this, bytes, err);
559 472978 work_finished();
560 484041 return 1;
561 }
562 3545 continue;
563 }
564
565 11056 case key_posted:
566 {
567 // Posted scheduler_op*: overlapped is actually a scheduler_op*
568 11056 auto* op = reinterpret_cast<scheduler_op*>(overlapped);
569
1/1
✓ Branch 35 → 36 taken 11056 times.
11056 op->complete(this, bytes, err);
570 11056 work_finished();
571 11056 return 1;
572 }
573
574 default:
575 continue;
576 }
577 }
578
579 // Signal completions (no OVERLAPPED)
580
2/2
✓ Branch 39 → 40 taken 3552 times.
✓ Branch 39 → 53 taken 9 times.
3561 if (result)
581 {
582
2/3
✓ Branch 40 → 41 taken 1036 times.
✓ Branch 40 → 42 taken 2516 times.
✗ Branch 40 → 52 not taken.
3552 switch (key)
583 {
584 1036 case key_wake_dispatch:
585 // Timer wakeup - loop to check dispatch_required_
586 1036 continue;
587
588 2516 case key_shutdown:
589 2516 ::InterlockedExchange(&stop_event_posted_, 0);
590
2/2
✓ Branch 45 → 46 taken 7 times.
✓ Branch 45 → 51 taken 2509 times.
2516 if (stopped())
591 {
592 // Re-post for other waiting threads
593
1/2
✓ Branch 48 → 49 taken 7 times.
✗ Branch 48 → 50 not taken.
14 if (::InterlockedExchange(&stop_event_posted_, 1) == 0)
594 {
595
1/1
✓ Branch 49 → 50 taken 7 times.
7 ::PostQueuedCompletionStatus(
596 iocp_, 0, key_shutdown, nullptr);
597 }
598 7 return 0;
599 }
600 2509 continue;
601
602 default:
603 continue;
604 }
605 }
606
607 // Timeout or error
608
1/2
✗ Branch 53 → 54 not taken.
✓ Branch 53 → 56 taken 9 times.
9 if (dwError != WAIT_TIMEOUT)
609 detail::throw_system_error(make_err(dwError));
610
1/2
✗ Branch 56 → 57 not taken.
✓ Branch 56 → 58 taken 9 times.
9 if (timeout_ms != INFINITE)
611 return 0;
612 3554 }
613 }
614
615 inline void
616 1764 win_scheduler::on_timer_changed(void* ctx)
617 {
618 1764 static_cast<win_scheduler*>(ctx)->update_timeout();
619 1764 }
620
621 inline void
622 290 win_scheduler::set_timer_service(timer_service* svc)
623 {
624 290 timer_svc_ = svc;
625 // Pass 'this' as context - callback routes to correct instance
626 290 svc->set_on_earliest_changed(
627 290 timer_service::callback{this, &on_timer_changed});
628
1/2
✓ Branch 5 → 6 taken 290 times.
✗ Branch 5 → 8 not taken.
290 if (timers_)
629 290 timers_->start();
630 290 }
631
632 inline void
633 2800 win_scheduler::update_timeout()
634 {
635
3/6
✓ Branch 2 → 3 taken 2800 times.
✗ Branch 2 → 6 not taken.
✓ Branch 4 → 5 taken 2800 times.
✗ Branch 4 → 6 not taken.
✓ Branch 7 → 8 taken 2800 times.
✗ Branch 7 → 11 not taken.
2800 if (timer_svc_ && timers_)
636 2800 timers_->update_timeout(timer_svc_->nearest_expiry());
637 2800 }
638
639 } // namespace boost::corosio::detail
640
641 #endif // BOOST_COROSIO_HAS_IOCP
642
643 #endif // BOOST_COROSIO_NATIVE_DETAIL_IOCP_WIN_SCHEDULER_HPP
644