include/boost/corosio/native/detail/epoll/epoll_tcp_service.hpp

86.2% Lines (50/58) 92.9% List of functions (13/14)
f(x) Functions (14)
Function Calls Lines Blocks
boost::corosio::detail::epoll_tcp_service::epoll_tcp_service(boost::capy::execution_context&) :99 339x 100.0% 100.0% boost::corosio::detail::epoll_connect_op::cancel() :112 0 0.0% 0.0% boost::corosio::detail::epoll_read_op::cancel() :121 799x 80.0% 75.0% boost::corosio::detail::epoll_write_op::cancel() :130 1x 80.0% 75.0% boost::corosio::detail::epoll_op::operator()() :139 60667x 100.0% 100.0% boost::corosio::detail::epoll_connect_op::operator()() :145 6336x 100.0% 100.0% boost::corosio::detail::epoll_tcp_socket::epoll_tcp_socket(boost::corosio::detail::epoll_tcp_service&) :150 19106x 100.0% 100.0% boost::corosio::detail::epoll_tcp_socket::~epoll_tcp_socket() :155 19106x 100.0% 100.0% boost::corosio::detail::epoll_tcp_socket::connect(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::endpoint, std::stop_token, std::error_code*) :158 6336x 100.0% 100.0% boost::corosio::detail::epoll_tcp_socket::read_some(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::buffer_param, std::stop_token, std::error_code*, unsigned long*) :169 107530x 100.0% 100.0% boost::corosio::detail::epoll_tcp_socket::write_some(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::buffer_param, std::stop_token, std::error_code*, unsigned long*) :181 178447x 100.0% 100.0% boost::corosio::detail::epoll_tcp_socket::cancel() :193 123x 100.0% 100.0% boost::corosio::detail::epoll_tcp_socket::close_socket() :199 57248x 100.0% 100.0% boost::corosio::detail::epoll_tcp_service::open_socket(boost::corosio::tcp_socket::implementation&, int, int, int) :205 6351x 94.4% 94.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_TCP_SERVICE_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_TCP_SERVICE_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_EPOLL
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/corosio/detail/tcp_service.hpp>
19
20 #include <boost/corosio/native/detail/epoll/epoll_tcp_socket.hpp>
21 #include <boost/corosio/native/detail/epoll/epoll_scheduler.hpp>
22 #include <boost/corosio/native/detail/reactor/reactor_socket_service.hpp>
23
24 #include <boost/corosio/native/detail/reactor/reactor_op_complete.hpp>
25
26 #include <coroutine>
27
28 #include <errno.h>
29 #include <netinet/in.h>
30 #include <netinet/tcp.h>
31 #include <sys/epoll.h>
32 #include <sys/socket.h>
33 #include <unistd.h>
34
35 /*
36 epoll Socket Implementation
37 ===========================
38
39 Each I/O operation follows the same pattern:
40 1. Try the syscall immediately (non-blocking socket)
41 2. If it succeeds or fails with a real error, post to completion queue
42 3. If EAGAIN/EWOULDBLOCK, register with epoll and wait
43
44 This "try first" approach avoids unnecessary epoll round-trips for
45 operations that can complete immediately (common for small reads/writes
46 on fast local connections).
47
48 One-Shot Registration
49 ---------------------
50 We use one-shot epoll registration: each operation registers, waits for
51 one event, then unregisters. This simplifies the state machine since we
52 don't need to track whether an fd is currently registered or handle
53 re-arming. The tradeoff is slightly more epoll_ctl calls, but the
54 simplicity is worth it.
55
56 Cancellation
57 ------------
58 See op.hpp for the completion/cancellation race handling via the
59 `registered` atomic. cancel() must complete pending operations (post
60 them with cancelled flag) so coroutines waiting on them can resume.
61 close_socket() calls cancel() first to ensure this.
62
63 Impl Lifetime with shared_ptr
64 -----------------------------
65 Socket impls use enable_shared_from_this. The service owns impls via
66 shared_ptr maps (impl_ptrs_) keyed by raw pointer for O(1) lookup and
67 removal. When a user calls close(), we call cancel() which posts pending
68 ops to the scheduler.
69
70 CRITICAL: The posted ops must keep the impl alive until they complete.
71 Otherwise the scheduler would process a freed op (use-after-free). The
72 cancel() method captures shared_from_this() into op.impl_ptr before
73 posting. When the op completes, impl_ptr is cleared, allowing the impl
74 to be destroyed if no other references exist.
75
76 Service Ownership
77 -----------------
78 epoll_tcp_service owns all socket impls. destroy_impl() removes the
79 shared_ptr from the map, but the impl may survive if ops still hold
80 impl_ptr refs. shutdown() closes all sockets and clears the map; any
81 in-flight ops will complete and release their refs.
82 */
83
84 namespace boost::corosio::detail {
85
86 /** epoll TCP service implementation.
87
88 Inherits from tcp_service to enable runtime polymorphism.
89 Uses key_type = tcp_service for service lookup.
90 */
91 class BOOST_COROSIO_DECL epoll_tcp_service final
92 : public reactor_socket_service<
93 epoll_tcp_service,
94 tcp_service,
95 epoll_scheduler,
96 epoll_tcp_socket>
97 {
98 public:
99 339x explicit epoll_tcp_service(capy::execution_context& ctx)
100 339x : reactor_socket_service(ctx)
101 {
102 339x }
103
104 std::error_code open_socket(
105 tcp_socket::implementation& impl,
106 int family,
107 int type,
108 int protocol) override;
109 };
110
111 inline void
112 epoll_connect_op::cancel() noexcept
113 {
114 if (socket_impl_)
115 socket_impl_->cancel_single_op(*this);
116 else
117 request_cancel();
118 }
119
120 inline void
121 799x epoll_read_op::cancel() noexcept
122 {
123 799x if (socket_impl_)
124 799x socket_impl_->cancel_single_op(*this);
125 else
126 request_cancel();
127 799x }
128
129 inline void
130 1x epoll_write_op::cancel() noexcept
131 {
132 1x if (socket_impl_)
133 1x socket_impl_->cancel_single_op(*this);
134 else
135 request_cancel();
136 1x }
137
138 inline void
139 60667x epoll_op::operator()()
140 {
141 60667x complete_io_op(*this);
142 60667x }
143
144 inline void
145 6336x epoll_connect_op::operator()()
146 {
147 6336x complete_connect_op(*this);
148 6336x }
149
150 19106x inline epoll_tcp_socket::epoll_tcp_socket(epoll_tcp_service& svc) noexcept
151 19106x : reactor_stream_socket(svc)
152 {
153 19106x }
154
155 19106x inline epoll_tcp_socket::~epoll_tcp_socket() = default;
156
157 inline std::coroutine_handle<>
158 6336x epoll_tcp_socket::connect(
159 std::coroutine_handle<> h,
160 capy::executor_ref ex,
161 endpoint ep,
162 std::stop_token token,
163 std::error_code* ec)
164 {
165 6336x return do_connect(h, ex, ep, token, ec);
166 }
167
168 inline std::coroutine_handle<>
169 107530x epoll_tcp_socket::read_some(
170 std::coroutine_handle<> h,
171 capy::executor_ref ex,
172 buffer_param param,
173 std::stop_token token,
174 std::error_code* ec,
175 std::size_t* bytes_out)
176 {
177 107530x return do_read_some(h, ex, param, token, ec, bytes_out);
178 }
179
180 inline std::coroutine_handle<>
181 178447x epoll_tcp_socket::write_some(
182 std::coroutine_handle<> h,
183 capy::executor_ref ex,
184 buffer_param param,
185 std::stop_token token,
186 std::error_code* ec,
187 std::size_t* bytes_out)
188 {
189 178447x return do_write_some(h, ex, param, token, ec, bytes_out);
190 }
191
192 inline void
193 123x epoll_tcp_socket::cancel() noexcept
194 {
195 123x do_cancel();
196 123x }
197
198 inline void
199 57248x epoll_tcp_socket::close_socket() noexcept
200 {
201 57248x do_close_socket();
202 57248x }
203
204 inline std::error_code
205 6351x epoll_tcp_service::open_socket(
206 tcp_socket::implementation& impl, int family, int type, int protocol)
207 {
208 6351x auto* epoll_impl = static_cast<epoll_tcp_socket*>(&impl);
209 6351x epoll_impl->close_socket();
210
211 6351x int fd = ::socket(family, type | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol);
212 6351x if (fd < 0)
213 return make_err(errno);
214
215 6351x if (family == AF_INET6)
216 {
217 5x int one = 1;
218 5x ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one));
219 }
220
221 6351x epoll_impl->fd_ = fd;
222
223 // Register fd with epoll (edge-triggered mode)
224 6351x epoll_impl->desc_state_.fd = fd;
225 {
226 6351x std::lock_guard lock(epoll_impl->desc_state_.mutex);
227 6351x epoll_impl->desc_state_.read_op = nullptr;
228 6351x epoll_impl->desc_state_.write_op = nullptr;
229 6351x epoll_impl->desc_state_.connect_op = nullptr;
230 6351x }
231 6351x scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
232
233 6351x return {};
234 }
235
236 } // namespace boost::corosio::detail
237
238 #endif // BOOST_COROSIO_HAS_EPOLL
239
240 #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_TCP_SERVICE_HPP
241