include/boost/corosio/native/detail/select/select_socket_service.hpp

75.7% Lines (327/432) 94.7% Functions (36/38) 44.3% Branches (108/244)
include/boost/corosio/native/detail/select/select_socket_service.hpp
Line Branch TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_SELECT
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/capy/ex/execution_context.hpp>
19 #include <boost/corosio/detail/socket_service.hpp>
20
21 #include <boost/corosio/native/detail/select/select_socket.hpp>
22 #include <boost/corosio/native/detail/select/select_scheduler.hpp>
23
24 #include <boost/corosio/detail/endpoint_convert.hpp>
25 #include <boost/corosio/detail/dispatch_coro.hpp>
26 #include <boost/corosio/detail/make_err.hpp>
27
28 #include <boost/corosio/detail/except.hpp>
29
30 #include <boost/capy/buffers.hpp>
31
32 #include <errno.h>
33 #include <fcntl.h>
34 #include <netinet/in.h>
35 #include <netinet/tcp.h>
36 #include <sys/socket.h>
37 #include <unistd.h>
38
39 #include <memory>
40 #include <mutex>
41 #include <unordered_map>
42
43 /*
44 select Socket Implementation
45 ============================
46
47 This mirrors the epoll_sockets design for behavioral consistency.
48 Each I/O operation follows the same pattern:
49 1. Try the syscall immediately (non-blocking socket)
50 2. If it succeeds or fails with a real error, post to completion queue
51 3. If EAGAIN/EWOULDBLOCK, register with select scheduler and wait
52
53 Cancellation
54 ------------
55 See op.hpp for the completion/cancellation race handling via the
56 `registered` atomic. cancel() must complete pending operations (post
57 them with cancelled flag) so coroutines waiting on them can resume.
58 close_socket() calls cancel() first to ensure this.
59
60 Impl Lifetime with shared_ptr
61 -----------------------------
62 Socket impls use enable_shared_from_this. The service owns impls via
63 shared_ptr maps (socket_ptrs_) keyed by raw pointer for O(1) lookup and
64 removal. When a user calls close(), we call cancel() which posts pending
65 ops to the scheduler.
66
67 CRITICAL: The posted ops must keep the impl alive until they complete.
68 Otherwise the scheduler would process a freed op (use-after-free). The
69 cancel() method captures shared_from_this() into op.impl_ptr before
70 posting. When the op completes, impl_ptr is cleared, allowing the impl
71 to be destroyed if no other references exist.
72
73 Service Ownership
74 -----------------
75 select_socket_service owns all socket impls. destroy() removes the
76 shared_ptr from the map, but the impl may survive if ops still hold
77 impl_ptr refs. shutdown() closes all sockets and clears the map; any
78 in-flight ops will complete and release their refs.
79 */
80
81 namespace boost::corosio::detail {
82
83 /** State for select socket service. */
84 class select_socket_state
85 {
86 public:
87 483 explicit select_socket_state(select_scheduler& sched) noexcept
88 161 : sched_(sched)
89 161 {
90 322 }
91
92 select_scheduler& sched_;
93 std::mutex mutex_;
94 intrusive_list<select_socket> socket_list_;
95 std::unordered_map<select_socket*, std::shared_ptr<select_socket>>
96 socket_ptrs_;
97 };
98
99 /** select socket service implementation.
100
101 Inherits from socket_service to enable runtime polymorphism.
102 Uses key_type = socket_service for service lookup.
103 */
104 class BOOST_COROSIO_DECL select_socket_service final : public socket_service
105 {
106 public:
107 explicit select_socket_service(capy::execution_context& ctx);
108 ~select_socket_service() override;
109
110 select_socket_service(select_socket_service const&) = delete;
111 select_socket_service& operator=(select_socket_service const&) = delete;
112
113 void shutdown() override;
114
115 io_object::implementation* construct() override;
116 void destroy(io_object::implementation*) override;
117 void close(io_object::handle&) override;
118 std::error_code open_socket(tcp_socket::implementation& impl) override;
119
120 37442 select_scheduler& scheduler() const noexcept
121 {
122 37442 return state_->sched_;
123 }
124 void post(select_op* op);
125 void work_started() noexcept;
126 void work_finished() noexcept;
127
128 private:
129 std::unique_ptr<select_socket_state> state_;
130 };
131
132 // Backward compatibility alias
133 using select_sockets = select_socket_service;
134
135 inline void
136 21 select_op::canceller::operator()() const noexcept
137 {
138 21 op->cancel();
139 21 }
140
141 inline void
142 select_connect_op::cancel() noexcept
143 {
144 if (socket_impl_)
145 socket_impl_->cancel_single_op(*this);
146 else
147 request_cancel();
148 }
149
150 inline void
151 21 select_read_op::cancel() noexcept
152 {
153
1/2
✓ Branch 0 taken 21 times.
✗ Branch 1 not taken.
21 if (socket_impl_)
154 21 socket_impl_->cancel_single_op(*this);
155 else
156 request_cancel();
157 21 }
158
159 inline void
160 select_write_op::cancel() noexcept
161 {
162 if (socket_impl_)
163 socket_impl_->cancel_single_op(*this);
164 else
165 request_cancel();
166 }
167
168 inline void
169 1701 select_connect_op::operator()()
170 {
171 1701 stop_cb.reset();
172
173
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 1699 times.
1701 bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
174
175 // Cache endpoints on successful connect
176
3/4
✓ Branch 0 taken 1699 times.
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 1699 times.
1701 if (success && socket_impl_)
177 {
178 // Query local endpoint via getsockname (may fail, but remote is always known)
179 1699 endpoint local_ep;
180 1699 sockaddr_in local_addr{};
181 1699 socklen_t local_len = sizeof(local_addr);
182
2/4
✗ Branch 0 not taken.
✓ Branch 1 taken 1699 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 1699 times.
3398 if (::getsockname(
183 3398 fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
184 1699 local_ep = from_sockaddr_in(local_addr);
185 // Always cache remote endpoint; local may be default if getsockname failed
186 1699 static_cast<select_socket*>(socket_impl_)
187 1699 ->set_endpoints(local_ep, target_endpoint);
188 1699 }
189
190
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1701 times.
1701 if (ec_out)
191 {
192
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1701 times.
1701 if (cancelled.load(std::memory_order_acquire))
193 *ec_out = capy::error::canceled;
194
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 1699 times.
1701 else if (errn != 0)
195 2 *ec_out = make_err(errn);
196 else
197 1699 *ec_out = {};
198 1701 }
199
200
1/2
✓ Branch 0 taken 1701 times.
✗ Branch 1 not taken.
1701 if (bytes_out)
201 *bytes_out = bytes_transferred;
202
203 // Move to stack before destroying the frame
204 1701 capy::executor_ref saved_ex(ex);
205 1701 std::coroutine_handle<> saved_h(h);
206 1701 impl_ptr.reset();
207 1701 dispatch_coro(saved_ex, saved_h).resume();
208 1701 }
209
210 20488 inline select_socket::select_socket(select_socket_service& svc) noexcept
211 5122 : svc_(svc)
212 15366 {
213 5122 }
214
215 inline std::coroutine_handle<>
216 1701 select_socket::connect(
217 std::coroutine_handle<> h,
218 capy::executor_ref ex,
219 endpoint ep,
220 std::stop_token token,
221 std::error_code* ec)
222 {
223 1701 auto& op = conn_;
224 1701 op.reset();
225 1701 op.h = h;
226 1701 op.ex = ex;
227 1701 op.ec_out = ec;
228 1701 op.fd = fd_;
229 1701 op.target_endpoint = ep; // Store target for endpoint caching
230
1/2
✓ Branch 0 taken 1701 times.
✗ Branch 1 not taken.
1701 op.start(token, this);
231
232 1701 sockaddr_in addr = detail::to_sockaddr_in(ep);
233 1701 int result =
234 1701 ::connect(fd_, reinterpret_cast<sockaddr*>(&addr), sizeof(addr));
235
236
1/2
✓ Branch 0 taken 1701 times.
✗ Branch 1 not taken.
1701 if (result == 0)
237 {
238 // Sync success - cache endpoints immediately
239 sockaddr_in local_addr{};
240 socklen_t local_len = sizeof(local_addr);
241 if (::getsockname(
242 fd_, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
243 local_endpoint_ = detail::from_sockaddr_in(local_addr);
244 remote_endpoint_ = ep;
245
246 op.complete(0, 0);
247 op.impl_ptr = shared_from_this();
248 svc_.post(&op);
249 // completion is always posted to scheduler queue, never inline.
250 return std::noop_coroutine();
251 }
252
253
1/2
✓ Branch 0 taken 1701 times.
✗ Branch 1 not taken.
1701 if (errno == EINPROGRESS)
254 {
255 1701 svc_.work_started();
256
1/2
✓ Branch 0 taken 1701 times.
✗ Branch 1 not taken.
1701 op.impl_ptr = shared_from_this();
257
258 // Set registering BEFORE register_fd to close the race window where
259 // reactor sees an event before we set registered. The reactor treats
260 // registering the same as registered when claiming the op.
261 1701 op.registered.store(
262 select_registration_state::registering, std::memory_order_release);
263 1701 svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
264
265 // Transition to registered. If this fails, reactor or cancel already
266 // claimed the op (state is now unregistered), so we're done. However,
267 // we must still deregister the fd because cancel's deregister_fd may
268 // have run before our register_fd, leaving the fd orphaned.
269 1701 auto expected = select_registration_state::registering;
270
1/2
✓ Branch 0 taken 1701 times.
✗ Branch 1 not taken.
1701 if (!op.registered.compare_exchange_strong(
271 expected, select_registration_state::registered,
272 std::memory_order_acq_rel))
273 {
274 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
275 // completion is always posted to scheduler queue, never inline.
276 return std::noop_coroutine();
277 }
278
279 // If cancelled was set before we registered, handle it now.
280
1/2
✓ Branch 0 taken 1701 times.
✗ Branch 1 not taken.
1701 if (op.cancelled.load(std::memory_order_acquire))
281 {
282 auto prev = op.registered.exchange(
283 select_registration_state::unregistered,
284 std::memory_order_acq_rel);
285 if (prev != select_registration_state::unregistered)
286 {
287 svc_.scheduler().deregister_fd(
288 fd_, select_scheduler::event_write);
289 op.impl_ptr = shared_from_this();
290 svc_.post(&op);
291 svc_.work_finished();
292 }
293 }
294 // completion is always posted to scheduler queue, never inline.
295 1701 return std::noop_coroutine();
296 }
297
298 op.complete(errno, 0);
299 op.impl_ptr = shared_from_this();
300 svc_.post(&op);
301 // completion is always posted to scheduler queue, never inline.
302 return std::noop_coroutine();
303 1701 }
304
305 inline std::coroutine_handle<>
306 365238 select_socket::read_some(
307 std::coroutine_handle<> h,
308 capy::executor_ref ex,
309 io_buffer_param param,
310 std::stop_token token,
311 std::error_code* ec,
312 std::size_t* bytes_out)
313 {
314 365238 auto& op = rd_;
315 365238 op.reset();
316 365238 op.h = h;
317 365238 op.ex = ex;
318 365238 op.ec_out = ec;
319 365238 op.bytes_out = bytes_out;
320 365238 op.fd = fd_;
321
1/2
✓ Branch 0 taken 365238 times.
✗ Branch 1 not taken.
365238 op.start(token, this);
322
323 365238 capy::mutable_buffer bufs[select_read_op::max_buffers];
324 365238 op.iovec_count =
325 365238 static_cast<int>(param.copy_to(bufs, select_read_op::max_buffers));
326
327
4/6
✓ Branch 0 taken 365237 times.
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 365237 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 365237 times.
✗ Branch 5 not taken.
365238 if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
328 {
329 1 op.empty_buffer_read = true;
330 1 op.complete(0, 0);
331
0/2
✗ Branch 0 not taken.
✗ Branch 1 not taken.
1 op.impl_ptr = shared_from_this();
332 1 svc_.post(&op);
333 1 return std::noop_coroutine();
334 }
335
336
2/2
✓ Branch 0 taken 365237 times.
✓ Branch 1 taken 365237 times.
730474 for (int i = 0; i < op.iovec_count; ++i)
337 {
338 365237 op.iovecs[i].iov_base = bufs[i].data();
339 365237 op.iovecs[i].iov_len = bufs[i].size();
340 365237 }
341
342 365237 ssize_t n = ::readv(fd_, op.iovecs, op.iovec_count);
343
344
2/2
✓ Branch 0 taken 332943 times.
✓ Branch 1 taken 32294 times.
365237 if (n > 0)
345 {
346 332943 op.complete(0, static_cast<std::size_t>(n));
347
0/2
✗ Branch 0 not taken.
✗ Branch 1 not taken.
332943 op.impl_ptr = shared_from_this();
348 332943 svc_.post(&op);
349 332943 return std::noop_coroutine();
350 }
351
352
2/2
✓ Branch 0 taken 32290 times.
✓ Branch 1 taken 4 times.
32294 if (n == 0)
353 {
354 4 op.complete(0, 0);
355
0/2
✗ Branch 0 not taken.
✗ Branch 1 not taken.
4 op.impl_ptr = shared_from_this();
356 4 svc_.post(&op);
357 4 return std::noop_coroutine();
358 }
359
360
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 32290 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
32290 if (errno == EAGAIN || errno == EWOULDBLOCK)
361 {
362 32290 svc_.work_started();
363
0/2
✗ Branch 0 not taken.
✗ Branch 1 not taken.
32290 op.impl_ptr = shared_from_this();
364
365 // Set registering BEFORE register_fd to close the race window where
366 // reactor sees an event before we set registered.
367 32290 op.registered.store(
368 select_registration_state::registering, std::memory_order_release);
369 32290 svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_read);
370
371 // Transition to registered. If this fails, reactor or cancel already
372 // claimed the op (state is now unregistered), so we're done. However,
373 // we must still deregister the fd because cancel's deregister_fd may
374 // have run before our register_fd, leaving the fd orphaned.
375 32290 auto expected = select_registration_state::registering;
376
1/2
✓ Branch 0 taken 32290 times.
✗ Branch 1 not taken.
32290 if (!op.registered.compare_exchange_strong(
377 expected, select_registration_state::registered,
378 std::memory_order_acq_rel))
379 {
380 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
381 return std::noop_coroutine();
382 }
383
384 // If cancelled was set before we registered, handle it now.
385
1/2
✓ Branch 0 taken 32290 times.
✗ Branch 1 not taken.
32290 if (op.cancelled.load(std::memory_order_acquire))
386 {
387 auto prev = op.registered.exchange(
388 select_registration_state::unregistered,
389 std::memory_order_acq_rel);
390 if (prev != select_registration_state::unregistered)
391 {
392 svc_.scheduler().deregister_fd(
393 fd_, select_scheduler::event_read);
394 op.impl_ptr = shared_from_this();
395 svc_.post(&op);
396 svc_.work_finished();
397 }
398 }
399 32290 return std::noop_coroutine();
400 }
401
402 op.complete(errno, 0);
403 op.impl_ptr = shared_from_this();
404 svc_.post(&op);
405 return std::noop_coroutine();
406 365238 }
407
408 inline std::coroutine_handle<>
409 367179 select_socket::write_some(
410 std::coroutine_handle<> h,
411 capy::executor_ref ex,
412 io_buffer_param param,
413 std::stop_token token,
414 std::error_code* ec,
415 std::size_t* bytes_out)
416 {
417 367179 auto& op = wr_;
418 367179 op.reset();
419 367179 op.h = h;
420 367179 op.ex = ex;
421 367179 op.ec_out = ec;
422 367179 op.bytes_out = bytes_out;
423 367179 op.fd = fd_;
424
1/2
✓ Branch 0 taken 367179 times.
✗ Branch 1 not taken.
367179 op.start(token, this);
425
426 367179 capy::mutable_buffer bufs[select_write_op::max_buffers];
427 367179 op.iovec_count =
428 367179 static_cast<int>(param.copy_to(bufs, select_write_op::max_buffers));
429
430
4/6
✓ Branch 0 taken 367178 times.
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 367178 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 367178 times.
✗ Branch 5 not taken.
367179 if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
431 {
432 1 op.complete(0, 0);
433 1 op.impl_ptr = shared_from_this();
434 1 svc_.post(&op);
435 1 return std::noop_coroutine();
436 }
437
438
2/2
✓ Branch 0 taken 367178 times.
✓ Branch 1 taken 367178 times.
734356 for (int i = 0; i < op.iovec_count; ++i)
439 {
440 367178 op.iovecs[i].iov_base = bufs[i].data();
441 367178 op.iovecs[i].iov_len = bufs[i].size();
442 367178 }
443
444 367178 msghdr msg{};
445 367178 msg.msg_iov = op.iovecs;
446 367178 msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
447
448 367178 ssize_t n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
449
450
2/2
✓ Branch 0 taken 367177 times.
✓ Branch 1 taken 1 time.
367178 if (n > 0)
451 {
452 367177 op.complete(0, static_cast<std::size_t>(n));
453 367177 op.impl_ptr = shared_from_this();
454 367177 svc_.post(&op);
455 367177 return std::noop_coroutine();
456 }
457
458
2/4
✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 1 time.
1 if (errno == EAGAIN || errno == EWOULDBLOCK)
459 {
460 svc_.work_started();
461 op.impl_ptr = shared_from_this();
462
463 // Set registering BEFORE register_fd to close the race window where
464 // reactor sees an event before we set registered.
465 op.registered.store(
466 select_registration_state::registering, std::memory_order_release);
467 svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
468
469 // Transition to registered. If this fails, reactor or cancel already
470 // claimed the op (state is now unregistered), so we're done. However,
471 // we must still deregister the fd because cancel's deregister_fd may
472 // have run before our register_fd, leaving the fd orphaned.
473 auto expected = select_registration_state::registering;
474 if (!op.registered.compare_exchange_strong(
475 expected, select_registration_state::registered,
476 std::memory_order_acq_rel))
477 {
478 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
479 return std::noop_coroutine();
480 }
481
482 // If cancelled was set before we registered, handle it now.
483 if (op.cancelled.load(std::memory_order_acquire))
484 {
485 auto prev = op.registered.exchange(
486 select_registration_state::unregistered,
487 std::memory_order_acq_rel);
488 if (prev != select_registration_state::unregistered)
489 {
490 svc_.scheduler().deregister_fd(
491 fd_, select_scheduler::event_write);
492 op.impl_ptr = shared_from_this();
493 svc_.post(&op);
494 svc_.work_finished();
495 }
496 }
497 return std::noop_coroutine();
498 }
499
500
1/2
✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
1 op.complete(errno ? errno : EIO, 0);
501 1 op.impl_ptr = shared_from_this();
502 1 svc_.post(&op);
503 1 return std::noop_coroutine();
504 367179 }
505
506 inline std::error_code
507 3 select_socket::shutdown(tcp_socket::shutdown_type what) noexcept
508 {
509 int how;
510
3/4
✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
✓ Branch 2 taken 1 time.
✓ Branch 3 taken 1 time.
3 switch (what)
511 {
512 case tcp_socket::shutdown_receive:
513 1 how = SHUT_RD;
514 1 break;
515 case tcp_socket::shutdown_send:
516 1 how = SHUT_WR;
517 1 break;
518 case tcp_socket::shutdown_both:
519 1 how = SHUT_RDWR;
520 1 break;
521 default:
522 return make_err(EINVAL);
523 }
524
2/4
✓ Branch 0 taken 3 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 3 times.
3 if (::shutdown(fd_, how) != 0)
525 return make_err(errno);
526 3 return {};
527 3 }
528
529 inline std::error_code
530 5 select_socket::set_no_delay(bool value) noexcept
531 {
532 5 int flag = value ? 1 : 0;
533
2/4
✓ Branch 0 taken 5 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 5 times.
5 if (::setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag)) != 0)
534 return make_err(errno);
535 5 return {};
536 5 }
537
538 inline bool
539 5 select_socket::no_delay(std::error_code& ec) const noexcept
540 {
541 5 int flag = 0;
542 5 socklen_t len = sizeof(flag);
543
2/4
✓ Branch 0 taken 5 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 5 times.
5 if (::getsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, &len) != 0)
544 {
545 ec = make_err(errno);
546 return false;
547 }
548 5 ec = {};
549 5 return flag != 0;
550 5 }
551
552 inline std::error_code
553 4 select_socket::set_keep_alive(bool value) noexcept
554 {
555 4 int flag = value ? 1 : 0;
556
2/4
✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 4 times.
4 if (::setsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, sizeof(flag)) != 0)
557 return make_err(errno);
558 4 return {};
559 4 }
560
561 inline bool
562 4 select_socket::keep_alive(std::error_code& ec) const noexcept
563 {
564 4 int flag = 0;
565 4 socklen_t len = sizeof(flag);
566
2/4
✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 4 times.
4 if (::getsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, &len) != 0)
567 {
568 ec = make_err(errno);
569 return false;
570 }
571 4 ec = {};
572 4 return flag != 0;
573 4 }
574
575 inline std::error_code
576 1 select_socket::set_receive_buffer_size(int size) noexcept
577 {
578
2/4
✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 1 time.
1 if (::setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)) != 0)
579 return make_err(errno);
580 1 return {};
581 1 }
582
583 inline int
584 3 select_socket::receive_buffer_size(std::error_code& ec) const noexcept
585 {
586 3 int size = 0;
587 3 socklen_t len = sizeof(size);
588
2/4
✓ Branch 0 taken 3 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 3 times.
3 if (::getsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, &len) != 0)
589 {
590 ec = make_err(errno);
591 return 0;
592 }
593 3 ec = {};
594 3 return size;
595 3 }
596
597 inline std::error_code
598 1 select_socket::set_send_buffer_size(int size) noexcept
599 {
600
2/4
✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 1 time.
1 if (::setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, sizeof(size)) != 0)
601 return make_err(errno);
602 1 return {};
603 1 }
604
605 inline int
606 3 select_socket::send_buffer_size(std::error_code& ec) const noexcept
607 {
608 3 int size = 0;
609 3 socklen_t len = sizeof(size);
610
2/4
✓ Branch 0 taken 3 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 3 times.
3 if (::getsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, &len) != 0)
611 {
612 ec = make_err(errno);
613 return 0;
614 }
615 3 ec = {};
616 3 return size;
617 3 }
618
619 inline std::error_code
620 8 select_socket::set_linger(bool enabled, int timeout) noexcept
621 {
622
2/2
✓ Branch 0 taken 7 times.
✓ Branch 1 taken 1 time.
8 if (timeout < 0)
623 1 return make_err(EINVAL);
624 struct ::linger lg;
625 7 lg.l_onoff = enabled ? 1 : 0;
626 7 lg.l_linger = timeout;
627
2/4
✓ Branch 0 taken 7 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 7 times.
7 if (::setsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, sizeof(lg)) != 0)
628 return make_err(errno);
629 7 return {};
630 8 }
631
632 inline tcp_socket::linger_options
633 3 select_socket::linger(std::error_code& ec) const noexcept
634 {
635 3 struct ::linger lg{};
636 3 socklen_t len = sizeof(lg);
637
2/4
✓ Branch 0 taken 3 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 3 times.
3 if (::getsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, &len) != 0)
638 {
639 ec = make_err(errno);
640 return {};
641 }
642 3 ec = {};
643 3 return {.enabled = lg.l_onoff != 0, .timeout = lg.l_linger};
644 3 }
645
646 inline void
647 41 select_socket::cancel() noexcept
648 {
649 41 auto self = weak_from_this().lock();
650
1/2
✓ Branch 0 taken 41 times.
✗ Branch 1 not taken.
41 if (!self)
651 return;
652
653 164 auto cancel_op = [this, &self](select_op& op, int events) {
654 123 auto prev = op.registered.exchange(
655 select_registration_state::unregistered, std::memory_order_acq_rel);
656 123 op.request_cancel();
657
2/2
✓ Branch 0 taken 101 times.
✓ Branch 1 taken 22 times.
123 if (prev != select_registration_state::unregistered)
658 {
659 22 svc_.scheduler().deregister_fd(fd_, events);
660 22 op.impl_ptr = self;
661 22 svc_.post(&op);
662 22 svc_.work_finished();
663 22 }
664 123 };
665
666
1/2
✓ Branch 0 taken 41 times.
✗ Branch 1 not taken.
41 cancel_op(conn_, select_scheduler::event_write);
667
1/2
✓ Branch 0 taken 41 times.
✗ Branch 1 not taken.
41 cancel_op(rd_, select_scheduler::event_read);
668
1/2
✓ Branch 0 taken 41 times.
✗ Branch 1 not taken.
41 cancel_op(wr_, select_scheduler::event_write);
669 41 }
670
671 inline void
672 21 select_socket::cancel_single_op(select_op& op) noexcept
673 {
674 21 auto self = weak_from_this().lock();
675
1/2
✓ Branch 0 taken 21 times.
✗ Branch 1 not taken.
21 if (!self)
676 return;
677
678 // Called from stop_token callback to cancel a specific pending operation.
679 21 auto prev = op.registered.exchange(
680 select_registration_state::unregistered, std::memory_order_acq_rel);
681 21 op.request_cancel();
682
683
2/2
✓ Branch 0 taken 6 times.
✓ Branch 1 taken 15 times.
21 if (prev != select_registration_state::unregistered)
684 {
685 // Determine which event type to deregister
686 15 int events = 0;
687
2/4
✓ Branch 0 taken 15 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 15 times.
15 if (&op == &conn_ || &op == &wr_)
688 events = select_scheduler::event_write;
689
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 15 times.
15 else if (&op == &rd_)
690 15 events = select_scheduler::event_read;
691
692
1/2
✓ Branch 0 taken 15 times.
✗ Branch 1 not taken.
15 svc_.scheduler().deregister_fd(fd_, events);
693
694 15 op.impl_ptr = self;
695
1/2
✓ Branch 0 taken 15 times.
✗ Branch 1 not taken.
15 svc_.post(&op);
696 15 svc_.work_finished();
697 15 }
698 21 }
699
700 inline void
701 15371 select_socket::close_socket() noexcept
702 {
703 15371 auto self = weak_from_this().lock();
704
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 15371 times.
15371 if (self)
705 {
706 61484 auto cancel_op = [this, &self](select_op& op, int events) {
707 46113 auto prev = op.registered.exchange(
708 select_registration_state::unregistered,
709 std::memory_order_acq_rel);
710 46113 op.request_cancel();
711
2/2
✓ Branch 0 taken 46112 times.
✓ Branch 1 taken 1 time.
46113 if (prev != select_registration_state::unregistered)
712 {
713 1 svc_.scheduler().deregister_fd(fd_, events);
714 1 op.impl_ptr = self;
715 1 svc_.post(&op);
716 1 svc_.work_finished();
717 1 }
718 46113 };
719
720
1/2
✓ Branch 0 taken 15371 times.
✗ Branch 1 not taken.
15371 cancel_op(conn_, select_scheduler::event_write);
721
1/2
✓ Branch 0 taken 15371 times.
✗ Branch 1 not taken.
15371 cancel_op(rd_, select_scheduler::event_read);
722
1/2
✓ Branch 0 taken 15371 times.
✗ Branch 1 not taken.
15371 cancel_op(wr_, select_scheduler::event_write);
723 15371 }
724
725
2/2
✓ Branch 0 taken 3413 times.
✓ Branch 1 taken 11958 times.
15371 if (fd_ >= 0)
726 {
727
2/4
✓ Branch 0 taken 3413 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 3413 times.
✗ Branch 3 not taken.
6826 svc_.scheduler().deregister_fd(
728 3413 fd_, select_scheduler::event_read | select_scheduler::event_write);
729
1/2
✓ Branch 0 taken 3413 times.
✗ Branch 1 not taken.
3413 ::close(fd_);
730 3413 fd_ = -1;
731 3413 }
732
733 15371 local_endpoint_ = endpoint{};
734 15371 remote_endpoint_ = endpoint{};
735 15371 }
736
737 322 inline select_socket_service::select_socket_service(
738 capy::execution_context& ctx)
739 161 : state_(
740
1/2
✓ Branch 0 taken 161 times.
✗ Branch 1 not taken.
161 std::make_unique<select_socket_state>(
741
1/2
✓ Branch 0 taken 161 times.
✗ Branch 1 not taken.
161 ctx.use_service<select_scheduler>()))
742 322 {
743 322 }
744
745 483 inline select_socket_service::~select_socket_service() {}
746
747 inline void
748 161 select_socket_service::shutdown()
749 {
750 161 std::lock_guard lock(state_->mutex_);
751
752
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 161 times.
161 while (auto* impl = state_->socket_list_.pop_front())
753 impl->close_socket();
754
755 // Don't clear socket_ptrs_ here. The scheduler shuts down after us and
756 // drains completed_ops_, calling destroy() on each queued op. Letting
757 // ~state_ release the ptrs (during service destruction, after scheduler
758 // shutdown) keeps every impl alive until all ops have been drained.
759 161 }
760
761 inline io_object::implementation*
762 5122 select_socket_service::construct()
763 {
764 5122 auto impl = std::make_shared<select_socket>(*this);
765 5122 auto* raw = impl.get();
766
767 {
768
1/2
✓ Branch 0 taken 5122 times.
✗ Branch 1 not taken.
5122 std::lock_guard lock(state_->mutex_);
769 5122 state_->socket_list_.push_back(raw);
770
1/2
✓ Branch 0 taken 5122 times.
✗ Branch 1 not taken.
5122 state_->socket_ptrs_.emplace(raw, std::move(impl));
771 5122 }
772
773 5122 return raw;
774 5122 }
775
776 inline void
777 5122 select_socket_service::destroy(io_object::implementation* impl)
778 {
779 5122 auto* select_impl = static_cast<select_socket*>(impl);
780 5122 select_impl->close_socket();
781 5122 std::lock_guard lock(state_->mutex_);
782 5122 state_->socket_list_.remove(select_impl);
783
1/2
✓ Branch 0 taken 5122 times.
✗ Branch 1 not taken.
5122 state_->socket_ptrs_.erase(select_impl);
784 5122 }
785
786 inline std::error_code
787 1714 select_socket_service::open_socket(tcp_socket::implementation& impl)
788 {
789 1714 auto* select_impl = static_cast<select_socket*>(&impl);
790 1714 select_impl->close_socket();
791
792 1714 int fd = ::socket(AF_INET, SOCK_STREAM, 0);
793
1/2
✓ Branch 0 taken 1714 times.
✗ Branch 1 not taken.
1714 if (fd < 0)
794 return make_err(errno);
795
796 // Set non-blocking and close-on-exec
797 1714 int flags = ::fcntl(fd, F_GETFL, 0);
798
1/2
✓ Branch 0 taken 1714 times.
✗ Branch 1 not taken.
1714 if (flags == -1)
799 {
800 int errn = errno;
801 ::close(fd);
802 return make_err(errn);
803 }
804
1/2
✓ Branch 0 taken 1714 times.
✗ Branch 1 not taken.
1714 if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
805 {
806 int errn = errno;
807 ::close(fd);
808 return make_err(errn);
809 }
810
1/2
✓ Branch 0 taken 1714 times.
✗ Branch 1 not taken.
1714 if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
811 {
812 int errn = errno;
813 ::close(fd);
814 return make_err(errn);
815 }
816
817 // Check fd is within select() limits
818
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1714 times.
1714 if (fd >= FD_SETSIZE)
819 {
820 ::close(fd);
821 return make_err(EMFILE); // Too many open files
822 }
823
824 1714 select_impl->fd_ = fd;
825 1714 return {};
826 1714 }
827
828 inline void
829 8535 select_socket_service::close(io_object::handle& h)
830 {
831 8535 static_cast<select_socket*>(h.get())->close_socket();
832 8535 }
833
834 inline void
835 700165 select_socket_service::post(select_op* op)
836 {
837 700165 state_->sched_.post(op);
838 700165 }
839
840 inline void
841 33991 select_socket_service::work_started() noexcept
842 {
843 33991 state_->sched_.work_started();
844 33991 }
845
846 inline void
847 38 select_socket_service::work_finished() noexcept
848 {
849 38 state_->sched_.work_finished();
850 38 }
851
852 } // namespace boost::corosio::detail
853
854 #endif // BOOST_COROSIO_HAS_SELECT
855
856 #endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
857