include/boost/corosio/native/detail/select/select_op.hpp

78.0% Lines (117/150) 89.3% Functions (25/28) 38.2% Branches (26/68)
include/boost/corosio/native/detail/select/select_op.hpp
Line Branch TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_OP_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_OP_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_SELECT
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/corosio/io/io_object.hpp>
19 #include <boost/corosio/endpoint.hpp>
20 #include <boost/capy/ex/executor_ref.hpp>
21 #include <coroutine>
22 #include <boost/capy/error.hpp>
23 #include <system_error>
24
25 #include <boost/corosio/detail/make_err.hpp>
26 #include <boost/corosio/detail/dispatch_coro.hpp>
27 #include <boost/corosio/detail/scheduler_op.hpp>
28 #include <boost/corosio/detail/endpoint_convert.hpp>
29
30 #include <unistd.h>
31 #include <errno.h>
32 #include <fcntl.h>
33
34 #include <atomic>
35 #include <cstddef>
36 #include <memory>
37 #include <optional>
38 #include <stop_token>
39
40 #include <netinet/in.h>
41 #include <sys/select.h>
42 #include <sys/socket.h>
43 #include <sys/uio.h>
44
45 /*
46 select Operation State
47 ======================
48
49 Each async I/O operation has a corresponding select_op-derived struct that
50 holds the operation's state while it's in flight. The socket impl owns
51 fixed slots for each operation type (conn_, rd_, wr_), so only one
52 operation of each type can be pending per socket at a time.
53
54 This mirrors the epoll_op design for consistency across backends.
55
56 Completion vs Cancellation Race
57 -------------------------------
58 The `registered` atomic uses a tri-state (unregistered, registering,
59 registered) to handle two races: (1) between register_fd() and the
60 reactor seeing an event, and (2) between reactor completion and cancel().
61
62 The registering state closes the window where an event could arrive
63 after register_fd() but before the boolean was set. The reactor and
64 cancel() both treat registering the same as registered when claiming.
65
66 Whoever atomically exchanges to unregistered "claims" the operation
67 and is responsible for completing it. The loser sees unregistered and
68 does nothing. The initiating thread uses compare_exchange to transition
69 from registering to registered; if this fails, the reactor or cancel
70 already claimed the op.
71
72 Impl Lifetime Management
73 ------------------------
74 When cancel() posts an op to the scheduler's ready queue, the socket impl
75 might be destroyed before the scheduler processes the op. The `impl_ptr`
76 member holds a shared_ptr to the impl, keeping it alive until the op
77 completes.
78
79 EOF Detection
80 -------------
81 For reads, 0 bytes with no error means EOF. But an empty user buffer also
82 returns 0 bytes. The `empty_buffer_read` flag distinguishes these cases.
83
84 SIGPIPE Prevention
85 ------------------
86 Writes use sendmsg() with MSG_NOSIGNAL instead of writev() to prevent
87 SIGPIPE when the peer has closed.
88 */
89
90 namespace boost::corosio::detail {
91
92 // Forward declarations for cancellation support
93 class select_socket;
94 class select_acceptor;
95
96 /** Registration state for async operations.
97
98 Tri-state enum to handle the race between register_fd() and
99 run_reactor() seeing an event. Setting REGISTERING before
100 calling register_fd() ensures events delivered during the
101 registration window are not dropped.
102 */
103 enum class select_registration_state : std::uint8_t
104 {
105 unregistered, ///< Not registered with reactor
106 registering, ///< register_fd() called, not yet confirmed
107 registered ///< Fully registered, ready for events
108 };
109
110 struct select_op : scheduler_op
111 {
112 struct canceller
113 {
114 select_op* op;
115 void operator()() const noexcept;
116 };
117
118 std::coroutine_handle<> h;
119 capy::executor_ref ex;
120 15418 std::error_code* ec_out = nullptr;
121 15418 std::size_t* bytes_out = nullptr;
122
123 15418 int fd = -1;
124 15418 int errn = 0;
125 15418 std::size_t bytes_transferred = 0;
126
127 15418 std::atomic<bool> cancelled{false};
128 15418 std::atomic<select_registration_state> registered{
129 select_registration_state::unregistered};
130 std::optional<std::stop_callback<canceller>> stop_cb;
131
132 // Prevents use-after-free when socket is closed with pending ops.
133 std::shared_ptr<void> impl_ptr;
134
135 // For stop_token cancellation - pointer to owning socket/acceptor impl.
136 15418 select_socket* socket_impl_ = nullptr;
137 15418 select_acceptor* acceptor_impl_ = nullptr;
138
139 46254 select_op() = default;
140
141 735820 void reset() noexcept
142 {
143 735820 fd = -1;
144 735820 errn = 0;
145 735820 bytes_transferred = 0;
146 735820 cancelled.store(false, std::memory_order_relaxed);
147 735820 registered.store(
148 select_registration_state::unregistered, std::memory_order_relaxed);
149 735820 impl_ptr.reset();
150 735820 socket_impl_ = nullptr;
151 735820 acceptor_impl_ = nullptr;
152 735820 }
153
154 732417 void operator()() override
155 {
156 732417 stop_cb.reset();
157
158
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 732417 times.
732417 if (ec_out)
159 {
160
2/2
✓ Branch 0 taken 57 times.
✓ Branch 1 taken 732360 times.
732417 if (cancelled.load(std::memory_order_acquire))
161 57 *ec_out = capy::error::canceled;
162
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 732359 times.
732360 else if (errn != 0)
163 1 *ec_out = make_err(errn);
164
4/4
✓ Branch 0 taken 365184 times.
✓ Branch 1 taken 367175 times.
✓ Branch 2 taken 365179 times.
✓ Branch 3 taken 5 times.
732359 else if (is_read_operation() && bytes_transferred == 0)
165 5 *ec_out = capy::error::eof;
166 else
167 732354 *ec_out = {};
168 732417 }
169
170
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 732417 times.
732417 if (bytes_out)
171 732417 *bytes_out = bytes_transferred;
172
173 // Move to stack before destroying the frame
174 732417 capy::executor_ref saved_ex(ex);
175 732417 std::coroutine_handle<> saved_h(h);
176 732417 impl_ptr.reset();
177 732417 dispatch_coro(saved_ex, saved_h).resume();
178 732417 }
179
180 367174 virtual bool is_read_operation() const noexcept
181 {
182 367174 return false;
183 }
184 virtual void cancel() noexcept = 0;
185
186 void destroy() override
187 {
188 stop_cb.reset();
189 impl_ptr.reset();
190 }
191
192 46460 void request_cancel() noexcept
193 {
194 46460 cancelled.store(true, std::memory_order_release);
195 46460 }
196
197 // NOLINTNEXTLINE(performance-unnecessary-value-param)
198 void start(std::stop_token token)
199 {
200 cancelled.store(false, std::memory_order_release);
201 stop_cb.reset();
202 socket_impl_ = nullptr;
203 acceptor_impl_ = nullptr;
204
205 if (token.stop_possible())
206 stop_cb.emplace(token, canceller{this});
207 }
208
209 // NOLINTNEXTLINE(performance-unnecessary-value-param)
210 734118 void start(std::stop_token token, select_socket* impl)
211 {
212 734118 cancelled.store(false, std::memory_order_release);
213 734118 stop_cb.reset();
214 734118 socket_impl_ = impl;
215 734118 acceptor_impl_ = nullptr;
216
217
2/2
✓ Branch 0 taken 734096 times.
✓ Branch 1 taken 22 times.
734118 if (token.stop_possible())
218 22 stop_cb.emplace(token, canceller{this});
219 734118 }
220
221 // NOLINTNEXTLINE(performance-unnecessary-value-param)
222 1702 void start(std::stop_token token, select_acceptor* impl)
223 {
224 1702 cancelled.store(false, std::memory_order_release);
225 1702 stop_cb.reset();
226 1702 socket_impl_ = nullptr;
227 1702 acceptor_impl_ = impl;
228
229
1/2
✓ Branch 0 taken 1702 times.
✗ Branch 1 not taken.
1702 if (token.stop_possible())
230 stop_cb.emplace(token, canceller{this});
231 1702 }
232
233 735779 void complete(int err, std::size_t bytes) noexcept
234 {
235 735779 errn = err;
236 735779 bytes_transferred = bytes;
237 735779 }
238
239 virtual void perform_io() noexcept {}
240 };
241
242 struct select_connect_op final : select_op
243 {
244 endpoint target_endpoint;
245
246 1701 void reset() noexcept
247 {
248 1701 select_op::reset();
249 1701 target_endpoint = endpoint{};
250 1701 }
251
252 1701 void perform_io() noexcept override
253 {
254 // connect() completion status is retrieved via SO_ERROR, not return value
255 1701 int err = 0;
256 1701 socklen_t len = sizeof(err);
257
2/4
✓ Branch 0 taken 1701 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 1701 times.
1701 if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
258 err = errno;
259 1701 complete(err, 0);
260 1701 }
261
262 // Defined in sockets.cpp where select_socket is complete
263 void operator()() override;
264 void cancel() noexcept override;
265 };
266
267 5122 struct select_read_op final : select_op
268 {
269 static constexpr std::size_t max_buffers = 16;
270 iovec iovecs[max_buffers];
271 5122 int iovec_count = 0;
272 5122 bool empty_buffer_read = false;
273
274 365185 bool is_read_operation() const noexcept override
275 {
276 365185 return !empty_buffer_read;
277 }
278
279 365238 void reset() noexcept
280 {
281 365238 select_op::reset();
282 365238 iovec_count = 0;
283 365238 empty_buffer_read = false;
284 365238 }
285
286 32252 void perform_io() noexcept override
287 {
288
1/2
✓ Branch 0 taken 32252 times.
✗ Branch 1 not taken.
32252 ssize_t n = ::readv(fd, iovecs, iovec_count);
289
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 32252 times.
32252 if (n >= 0)
290 32252 complete(0, static_cast<std::size_t>(n));
291 else
292 complete(errno, 0);
293 32252 }
294
295 void cancel() noexcept override;
296 };
297
298 5122 struct select_write_op final : select_op
299 {
300 static constexpr std::size_t max_buffers = 16;
301 iovec iovecs[max_buffers];
302 5122 int iovec_count = 0;
303
304 367179 void reset() noexcept
305 {
306 367179 select_op::reset();
307 367179 iovec_count = 0;
308 367179 }
309
310 void perform_io() noexcept override
311 {
312 msghdr msg{};
313 msg.msg_iov = iovecs;
314 msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
315
316 ssize_t n = ::sendmsg(fd, &msg, MSG_NOSIGNAL);
317 if (n >= 0)
318 complete(0, static_cast<std::size_t>(n));
319 else
320 complete(errno, 0);
321 }
322
323 void cancel() noexcept override;
324 };
325
326 52 struct select_accept_op final : select_op
327 {
328 52 int accepted_fd = -1;
329 52 io_object::implementation* peer_impl = nullptr;
330 52 io_object::implementation** impl_out = nullptr;
331
332 1702 void reset() noexcept
333 {
334 1702 select_op::reset();
335 1702 accepted_fd = -1;
336 1702 peer_impl = nullptr;
337 1702 impl_out = nullptr;
338 1702 }
339
340 1699 void perform_io() noexcept override
341 {
342 1699 sockaddr_in addr{};
343 1699 socklen_t addrlen = sizeof(addr);
344
345 // Note: select backend uses accept() + fcntl instead of accept4()
346 // for broader POSIX compatibility
347
1/2
✓ Branch 0 taken 1699 times.
✗ Branch 1 not taken.
1699 int new_fd = ::accept(fd, reinterpret_cast<sockaddr*>(&addr), &addrlen);
348
349
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1699 times.
1699 if (new_fd >= 0)
350 {
351 // Reject fds that exceed select()'s FD_SETSIZE limit.
352 // Better to fail now than during later async operations.
353
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1699 times.
1699 if (new_fd >= FD_SETSIZE)
354 {
355 ::close(new_fd);
356 complete(EINVAL, 0);
357 return;
358 }
359
360 // Set non-blocking and close-on-exec flags.
361 // A non-blocking socket is essential for the async reactor;
362 // if we can't configure it, fail rather than risk blocking.
363
1/2
✓ Branch 0 taken 1699 times.
✗ Branch 1 not taken.
1699 int flags = ::fcntl(new_fd, F_GETFL, 0);
364
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1699 times.
1699 if (flags == -1)
365 {
366 int err = errno;
367 ::close(new_fd);
368 complete(err, 0);
369 return;
370 }
371
372
2/4
✓ Branch 0 taken 1699 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 1699 times.
1699 if (::fcntl(new_fd, F_SETFL, flags | O_NONBLOCK) == -1)
373 {
374 int err = errno;
375 ::close(new_fd);
376 complete(err, 0);
377 return;
378 }
379
380
2/4
✓ Branch 0 taken 1699 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 1699 times.
1699 if (::fcntl(new_fd, F_SETFD, FD_CLOEXEC) == -1)
381 {
382 int err = errno;
383 ::close(new_fd);
384 complete(err, 0);
385 return;
386 }
387
388 1699 accepted_fd = new_fd;
389 1699 complete(0, 0);
390 1699 }
391 else
392 {
393 complete(errno, 0);
394 }
395 1699 }
396
397 // Defined in acceptors.cpp where select_acceptor is complete
398 void operator()() override;
399 void cancel() noexcept override;
400 };
401
402 } // namespace boost::corosio::detail
403
404 #endif // BOOST_COROSIO_HAS_SELECT
405
406 #endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_OP_HPP
407