include/boost/corosio/native/detail/iocp/win_wait_reactor.hpp

81.9% Lines (127/155) 100.0% List of functions (13/13) 69.7% Branches (76/109)
win_wait_reactor.hpp
f(x) Functions (13)
Function Calls Lines Branches Blocks
boost::corosio::detail::win_wait_reactor::events_for_wait(boost::corosio::wait_type) :104 16x 80.0% 66.7% 80.0% boost::corosio::detail::win_wait_reactor::ready_for_wait(boost::corosio::wait_type, short) :114 5x 55.6% 33.3% 60.0% boost::corosio::detail::win_wait_reactor::win_wait_reactor(boost::corosio::detail::win_scheduler&) :144 14x 100.0% 100.0% 54.2% boost::corosio::detail::win_wait_reactor::win_wait_reactor(boost::corosio::detail::win_scheduler&)::{lambda()#1}::operator()() const :148 14x 100.0% 100.0% 100.0% boost::corosio::detail::win_wait_reactor::~win_wait_reactor() :151 14x 100.0% 100.0% boost::corosio::detail::win_wait_reactor::make_wakeup_pair() :158 14x 63.6% 58.1% 64.7% boost::corosio::detail::win_wait_reactor::close_wakeup_pair() :218 14x 100.0% 50.0% 100.0% boost::corosio::detail::win_wait_reactor::wake_self() :233 106x 100.0% 75.0% 100.0% boost::corosio::detail::win_wait_reactor::register_wait(unsigned long long, boost::corosio::wait_type, boost::corosio::detail::overlapped_op*) :248 14x 100.0% 100.0% 76.9% boost::corosio::detail::win_wait_reactor::cancel_wait(boost::corosio::detail::overlapped_op*) :269 79x 100.0% 100.0% 75.0% boost::corosio::detail::win_wait_reactor::stop() :279 42x 100.0% 75.0% 100.0% boost::corosio::detail::win_wait_reactor::run() :289 14x 82.5% 75.0% 71.9% boost::corosio::detail::win_wait_reactor::run()::{lambda(boost::corosio::detail::win_wait_reactor::entry const&)#1}::operator()(boost::corosio::detail::win_wait_reactor::entry const&) const :311 4x 100.0% 100.0%
Line Branch TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Michael Vandeberg
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_IOCP_WIN_WAIT_REACTOR_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_IOCP_WIN_WAIT_REACTOR_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_IOCP
16
17 // This header is included from the bottom of win_scheduler.hpp after
18 // the scheduler class is fully defined. Including it directly would
19 // circle back into a still-incomplete win_scheduler when the dtor's
20 // unique_ptr<win_wait_reactor>::reset() is parsed. Diagnose that
21 // rather than emitting a confusing "incomplete type" error far away.
22 #ifndef BOOST_COROSIO_DETAIL_IOCP_WIN_SCHEDULER_BODY_DONE
23 #error "Include <boost/corosio/native/detail/iocp/win_scheduler.hpp> \
24 instead of including this header directly."
25 #endif
26
27 #include <boost/corosio/wait_type.hpp>
28 #include <boost/corosio/native/detail/iocp/win_overlapped_op.hpp>
29 #include <boost/corosio/native/detail/iocp/win_scheduler.hpp>
30 #include <boost/corosio/native/detail/iocp/win_windows.hpp>
31
32 #include <Ws2tcpip.h>
33
34 #include <algorithm>
35 #include <atomic>
36 #include <cstddef>
37 #include <mutex>
38 #include <thread>
39 #include <vector>
40
41 namespace boost::corosio::detail {
42
43 /** Auxiliary select-based reactor for IOCP wait operations.
44
45 IOCP has no native primitive for socket readiness without I/O.
46 For cases where a zero-byte WSARecv won't work (datagram-read,
47 acceptor-read, error-wait), this reactor runs a dedicated thread
48 using WSAPoll to detect readiness and posts a synthetic completion
49 to the owning IOCP scheduler via win_scheduler::on_completion().
50
51 The same dispatch path used by overlapped I/O then delivers the
52 completion to the user's coroutine, so the public API is uniform
53 across backends.
54
55 Per-op lifecycle:
56 1. Caller sets up an overlapped_op (h, ex, ec_out, cancelled flag).
57 2. Caller calls register_wait(fd, w, op) and returns
58 std::noop_coroutine. The op is parked in the reactor's table.
59 3. Reactor thread polls. When the fd is ready, the op is removed
60 from the table and posted to the scheduler. The error code
61 delivered to the completion is: ec={} on success; the SO_ERROR
62 value if error revents fired and SO_ERROR is set; or
63 WSAECONNABORTED as a synthesized fallback for wait_type::error
64 when error revents fired but SO_ERROR returned zero.
65 4. On socket cancel(), the user's thread calls cancel_wait(op),
66 which queues a cancel request. The reactor thread removes the
67 op from the table and posts a completion; invoke_handler sees
68 op.cancelled==true and yields capy::cond::canceled.
69
70 Thread-safe: register_wait, cancel_wait, and stop may be called
71 from any thread.
72 */
73 class win_wait_reactor
74 {
75 public:
76 explicit win_wait_reactor(win_scheduler& sched);
77 ~win_wait_reactor();
78
79 win_wait_reactor(win_wait_reactor const&) = delete;
80 win_wait_reactor& operator=(win_wait_reactor const&) = delete;
81
82 /// Park an overlapped_op until @p fd is ready for @p w.
83 void register_wait(SOCKET fd, wait_type w, overlapped_op* op);
84
85 /// Remove a parked op and post a completion. Idempotent.
86 void cancel_wait(overlapped_op* op);
87
88 /// Stop the reactor thread and drain remaining ops as cancelled.
89 void stop();
90
91 private:
92 struct entry
93 {
94 SOCKET fd = INVALID_SOCKET;
95 wait_type w = wait_type::read;
96 overlapped_op* op = nullptr;
97 };
98
99 void run();
100 void wake_self() noexcept;
101 void make_wakeup_pair();
102 void close_wakeup_pair() noexcept;
103
104 16x static SHORT events_for_wait(wait_type w) noexcept
105 {
106
2/3
✓ Branch 2 → 3 taken 10 times.
✗ Branch 2 → 4 not taken.
✓ Branch 2 → 5 taken 6 times.
16x switch (w)
107 {
108 10x case wait_type::read: return POLLRDNORM;
109 case wait_type::write: return POLLWRNORM;
110 6x default: return POLLPRI;
111 }
112 }
113
114 5x static bool ready_for_wait(wait_type w, SHORT revents) noexcept
115 {
116 5x constexpr SHORT err_bits = POLLERR | POLLHUP | POLLNVAL;
117
1/3
✓ Branch 2 → 3 taken 5 times.
✗ Branch 2 → 4 not taken.
✗ Branch 2 → 5 not taken.
5x switch (w)
118 {
119 5x case wait_type::read:
120 5x return (revents & (POLLRDNORM | POLLRDBAND | err_bits)) != 0;
121 case wait_type::write:
122 return (revents & (POLLWRNORM | POLLWRBAND | err_bits)) != 0;
123 default:
124 return (revents & (POLLPRI | err_bits)) != 0;
125 }
126 }
127
128 win_scheduler& sched_;
129
130 SOCKET wakeup_read_ = INVALID_SOCKET;
131 SOCKET wakeup_write_ = INVALID_SOCKET;
132
133 std::mutex mutex_;
134 std::vector<entry> pending_register_;
135 std::vector<overlapped_op*> pending_cancel_;
136 std::atomic<bool> stop_{false};
137 std::atomic<bool> wake_pending_{false};
138
139 std::vector<entry> registered_; // reactor-thread-only
140
141 std::thread thread_;
142 };
143
144 14x inline win_wait_reactor::win_wait_reactor(win_scheduler& sched)
145 14x : sched_(sched)
146 {
147
1/1
✓ Branch 9 → 10 taken 14 times.
14x make_wakeup_pair();
148
1/1
✓ Branch 10 → 11 taken 14 times.
28x thread_ = std::thread([this] { run(); });
149 14x }
150
151 14x inline win_wait_reactor::~win_wait_reactor()
152 {
153 14x stop();
154 14x close_wakeup_pair();
155 14x }
156
157 inline void
158 14x win_wait_reactor::make_wakeup_pair()
159 {
160 // Build a pair of connected loopback sockets to use as a wakeup
161 // channel. Winsock has no socketpair(2), so we listen on
162 // 127.0.0.1:0, connect a peer, then accept it.
163
1/1
✓ Branch 2 → 3 taken 14 times.
14x SOCKET listener = ::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
164
1/2
✗ Branch 3 → 4 not taken.
✓ Branch 3 → 5 taken 14 times.
14x if (listener == INVALID_SOCKET)
165 return;
166
167 14x sockaddr_in addr{};
168 14x addr.sin_family = AF_INET;
169
1/1
✓ Branch 5 → 6 taken 14 times.
14x addr.sin_addr.s_addr = ::htonl(INADDR_LOOPBACK);
170 14x addr.sin_port = 0;
171
172 14x int len = sizeof(addr);
173
1/1
✓ Branch 6 → 7 taken 14 times.
14x if (::bind(listener, reinterpret_cast<sockaddr*>(&addr), len) ==
174 14x SOCKET_ERROR ||
175
4/7
✓ Branch 7 → 8 taken 14 times.
✗ Branch 7 → 12 not taken.
✓ Branch 8 → 9 taken 14 times.
✓ Branch 9 → 10 taken 14 times.
✗ Branch 9 → 12 not taken.
✗ Branch 14 → 15 not taken.
✓ Branch 14 → 17 taken 14 times.
28x ::listen(listener, 1) == SOCKET_ERROR ||
176
2/3
✓ Branch 10 → 11 taken 14 times.
✗ Branch 11 → 12 not taken.
✓ Branch 11 → 13 taken 14 times.
14x ::getsockname(listener, reinterpret_cast<sockaddr*>(&addr), &len) ==
177 SOCKET_ERROR)
178 {
179 ::closesocket(listener);
180 return;
181 }
182
183
1/1
✓ Branch 17 → 18 taken 14 times.
14x wakeup_write_ = ::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
184
1/2
✗ Branch 18 → 19 not taken.
✓ Branch 18 → 21 taken 14 times.
14x if (wakeup_write_ == INVALID_SOCKET)
185 {
186 ::closesocket(listener);
187 return;
188 }
189
190
1/1
✓ Branch 21 → 22 taken 14 times.
14x if (::connect(
191
1/2
✗ Branch 22 → 23 not taken.
✓ Branch 22 → 26 taken 14 times.
14x wakeup_write_, reinterpret_cast<sockaddr*>(&addr), len) ==
192 SOCKET_ERROR)
193 {
194 ::closesocket(wakeup_write_);
195 wakeup_write_ = INVALID_SOCKET;
196 ::closesocket(listener);
197 return;
198 }
199
200
1/1
✓ Branch 26 → 27 taken 14 times.
14x wakeup_read_ = ::accept(listener, nullptr, nullptr);
201
1/1
✓ Branch 27 → 28 taken 14 times.
14x ::closesocket(listener);
202
203
1/2
✗ Branch 28 → 29 not taken.
✓ Branch 28 → 31 taken 14 times.
14x if (wakeup_read_ == INVALID_SOCKET)
204 {
205 ::closesocket(wakeup_write_);
206 wakeup_write_ = INVALID_SOCKET;
207 return;
208 }
209
210 // The drain loop in run() calls recv() until it returns <= 0.
211 // With a blocking socket that second recv() would block instead
212 // of returning WSAEWOULDBLOCK, deadlocking the reactor thread.
213 14x u_long non_blocking = 1;
214
1/1
✓ Branch 31 → 32 taken 14 times.
14x ::ioctlsocket(wakeup_read_, FIONBIO, &non_blocking);
215 }
216
217 inline void
218 14x win_wait_reactor::close_wakeup_pair() noexcept
219 {
220
1/2
✓ Branch 2 → 3 taken 14 times.
✗ Branch 2 → 5 not taken.
14x if (wakeup_read_ != INVALID_SOCKET)
221 {
222 14x ::closesocket(wakeup_read_);
223 14x wakeup_read_ = INVALID_SOCKET;
224 }
225
1/2
✓ Branch 5 → 6 taken 14 times.
✗ Branch 5 → 8 not taken.
14x if (wakeup_write_ != INVALID_SOCKET)
226 {
227 14x ::closesocket(wakeup_write_);
228 14x wakeup_write_ = INVALID_SOCKET;
229 }
230 14x }
231
232 inline void
233 106x win_wait_reactor::wake_self() noexcept
234 {
235 // Coalesce wakes: only send a byte if no wake is already pending.
236 106x bool expected = false;
237
2/2
✓ Branch 3 → 4 taken 57 times.
✓ Branch 3 → 5 taken 49 times.
106x if (!wake_pending_.compare_exchange_strong(
238 expected, true, std::memory_order_acq_rel))
239 57x return;
240
1/2
✓ Branch 5 → 6 taken 49 times.
✗ Branch 5 → 8 not taken.
49x if (wakeup_write_ != INVALID_SOCKET)
241 {
242 49x char b = 0;
243 49x ::send(wakeup_write_, &b, 1, 0);
244 }
245 }
246
247 inline void
248 14x win_wait_reactor::register_wait(
249 SOCKET fd, wait_type w, overlapped_op* op)
250 {
251 // If the op was already cancelled (e.g. pre-cancelled stop_token
252 // fired synchronously before this call), complete immediately
253 // instead of registering. Otherwise the reactor would park the
254 // op forever because the earlier cancel_wait() found nothing to
255 // cancel in registered_.
256
2/2
✓ Branch 3 → 4 taken 1 time.
✓ Branch 3 → 6 taken 13 times.
14x if (op->cancelled.load(std::memory_order_acquire))
257 {
258 1x sched_.on_completion(op, 0, 0);
259 1x return;
260 }
261 {
262
1/1
✓ Branch 6 → 7 taken 13 times.
13x std::lock_guard lock(mutex_);
263
1/1
✓ Branch 7 → 8 taken 13 times.
13x pending_register_.push_back(entry{fd, w, op});
264 13x }
265 13x wake_self();
266 }
267
268 inline void
269 79x win_wait_reactor::cancel_wait(overlapped_op* op)
270 {
271 {
272
1/1
✓ Branch 2 → 3 taken 79 times.
79x std::lock_guard lock(mutex_);
273
1/1
✓ Branch 3 → 4 taken 79 times.
79x pending_cancel_.push_back(op);
274 79x }
275 79x wake_self();
276 79x }
277
278 inline void
279 42x win_wait_reactor::stop()
280 {
281
2/2
✓ Branch 3 → 4 taken 28 times.
✓ Branch 3 → 5 taken 14 times.
42x if (stop_.exchange(true, std::memory_order_acq_rel))
282 28x return;
283 14x wake_self();
284
1/2
✓ Branch 7 → 8 taken 14 times.
✗ Branch 7 → 9 not taken.
14x if (thread_.joinable())
285 14x thread_.join();
286 }
287
288 inline void
289 14x win_wait_reactor::run()
290 {
291 14x std::vector<WSAPOLLFD> pollfds;
292
293
2/2
✓ Branch 93 → 3 taken 50 times.
✓ Branch 93 → 94 taken 8 times.
58x while (!stop_.load(std::memory_order_acquire))
294 {
295 // Drain pending register/cancel under the lock.
296 50x std::vector<entry> to_add;
297 50x std::vector<overlapped_op*> to_cancel;
298 {
299
1/1
✓ Branch 3 → 4 taken 50 times.
50x std::lock_guard lock(mutex_);
300 50x to_add.swap(pending_register_);
301 50x to_cancel.swap(pending_cancel_);
302 50x }
303
304
2/2
✓ Branch 13 → 9 taken 13 times.
✓ Branch 13 → 14 taken 50 times.
63x for (auto& e : to_add)
305
1/1
✓ Branch 10 → 11 taken 13 times.
13x registered_.push_back(e);
306
307
2/2
✓ Branch 29 → 16 taken 47 times.
✓ Branch 29 → 30 taken 50 times.
97x for (auto* op : to_cancel)
308 {
309
1/1
✓ Branch 19 → 20 taken 47 times.
47x auto it = std::find_if(
310 registered_.begin(), registered_.end(),
311 4x [op](entry const& e) { return e.op == op; });
312
2/2
✓ Branch 22 → 23 taken 2 times.
✓ Branch 22 → 27 taken 45 times.
47x if (it != registered_.end())
313 {
314 // The op's cancelled flag has already been set by
315 // request_cancel; invoke_handler will translate it.
316
1/1
✓ Branch 23 → 24 taken 2 times.
2x sched_.on_completion(op, 0, 0);
317
1/1
✓ Branch 25 → 26 taken 2 times.
2x registered_.erase(it);
318 }
319 // If not in registered_, the op already fired — no-op.
320 }
321
322 // Build the poll set. Slot 0 is the wakeup socket.
323 50x pollfds.clear();
324
1/1
✓ Branch 32 → 33 taken 50 times.
50x pollfds.reserve(registered_.size() + 1);
325
1/1
✓ Branch 33 → 34 taken 50 times.
50x pollfds.push_back({wakeup_read_, POLLRDNORM, 0});
326
2/2
✓ Branch 41 → 36 taken 16 times.
✓ Branch 41 → 42 taken 50 times.
66x for (auto& e : registered_)
327
1/1
✓ Branch 38 → 39 taken 16 times.
16x pollfds.push_back({e.fd, events_for_wait(e.w), 0});
328
329 // Bounded timeout rather than infinite: this is a safety net
330 // against a lost self-pipe wakeup (e.g. a failed/coalesced
331 // send in wake_self leaving wake_pending_ stuck true). On
332 // timeout the loop re-drains pending_register_/pending_cancel_
333 // and re-checks stop_, so a missed wakeup costs at most one
334 // poll interval of latency instead of a permanent hang. This
335 // mirrors the 500 ms GQCS safety timeout in win_scheduler.
336
1/1
✓ Branch 44 → 45 taken 50 times.
50x int n = ::WSAPoll(
337 pollfds.data(),
338 50x static_cast<ULONG>(pollfds.size()),
339 500 /* ms */);
340
2/2
✓ Branch 45 → 46 taken 6 times.
✓ Branch 45 → 47 taken 44 times.
50x if (n == SOCKET_ERROR)
341 6x break;
342
343 // Drain the wakeup socket so it stops reporting readable.
344
2/2
✓ Branch 48 → 49 taken 43 times.
✓ Branch 48 → 55 taken 1 time.
44x if (pollfds[0].revents != 0)
345 {
346 char buf[64];
347 for (;;)
348 {
349
1/1
✓ Branch 49 → 50 taken 86 times.
86x int r = ::recv(wakeup_read_, buf, sizeof(buf), 0);
350
2/2
✓ Branch 50 → 51 taken 43 times.
✓ Branch 50 → 52 taken 43 times.
86x if (r <= 0)
351 43x break;
352 43x }
353 43x wake_pending_.store(false, std::memory_order_release);
354 }
355
356 // Walk events in reverse so erases don't invalidate later indices.
357
2/2
✓ Branch 81 → 56 taken 10 times.
✓ Branch 81 → 82 taken 44 times.
54x for (std::size_t i = pollfds.size(); i > 1; --i)
358 {
359 10x auto const& pfd = pollfds[i - 1];
360
2/2
✓ Branch 57 → 58 taken 5 times.
✓ Branch 57 → 59 taken 5 times.
10x if (pfd.revents == 0)
361 5x continue;
362
363 5x auto const& e = registered_[i - 2];
364
1/2
✗ Branch 61 → 62 not taken.
✓ Branch 61 → 63 taken 5 times.
5x if (!ready_for_wait(e.w, pfd.revents))
365 continue;
366
367 5x DWORD err = 0;
368 5x constexpr SHORT err_bits = POLLERR | POLLHUP | POLLNVAL;
369
1/2
✗ Branch 63 → 64 not taken.
✓ Branch 63 → 74 taken 5 times.
5x if (pfd.revents & err_bits)
370 {
371 int so_err = 0;
372 int sz = sizeof(so_err);
373 if (::getsockopt(
374 e.fd, SOL_SOCKET, SO_ERROR,
375 reinterpret_cast<char*>(&so_err), &sz) == 0 &&
376 so_err != 0)
377 {
378 err = static_cast<DWORD>(so_err);
379 }
380 else if (e.w == wait_type::error)
381 {
382 // wait_type::error fires on the error condition;
383 // the contract is to report a non-zero error_code.
384 err = WSAECONNABORTED;
385 }
386 }
387
388
1/1
✓ Branch 74 → 75 taken 5 times.
5x sched_.on_completion(e.op, err, 0);
389
1/1
✓ Branch 78 → 79 taken 5 times.
5x registered_.erase(registered_.begin() + (i - 2));
390 }
391 56x }
392
393 // Drain remaining ops as cancelled on shutdown. This must cover
394 // both the active set and anything still queued by user threads
395 // that hasn't been moved into registered_ yet, otherwise those
396 // ops leak work_started credit and stall scheduler shutdown.
397 {
398
1/1
✓ Branch 94 → 95 taken 14 times.
14x std::lock_guard lock(mutex_);
399
1/2
✗ Branch 101 → 97 not taken.
✓ Branch 101 → 102 taken 14 times.
14x for (auto& e : pending_register_)
400 registered_.push_back(e);
401 14x pending_register_.clear();
402 14x pending_cancel_.clear();
403 14x }
404
2/2
✓ Branch 111 → 107 taken 6 times.
✓ Branch 111 → 112 taken 14 times.
20x for (auto& e : registered_)
405
1/1
✓ Branch 108 → 109 taken 6 times.
6x sched_.on_completion(e.op, ERROR_OPERATION_ABORTED, 0);
406 14x registered_.clear();
407 14x }
408
409 } // namespace boost::corosio::detail
410
411 #endif // BOOST_COROSIO_HAS_IOCP
412
413 #endif // BOOST_COROSIO_NATIVE_DETAIL_IOCP_WIN_WAIT_REACTOR_HPP
414