include/boost/corosio/native/detail/epoll/epoll_tcp_acceptor_service.hpp

76.0% Lines (111/146) 100.0% List of functions (20/20)
epoll_tcp_acceptor_service.hpp
f(x) Functions (20)
Function Calls Lines Blocks
boost::corosio::detail::epoll_tcp_acceptor_service::scheduler() const :76 2273x 100.0% 100.0% boost::corosio::detail::epoll_accept_op::cancel() :93 6x 80.0% 75.0% boost::corosio::detail::epoll_accept_op::operator()() :102 4911x 100.0% 100.0% boost::corosio::detail::epoll_tcp_acceptor::epoll_tcp_acceptor(boost::corosio::detail::epoll_tcp_acceptor_service&) :107 1143x 100.0% 100.0% boost::corosio::detail::epoll_tcp_acceptor::accept(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, std::stop_token, std::error_code*, boost::corosio::io_object::implementation**) :114 4911x 51.5% 47.0% boost::corosio::detail::epoll_tcp_acceptor::cancel() :229 2x 100.0% 100.0% boost::corosio::detail::epoll_tcp_acceptor::close_socket() :235 4568x 100.0% 100.0% boost::corosio::detail::epoll_tcp_acceptor_service::epoll_tcp_acceptor_service(boost::capy::execution_context&, boost::corosio::detail::epoll_tcp_service&) :240 401x 100.0% 83.0% boost::corosio::detail::epoll_tcp_acceptor_service::~epoll_tcp_acceptor_service() :249 802x 100.0% 100.0% boost::corosio::detail::epoll_tcp_acceptor_service::shutdown() :252 401x 80.0% 90.0% boost::corosio::detail::epoll_tcp_acceptor_service::construct() :265 1143x 100.0% 78.0% boost::corosio::detail::epoll_tcp_acceptor_service::destroy(boost::corosio::io_object::implementation*) :278 1143x 100.0% 83.0% boost::corosio::detail::epoll_tcp_acceptor_service::close(boost::corosio::io_object::handle&) :288 2284x 100.0% 100.0% boost::corosio::detail::epoll_tcp_acceptor_service::open_acceptor_socket(boost::corosio::tcp_acceptor::implementation&, int, int, int) :294 1141x 93.3% 93.0% boost::corosio::detail::epoll_tcp_acceptor_service::bind_acceptor(boost::corosio::tcp_acceptor::implementation&, boost::corosio::endpoint) :323 1140x 100.0% 100.0% boost::corosio::detail::epoll_tcp_acceptor_service::listen_acceptor(boost::corosio::tcp_acceptor::implementation&, int) :330 1135x 100.0% 100.0% boost::corosio::detail::epoll_tcp_acceptor_service::post(boost::corosio::detail::scheduler_op*) :337 12x 100.0% 100.0% boost::corosio::detail::epoll_tcp_acceptor_service::work_started() :343 4908x 100.0% 100.0% boost::corosio::detail::epoll_tcp_acceptor_service::work_finished() :349 9x 100.0% 100.0% boost::corosio::detail::epoll_tcp_acceptor_service::tcp_service() const :355 4902x 100.0% 100.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_TCP_ACCEPTOR_SERVICE_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_TCP_ACCEPTOR_SERVICE_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_EPOLL
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/capy/ex/execution_context.hpp>
19 #include <boost/corosio/detail/tcp_acceptor_service.hpp>
20
21 #include <boost/corosio/native/detail/epoll/epoll_tcp_acceptor.hpp>
22 #include <boost/corosio/native/detail/epoll/epoll_tcp_service.hpp>
23 #include <boost/corosio/native/detail/epoll/epoll_scheduler.hpp>
24 #include <boost/corosio/native/detail/reactor/reactor_service_state.hpp>
25
26 #include <boost/corosio/native/detail/reactor/reactor_op_complete.hpp>
27
28 #include <memory>
29 #include <mutex>
30 #include <utility>
31
32 #include <errno.h>
33 #include <netinet/in.h>
34 #include <sys/epoll.h>
35 #include <sys/socket.h>
36 #include <unistd.h>
37
38 namespace boost::corosio::detail {
39
40 /// State for epoll acceptor service.
41 using epoll_tcp_acceptor_state =
42 reactor_service_state<epoll_scheduler, epoll_tcp_acceptor>;
43
44 /** epoll acceptor service implementation.
45
46 Inherits from tcp_acceptor_service to enable runtime polymorphism.
47 Uses key_type = tcp_acceptor_service for service lookup.
48 */
49 class BOOST_COROSIO_DECL epoll_tcp_acceptor_service final
50 : public tcp_acceptor_service
51 {
52 public:
53 explicit epoll_tcp_acceptor_service(
54 capy::execution_context& ctx, epoll_tcp_service& tcp_svc);
55 ~epoll_tcp_acceptor_service() override;
56
57 epoll_tcp_acceptor_service(epoll_tcp_acceptor_service const&) = delete;
58 epoll_tcp_acceptor_service&
59 operator=(epoll_tcp_acceptor_service const&) = delete;
60
61 void shutdown() override;
62
63 io_object::implementation* construct() override;
64 void destroy(io_object::implementation*) override;
65 void close(io_object::handle&) override;
66 std::error_code open_acceptor_socket(
67 tcp_acceptor::implementation& impl,
68 int family,
69 int type,
70 int protocol) override;
71 std::error_code
72 bind_acceptor(tcp_acceptor::implementation& impl, endpoint ep) override;
73 std::error_code
74 listen_acceptor(tcp_acceptor::implementation& impl, int backlog) override;
75
76 2273x epoll_scheduler& scheduler() const noexcept
77 {
78 2273x return state_->sched_;
79 }
80 void post(scheduler_op* op);
81 void work_started() noexcept;
82 void work_finished() noexcept;
83
84 /** Get the TCP service for creating peer sockets during accept. */
85 epoll_tcp_service* tcp_service() const noexcept;
86
87 private:
88 epoll_tcp_service* tcp_svc_;
89 std::unique_ptr<epoll_tcp_acceptor_state> state_;
90 };
91
92 inline void
93 6x epoll_accept_op::cancel() noexcept
94 {
95 6x if (acceptor_impl_)
96 6x acceptor_impl_->cancel_single_op(*this);
97 else
98 request_cancel();
99 6x }
100
101 inline void
102 4911x epoll_accept_op::operator()()
103 {
104 4911x complete_accept_op<epoll_tcp_socket>(*this);
105 4911x }
106
107 1143x inline epoll_tcp_acceptor::epoll_tcp_acceptor(
108 1143x epoll_tcp_acceptor_service& svc) noexcept
109 1143x : reactor_acceptor(svc)
110 {
111 1143x }
112
113 inline std::coroutine_handle<>
114 4911x epoll_tcp_acceptor::accept(
115 std::coroutine_handle<> h,
116 capy::executor_ref ex,
117 std::stop_token token,
118 std::error_code* ec,
119 io_object::implementation** impl_out)
120 {
121 4911x auto& op = acc_;
122 4911x op.reset();
123 4911x op.h = h;
124 4911x op.ex = ex;
125 4911x op.ec_out = ec;
126 4911x op.impl_out = impl_out;
127 4911x op.fd = fd_;
128 4911x op.start(token, this);
129
130 4911x sockaddr_storage peer_storage{};
131 4911x socklen_t addrlen = sizeof(peer_storage);
132 int accepted;
133 do
134 {
135 4911x accepted = ::accept4(
136 fd_, reinterpret_cast<sockaddr*>(&peer_storage), &addrlen,
137 SOCK_NONBLOCK | SOCK_CLOEXEC);
138 }
139 4911x while (accepted < 0 && errno == EINTR);
140
141 4911x if (accepted >= 0)
142 {
143 {
144 3x std::lock_guard lock(desc_state_.mutex);
145 3x desc_state_.read_ready = false;
146 3x }
147
148 3x if (svc_.scheduler().try_consume_inline_budget())
149 {
150 auto* socket_svc = svc_.tcp_service();
151 if (socket_svc)
152 {
153 auto& impl =
154 static_cast<epoll_tcp_socket&>(*socket_svc->construct());
155 impl.set_socket(accepted);
156
157 impl.desc_state_.fd = accepted;
158 {
159 std::lock_guard lock(impl.desc_state_.mutex);
160 impl.desc_state_.read_op = nullptr;
161 impl.desc_state_.write_op = nullptr;
162 impl.desc_state_.connect_op = nullptr;
163 }
164 socket_svc->scheduler().register_descriptor(
165 accepted, &impl.desc_state_);
166
167 impl.set_endpoints(
168 local_endpoint_, from_sockaddr(peer_storage));
169
170 *ec = {};
171 if (impl_out)
172 *impl_out = &impl;
173 }
174 else
175 {
176 ::close(accepted);
177 *ec = make_err(ENOENT);
178 if (impl_out)
179 *impl_out = nullptr;
180 }
181 op.cont_op.cont.h = h;
182 return dispatch_coro(ex, op.cont_op.cont);
183 }
184
185 3x op.accepted_fd = accepted;
186 3x op.peer_storage = peer_storage;
187 3x op.complete(0, 0);
188 3x op.impl_ptr = shared_from_this();
189 3x svc_.post(&op);
190 3x return std::noop_coroutine();
191 }
192
193 4908x if (errno == EAGAIN || errno == EWOULDBLOCK)
194 {
195 4908x op.impl_ptr = shared_from_this();
196 4908x svc_.work_started();
197
198 4908x std::lock_guard lock(desc_state_.mutex);
199 4908x bool io_done = false;
200 4908x if (desc_state_.read_ready)
201 {
202 desc_state_.read_ready = false;
203 op.perform_io();
204 io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
205 if (!io_done)
206 op.errn = 0;
207 }
208
209 4908x if (io_done || op.cancelled.load(std::memory_order_acquire))
210 {
211 svc_.post(&op);
212 svc_.work_finished();
213 }
214 else
215 {
216 4908x desc_state_.read_op = &op;
217 }
218 4908x return std::noop_coroutine();
219 4908x }
220
221 op.complete(errno, 0);
222 op.impl_ptr = shared_from_this();
223 svc_.post(&op);
224 // completion is always posted to scheduler queue, never inline.
225 return std::noop_coroutine();
226 }
227
228 inline void
229 2x epoll_tcp_acceptor::cancel() noexcept
230 {
231 2x do_cancel();
232 2x }
233
234 inline void
235 4568x epoll_tcp_acceptor::close_socket() noexcept
236 {
237 4568x do_close_socket();
238 4568x }
239
240 401x inline epoll_tcp_acceptor_service::epoll_tcp_acceptor_service(
241 401x capy::execution_context& ctx, epoll_tcp_service& tcp_svc)
242 401x : tcp_svc_(&tcp_svc)
243 401x , state_(
244 std::make_unique<epoll_tcp_acceptor_state>(
245 401x ctx.use_service<epoll_scheduler>()))
246 {
247 401x }
248
249 802x inline epoll_tcp_acceptor_service::~epoll_tcp_acceptor_service() {}
250
251 inline void
252 401x epoll_tcp_acceptor_service::shutdown()
253 {
254 401x std::lock_guard lock(state_->mutex_);
255
256 401x while (auto* impl = state_->impl_list_.pop_front())
257 impl->close_socket();
258
259 // Don't clear impl_ptrs_ here — same rationale as
260 // epoll_tcp_service::shutdown(). Let ~state_ release ptrs
261 // after scheduler shutdown has drained all queued ops.
262 401x }
263
264 inline io_object::implementation*
265 1143x epoll_tcp_acceptor_service::construct()
266 {
267 1143x auto impl = std::make_shared<epoll_tcp_acceptor>(*this);
268 1143x auto* raw = impl.get();
269
270 1143x std::lock_guard lock(state_->mutex_);
271 1143x state_->impl_ptrs_.emplace(raw, std::move(impl));
272 1143x state_->impl_list_.push_back(raw);
273
274 1143x return raw;
275 1143x }
276
277 inline void
278 1143x epoll_tcp_acceptor_service::destroy(io_object::implementation* impl)
279 {
280 1143x auto* epoll_impl = static_cast<epoll_tcp_acceptor*>(impl);
281 1143x epoll_impl->close_socket();
282 1143x std::lock_guard lock(state_->mutex_);
283 1143x state_->impl_list_.remove(epoll_impl);
284 1143x state_->impl_ptrs_.erase(epoll_impl);
285 1143x }
286
287 inline void
288 2284x epoll_tcp_acceptor_service::close(io_object::handle& h)
289 {
290 2284x static_cast<epoll_tcp_acceptor*>(h.get())->close_socket();
291 2284x }
292
293 inline std::error_code
294 1141x epoll_tcp_acceptor_service::open_acceptor_socket(
295 tcp_acceptor::implementation& impl, int family, int type, int protocol)
296 {
297 1141x auto* epoll_impl = static_cast<epoll_tcp_acceptor*>(&impl);
298 1141x epoll_impl->close_socket();
299
300 1141x int fd = ::socket(family, type | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol);
301 1141x if (fd < 0)
302 return make_err(errno);
303
304 1141x if (family == AF_INET6)
305 {
306 8x int val = 0; // dual-stack default
307 8x ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof(val));
308 }
309
310 1141x epoll_impl->fd_ = fd;
311
312 // Set up descriptor state but do NOT register with epoll yet
313 1141x epoll_impl->desc_state_.fd = fd;
314 {
315 1141x std::lock_guard lock(epoll_impl->desc_state_.mutex);
316 1141x epoll_impl->desc_state_.read_op = nullptr;
317 1141x }
318
319 1141x return {};
320 }
321
322 inline std::error_code
323 1140x epoll_tcp_acceptor_service::bind_acceptor(
324 tcp_acceptor::implementation& impl, endpoint ep)
325 {
326 1140x return static_cast<epoll_tcp_acceptor*>(&impl)->do_bind(ep);
327 }
328
329 inline std::error_code
330 1135x epoll_tcp_acceptor_service::listen_acceptor(
331 tcp_acceptor::implementation& impl, int backlog)
332 {
333 1135x return static_cast<epoll_tcp_acceptor*>(&impl)->do_listen(backlog);
334 }
335
336 inline void
337 12x epoll_tcp_acceptor_service::post(scheduler_op* op)
338 {
339 12x state_->sched_.post(op);
340 12x }
341
342 inline void
343 4908x epoll_tcp_acceptor_service::work_started() noexcept
344 {
345 4908x state_->sched_.work_started();
346 4908x }
347
348 inline void
349 9x epoll_tcp_acceptor_service::work_finished() noexcept
350 {
351 9x state_->sched_.work_finished();
352 9x }
353
354 inline epoll_tcp_service*
355 4902x epoll_tcp_acceptor_service::tcp_service() const noexcept
356 {
357 4902x return tcp_svc_;
358 }
359
360 } // namespace boost::corosio::detail
361
362 #endif // BOOST_COROSIO_HAS_EPOLL
363
364 #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_TCP_ACCEPTOR_SERVICE_HPP
365