include/boost/corosio/native/detail/iocp/win_wait_reactor.hpp

78.7% Lines (122/155) 100.0% List of functions (13/13) 65.1% Branches (71/109)
win_wait_reactor.hpp
f(x) Functions (13)
Function Calls Lines Branches Blocks
boost::corosio::detail::win_wait_reactor::events_for_wait(boost::corosio::wait_type) :104 10x 60.0% 33.3% 60.0% boost::corosio::detail::win_wait_reactor::ready_for_wait(boost::corosio::wait_type, short) :114 5x 55.6% 33.3% 60.0% boost::corosio::detail::win_wait_reactor::win_wait_reactor(boost::corosio::detail::win_scheduler&) :144 6x 100.0% 100.0% 54.2% boost::corosio::detail::win_wait_reactor::win_wait_reactor(boost::corosio::detail::win_scheduler&)::{lambda()#1}::operator()() const :148 6x 100.0% 100.0% 100.0% boost::corosio::detail::win_wait_reactor::~win_wait_reactor() :151 6x 100.0% 100.0% boost::corosio::detail::win_wait_reactor::make_wakeup_pair() :158 6x 63.6% 58.1% 64.7% boost::corosio::detail::win_wait_reactor::close_wakeup_pair() :218 6x 100.0% 50.0% 100.0% boost::corosio::detail::win_wait_reactor::wake_self() :233 53x 100.0% 75.0% 100.0% boost::corosio::detail::win_wait_reactor::register_wait(unsigned long long, boost::corosio::wait_type, boost::corosio::detail::overlapped_op*) :248 6x 75.0% 75.0% 61.5% boost::corosio::detail::win_wait_reactor::cancel_wait(boost::corosio::detail::overlapped_op*) :269 41x 100.0% 100.0% 75.0% boost::corosio::detail::win_wait_reactor::stop() :279 18x 100.0% 75.0% 100.0% boost::corosio::detail::win_wait_reactor::run() :289 6x 79.4% 69.2% 67.6% boost::corosio::detail::win_wait_reactor::run()::{lambda(boost::corosio::detail::win_wait_reactor::entry const&)#1}::operator()(boost::corosio::detail::win_wait_reactor::entry const&) const :311 3x 100.0% 100.0%
Line Branch TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Michael Vandeberg
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_IOCP_WIN_WAIT_REACTOR_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_IOCP_WIN_WAIT_REACTOR_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_IOCP
16
17 // This header is included from the bottom of win_scheduler.hpp after
18 // the scheduler class is fully defined. Including it directly would
19 // circle back into a still-incomplete win_scheduler when the dtor's
20 // unique_ptr<win_wait_reactor>::reset() is parsed. Diagnose that
21 // rather than emitting a confusing "incomplete type" error far away.
22 #ifndef BOOST_COROSIO_DETAIL_IOCP_WIN_SCHEDULER_BODY_DONE
23 #error "Include <boost/corosio/native/detail/iocp/win_scheduler.hpp> \
24 instead of including this header directly."
25 #endif
26
27 #include <boost/corosio/wait_type.hpp>
28 #include <boost/corosio/native/detail/iocp/win_overlapped_op.hpp>
29 #include <boost/corosio/native/detail/iocp/win_scheduler.hpp>
30 #include <boost/corosio/native/detail/iocp/win_windows.hpp>
31
32 #include <Ws2tcpip.h>
33
34 #include <algorithm>
35 #include <atomic>
36 #include <cstddef>
37 #include <mutex>
38 #include <thread>
39 #include <vector>
40
41 namespace boost::corosio::detail {
42
43 /** Auxiliary select-based reactor for IOCP wait operations.
44
45 IOCP has no native primitive for socket readiness without I/O.
46 For cases where a zero-byte WSARecv won't work (datagram-read,
47 acceptor-read, error-wait), this reactor runs a dedicated thread
48 using WSAPoll to detect readiness and posts a synthetic completion
49 to the owning IOCP scheduler via win_scheduler::on_completion().
50
51 The same dispatch path used by overlapped I/O then delivers the
52 completion to the user's coroutine, so the public API is uniform
53 across backends.
54
55 Per-op lifecycle:
56 1. Caller sets up an overlapped_op (h, ex, ec_out, cancelled flag).
57 2. Caller calls register_wait(fd, w, op) and returns
58 std::noop_coroutine. The op is parked in the reactor's table.
59 3. Reactor thread polls. When the fd is ready, the op is removed
60 from the table and posted to the scheduler. The error code
61 delivered to the completion is: ec={} on success; the SO_ERROR
62 value if error revents fired and SO_ERROR is set; or
63 WSAECONNABORTED as a synthesized fallback for wait_type::error
64 when error revents fired but SO_ERROR returned zero.
65 4. On socket cancel(), the user's thread calls cancel_wait(op),
66 which queues a cancel request. The reactor thread removes the
67 op from the table and posts a completion; invoke_handler sees
68 op.cancelled==true and yields capy::cond::canceled.
69
70 Thread-safe: register_wait, cancel_wait, and stop may be called
71 from any thread.
72 */
73 class win_wait_reactor
74 {
75 public:
76 explicit win_wait_reactor(win_scheduler& sched);
77 ~win_wait_reactor();
78
79 win_wait_reactor(win_wait_reactor const&) = delete;
80 win_wait_reactor& operator=(win_wait_reactor const&) = delete;
81
82 /// Park an overlapped_op until @p fd is ready for @p w.
83 void register_wait(SOCKET fd, wait_type w, overlapped_op* op);
84
85 /// Remove a parked op and post a completion. Idempotent.
86 void cancel_wait(overlapped_op* op);
87
88 /// Stop the reactor thread and drain remaining ops as cancelled.
89 void stop();
90
91 private:
92 struct entry
93 {
94 SOCKET fd = INVALID_SOCKET;
95 wait_type w = wait_type::read;
96 overlapped_op* op = nullptr;
97 };
98
99 void run();
100 void wake_self() noexcept;
101 void make_wakeup_pair();
102 void close_wakeup_pair() noexcept;
103
104 10x static SHORT events_for_wait(wait_type w) noexcept
105 {
106
1/3
✓ Branch 2 → 3 taken 10 times.
✗ Branch 2 → 4 not taken.
✗ Branch 2 → 5 not taken.
10x switch (w)
107 {
108 10x case wait_type::read: return POLLRDNORM;
109 case wait_type::write: return POLLWRNORM;
110 default: return POLLPRI;
111 }
112 }
113
114 5x static bool ready_for_wait(wait_type w, SHORT revents) noexcept
115 {
116 5x constexpr SHORT err_bits = POLLERR | POLLHUP | POLLNVAL;
117
1/3
✓ Branch 2 → 3 taken 5 times.
✗ Branch 2 → 4 not taken.
✗ Branch 2 → 5 not taken.
5x switch (w)
118 {
119 5x case wait_type::read:
120 5x return (revents & (POLLRDNORM | POLLRDBAND | err_bits)) != 0;
121 case wait_type::write:
122 return (revents & (POLLWRNORM | POLLWRBAND | err_bits)) != 0;
123 default:
124 return (revents & (POLLPRI | err_bits)) != 0;
125 }
126 }
127
128 win_scheduler& sched_;
129
130 SOCKET wakeup_read_ = INVALID_SOCKET;
131 SOCKET wakeup_write_ = INVALID_SOCKET;
132
133 std::mutex mutex_;
134 std::vector<entry> pending_register_;
135 std::vector<overlapped_op*> pending_cancel_;
136 std::atomic<bool> stop_{false};
137 std::atomic<bool> wake_pending_{false};
138
139 std::vector<entry> registered_; // reactor-thread-only
140
141 std::thread thread_;
142 };
143
144 6x inline win_wait_reactor::win_wait_reactor(win_scheduler& sched)
145 6x : sched_(sched)
146 {
147
1/1
✓ Branch 9 → 10 taken 6 times.
6x make_wakeup_pair();
148
1/1
✓ Branch 10 → 11 taken 6 times.
12x thread_ = std::thread([this] { run(); });
149 6x }
150
151 6x inline win_wait_reactor::~win_wait_reactor()
152 {
153 6x stop();
154 6x close_wakeup_pair();
155 6x }
156
157 inline void
158 6x win_wait_reactor::make_wakeup_pair()
159 {
160 // Build a pair of connected loopback sockets to use as a wakeup
161 // channel. Winsock has no socketpair(2), so we listen on
162 // 127.0.0.1:0, connect a peer, then accept it.
163
1/1
✓ Branch 2 → 3 taken 6 times.
6x SOCKET listener = ::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
164
1/2
✗ Branch 3 → 4 not taken.
✓ Branch 3 → 5 taken 6 times.
6x if (listener == INVALID_SOCKET)
165 return;
166
167 6x sockaddr_in addr{};
168 6x addr.sin_family = AF_INET;
169
1/1
✓ Branch 5 → 6 taken 6 times.
6x addr.sin_addr.s_addr = ::htonl(INADDR_LOOPBACK);
170 6x addr.sin_port = 0;
171
172 6x int len = sizeof(addr);
173
1/1
✓ Branch 6 → 7 taken 6 times.
6x if (::bind(listener, reinterpret_cast<sockaddr*>(&addr), len) ==
174 6x SOCKET_ERROR ||
175
4/7
✓ Branch 7 → 8 taken 6 times.
✗ Branch 7 → 12 not taken.
✓ Branch 8 → 9 taken 6 times.
✓ Branch 9 → 10 taken 6 times.
✗ Branch 9 → 12 not taken.
✗ Branch 14 → 15 not taken.
✓ Branch 14 → 17 taken 6 times.
12x ::listen(listener, 1) == SOCKET_ERROR ||
176
2/3
✓ Branch 10 → 11 taken 6 times.
✗ Branch 11 → 12 not taken.
✓ Branch 11 → 13 taken 6 times.
6x ::getsockname(listener, reinterpret_cast<sockaddr*>(&addr), &len) ==
177 SOCKET_ERROR)
178 {
179 ::closesocket(listener);
180 return;
181 }
182
183
1/1
✓ Branch 17 → 18 taken 6 times.
6x wakeup_write_ = ::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
184
1/2
✗ Branch 18 → 19 not taken.
✓ Branch 18 → 21 taken 6 times.
6x if (wakeup_write_ == INVALID_SOCKET)
185 {
186 ::closesocket(listener);
187 return;
188 }
189
190
1/1
✓ Branch 21 → 22 taken 6 times.
6x if (::connect(
191
1/2
✗ Branch 22 → 23 not taken.
✓ Branch 22 → 26 taken 6 times.
6x wakeup_write_, reinterpret_cast<sockaddr*>(&addr), len) ==
192 SOCKET_ERROR)
193 {
194 ::closesocket(wakeup_write_);
195 wakeup_write_ = INVALID_SOCKET;
196 ::closesocket(listener);
197 return;
198 }
199
200
1/1
✓ Branch 26 → 27 taken 6 times.
6x wakeup_read_ = ::accept(listener, nullptr, nullptr);
201
1/1
✓ Branch 27 → 28 taken 6 times.
6x ::closesocket(listener);
202
203
1/2
✗ Branch 28 → 29 not taken.
✓ Branch 28 → 31 taken 6 times.
6x if (wakeup_read_ == INVALID_SOCKET)
204 {
205 ::closesocket(wakeup_write_);
206 wakeup_write_ = INVALID_SOCKET;
207 return;
208 }
209
210 // The drain loop in run() calls recv() until it returns <= 0.
211 // With a blocking socket that second recv() would block instead
212 // of returning WSAEWOULDBLOCK, deadlocking the reactor thread.
213 6x u_long non_blocking = 1;
214
1/1
✓ Branch 31 → 32 taken 6 times.
6x ::ioctlsocket(wakeup_read_, FIONBIO, &non_blocking);
215 }
216
217 inline void
218 6x win_wait_reactor::close_wakeup_pair() noexcept
219 {
220
1/2
✓ Branch 2 → 3 taken 6 times.
✗ Branch 2 → 5 not taken.
6x if (wakeup_read_ != INVALID_SOCKET)
221 {
222 6x ::closesocket(wakeup_read_);
223 6x wakeup_read_ = INVALID_SOCKET;
224 }
225
1/2
✓ Branch 5 → 6 taken 6 times.
✗ Branch 5 → 8 not taken.
6x if (wakeup_write_ != INVALID_SOCKET)
226 {
227 6x ::closesocket(wakeup_write_);
228 6x wakeup_write_ = INVALID_SOCKET;
229 }
230 6x }
231
232 inline void
233 53x win_wait_reactor::wake_self() noexcept
234 {
235 // Coalesce wakes: only send a byte if no wake is already pending.
236 53x bool expected = false;
237
2/2
✓ Branch 3 → 4 taken 17 times.
✓ Branch 3 → 5 taken 36 times.
53x if (!wake_pending_.compare_exchange_strong(
238 expected, true, std::memory_order_acq_rel))
239 17x return;
240
1/2
✓ Branch 5 → 6 taken 36 times.
✗ Branch 5 → 8 not taken.
36x if (wakeup_write_ != INVALID_SOCKET)
241 {
242 36x char b = 0;
243 36x ::send(wakeup_write_, &b, 1, 0);
244 }
245 }
246
247 inline void
248 6x win_wait_reactor::register_wait(
249 SOCKET fd, wait_type w, overlapped_op* op)
250 {
251 // If the op was already cancelled (e.g. pre-cancelled stop_token
252 // fired synchronously before this call), complete immediately
253 // instead of registering. Otherwise the reactor would park the
254 // op forever because the earlier cancel_wait() found nothing to
255 // cancel in registered_.
256
1/2
✗ Branch 3 → 4 not taken.
✓ Branch 3 → 6 taken 6 times.
6x if (op->cancelled.load(std::memory_order_acquire))
257 {
258 sched_.on_completion(op, 0, 0);
259 return;
260 }
261 {
262
1/1
✓ Branch 6 → 7 taken 6 times.
6x std::lock_guard lock(mutex_);
263
1/1
✓ Branch 7 → 8 taken 6 times.
6x pending_register_.push_back(entry{fd, w, op});
264 6x }
265 6x wake_self();
266 }
267
268 inline void
269 41x win_wait_reactor::cancel_wait(overlapped_op* op)
270 {
271 {
272
1/1
✓ Branch 2 → 3 taken 41 times.
41x std::lock_guard lock(mutex_);
273
1/1
✓ Branch 3 → 4 taken 41 times.
41x pending_cancel_.push_back(op);
274 41x }
275 41x wake_self();
276 41x }
277
278 inline void
279 18x win_wait_reactor::stop()
280 {
281
2/2
✓ Branch 3 → 4 taken 12 times.
✓ Branch 3 → 5 taken 6 times.
18x if (stop_.exchange(true, std::memory_order_acq_rel))
282 12x return;
283 6x wake_self();
284
1/2
✓ Branch 7 → 8 taken 6 times.
✗ Branch 7 → 9 not taken.
6x if (thread_.joinable())
285 6x thread_.join();
286 }
287
288 inline void
289 6x win_wait_reactor::run()
290 {
291 6x std::vector<WSAPOLLFD> pollfds;
292
293
2/2
✓ Branch 93 → 3 taken 39 times.
✓ Branch 93 → 94 taken 6 times.
45x while (!stop_.load(std::memory_order_acquire))
294 {
295 // Drain pending register/cancel under the lock.
296 39x std::vector<entry> to_add;
297 39x std::vector<overlapped_op*> to_cancel;
298 {
299
1/1
✓ Branch 3 → 4 taken 39 times.
39x std::lock_guard lock(mutex_);
300 39x to_add.swap(pending_register_);
301 39x to_cancel.swap(pending_cancel_);
302 39x }
303
304
2/2
✓ Branch 13 → 9 taken 6 times.
✓ Branch 13 → 14 taken 39 times.
45x for (auto& e : to_add)
305
1/1
✓ Branch 10 → 11 taken 6 times.
6x registered_.push_back(e);
306
307
2/2
✓ Branch 29 → 16 taken 41 times.
✓ Branch 29 → 30 taken 39 times.
80x for (auto* op : to_cancel)
308 {
309
1/1
✓ Branch 19 → 20 taken 41 times.
41x auto it = std::find_if(
310 registered_.begin(), registered_.end(),
311 3x [op](entry const& e) { return e.op == op; });
312
2/2
✓ Branch 22 → 23 taken 1 time.
✓ Branch 22 → 27 taken 40 times.
41x if (it != registered_.end())
313 {
314 // The op's cancelled flag has already been set by
315 // request_cancel; invoke_handler will translate it.
316
1/1
✓ Branch 23 → 24 taken 1 time.
1x sched_.on_completion(op, 0, 0);
317
1/1
✓ Branch 25 → 26 taken 1 time.
1x registered_.erase(it);
318 }
319 // If not in registered_, the op already fired — no-op.
320 }
321
322 // Build the poll set. Slot 0 is the wakeup socket.
323 39x pollfds.clear();
324
1/1
✓ Branch 32 → 33 taken 39 times.
39x pollfds.reserve(registered_.size() + 1);
325
1/1
✓ Branch 33 → 34 taken 39 times.
39x pollfds.push_back({wakeup_read_, POLLRDNORM, 0});
326
2/2
✓ Branch 41 → 36 taken 10 times.
✓ Branch 41 → 42 taken 39 times.
49x for (auto& e : registered_)
327
1/1
✓ Branch 38 → 39 taken 10 times.
10x pollfds.push_back({e.fd, events_for_wait(e.w), 0});
328
329 // Bounded timeout rather than infinite: this is a safety net
330 // against a lost self-pipe wakeup (e.g. a failed/coalesced
331 // send in wake_self leaving wake_pending_ stuck true). On
332 // timeout the loop re-drains pending_register_/pending_cancel_
333 // and re-checks stop_, so a missed wakeup costs at most one
334 // poll interval of latency instead of a permanent hang. This
335 // mirrors the 500 ms GQCS safety timeout in win_scheduler.
336
1/1
✓ Branch 44 → 45 taken 39 times.
39x int n = ::WSAPoll(
337 pollfds.data(),
338 39x static_cast<ULONG>(pollfds.size()),
339 500 /* ms */);
340
1/2
✗ Branch 45 → 46 not taken.
✓ Branch 45 → 47 taken 39 times.
39x if (n == SOCKET_ERROR)
341 break;
342
343 // Drain the wakeup socket so it stops reporting readable.
344
2/2
✓ Branch 48 → 49 taken 36 times.
✓ Branch 48 → 55 taken 3 times.
39x if (pollfds[0].revents != 0)
345 {
346 char buf[64];
347 for (;;)
348 {
349
1/1
✓ Branch 49 → 50 taken 72 times.
72x int r = ::recv(wakeup_read_, buf, sizeof(buf), 0);
350
2/2
✓ Branch 50 → 51 taken 36 times.
✓ Branch 50 → 52 taken 36 times.
72x if (r <= 0)
351 36x break;
352 36x }
353 36x wake_pending_.store(false, std::memory_order_release);
354 }
355
356 // Walk events in reverse so erases don't invalidate later indices.
357
2/2
✓ Branch 81 → 56 taken 10 times.
✓ Branch 81 → 82 taken 39 times.
49x for (std::size_t i = pollfds.size(); i > 1; --i)
358 {
359 10x auto const& pfd = pollfds[i - 1];
360
2/2
✓ Branch 57 → 58 taken 5 times.
✓ Branch 57 → 59 taken 5 times.
10x if (pfd.revents == 0)
361 5x continue;
362
363 5x auto const& e = registered_[i - 2];
364
1/2
✗ Branch 61 → 62 not taken.
✓ Branch 61 → 63 taken 5 times.
5x if (!ready_for_wait(e.w, pfd.revents))
365 continue;
366
367 5x DWORD err = 0;
368 5x constexpr SHORT err_bits = POLLERR | POLLHUP | POLLNVAL;
369
1/2
✗ Branch 63 → 64 not taken.
✓ Branch 63 → 74 taken 5 times.
5x if (pfd.revents & err_bits)
370 {
371 int so_err = 0;
372 int sz = sizeof(so_err);
373 if (::getsockopt(
374 e.fd, SOL_SOCKET, SO_ERROR,
375 reinterpret_cast<char*>(&so_err), &sz) == 0 &&
376 so_err != 0)
377 {
378 err = static_cast<DWORD>(so_err);
379 }
380 else if (e.w == wait_type::error)
381 {
382 // wait_type::error fires on the error condition;
383 // the contract is to report a non-zero error_code.
384 err = WSAECONNABORTED;
385 }
386 }
387
388
1/1
✓ Branch 74 → 75 taken 5 times.
5x sched_.on_completion(e.op, err, 0);
389
1/1
✓ Branch 78 → 79 taken 5 times.
5x registered_.erase(registered_.begin() + (i - 2));
390 }
391 39x }
392
393 // Drain remaining ops as cancelled on shutdown. This must cover
394 // both the active set and anything still queued by user threads
395 // that hasn't been moved into registered_ yet, otherwise those
396 // ops leak work_started credit and stall scheduler shutdown.
397 {
398
1/1
✓ Branch 94 → 95 taken 6 times.
6x std::lock_guard lock(mutex_);
399
1/2
✗ Branch 101 → 97 not taken.
✓ Branch 101 → 102 taken 6 times.
6x for (auto& e : pending_register_)
400 registered_.push_back(e);
401 6x pending_register_.clear();
402 6x pending_cancel_.clear();
403 6x }
404
1/2
✗ Branch 111 → 107 not taken.
✓ Branch 111 → 112 taken 6 times.
6x for (auto& e : registered_)
405 sched_.on_completion(e.op, ERROR_OPERATION_ABORTED, 0);
406 6x registered_.clear();
407 6x }
408
409 } // namespace boost::corosio::detail
410
411 #endif // BOOST_COROSIO_HAS_IOCP
412
413 #endif // BOOST_COROSIO_NATIVE_DETAIL_IOCP_WIN_WAIT_REACTOR_HPP
414