include/boost/corosio/native/detail/epoll/epoll_tcp_acceptor_service.hpp

76.7% Lines (112/146) 100.0% List of functions (20/20)
f(x) Functions (20)
Function Calls Lines Blocks
boost::corosio::detail::epoll_tcp_acceptor_service::scheduler() const :75 2284x 100.0% 100.0% boost::corosio::detail::epoll_accept_op::cancel() :92 6x 80.0% 75.0% boost::corosio::detail::epoll_accept_op::operator()() :101 6343x 100.0% 100.0% boost::corosio::detail::epoll_tcp_acceptor::epoll_tcp_acceptor(boost::corosio::detail::epoll_tcp_acceptor_service&) :106 1146x 100.0% 100.0% boost::corosio::detail::epoll_tcp_acceptor::accept(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, std::stop_token, std::error_code*, boost::corosio::io_object::implementation**) :113 6343x 52.3% 47.0% boost::corosio::detail::epoll_tcp_acceptor::cancel() :227 2x 100.0% 100.0% boost::corosio::detail::epoll_tcp_acceptor::close_socket() :233 4582x 100.0% 100.0% boost::corosio::detail::epoll_tcp_acceptor_service::epoll_tcp_acceptor_service(boost::capy::execution_context&) :238 339x 100.0% 83.0% boost::corosio::detail::epoll_tcp_acceptor_service::~epoll_tcp_acceptor_service() :247 678x 100.0% 100.0% boost::corosio::detail::epoll_tcp_acceptor_service::shutdown() :250 339x 80.0% 90.0% boost::corosio::detail::epoll_tcp_acceptor_service::construct() :263 1146x 100.0% 78.0% boost::corosio::detail::epoll_tcp_acceptor_service::destroy(boost::corosio::io_object::implementation*) :276 1146x 100.0% 83.0% boost::corosio::detail::epoll_tcp_acceptor_service::close(boost::corosio::io_object::handle&) :286 2291x 100.0% 100.0% boost::corosio::detail::epoll_tcp_acceptor_service::open_acceptor_socket(boost::corosio::tcp_acceptor::implementation&, int, int, int) :292 1145x 93.3% 93.0% boost::corosio::detail::epoll_tcp_acceptor_service::bind_acceptor(boost::corosio::tcp_acceptor::implementation&, boost::corosio::endpoint) :321 1144x 100.0% 100.0% boost::corosio::detail::epoll_tcp_acceptor_service::listen_acceptor(boost::corosio::tcp_acceptor::implementation&, int) :328 1141x 100.0% 100.0% boost::corosio::detail::epoll_tcp_acceptor_service::post(boost::corosio::detail::scheduler_op*) :335 11x 100.0% 100.0% boost::corosio::detail::epoll_tcp_acceptor_service::work_started() :341 6341x 100.0% 100.0% boost::corosio::detail::epoll_tcp_acceptor_service::work_finished() :347 9x 100.0% 100.0% boost::corosio::detail::epoll_tcp_acceptor_service::tcp_service() const :353 6334x 100.0% 78.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_TCP_ACCEPTOR_SERVICE_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_TCP_ACCEPTOR_SERVICE_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_EPOLL
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/capy/ex/execution_context.hpp>
19 #include <boost/corosio/detail/tcp_acceptor_service.hpp>
20
21 #include <boost/corosio/native/detail/epoll/epoll_tcp_acceptor.hpp>
22 #include <boost/corosio/native/detail/epoll/epoll_tcp_service.hpp>
23 #include <boost/corosio/native/detail/epoll/epoll_scheduler.hpp>
24 #include <boost/corosio/native/detail/reactor/reactor_service_state.hpp>
25
26 #include <boost/corosio/native/detail/reactor/reactor_op_complete.hpp>
27
28 #include <memory>
29 #include <mutex>
30 #include <utility>
31
32 #include <errno.h>
33 #include <netinet/in.h>
34 #include <sys/epoll.h>
35 #include <sys/socket.h>
36 #include <unistd.h>
37
38 namespace boost::corosio::detail {
39
40 /// State for epoll acceptor service.
41 using epoll_tcp_acceptor_state =
42 reactor_service_state<epoll_scheduler, epoll_tcp_acceptor>;
43
44 /** epoll acceptor service implementation.
45
46 Inherits from tcp_acceptor_service to enable runtime polymorphism.
47 Uses key_type = tcp_acceptor_service for service lookup.
48 */
49 class BOOST_COROSIO_DECL epoll_tcp_acceptor_service final
50 : public tcp_acceptor_service
51 {
52 public:
53 explicit epoll_tcp_acceptor_service(capy::execution_context& ctx);
54 ~epoll_tcp_acceptor_service() override;
55
56 epoll_tcp_acceptor_service(epoll_tcp_acceptor_service const&) = delete;
57 epoll_tcp_acceptor_service&
58 operator=(epoll_tcp_acceptor_service const&) = delete;
59
60 void shutdown() override;
61
62 io_object::implementation* construct() override;
63 void destroy(io_object::implementation*) override;
64 void close(io_object::handle&) override;
65 std::error_code open_acceptor_socket(
66 tcp_acceptor::implementation& impl,
67 int family,
68 int type,
69 int protocol) override;
70 std::error_code
71 bind_acceptor(tcp_acceptor::implementation& impl, endpoint ep) override;
72 std::error_code
73 listen_acceptor(tcp_acceptor::implementation& impl, int backlog) override;
74
75 2284x epoll_scheduler& scheduler() const noexcept
76 {
77 2284x return state_->sched_;
78 }
79 void post(scheduler_op* op);
80 void work_started() noexcept;
81 void work_finished() noexcept;
82
83 /** Get the TCP service for creating peer sockets during accept. */
84 epoll_tcp_service* tcp_service() const noexcept;
85
86 private:
87 capy::execution_context& ctx_;
88 std::unique_ptr<epoll_tcp_acceptor_state> state_;
89 };
90
91 inline void
92 6x epoll_accept_op::cancel() noexcept
93 {
94 6x if (acceptor_impl_)
95 6x acceptor_impl_->cancel_single_op(*this);
96 else
97 request_cancel();
98 6x }
99
100 inline void
101 6343x epoll_accept_op::operator()()
102 {
103 6343x complete_accept_op<epoll_tcp_socket>(*this);
104 6343x }
105
106 1146x inline epoll_tcp_acceptor::epoll_tcp_acceptor(
107 1146x epoll_tcp_acceptor_service& svc) noexcept
108 1146x : reactor_acceptor(svc)
109 {
110 1146x }
111
112 inline std::coroutine_handle<>
113 6343x epoll_tcp_acceptor::accept(
114 std::coroutine_handle<> h,
115 capy::executor_ref ex,
116 std::stop_token token,
117 std::error_code* ec,
118 io_object::implementation** impl_out)
119 {
120 6343x auto& op = acc_;
121 6343x op.reset();
122 6343x op.h = h;
123 6343x op.ex = ex;
124 6343x op.ec_out = ec;
125 6343x op.impl_out = impl_out;
126 6343x op.fd = fd_;
127 6343x op.start(token, this);
128
129 6343x sockaddr_storage peer_storage{};
130 6343x socklen_t addrlen = sizeof(peer_storage);
131 int accepted;
132 do
133 {
134 6343x accepted = ::accept4(
135 fd_, reinterpret_cast<sockaddr*>(&peer_storage), &addrlen,
136 SOCK_NONBLOCK | SOCK_CLOEXEC);
137 }
138 6343x while (accepted < 0 && errno == EINTR);
139
140 6343x if (accepted >= 0)
141 {
142 {
143 2x std::lock_guard lock(desc_state_.mutex);
144 2x desc_state_.read_ready = false;
145 2x }
146
147 2x if (svc_.scheduler().try_consume_inline_budget())
148 {
149 auto* socket_svc = svc_.tcp_service();
150 if (socket_svc)
151 {
152 auto& impl =
153 static_cast<epoll_tcp_socket&>(*socket_svc->construct());
154 impl.set_socket(accepted);
155
156 impl.desc_state_.fd = accepted;
157 {
158 std::lock_guard lock(impl.desc_state_.mutex);
159 impl.desc_state_.read_op = nullptr;
160 impl.desc_state_.write_op = nullptr;
161 impl.desc_state_.connect_op = nullptr;
162 }
163 socket_svc->scheduler().register_descriptor(
164 accepted, &impl.desc_state_);
165
166 impl.set_endpoints(
167 local_endpoint_, from_sockaddr(peer_storage));
168
169 *ec = {};
170 if (impl_out)
171 *impl_out = &impl;
172 }
173 else
174 {
175 ::close(accepted);
176 *ec = make_err(ENOENT);
177 if (impl_out)
178 *impl_out = nullptr;
179 }
180 return dispatch_coro(ex, h);
181 }
182
183 2x op.accepted_fd = accepted;
184 2x op.peer_storage = peer_storage;
185 2x op.complete(0, 0);
186 2x op.impl_ptr = shared_from_this();
187 2x svc_.post(&op);
188 2x return std::noop_coroutine();
189 }
190
191 6341x if (errno == EAGAIN || errno == EWOULDBLOCK)
192 {
193 6341x op.impl_ptr = shared_from_this();
194 6341x svc_.work_started();
195
196 6341x std::lock_guard lock(desc_state_.mutex);
197 6341x bool io_done = false;
198 6341x if (desc_state_.read_ready)
199 {
200 desc_state_.read_ready = false;
201 op.perform_io();
202 io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
203 if (!io_done)
204 op.errn = 0;
205 }
206
207 6341x if (io_done || op.cancelled.load(std::memory_order_acquire))
208 {
209 svc_.post(&op);
210 svc_.work_finished();
211 }
212 else
213 {
214 6341x desc_state_.read_op = &op;
215 }
216 6341x return std::noop_coroutine();
217 6341x }
218
219 op.complete(errno, 0);
220 op.impl_ptr = shared_from_this();
221 svc_.post(&op);
222 // completion is always posted to scheduler queue, never inline.
223 return std::noop_coroutine();
224 }
225
226 inline void
227 2x epoll_tcp_acceptor::cancel() noexcept
228 {
229 2x do_cancel();
230 2x }
231
232 inline void
233 4582x epoll_tcp_acceptor::close_socket() noexcept
234 {
235 4582x do_close_socket();
236 4582x }
237
238 339x inline epoll_tcp_acceptor_service::epoll_tcp_acceptor_service(
239 339x capy::execution_context& ctx)
240 339x : ctx_(ctx)
241 339x , state_(
242 std::make_unique<epoll_tcp_acceptor_state>(
243 339x ctx.use_service<epoll_scheduler>()))
244 {
245 339x }
246
247 678x inline epoll_tcp_acceptor_service::~epoll_tcp_acceptor_service() {}
248
249 inline void
250 339x epoll_tcp_acceptor_service::shutdown()
251 {
252 339x std::lock_guard lock(state_->mutex_);
253
254 339x while (auto* impl = state_->impl_list_.pop_front())
255 impl->close_socket();
256
257 // Don't clear impl_ptrs_ here — same rationale as
258 // epoll_tcp_service::shutdown(). Let ~state_ release ptrs
259 // after scheduler shutdown has drained all queued ops.
260 339x }
261
262 inline io_object::implementation*
263 1146x epoll_tcp_acceptor_service::construct()
264 {
265 1146x auto impl = std::make_shared<epoll_tcp_acceptor>(*this);
266 1146x auto* raw = impl.get();
267
268 1146x std::lock_guard lock(state_->mutex_);
269 1146x state_->impl_ptrs_.emplace(raw, std::move(impl));
270 1146x state_->impl_list_.push_back(raw);
271
272 1146x return raw;
273 1146x }
274
275 inline void
276 1146x epoll_tcp_acceptor_service::destroy(io_object::implementation* impl)
277 {
278 1146x auto* epoll_impl = static_cast<epoll_tcp_acceptor*>(impl);
279 1146x epoll_impl->close_socket();
280 1146x std::lock_guard lock(state_->mutex_);
281 1146x state_->impl_list_.remove(epoll_impl);
282 1146x state_->impl_ptrs_.erase(epoll_impl);
283 1146x }
284
285 inline void
286 2291x epoll_tcp_acceptor_service::close(io_object::handle& h)
287 {
288 2291x static_cast<epoll_tcp_acceptor*>(h.get())->close_socket();
289 2291x }
290
291 inline std::error_code
292 1145x epoll_tcp_acceptor_service::open_acceptor_socket(
293 tcp_acceptor::implementation& impl, int family, int type, int protocol)
294 {
295 1145x auto* epoll_impl = static_cast<epoll_tcp_acceptor*>(&impl);
296 1145x epoll_impl->close_socket();
297
298 1145x int fd = ::socket(family, type | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol);
299 1145x if (fd < 0)
300 return make_err(errno);
301
302 1145x if (family == AF_INET6)
303 {
304 8x int val = 0; // dual-stack default
305 8x ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof(val));
306 }
307
308 1145x epoll_impl->fd_ = fd;
309
310 // Set up descriptor state but do NOT register with epoll yet
311 1145x epoll_impl->desc_state_.fd = fd;
312 {
313 1145x std::lock_guard lock(epoll_impl->desc_state_.mutex);
314 1145x epoll_impl->desc_state_.read_op = nullptr;
315 1145x }
316
317 1145x return {};
318 }
319
320 inline std::error_code
321 1144x epoll_tcp_acceptor_service::bind_acceptor(
322 tcp_acceptor::implementation& impl, endpoint ep)
323 {
324 1144x return static_cast<epoll_tcp_acceptor*>(&impl)->do_bind(ep);
325 }
326
327 inline std::error_code
328 1141x epoll_tcp_acceptor_service::listen_acceptor(
329 tcp_acceptor::implementation& impl, int backlog)
330 {
331 1141x return static_cast<epoll_tcp_acceptor*>(&impl)->do_listen(backlog);
332 }
333
334 inline void
335 11x epoll_tcp_acceptor_service::post(scheduler_op* op)
336 {
337 11x state_->sched_.post(op);
338 11x }
339
340 inline void
341 6341x epoll_tcp_acceptor_service::work_started() noexcept
342 {
343 6341x state_->sched_.work_started();
344 6341x }
345
346 inline void
347 9x epoll_tcp_acceptor_service::work_finished() noexcept
348 {
349 9x state_->sched_.work_finished();
350 9x }
351
352 inline epoll_tcp_service*
353 6334x epoll_tcp_acceptor_service::tcp_service() const noexcept
354 {
355 6334x auto* svc = ctx_.find_service<detail::tcp_service>();
356 6334x return svc ? dynamic_cast<epoll_tcp_service*>(svc) : nullptr;
357 }
358
359 } // namespace boost::corosio::detail
360
361 #endif // BOOST_COROSIO_HAS_EPOLL
362
363 #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_TCP_ACCEPTOR_SERVICE_HPP
364