include/boost/corosio/native/detail/iocp/win_wait_reactor.hpp

81.9% Lines (127/155) 100.0% List of functions (13/13) 69.7% Branches (76/109)
win_wait_reactor.hpp
f(x) Functions (13)
Function Calls Lines Branches Blocks
boost::corosio::detail::win_wait_reactor::events_for_wait(boost::corosio::wait_type) :104 14x 80.0% 66.7% 80.0% boost::corosio::detail::win_wait_reactor::ready_for_wait(boost::corosio::wait_type, short) :114 5x 55.6% 33.3% 60.0% boost::corosio::detail::win_wait_reactor::win_wait_reactor(boost::corosio::detail::win_scheduler&) :144 9x 100.0% 100.0% 54.2% boost::corosio::detail::win_wait_reactor::win_wait_reactor(boost::corosio::detail::win_scheduler&)::{lambda()#1}::operator()() const :148 9x 100.0% 100.0% 100.0% boost::corosio::detail::win_wait_reactor::~win_wait_reactor() :151 9x 100.0% 100.0% boost::corosio::detail::win_wait_reactor::make_wakeup_pair() :158 9x 63.6% 58.1% 64.7% boost::corosio::detail::win_wait_reactor::close_wakeup_pair() :218 9x 100.0% 50.0% 100.0% boost::corosio::detail::win_wait_reactor::wake_self() :233 71x 100.0% 75.0% 100.0% boost::corosio::detail::win_wait_reactor::register_wait(unsigned long long, boost::corosio::wait_type, boost::corosio::detail::overlapped_op*) :248 9x 100.0% 100.0% 76.9% boost::corosio::detail::win_wait_reactor::cancel_wait(boost::corosio::detail::overlapped_op*) :269 54x 100.0% 100.0% 75.0% boost::corosio::detail::win_wait_reactor::stop() :279 27x 100.0% 75.0% 100.0% boost::corosio::detail::win_wait_reactor::run() :289 9x 82.5% 75.0% 71.9% boost::corosio::detail::win_wait_reactor::run()::{lambda(boost::corosio::detail::win_wait_reactor::entry const&)#1}::operator()(boost::corosio::detail::win_wait_reactor::entry const&) const :311 4x 100.0% 100.0%
Line Branch TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Michael Vandeberg
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_IOCP_WIN_WAIT_REACTOR_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_IOCP_WIN_WAIT_REACTOR_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_IOCP
16
17 // This header is included from the bottom of win_scheduler.hpp after
18 // the scheduler class is fully defined. Including it directly would
19 // circle back into a still-incomplete win_scheduler when the dtor's
20 // unique_ptr<win_wait_reactor>::reset() is parsed. Diagnose that
21 // rather than emitting a confusing "incomplete type" error far away.
22 #ifndef BOOST_COROSIO_DETAIL_IOCP_WIN_SCHEDULER_BODY_DONE
23 #error "Include <boost/corosio/native/detail/iocp/win_scheduler.hpp> \
24 instead of including this header directly."
25 #endif
26
27 #include <boost/corosio/wait_type.hpp>
28 #include <boost/corosio/native/detail/iocp/win_overlapped_op.hpp>
29 #include <boost/corosio/native/detail/iocp/win_scheduler.hpp>
30 #include <boost/corosio/native/detail/iocp/win_windows.hpp>
31
32 #include <Ws2tcpip.h>
33
34 #include <algorithm>
35 #include <atomic>
36 #include <cstddef>
37 #include <mutex>
38 #include <thread>
39 #include <vector>
40
41 namespace boost::corosio::detail {
42
43 /** Auxiliary select-based reactor for IOCP wait operations.
44
45 IOCP has no native primitive for socket readiness without I/O.
46 For cases where a zero-byte WSARecv won't work (datagram-read,
47 acceptor-read, error-wait), this reactor runs a dedicated thread
48 using WSAPoll to detect readiness and posts a synthetic completion
49 to the owning IOCP scheduler via win_scheduler::on_completion().
50
51 The same dispatch path used by overlapped I/O then delivers the
52 completion to the user's coroutine, so the public API is uniform
53 across backends.
54
55 Per-op lifecycle:
56 1. Caller sets up an overlapped_op (h, ex, ec_out, cancelled flag).
57 2. Caller calls register_wait(fd, w, op) and returns
58 std::noop_coroutine. The op is parked in the reactor's table.
59 3. Reactor thread polls. When the fd is ready, the op is removed
60 from the table and posted to the scheduler. The error code
61 delivered to the completion is: ec={} on success; the SO_ERROR
62 value if error revents fired and SO_ERROR is set; or
63 WSAECONNABORTED as a synthesized fallback for wait_type::error
64 when error revents fired but SO_ERROR returned zero.
65 4. On socket cancel(), the user's thread calls cancel_wait(op),
66 which queues a cancel request. The reactor thread removes the
67 op from the table and posts a completion; invoke_handler sees
68 op.cancelled==true and yields capy::cond::canceled.
69
70 Thread-safe: register_wait, cancel_wait, and stop may be called
71 from any thread.
72 */
73 class win_wait_reactor
74 {
75 public:
76 explicit win_wait_reactor(win_scheduler& sched);
77 ~win_wait_reactor();
78
79 win_wait_reactor(win_wait_reactor const&) = delete;
80 win_wait_reactor& operator=(win_wait_reactor const&) = delete;
81
82 /// Park an overlapped_op until @p fd is ready for @p w.
83 void register_wait(SOCKET fd, wait_type w, overlapped_op* op);
84
85 /// Remove a parked op and post a completion. Idempotent.
86 void cancel_wait(overlapped_op* op);
87
88 /// Stop the reactor thread and drain remaining ops as cancelled.
89 void stop();
90
91 private:
92 struct entry
93 {
94 SOCKET fd = INVALID_SOCKET;
95 wait_type w = wait_type::read;
96 overlapped_op* op = nullptr;
97 };
98
99 void run();
100 void wake_self() noexcept;
101 void make_wakeup_pair();
102 void close_wakeup_pair() noexcept;
103
104 14x static SHORT events_for_wait(wait_type w) noexcept
105 {
106
2/3
✓ Branch 2 → 3 taken 13 times.
✗ Branch 2 → 4 not taken.
✓ Branch 2 → 5 taken 1 time.
14x switch (w)
107 {
108 13x case wait_type::read: return POLLRDNORM;
109 case wait_type::write: return POLLWRNORM;
110 1x default: return POLLPRI;
111 }
112 }
113
114 5x static bool ready_for_wait(wait_type w, SHORT revents) noexcept
115 {
116 5x constexpr SHORT err_bits = POLLERR | POLLHUP | POLLNVAL;
117
1/3
✓ Branch 2 → 3 taken 5 times.
✗ Branch 2 → 4 not taken.
✗ Branch 2 → 5 not taken.
5x switch (w)
118 {
119 5x case wait_type::read:
120 5x return (revents & (POLLRDNORM | POLLRDBAND | err_bits)) != 0;
121 case wait_type::write:
122 return (revents & (POLLWRNORM | POLLWRBAND | err_bits)) != 0;
123 default:
124 return (revents & (POLLPRI | err_bits)) != 0;
125 }
126 }
127
128 win_scheduler& sched_;
129
130 SOCKET wakeup_read_ = INVALID_SOCKET;
131 SOCKET wakeup_write_ = INVALID_SOCKET;
132
133 std::mutex mutex_;
134 std::vector<entry> pending_register_;
135 std::vector<overlapped_op*> pending_cancel_;
136 std::atomic<bool> stop_{false};
137 std::atomic<bool> wake_pending_{false};
138
139 std::vector<entry> registered_; // reactor-thread-only
140
141 std::thread thread_;
142 };
143
144 9x inline win_wait_reactor::win_wait_reactor(win_scheduler& sched)
145 9x : sched_(sched)
146 {
147
1/1
✓ Branch 9 → 10 taken 9 times.
9x make_wakeup_pair();
148
1/1
✓ Branch 10 → 11 taken 9 times.
18x thread_ = std::thread([this] { run(); });
149 9x }
150
151 9x inline win_wait_reactor::~win_wait_reactor()
152 {
153 9x stop();
154 9x close_wakeup_pair();
155 9x }
156
157 inline void
158 9x win_wait_reactor::make_wakeup_pair()
159 {
160 // Build a pair of connected loopback sockets to use as a wakeup
161 // channel. Winsock has no socketpair(2), so we listen on
162 // 127.0.0.1:0, connect a peer, then accept it.
163
1/1
✓ Branch 2 → 3 taken 9 times.
9x SOCKET listener = ::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
164
1/2
✗ Branch 3 → 4 not taken.
✓ Branch 3 → 5 taken 9 times.
9x if (listener == INVALID_SOCKET)
165 return;
166
167 9x sockaddr_in addr{};
168 9x addr.sin_family = AF_INET;
169
1/1
✓ Branch 5 → 6 taken 9 times.
9x addr.sin_addr.s_addr = ::htonl(INADDR_LOOPBACK);
170 9x addr.sin_port = 0;
171
172 9x int len = sizeof(addr);
173
1/1
✓ Branch 6 → 7 taken 9 times.
9x if (::bind(listener, reinterpret_cast<sockaddr*>(&addr), len) ==
174 9x SOCKET_ERROR ||
175
4/7
✓ Branch 7 → 8 taken 9 times.
✗ Branch 7 → 12 not taken.
✓ Branch 8 → 9 taken 9 times.
✓ Branch 9 → 10 taken 9 times.
✗ Branch 9 → 12 not taken.
✗ Branch 14 → 15 not taken.
✓ Branch 14 → 17 taken 9 times.
18x ::listen(listener, 1) == SOCKET_ERROR ||
176
2/3
✓ Branch 10 → 11 taken 9 times.
✗ Branch 11 → 12 not taken.
✓ Branch 11 → 13 taken 9 times.
9x ::getsockname(listener, reinterpret_cast<sockaddr*>(&addr), &len) ==
177 SOCKET_ERROR)
178 {
179 ::closesocket(listener);
180 return;
181 }
182
183
1/1
✓ Branch 17 → 18 taken 9 times.
9x wakeup_write_ = ::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
184
1/2
✗ Branch 18 → 19 not taken.
✓ Branch 18 → 21 taken 9 times.
9x if (wakeup_write_ == INVALID_SOCKET)
185 {
186 ::closesocket(listener);
187 return;
188 }
189
190
1/1
✓ Branch 21 → 22 taken 9 times.
9x if (::connect(
191
1/2
✗ Branch 22 → 23 not taken.
✓ Branch 22 → 26 taken 9 times.
9x wakeup_write_, reinterpret_cast<sockaddr*>(&addr), len) ==
192 SOCKET_ERROR)
193 {
194 ::closesocket(wakeup_write_);
195 wakeup_write_ = INVALID_SOCKET;
196 ::closesocket(listener);
197 return;
198 }
199
200
1/1
✓ Branch 26 → 27 taken 9 times.
9x wakeup_read_ = ::accept(listener, nullptr, nullptr);
201
1/1
✓ Branch 27 → 28 taken 9 times.
9x ::closesocket(listener);
202
203
1/2
✗ Branch 28 → 29 not taken.
✓ Branch 28 → 31 taken 9 times.
9x if (wakeup_read_ == INVALID_SOCKET)
204 {
205 ::closesocket(wakeup_write_);
206 wakeup_write_ = INVALID_SOCKET;
207 return;
208 }
209
210 // The drain loop in run() calls recv() until it returns <= 0.
211 // With a blocking socket that second recv() would block instead
212 // of returning WSAEWOULDBLOCK, deadlocking the reactor thread.
213 9x u_long non_blocking = 1;
214
1/1
✓ Branch 31 → 32 taken 9 times.
9x ::ioctlsocket(wakeup_read_, FIONBIO, &non_blocking);
215 }
216
217 inline void
218 9x win_wait_reactor::close_wakeup_pair() noexcept
219 {
220
1/2
✓ Branch 2 → 3 taken 9 times.
✗ Branch 2 → 5 not taken.
9x if (wakeup_read_ != INVALID_SOCKET)
221 {
222 9x ::closesocket(wakeup_read_);
223 9x wakeup_read_ = INVALID_SOCKET;
224 }
225
1/2
✓ Branch 5 → 6 taken 9 times.
✗ Branch 5 → 8 not taken.
9x if (wakeup_write_ != INVALID_SOCKET)
226 {
227 9x ::closesocket(wakeup_write_);
228 9x wakeup_write_ = INVALID_SOCKET;
229 }
230 9x }
231
232 inline void
233 71x win_wait_reactor::wake_self() noexcept
234 {
235 // Coalesce wakes: only send a byte if no wake is already pending.
236 71x bool expected = false;
237
2/2
✓ Branch 3 → 4 taken 14 times.
✓ Branch 3 → 5 taken 57 times.
71x if (!wake_pending_.compare_exchange_strong(
238 expected, true, std::memory_order_acq_rel))
239 14x return;
240
1/2
✓ Branch 5 → 6 taken 57 times.
✗ Branch 5 → 8 not taken.
57x if (wakeup_write_ != INVALID_SOCKET)
241 {
242 57x char b = 0;
243 57x ::send(wakeup_write_, &b, 1, 0);
244 }
245 }
246
247 inline void
248 9x win_wait_reactor::register_wait(
249 SOCKET fd, wait_type w, overlapped_op* op)
250 {
251 // If the op was already cancelled (e.g. pre-cancelled stop_token
252 // fired synchronously before this call), complete immediately
253 // instead of registering. Otherwise the reactor would park the
254 // op forever because the earlier cancel_wait() found nothing to
255 // cancel in registered_.
256
2/2
✓ Branch 3 → 4 taken 1 time.
✓ Branch 3 → 6 taken 8 times.
9x if (op->cancelled.load(std::memory_order_acquire))
257 {
258 1x sched_.on_completion(op, 0, 0);
259 1x return;
260 }
261 {
262
1/1
✓ Branch 6 → 7 taken 8 times.
8x std::lock_guard lock(mutex_);
263
1/1
✓ Branch 7 → 8 taken 8 times.
8x pending_register_.push_back(entry{fd, w, op});
264 8x }
265 8x wake_self();
266 }
267
268 inline void
269 54x win_wait_reactor::cancel_wait(overlapped_op* op)
270 {
271 {
272
1/1
✓ Branch 2 → 3 taken 54 times.
54x std::lock_guard lock(mutex_);
273
1/1
✓ Branch 3 → 4 taken 54 times.
54x pending_cancel_.push_back(op);
274 54x }
275 54x wake_self();
276 54x }
277
278 inline void
279 27x win_wait_reactor::stop()
280 {
281
2/2
✓ Branch 3 → 4 taken 18 times.
✓ Branch 3 → 5 taken 9 times.
27x if (stop_.exchange(true, std::memory_order_acq_rel))
282 18x return;
283 9x wake_self();
284
1/2
✓ Branch 7 → 8 taken 9 times.
✗ Branch 7 → 9 not taken.
9x if (thread_.joinable())
285 9x thread_.join();
286 }
287
288 inline void
289 9x win_wait_reactor::run()
290 {
291 9x std::vector<WSAPOLLFD> pollfds;
292
293
2/2
✓ Branch 93 → 3 taken 61 times.
✓ Branch 93 → 94 taken 8 times.
69x while (!stop_.load(std::memory_order_acquire))
294 {
295 // Drain pending register/cancel under the lock.
296 61x std::vector<entry> to_add;
297 61x std::vector<overlapped_op*> to_cancel;
298 {
299
1/1
✓ Branch 3 → 4 taken 61 times.
61x std::lock_guard lock(mutex_);
300 61x to_add.swap(pending_register_);
301 61x to_cancel.swap(pending_cancel_);
302 61x }
303
304
2/2
✓ Branch 13 → 9 taken 8 times.
✓ Branch 13 → 14 taken 61 times.
69x for (auto& e : to_add)
305
1/1
✓ Branch 10 → 11 taken 8 times.
8x registered_.push_back(e);
306
307
2/2
✓ Branch 29 → 16 taken 51 times.
✓ Branch 29 → 30 taken 61 times.
112x for (auto* op : to_cancel)
308 {
309
1/1
✓ Branch 19 → 20 taken 51 times.
51x auto it = std::find_if(
310 registered_.begin(), registered_.end(),
311 4x [op](entry const& e) { return e.op == op; });
312
2/2
✓ Branch 22 → 23 taken 2 times.
✓ Branch 22 → 27 taken 49 times.
51x if (it != registered_.end())
313 {
314 // The op's cancelled flag has already been set by
315 // request_cancel; invoke_handler will translate it.
316
1/1
✓ Branch 23 → 24 taken 2 times.
2x sched_.on_completion(op, 0, 0);
317
1/1
✓ Branch 25 → 26 taken 2 times.
2x registered_.erase(it);
318 }
319 // If not in registered_, the op already fired — no-op.
320 }
321
322 // Build the poll set. Slot 0 is the wakeup socket.
323 61x pollfds.clear();
324
1/1
✓ Branch 32 → 33 taken 61 times.
61x pollfds.reserve(registered_.size() + 1);
325
1/1
✓ Branch 33 → 34 taken 61 times.
61x pollfds.push_back({wakeup_read_, POLLRDNORM, 0});
326
2/2
✓ Branch 41 → 36 taken 14 times.
✓ Branch 41 → 42 taken 61 times.
75x for (auto& e : registered_)
327
1/1
✓ Branch 38 → 39 taken 14 times.
14x pollfds.push_back({e.fd, events_for_wait(e.w), 0});
328
329 // Bounded timeout rather than infinite: this is a safety net
330 // against a lost self-pipe wakeup (e.g. a failed/coalesced
331 // send in wake_self leaving wake_pending_ stuck true). On
332 // timeout the loop re-drains pending_register_/pending_cancel_
333 // and re-checks stop_, so a missed wakeup costs at most one
334 // poll interval of latency instead of a permanent hang. This
335 // mirrors the 500 ms GQCS safety timeout in win_scheduler.
336
1/1
✓ Branch 44 → 45 taken 61 times.
61x int n = ::WSAPoll(
337 pollfds.data(),
338 61x static_cast<ULONG>(pollfds.size()),
339 500 /* ms */);
340
2/2
✓ Branch 45 → 46 taken 1 time.
✓ Branch 45 → 47 taken 60 times.
61x if (n == SOCKET_ERROR)
341 1x break;
342
343 // Drain the wakeup socket so it stops reporting readable.
344
2/2
✓ Branch 48 → 49 taken 56 times.
✓ Branch 48 → 55 taken 4 times.
60x if (pollfds[0].revents != 0)
345 {
346 char buf[64];
347 for (;;)
348 {
349
1/1
✓ Branch 49 → 50 taken 112 times.
112x int r = ::recv(wakeup_read_, buf, sizeof(buf), 0);
350
2/2
✓ Branch 50 → 51 taken 56 times.
✓ Branch 50 → 52 taken 56 times.
112x if (r <= 0)
351 56x break;
352 56x }
353 56x wake_pending_.store(false, std::memory_order_release);
354 }
355
356 // Walk events in reverse so erases don't invalidate later indices.
357
2/2
✓ Branch 81 → 56 taken 13 times.
✓ Branch 81 → 82 taken 60 times.
73x for (std::size_t i = pollfds.size(); i > 1; --i)
358 {
359 13x auto const& pfd = pollfds[i - 1];
360
2/2
✓ Branch 57 → 58 taken 8 times.
✓ Branch 57 → 59 taken 5 times.
13x if (pfd.revents == 0)
361 8x continue;
362
363 5x auto const& e = registered_[i - 2];
364
1/2
✗ Branch 61 → 62 not taken.
✓ Branch 61 → 63 taken 5 times.
5x if (!ready_for_wait(e.w, pfd.revents))
365 continue;
366
367 5x DWORD err = 0;
368 5x constexpr SHORT err_bits = POLLERR | POLLHUP | POLLNVAL;
369
1/2
✗ Branch 63 → 64 not taken.
✓ Branch 63 → 74 taken 5 times.
5x if (pfd.revents & err_bits)
370 {
371 int so_err = 0;
372 int sz = sizeof(so_err);
373 if (::getsockopt(
374 e.fd, SOL_SOCKET, SO_ERROR,
375 reinterpret_cast<char*>(&so_err), &sz) == 0 &&
376 so_err != 0)
377 {
378 err = static_cast<DWORD>(so_err);
379 }
380 else if (e.w == wait_type::error)
381 {
382 // wait_type::error fires on the error condition;
383 // the contract is to report a non-zero error_code.
384 err = WSAECONNABORTED;
385 }
386 }
387
388
1/1
✓ Branch 74 → 75 taken 5 times.
5x sched_.on_completion(e.op, err, 0);
389
1/1
✓ Branch 78 → 79 taken 5 times.
5x registered_.erase(registered_.begin() + (i - 2));
390 }
391 62x }
392
393 // Drain remaining ops as cancelled on shutdown. This must cover
394 // both the active set and anything still queued by user threads
395 // that hasn't been moved into registered_ yet, otherwise those
396 // ops leak work_started credit and stall scheduler shutdown.
397 {
398
1/1
✓ Branch 94 → 95 taken 9 times.
9x std::lock_guard lock(mutex_);
399
1/2
✗ Branch 101 → 97 not taken.
✓ Branch 101 → 102 taken 9 times.
9x for (auto& e : pending_register_)
400 registered_.push_back(e);
401 9x pending_register_.clear();
402 9x pending_cancel_.clear();
403 9x }
404
2/2
✓ Branch 111 → 107 taken 1 time.
✓ Branch 111 → 112 taken 9 times.
10x for (auto& e : registered_)
405
1/1
✓ Branch 108 → 109 taken 1 time.
1x sched_.on_completion(e.op, ERROR_OPERATION_ABORTED, 0);
406 9x registered_.clear();
407 9x }
408
409 } // namespace boost::corosio::detail
410
411 #endif // BOOST_COROSIO_HAS_IOCP
412
413 #endif // BOOST_COROSIO_NATIVE_DETAIL_IOCP_WIN_WAIT_REACTOR_HPP
414