include/boost/corosio/native/detail/kqueue/kqueue_tcp_service.hpp

73.3% Lines (77/105) 90.0% List of functions (18/20) 47.2% Branches (17/36)
kqueue_tcp_service.hpp
f(x) Functions (20)
Function Calls Lines Branches Blocks
boost::corosio::detail::kqueue_tcp_service::~kqueue_tcp_service() :119 1311x 100.0% 100.0% boost::corosio::detail::kqueue_tcp_service::reset_linger(boost::corosio::detail::kqueue_tcp_socket*) :136 16035x 42.9% 16.7% 33.0% boost::corosio::detail::kqueue_tcp_service::pre_shutdown(boost::corosio::detail::kqueue_tcp_socket*) :147 0 0.0% 0.0% boost::corosio::detail::kqueue_tcp_service::pre_destroy(boost::corosio::detail::kqueue_tcp_socket*) :152 16035x 100.0% 100.0% boost::corosio::detail::kqueue_tcp_service::kqueue_tcp_service(boost::capy::execution_context&) :158 874x 100.0% 100.0% boost::corosio::detail::kqueue_connect_op::cancel() :174 0 0.0% 0.0% 0.0% boost::corosio::detail::kqueue_read_op::cancel() :183 656x 80.0% 50.0% 75.0% boost::corosio::detail::kqueue_write_op::cancel() :192 1x 80.0% 50.0% 75.0% boost::corosio::detail::kqueue_op::operator()() :201 213791x 100.0% 100.0% boost::corosio::detail::kqueue_connect_op::operator()() :207 5309x 100.0% 100.0% boost::corosio::detail::kqueue_tcp_socket::kqueue_tcp_socket(boost::corosio::detail::kqueue_tcp_service&) :212 32070x 100.0% 100.0% boost::corosio::detail::kqueue_tcp_socket::~kqueue_tcp_socket() :217 32070x 100.0% 100.0% boost::corosio::detail::kqueue_tcp_socket::connect(std::__1::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::endpoint, std::__1::stop_token, std::__1::error_code*) :220 5309x 100.0% 100.0% boost::corosio::detail::kqueue_tcp_socket::read_some(std::__1::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::buffer_param, std::__1::stop_token, std::__1::error_code*, unsigned long*) :231 386255x 100.0% 100.0% boost::corosio::detail::kqueue_tcp_socket::write_some(std::__1::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::buffer_param, std::__1::stop_token, std::__1::error_code*, unsigned long*) :243 484200x 100.0% 100.0% boost::corosio::detail::kqueue_tcp_socket::set_option(int, int, void const*, unsigned long) :255 1808x 88.9% 58.3% 72.0% boost::corosio::detail::kqueue_tcp_socket::cancel() :269 117x 100.0% 100.0% boost::corosio::detail::kqueue_tcp_socket::close_socket() :275 48039x 100.0% 100.0% boost::corosio::detail::kqueue_tcp_service::open_socket(boost::corosio::tcp_socket::implementation&, int, int, int) :282 5331x 65.8% 58.3% 64.0% boost::corosio::detail::kqueue_tcp_service::bind_socket(boost::corosio::tcp_socket::implementation&, boost::corosio::endpoint) :347 6x 100.0% 100.0%
Line Branch TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Michael Vandeberg
3 // Copyright (c) 2026 Steve Gerbino
4 //
5 // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 //
8 // Official repository: https://github.com/cppalliance/corosio
9 //
10
11 #ifndef BOOST_COROSIO_NATIVE_DETAIL_KQUEUE_KQUEUE_TCP_SERVICE_HPP
12 #define BOOST_COROSIO_NATIVE_DETAIL_KQUEUE_KQUEUE_TCP_SERVICE_HPP
13
14 #include <boost/corosio/detail/platform.hpp>
15
16 #if BOOST_COROSIO_HAS_KQUEUE
17
18 #include <boost/corosio/detail/config.hpp>
19 #include <boost/corosio/detail/tcp_service.hpp>
20
21 #include <boost/corosio/native/detail/kqueue/kqueue_tcp_socket.hpp>
22 #include <boost/corosio/native/detail/kqueue/kqueue_scheduler.hpp>
23 #include <boost/corosio/native/detail/reactor/reactor_socket_service.hpp>
24
25 #include <boost/corosio/native/detail/reactor/reactor_op_complete.hpp>
26
27 #include <coroutine>
28
29 #include <errno.h>
30 #include <fcntl.h>
31 #include <netinet/in.h>
32 #include <netinet/tcp.h>
33 #include <sys/socket.h>
34 #include <unistd.h>
35
36 /*
37 kqueue Socket Implementation
38 ============================
39
40 Each I/O operation follows the same pattern:
41 1. Try the syscall speculatively (readv/writev) before suspending
42 2. On success, return via symmetric transfer (the "pump" fast path)
43 3. On budget exhaustion, post to the scheduler queue for fairness
44 4. On EAGAIN, register_op() parks the op in the descriptor_state
45
46 The speculative path avoids scheduler queue, mutex, and reactor
47 round-trips entirely. An inline budget limits consecutive inline
48 completions to prevent starvation of other connections.
49
50 Cancellation
51 ------------
52 See op.hpp for the completion/cancellation race handling via the
53 descriptor_state mutex. cancel() must complete pending operations (post
54 them with cancelled flag) so coroutines waiting on them can resume.
55 close_socket() calls cancel() first to ensure this.
56
57 Impl Lifetime with shared_ptr
58 -----------------------------
59 Socket impls use enable_shared_from_this. The service owns impls via
60 shared_ptr maps (impl_ptrs_) keyed by raw pointer for O(1) lookup and
61 removal. When a user calls close(), we call cancel() which posts pending
62 ops to the scheduler.
63
64 CRITICAL: The posted ops must keep the impl alive until they complete.
65 Otherwise the scheduler would process a freed op (use-after-free). The
66 cancel() method captures shared_from_this() into op.impl_ptr before
67 posting. When the op completes, impl_ptr is cleared, allowing the impl
68 to be destroyed if no other references exist.
69
70 Service Ownership
71 -----------------
72 kqueue_tcp_service owns all socket impls. destroy_impl() removes the
73 shared_ptr from the map, but the impl may survive if ops still hold
74 impl_ptr refs. shutdown() closes all sockets and clears the map; any
75 in-flight ops will complete and release their refs.
76 */
77
78 /*
79 kqueue socket implementation
80 ============================
81
82 Each kqueue_tcp_socket owns a descriptor_state that is persistently
83 registered with kqueue (EVFILT_READ + EVFILT_WRITE, both EV_CLEAR for
84 edge-triggered semantics). The descriptor_state tracks three operation
85 slots (read_op, write_op, connect_op) and two ready flags
86 (read_ready, write_ready) under a per-descriptor mutex.
87
88 Speculative I/O and the pump
89 ----------------------------
90 read_some() and write_some() attempt the syscall (readv/writev)
91 speculatively before suspending the caller. If data is available the
92 result is returned via symmetric transfer — no scheduler queue, no
93 mutex, no reactor round-trip. An inline budget limits consecutive
94 inline completions to prevent starvation of other connections.
95
96 When the speculative attempt returns EAGAIN, register_op() parks the
97 operation in its descriptor_state slot under the per-descriptor mutex.
98 If a cached ready flag fires before parking, register_op() retries
99 the I/O once under the mutex. This eliminates the cached_initiator
100 coroutine frame that previously trampolined into do_read_io() /
101 do_write_io() after the caller suspended.
102
103 Ready-flag protocol
104 -------------------
105 When a kqueue event fires and no operation is pending for that
106 direction, the reactor sets the corresponding ready flag instead of
107 dropping the event. When register_op() finds the ready flag set, it
108 performs I/O immediately rather than parking. This prevents lost
109 wakeups under edge-triggered notification.
110 */
111
112 namespace boost::corosio::detail {
113
114 /** kqueue TCP service implementation.
115
116 Inherits from tcp_service to enable runtime polymorphism.
117 Uses key_type = tcp_service for service lookup.
118 */
119 class BOOST_COROSIO_DECL kqueue_tcp_service final
120 : public reactor_socket_service<
121 kqueue_tcp_service,
122 tcp_service,
123 kqueue_scheduler,
124 kqueue_tcp_socket>
125 {
126 using base_service = reactor_socket_service<
127 kqueue_tcp_service,
128 tcp_service,
129 kqueue_scheduler,
130 kqueue_tcp_socket>;
131 friend base_service;
132
133 // Clear SO_LINGER before close so the destructor doesn't block
134 // and close() sends FIN instead of RST. RST doesn't reliably
135 // trigger EV_EOF on macOS kqueue.
136 16035x static void reset_linger(kqueue_tcp_socket* impl) noexcept
137 {
138
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 16035 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
16035x if (impl->user_set_linger_ && impl->fd_ >= 0)
139 {
140 struct ::linger lg;
141 lg.l_onoff = 0;
142 lg.l_linger = 0;
143 ::setsockopt(impl->fd_, SOL_SOCKET, SO_LINGER, &lg, sizeof(lg));
144 }
145 16035x }
146
147 void pre_shutdown(kqueue_tcp_socket* impl) noexcept
148 {
149 reset_linger(impl);
150 }
151
152 16035x void pre_destroy(kqueue_tcp_socket* impl) noexcept
153 {
154 16035x reset_linger(impl);
155 16035x }
156
157 public:
158 874x explicit kqueue_tcp_service(capy::execution_context& ctx)
159 437x : reactor_socket_service(ctx)
160 874x {
161 874x }
162
163 std::error_code open_socket(
164 tcp_socket::implementation& impl,
165 int family,
166 int type,
167 int protocol) override;
168
169 std::error_code
170 bind_socket(tcp_socket::implementation& impl, endpoint ep) override;
171 };
172
173 inline void
174 kqueue_connect_op::cancel() noexcept
175 {
176 if (socket_impl_)
177 socket_impl_->cancel_single_op(*this);
178 else
179 request_cancel();
180 }
181
182 inline void
183 656x kqueue_read_op::cancel() noexcept
184 {
185
1/2
✓ Branch 0 taken 656 times.
✗ Branch 1 not taken.
656x if (socket_impl_)
186 656x socket_impl_->cancel_single_op(*this);
187 else
188 request_cancel();
189 656x }
190
191 inline void
192 1x kqueue_write_op::cancel() noexcept
193 {
194
1/2
✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
1x if (socket_impl_)
195 1x socket_impl_->cancel_single_op(*this);
196 else
197 request_cancel();
198 1x }
199
200 inline void
201 213791x kqueue_op::operator()()
202 {
203 213791x complete_io_op(*this);
204 213791x }
205
206 inline void
207 5309x kqueue_connect_op::operator()()
208 {
209 5309x complete_connect_op(*this);
210 5309x }
211
212 32070x inline kqueue_tcp_socket::kqueue_tcp_socket(kqueue_tcp_service& svc) noexcept
213 16035x : reactor_stream_socket(svc)
214 32070x {
215 16035x }
216
217 32070x inline kqueue_tcp_socket::~kqueue_tcp_socket() = default;
218
219 inline std::coroutine_handle<>
220 5309x kqueue_tcp_socket::connect(
221 std::coroutine_handle<> h,
222 capy::executor_ref ex,
223 endpoint ep,
224 std::stop_token token,
225 std::error_code* ec)
226 {
227 5309x return do_connect(h, ex, ep, token, ec);
228 }
229
230 inline std::coroutine_handle<>
231 386255x kqueue_tcp_socket::read_some(
232 std::coroutine_handle<> h,
233 capy::executor_ref ex,
234 buffer_param param,
235 std::stop_token token,
236 std::error_code* ec,
237 std::size_t* bytes_out)
238 {
239 386255x return do_read_some(h, ex, param, token, ec, bytes_out);
240 }
241
242 inline std::coroutine_handle<>
243 484200x kqueue_tcp_socket::write_some(
244 std::coroutine_handle<> h,
245 capy::executor_ref ex,
246 buffer_param param,
247 std::stop_token token,
248 std::error_code* ec,
249 std::size_t* bytes_out)
250 {
251 484200x return do_write_some(h, ex, param, token, ec, bytes_out);
252 }
253
254 inline std::error_code
255 1808x kqueue_tcp_socket::set_option(
256 int level, int optname, void const* data, std::size_t size) noexcept
257 {
258
2/4
✓ Branch 0 taken 1808 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 1808 times.
1808x if (::setsockopt(fd_, level, optname, data, static_cast<socklen_t>(size)) !=
259 0)
260 return make_err(errno);
261
5/6
✓ Branch 0 taken 1795 times.
✓ Branch 1 taken 13 times.
✓ Branch 2 taken 1787 times.
✓ Branch 3 taken 8 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 1787 times.
1808x if (level == SOL_SOCKET && optname == SO_LINGER &&
262 1787x size >= sizeof(struct ::linger))
263 1787x user_set_linger_ =
264 1787x static_cast<struct ::linger const*>(data)->l_onoff != 0;
265 1808x return {};
266 1808x }
267
268 inline void
269 117x kqueue_tcp_socket::cancel() noexcept
270 {
271 117x do_cancel();
272 117x }
273
274 inline void
275 48039x kqueue_tcp_socket::close_socket() noexcept
276 {
277 48039x do_close_socket();
278 48039x user_set_linger_ = false;
279 48039x }
280
281 inline std::error_code
282 5331x kqueue_tcp_service::open_socket(
283 tcp_socket::implementation& impl, int family, int type, int protocol)
284 {
285 5331x auto* kq_impl = static_cast<kqueue_tcp_socket*>(&impl);
286 5331x kq_impl->close_socket();
287
288 5331x int fd = ::socket(family, type, protocol);
289
1/2
✓ Branch 0 taken 5331 times.
✗ Branch 1 not taken.
5331x if (fd < 0)
290 return make_err(errno);
291
292
2/2
✓ Branch 0 taken 5325 times.
✓ Branch 1 taken 6 times.
5331x if (family == AF_INET6)
293 {
294 6x int v6only = 1;
295 6x ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &v6only, sizeof(v6only));
296 6x }
297
298 // Set non-blocking
299 5331x int flags = ::fcntl(fd, F_GETFL, 0);
300
1/2
✓ Branch 0 taken 5331 times.
✗ Branch 1 not taken.
5331x if (flags == -1)
301 {
302 int errn = errno;
303 ::close(fd);
304 return make_err(errn);
305 }
306
1/2
✓ Branch 0 taken 5331 times.
✗ Branch 1 not taken.
5331x if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
307 {
308 int errn = errno;
309 ::close(fd);
310 return make_err(errn);
311 }
312
313 // Set close-on-exec
314
1/2
✓ Branch 0 taken 5331 times.
✗ Branch 1 not taken.
5331x if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
315 {
316 int errn = errno;
317 ::close(fd);
318 return make_err(errn);
319 }
320
321 // Suppress SIGPIPE on this socket; writev() has no MSG_NOSIGNAL
322 // equivalent, so SO_NOSIGPIPE is required on macOS/FreeBSD.
323 5331x int one = 1;
324
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 5331 times.
5331x if (::setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one)) != 0)
325 {
326 int errn = errno;
327 ::close(fd);
328 return make_err(errn);
329 }
330
331 5331x kq_impl->fd_ = fd;
332
333 // Register fd with kqueue (edge-triggered mode via EV_CLEAR)
334 5331x kq_impl->desc_state_.fd = fd;
335 {
336 5331x std::lock_guard lock(kq_impl->desc_state_.mutex);
337 5331x kq_impl->desc_state_.read_op = nullptr;
338 5331x kq_impl->desc_state_.write_op = nullptr;
339 5331x kq_impl->desc_state_.connect_op = nullptr;
340 5331x }
341 5331x scheduler().register_descriptor(fd, &kq_impl->desc_state_);
342
343 5331x return {};
344 5331x }
345
346 inline std::error_code
347 6x kqueue_tcp_service::bind_socket(
348 tcp_socket::implementation& impl, endpoint ep)
349 {
350 6x return static_cast<kqueue_tcp_socket*>(&impl)->do_bind(ep);
351 }
352
353 } // namespace boost::corosio::detail
354
355 #endif // BOOST_COROSIO_HAS_KQUEUE
356
357 #endif // BOOST_COROSIO_NATIVE_DETAIL_KQUEUE_KQUEUE_TCP_SERVICE_HPP
358