include/boost/corosio/native/detail/select/select_tcp_acceptor_service.hpp

54.6% Lines (112/205) 95.0% List of functions (19/20) 27.5% Branches (22/80)
f(x) Functions (20)
Function Calls Lines Branches Blocks
boost::corosio::detail::select_tcp_acceptor_service::scheduler() const :76 124x 100.0% 100.0% boost::corosio::detail::select_accept_op::cancel() :93 0 0.0% 0.0% 0.0% boost::corosio::detail::select_accept_op::operator()() :102 1789x 100.0% 100.0% boost::corosio::detail::select_tcp_acceptor::select_tcp_acceptor(boost::corosio::detail::select_tcp_acceptor_service&) :107 134x 100.0% 100.0% boost::corosio::detail::select_tcp_acceptor::accept(std::__1::coroutine_handle<void>, boost::capy::executor_ref, std::__1::stop_token, std::__1::error_code*, boost::corosio::io_object::implementation**) :114 1789x 27.2% 14.0% 25.0% boost::corosio::detail::select_tcp_acceptor::cancel() :266 2x 100.0% 100.0% boost::corosio::detail::select_tcp_acceptor::close_socket() :272 262x 100.0% 100.0% boost::corosio::detail::select_tcp_acceptor_service::select_tcp_acceptor_service(boost::capy::execution_context&) :277 462x 100.0% 50.0% 100.0% boost::corosio::detail::select_tcp_acceptor_service::~select_tcp_acceptor_service() :286 693x 100.0% 100.0% boost::corosio::detail::select_tcp_acceptor_service::shutdown() :289 231x 80.0% 50.0% 75.0% boost::corosio::detail::select_tcp_acceptor_service::construct() :302 67x 100.0% 50.0% 42.0% boost::corosio::detail::select_tcp_acceptor_service::destroy(boost::corosio::io_object::implementation*) :315 67x 100.0% 50.0% 50.0% boost::corosio::detail::select_tcp_acceptor_service::close(boost::corosio::io_object::handle&) :325 131x 100.0% 100.0% boost::corosio::detail::select_tcp_acceptor_service::open_acceptor_socket(boost::corosio::tcp_acceptor::implementation&, int, int, int) :331 64x 65.7% 58.3% 64.0% boost::corosio::detail::select_tcp_acceptor_service::bind_acceptor(boost::corosio::tcp_acceptor::implementation&, boost::corosio::endpoint) :394 63x 100.0% 100.0% boost::corosio::detail::select_tcp_acceptor_service::listen_acceptor(boost::corosio::tcp_acceptor::implementation&, int) :401 62x 100.0% 100.0% boost::corosio::detail::select_tcp_acceptor_service::post(boost::corosio::detail::scheduler_op*) :408 3x 100.0% 100.0% boost::corosio::detail::select_tcp_acceptor_service::work_started() :414 1789x 100.0% 100.0% boost::corosio::detail::select_tcp_acceptor_service::work_finished() :420 3x 100.0% 100.0% boost::corosio::detail::select_tcp_acceptor_service::tcp_service() const :426 1786x 100.0% 50.0% 71.0%
Line Branch TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_TCP_ACCEPTOR_SERVICE_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_TCP_ACCEPTOR_SERVICE_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_SELECT
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/capy/ex/execution_context.hpp>
19 #include <boost/corosio/detail/tcp_acceptor_service.hpp>
20
21 #include <boost/corosio/native/detail/select/select_tcp_acceptor.hpp>
22 #include <boost/corosio/native/detail/select/select_tcp_service.hpp>
23 #include <boost/corosio/native/detail/select/select_scheduler.hpp>
24 #include <boost/corosio/native/detail/reactor/reactor_service_state.hpp>
25
26 #include <boost/corosio/native/detail/reactor/reactor_op_complete.hpp>
27
28 #include <memory>
29 #include <mutex>
30 #include <utility>
31
32 #include <errno.h>
33 #include <fcntl.h>
34 #include <netinet/in.h>
35 #include <sys/select.h>
36 #include <sys/socket.h>
37 #include <unistd.h>
38
39 namespace boost::corosio::detail {
40
41 /// State for select acceptor service.
42 using select_tcp_acceptor_state =
43 reactor_service_state<select_scheduler, select_tcp_acceptor>;
44
45 /** select acceptor service implementation.
46
47 Inherits from tcp_acceptor_service to enable runtime polymorphism.
48 Uses key_type = tcp_acceptor_service for service lookup.
49 */
50 class BOOST_COROSIO_DECL select_tcp_acceptor_service final
51 : public tcp_acceptor_service
52 {
53 public:
54 explicit select_tcp_acceptor_service(capy::execution_context& ctx);
55 ~select_tcp_acceptor_service() override;
56
57 select_tcp_acceptor_service(select_tcp_acceptor_service const&) = delete;
58 select_tcp_acceptor_service&
59 operator=(select_tcp_acceptor_service const&) = delete;
60
61 void shutdown() override;
62
63 io_object::implementation* construct() override;
64 void destroy(io_object::implementation*) override;
65 void close(io_object::handle&) override;
66 std::error_code open_acceptor_socket(
67 tcp_acceptor::implementation& impl,
68 int family,
69 int type,
70 int protocol) override;
71 std::error_code
72 bind_acceptor(tcp_acceptor::implementation& impl, endpoint ep) override;
73 std::error_code
74 listen_acceptor(tcp_acceptor::implementation& impl, int backlog) override;
75
76 124x select_scheduler& scheduler() const noexcept
77 {
78 124x return state_->sched_;
79 }
80 void post(scheduler_op* op);
81 void work_started() noexcept;
82 void work_finished() noexcept;
83
84 /** Get the TCP service for creating peer sockets during accept. */
85 select_tcp_service* tcp_service() const noexcept;
86
87 private:
88 capy::execution_context& ctx_;
89 std::unique_ptr<select_tcp_acceptor_state> state_;
90 };
91
92 inline void
93 select_accept_op::cancel() noexcept
94 {
95 if (acceptor_impl_)
96 acceptor_impl_->cancel_single_op(*this);
97 else
98 request_cancel();
99 }
100
101 inline void
102 1789x select_accept_op::operator()()
103 {
104 1789x complete_accept_op<select_tcp_socket>(*this);
105 1789x }
106
107 134x inline select_tcp_acceptor::select_tcp_acceptor(
108 select_tcp_acceptor_service& svc) noexcept
109 67x : reactor_acceptor(svc)
110 134x {
111 134x }
112
113 inline std::coroutine_handle<>
114 1789x select_tcp_acceptor::accept(
115 std::coroutine_handle<> h,
116 capy::executor_ref ex,
117 std::stop_token token,
118 std::error_code* ec,
119 io_object::implementation** impl_out)
120 {
121 1789x auto& op = acc_;
122 1789x op.reset();
123 1789x op.h = h;
124 1789x op.ex = ex;
125 1789x op.ec_out = ec;
126 1789x op.impl_out = impl_out;
127 1789x op.fd = fd_;
128 1789x op.start(token, this);
129
130 1789x sockaddr_storage peer_storage{};
131 1789x socklen_t addrlen = sizeof(peer_storage);
132 int accepted;
133 1789x do
134 {
135 1789x accepted =
136 1789x ::accept(fd_, reinterpret_cast<sockaddr*>(&peer_storage), &addrlen);
137 3578x }
138
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1789 times.
1789x while (accepted < 0 && errno == EINTR);
139
140
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1789 times.
1789x if (accepted >= 0)
141 {
142 if (accepted >= FD_SETSIZE)
143 {
144 ::close(accepted);
145 op.complete(EINVAL, 0);
146 op.impl_ptr = shared_from_this();
147 svc_.post(&op);
148 return std::noop_coroutine();
149 }
150
151 int flags = ::fcntl(accepted, F_GETFL, 0);
152 if (flags == -1)
153 {
154 int err = errno;
155 ::close(accepted);
156 op.complete(err, 0);
157 op.impl_ptr = shared_from_this();
158 svc_.post(&op);
159 return std::noop_coroutine();
160 }
161
162 if (::fcntl(accepted, F_SETFL, flags | O_NONBLOCK) == -1)
163 {
164 int err = errno;
165 ::close(accepted);
166 op.complete(err, 0);
167 op.impl_ptr = shared_from_this();
168 svc_.post(&op);
169 return std::noop_coroutine();
170 }
171
172 if (::fcntl(accepted, F_SETFD, FD_CLOEXEC) == -1)
173 {
174 int err = errno;
175 ::close(accepted);
176 op.complete(err, 0);
177 op.impl_ptr = shared_from_this();
178 svc_.post(&op);
179 return std::noop_coroutine();
180 }
181
182 {
183 std::lock_guard lock(desc_state_.mutex);
184 desc_state_.read_ready = false;
185 }
186
187 if (svc_.scheduler().try_consume_inline_budget())
188 {
189 auto* socket_svc = svc_.tcp_service();
190 if (socket_svc)
191 {
192 auto& impl =
193 static_cast<select_tcp_socket&>(*socket_svc->construct());
194 impl.set_socket(accepted);
195
196 impl.desc_state_.fd = accepted;
197 {
198 std::lock_guard lock(impl.desc_state_.mutex);
199 impl.desc_state_.read_op = nullptr;
200 impl.desc_state_.write_op = nullptr;
201 impl.desc_state_.connect_op = nullptr;
202 }
203 socket_svc->scheduler().register_descriptor(
204 accepted, &impl.desc_state_);
205
206 impl.set_endpoints(
207 local_endpoint_, from_sockaddr(peer_storage));
208
209 *ec = {};
210 if (impl_out)
211 *impl_out = &impl;
212 }
213 else
214 {
215 ::close(accepted);
216 *ec = make_err(ENOENT);
217 if (impl_out)
218 *impl_out = nullptr;
219 }
220 return dispatch_coro(ex, h);
221 }
222
223 op.accepted_fd = accepted;
224 op.peer_storage = peer_storage;
225 op.complete(0, 0);
226 op.impl_ptr = shared_from_this();
227 svc_.post(&op);
228 return std::noop_coroutine();
229 }
230
231
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 1789 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
1789x if (errno == EAGAIN || errno == EWOULDBLOCK)
232 {
233
1/2
✓ Branch 0 taken 1789 times.
✗ Branch 1 not taken.
1789x op.impl_ptr = shared_from_this();
234 1789x svc_.work_started();
235
236 1789x std::lock_guard lock(desc_state_.mutex);
237 1789x bool io_done = false;
238
1/2
✓ Branch 0 taken 1789 times.
✗ Branch 1 not taken.
1789x if (desc_state_.read_ready)
239 {
240 desc_state_.read_ready = false;
241 op.perform_io();
242 io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
243 if (!io_done)
244 op.errn = 0;
245 }
246
247
2/4
✓ Branch 0 taken 1789 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 1789 times.
1789x if (io_done || op.cancelled.load(std::memory_order_acquire))
248 {
249 svc_.post(&op);
250 svc_.work_finished();
251 }
252 else
253 {
254 1789x desc_state_.read_op = &op;
255 }
256 1789x return std::noop_coroutine();
257 1789x }
258
259 op.complete(errno, 0);
260 op.impl_ptr = shared_from_this();
261 svc_.post(&op);
262 return std::noop_coroutine();
263 1789x }
264
265 inline void
266 2x select_tcp_acceptor::cancel() noexcept
267 {
268 2x do_cancel();
269 2x }
270
271 inline void
272 262x select_tcp_acceptor::close_socket() noexcept
273 {
274 262x do_close_socket();
275 262x }
276
277 462x inline select_tcp_acceptor_service::select_tcp_acceptor_service(
278 capy::execution_context& ctx)
279 231x : ctx_(ctx)
280 231x , state_(
281
1/2
✓ Branch 0 taken 231 times.
✗ Branch 1 not taken.
231x std::make_unique<select_tcp_acceptor_state>(
282
1/2
✓ Branch 0 taken 231 times.
✗ Branch 1 not taken.
231x ctx.use_service<select_scheduler>()))
283 462x {
284 462x }
285
286 693x inline select_tcp_acceptor_service::~select_tcp_acceptor_service() {}
287
288 inline void
289 231x select_tcp_acceptor_service::shutdown()
290 {
291 231x std::lock_guard lock(state_->mutex_);
292
293
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 231 times.
231x while (auto* impl = state_->impl_list_.pop_front())
294 impl->close_socket();
295
296 // Don't clear impl_ptrs_ here — same rationale as
297 // select_tcp_service::shutdown(). Let ~state_ release ptrs
298 // after scheduler shutdown has drained all queued ops.
299 231x }
300
301 inline io_object::implementation*
302 67x select_tcp_acceptor_service::construct()
303 {
304 67x auto impl = std::make_shared<select_tcp_acceptor>(*this);
305 67x auto* raw = impl.get();
306
307
1/2
✓ Branch 0 taken 67 times.
✗ Branch 1 not taken.
67x std::lock_guard lock(state_->mutex_);
308
1/2
✓ Branch 0 taken 67 times.
✗ Branch 1 not taken.
67x state_->impl_ptrs_.emplace(raw, std::move(impl));
309 67x state_->impl_list_.push_back(raw);
310
311 67x return raw;
312 67x }
313
314 inline void
315 67x select_tcp_acceptor_service::destroy(io_object::implementation* impl)
316 {
317 67x auto* select_impl = static_cast<select_tcp_acceptor*>(impl);
318 67x select_impl->close_socket();
319 67x std::lock_guard lock(state_->mutex_);
320 67x state_->impl_list_.remove(select_impl);
321
1/2
✓ Branch 0 taken 67 times.
✗ Branch 1 not taken.
67x state_->impl_ptrs_.erase(select_impl);
322 67x }
323
324 inline void
325 131x select_tcp_acceptor_service::close(io_object::handle& h)
326 {
327 131x static_cast<select_tcp_acceptor*>(h.get())->close_socket();
328 131x }
329
330 inline std::error_code
331 64x select_tcp_acceptor_service::open_acceptor_socket(
332 tcp_acceptor::implementation& impl, int family, int type, int protocol)
333 {
334 64x auto* select_impl = static_cast<select_tcp_acceptor*>(&impl);
335 64x select_impl->close_socket();
336
337 64x int fd = ::socket(family, type, protocol);
338
1/2
✓ Branch 0 taken 64 times.
✗ Branch 1 not taken.
64x if (fd < 0)
339 return make_err(errno);
340
341 64x int flags = ::fcntl(fd, F_GETFL, 0);
342
1/2
✓ Branch 0 taken 64 times.
✗ Branch 1 not taken.
64x if (flags == -1)
343 {
344 int errn = errno;
345 ::close(fd);
346 return make_err(errn);
347 }
348
1/2
✓ Branch 0 taken 64 times.
✗ Branch 1 not taken.
64x if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
349 {
350 int errn = errno;
351 ::close(fd);
352 return make_err(errn);
353 }
354
1/2
✓ Branch 0 taken 64 times.
✗ Branch 1 not taken.
64x if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
355 {
356 int errn = errno;
357 ::close(fd);
358 return make_err(errn);
359 }
360
361
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 64 times.
64x if (fd >= FD_SETSIZE)
362 {
363 ::close(fd);
364 return make_err(EMFILE);
365 }
366
367
2/2
✓ Branch 0 taken 56 times.
✓ Branch 1 taken 8 times.
64x if (family == AF_INET6)
368 {
369 8x int val = 0; // dual-stack default
370 8x ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof(val));
371 8x }
372
373 #ifdef SO_NOSIGPIPE
374 {
375 64x int nosig = 1;
376 64x ::setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &nosig, sizeof(nosig));
377 }
378 #endif
379
380 64x select_impl->fd_ = fd;
381
382 // Set up descriptor state but do NOT register with reactor yet
383 // (registration happens in do_listen via reactor_acceptor base)
384 64x select_impl->desc_state_.fd = fd;
385 {
386 64x std::lock_guard lock(select_impl->desc_state_.mutex);
387 64x select_impl->desc_state_.read_op = nullptr;
388 64x }
389
390 64x return {};
391 64x }
392
393 inline std::error_code
394 63x select_tcp_acceptor_service::bind_acceptor(
395 tcp_acceptor::implementation& impl, endpoint ep)
396 {
397 63x return static_cast<select_tcp_acceptor*>(&impl)->do_bind(ep);
398 }
399
400 inline std::error_code
401 62x select_tcp_acceptor_service::listen_acceptor(
402 tcp_acceptor::implementation& impl, int backlog)
403 {
404 62x return static_cast<select_tcp_acceptor*>(&impl)->do_listen(backlog);
405 }
406
407 inline void
408 3x select_tcp_acceptor_service::post(scheduler_op* op)
409 {
410 3x state_->sched_.post(op);
411 3x }
412
413 inline void
414 1789x select_tcp_acceptor_service::work_started() noexcept
415 {
416 1789x state_->sched_.work_started();
417 1789x }
418
419 inline void
420 3x select_tcp_acceptor_service::work_finished() noexcept
421 {
422 3x state_->sched_.work_finished();
423 3x }
424
425 inline select_tcp_service*
426 1786x select_tcp_acceptor_service::tcp_service() const noexcept
427 {
428 1786x auto* svc = ctx_.find_service<detail::tcp_service>();
429
2/4
✓ Branch 0 taken 1786 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 1786 times.
✗ Branch 3 not taken.
1786x return svc ? dynamic_cast<select_tcp_service*>(svc) : nullptr;
430 }
431
432 } // namespace boost::corosio::detail
433
434 #endif // BOOST_COROSIO_HAS_SELECT
435
436 #endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_TCP_ACCEPTOR_SERVICE_HPP
437