include/boost/corosio/native/detail/kqueue/kqueue_tcp_service.hpp

72.8% Lines (75/103) 89.5% List of functions (17/19) 47.2% Branches (17/36)
f(x) Functions (19)
Function Calls Lines Branches Blocks
boost::corosio::detail::kqueue_tcp_service::~kqueue_tcp_service() :119 1125x 100.0% 100.0% boost::corosio::detail::kqueue_tcp_service::reset_linger(boost::corosio::detail::kqueue_tcp_socket*) :136 16956x 42.9% 16.7% 33.0% boost::corosio::detail::kqueue_tcp_service::pre_shutdown(boost::corosio::detail::kqueue_tcp_socket*) :147 0 0.0% 0.0% boost::corosio::detail::kqueue_tcp_service::pre_destroy(boost::corosio::detail::kqueue_tcp_socket*) :152 16956x 100.0% 100.0% boost::corosio::detail::kqueue_tcp_service::kqueue_tcp_service(boost::capy::execution_context&) :158 750x 100.0% 100.0% boost::corosio::detail::kqueue_connect_op::cancel() :171 0 0.0% 0.0% 0.0% boost::corosio::detail::kqueue_read_op::cancel() :180 757x 80.0% 50.0% 75.0% boost::corosio::detail::kqueue_write_op::cancel() :189 1x 80.0% 50.0% 75.0% boost::corosio::detail::kqueue_op::operator()() :198 222116x 100.0% 100.0% boost::corosio::detail::kqueue_connect_op::operator()() :204 5618x 100.0% 100.0% boost::corosio::detail::kqueue_tcp_socket::kqueue_tcp_socket(boost::corosio::detail::kqueue_tcp_service&) :209 33912x 100.0% 100.0% boost::corosio::detail::kqueue_tcp_socket::~kqueue_tcp_socket() :214 33912x 100.0% 100.0% boost::corosio::detail::kqueue_tcp_socket::connect(std::__1::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::endpoint, std::__1::stop_token, std::__1::error_code*) :217 5618x 100.0% 100.0% boost::corosio::detail::kqueue_tcp_socket::read_some(std::__1::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::buffer_param, std::__1::stop_token, std::__1::error_code*, unsigned long*) :228 385012x 100.0% 100.0% boost::corosio::detail::kqueue_tcp_socket::write_some(std::__1::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::buffer_param, std::__1::stop_token, std::__1::error_code*, unsigned long*) :240 516758x 100.0% 100.0% boost::corosio::detail::kqueue_tcp_socket::set_option(int, int, void const*, unsigned long) :252 2128x 88.9% 58.3% 72.0% boost::corosio::detail::kqueue_tcp_socket::cancel() :266 118x 100.0% 100.0% boost::corosio::detail::kqueue_tcp_socket::close_socket() :272 50798x 100.0% 100.0% boost::corosio::detail::kqueue_tcp_service::open_socket(boost::corosio::tcp_socket::implementation&, int, int, int) :279 5635x 65.8% 58.3% 64.0%
Line Branch TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Michael Vandeberg
3 // Copyright (c) 2026 Steve Gerbino
4 //
5 // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 //
8 // Official repository: https://github.com/cppalliance/corosio
9 //
10
11 #ifndef BOOST_COROSIO_NATIVE_DETAIL_KQUEUE_KQUEUE_TCP_SERVICE_HPP
12 #define BOOST_COROSIO_NATIVE_DETAIL_KQUEUE_KQUEUE_TCP_SERVICE_HPP
13
14 #include <boost/corosio/detail/platform.hpp>
15
16 #if BOOST_COROSIO_HAS_KQUEUE
17
18 #include <boost/corosio/detail/config.hpp>
19 #include <boost/corosio/detail/tcp_service.hpp>
20
21 #include <boost/corosio/native/detail/kqueue/kqueue_tcp_socket.hpp>
22 #include <boost/corosio/native/detail/kqueue/kqueue_scheduler.hpp>
23 #include <boost/corosio/native/detail/reactor/reactor_socket_service.hpp>
24
25 #include <boost/corosio/native/detail/reactor/reactor_op_complete.hpp>
26
27 #include <coroutine>
28
29 #include <errno.h>
30 #include <fcntl.h>
31 #include <netinet/in.h>
32 #include <netinet/tcp.h>
33 #include <sys/socket.h>
34 #include <unistd.h>
35
36 /*
37 kqueue Socket Implementation
38 ============================
39
40 Each I/O operation follows the same pattern:
41 1. Try the syscall speculatively (readv/writev) before suspending
42 2. On success, return via symmetric transfer (the "pump" fast path)
43 3. On budget exhaustion, post to the scheduler queue for fairness
44 4. On EAGAIN, register_op() parks the op in the descriptor_state
45
46 The speculative path avoids scheduler queue, mutex, and reactor
47 round-trips entirely. An inline budget limits consecutive inline
48 completions to prevent starvation of other connections.
49
50 Cancellation
51 ------------
52 See op.hpp for the completion/cancellation race handling via the
53 descriptor_state mutex. cancel() must complete pending operations (post
54 them with cancelled flag) so coroutines waiting on them can resume.
55 close_socket() calls cancel() first to ensure this.
56
57 Impl Lifetime with shared_ptr
58 -----------------------------
59 Socket impls use enable_shared_from_this. The service owns impls via
60 shared_ptr maps (impl_ptrs_) keyed by raw pointer for O(1) lookup and
61 removal. When a user calls close(), we call cancel() which posts pending
62 ops to the scheduler.
63
64 CRITICAL: The posted ops must keep the impl alive until they complete.
65 Otherwise the scheduler would process a freed op (use-after-free). The
66 cancel() method captures shared_from_this() into op.impl_ptr before
67 posting. When the op completes, impl_ptr is cleared, allowing the impl
68 to be destroyed if no other references exist.
69
70 Service Ownership
71 -----------------
72 kqueue_tcp_service owns all socket impls. destroy_impl() removes the
73 shared_ptr from the map, but the impl may survive if ops still hold
74 impl_ptr refs. shutdown() closes all sockets and clears the map; any
75 in-flight ops will complete and release their refs.
76 */
77
78 /*
79 kqueue socket implementation
80 ============================
81
82 Each kqueue_tcp_socket owns a descriptor_state that is persistently
83 registered with kqueue (EVFILT_READ + EVFILT_WRITE, both EV_CLEAR for
84 edge-triggered semantics). The descriptor_state tracks three operation
85 slots (read_op, write_op, connect_op) and two ready flags
86 (read_ready, write_ready) under a per-descriptor mutex.
87
88 Speculative I/O and the pump
89 ----------------------------
90 read_some() and write_some() attempt the syscall (readv/writev)
91 speculatively before suspending the caller. If data is available the
92 result is returned via symmetric transfer — no scheduler queue, no
93 mutex, no reactor round-trip. An inline budget limits consecutive
94 inline completions to prevent starvation of other connections.
95
96 When the speculative attempt returns EAGAIN, register_op() parks the
97 operation in its descriptor_state slot under the per-descriptor mutex.
98 If a cached ready flag fires before parking, register_op() retries
99 the I/O once under the mutex. This eliminates the cached_initiator
100 coroutine frame that previously trampolined into do_read_io() /
101 do_write_io() after the caller suspended.
102
103 Ready-flag protocol
104 -------------------
105 When a kqueue event fires and no operation is pending for that
106 direction, the reactor sets the corresponding ready flag instead of
107 dropping the event. When register_op() finds the ready flag set, it
108 performs I/O immediately rather than parking. This prevents lost
109 wakeups under edge-triggered notification.
110 */
111
112 namespace boost::corosio::detail {
113
114 /** kqueue TCP service implementation.
115
116 Inherits from tcp_service to enable runtime polymorphism.
117 Uses key_type = tcp_service for service lookup.
118 */
119 class BOOST_COROSIO_DECL kqueue_tcp_service final
120 : public reactor_socket_service<
121 kqueue_tcp_service,
122 tcp_service,
123 kqueue_scheduler,
124 kqueue_tcp_socket>
125 {
126 using base_service = reactor_socket_service<
127 kqueue_tcp_service,
128 tcp_service,
129 kqueue_scheduler,
130 kqueue_tcp_socket>;
131 friend base_service;
132
133 // Clear SO_LINGER before close so the destructor doesn't block
134 // and close() sends FIN instead of RST. RST doesn't reliably
135 // trigger EV_EOF on macOS kqueue.
136 16956x static void reset_linger(kqueue_tcp_socket* impl) noexcept
137 {
138
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 16956 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
16956x if (impl->user_set_linger_ && impl->fd_ >= 0)
139 {
140 struct ::linger lg;
141 lg.l_onoff = 0;
142 lg.l_linger = 0;
143 ::setsockopt(impl->fd_, SOL_SOCKET, SO_LINGER, &lg, sizeof(lg));
144 }
145 16956x }
146
147 void pre_shutdown(kqueue_tcp_socket* impl) noexcept
148 {
149 reset_linger(impl);
150 }
151
152 16956x void pre_destroy(kqueue_tcp_socket* impl) noexcept
153 {
154 16956x reset_linger(impl);
155 16956x }
156
157 public:
158 750x explicit kqueue_tcp_service(capy::execution_context& ctx)
159 375x : reactor_socket_service(ctx)
160 750x {
161 750x }
162
163 std::error_code open_socket(
164 tcp_socket::implementation& impl,
165 int family,
166 int type,
167 int protocol) override;
168 };
169
170 inline void
171 kqueue_connect_op::cancel() noexcept
172 {
173 if (socket_impl_)
174 socket_impl_->cancel_single_op(*this);
175 else
176 request_cancel();
177 }
178
179 inline void
180 757x kqueue_read_op::cancel() noexcept
181 {
182
1/2
✓ Branch 0 taken 757 times.
✗ Branch 1 not taken.
757x if (socket_impl_)
183 757x socket_impl_->cancel_single_op(*this);
184 else
185 request_cancel();
186 757x }
187
188 inline void
189 1x kqueue_write_op::cancel() noexcept
190 {
191
1/2
✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
1x if (socket_impl_)
192 1x socket_impl_->cancel_single_op(*this);
193 else
194 request_cancel();
195 1x }
196
197 inline void
198 222116x kqueue_op::operator()()
199 {
200 222116x complete_io_op(*this);
201 222116x }
202
203 inline void
204 5618x kqueue_connect_op::operator()()
205 {
206 5618x complete_connect_op(*this);
207 5618x }
208
209 33912x inline kqueue_tcp_socket::kqueue_tcp_socket(kqueue_tcp_service& svc) noexcept
210 16956x : reactor_stream_socket(svc)
211 33912x {
212 16956x }
213
214 33912x inline kqueue_tcp_socket::~kqueue_tcp_socket() = default;
215
216 inline std::coroutine_handle<>
217 5618x kqueue_tcp_socket::connect(
218 std::coroutine_handle<> h,
219 capy::executor_ref ex,
220 endpoint ep,
221 std::stop_token token,
222 std::error_code* ec)
223 {
224 5618x return do_connect(h, ex, ep, token, ec);
225 }
226
227 inline std::coroutine_handle<>
228 385012x kqueue_tcp_socket::read_some(
229 std::coroutine_handle<> h,
230 capy::executor_ref ex,
231 buffer_param param,
232 std::stop_token token,
233 std::error_code* ec,
234 std::size_t* bytes_out)
235 {
236 385012x return do_read_some(h, ex, param, token, ec, bytes_out);
237 }
238
239 inline std::coroutine_handle<>
240 516758x kqueue_tcp_socket::write_some(
241 std::coroutine_handle<> h,
242 capy::executor_ref ex,
243 buffer_param param,
244 std::stop_token token,
245 std::error_code* ec,
246 std::size_t* bytes_out)
247 {
248 516758x return do_write_some(h, ex, param, token, ec, bytes_out);
249 }
250
251 inline std::error_code
252 2128x kqueue_tcp_socket::set_option(
253 int level, int optname, void const* data, std::size_t size) noexcept
254 {
255
2/4
✓ Branch 0 taken 2128 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 2128 times.
2128x if (::setsockopt(fd_, level, optname, data, static_cast<socklen_t>(size)) !=
256 0)
257 return make_err(errno);
258
5/6
✓ Branch 0 taken 2115 times.
✓ Branch 1 taken 13 times.
✓ Branch 2 taken 2107 times.
✓ Branch 3 taken 8 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 2107 times.
2128x if (level == SOL_SOCKET && optname == SO_LINGER &&
259 2107x size >= sizeof(struct ::linger))
260 2107x user_set_linger_ =
261 2107x static_cast<struct ::linger const*>(data)->l_onoff != 0;
262 2128x return {};
263 2128x }
264
265 inline void
266 118x kqueue_tcp_socket::cancel() noexcept
267 {
268 118x do_cancel();
269 118x }
270
271 inline void
272 50798x kqueue_tcp_socket::close_socket() noexcept
273 {
274 50798x do_close_socket();
275 50798x user_set_linger_ = false;
276 50798x }
277
278 inline std::error_code
279 5635x kqueue_tcp_service::open_socket(
280 tcp_socket::implementation& impl, int family, int type, int protocol)
281 {
282 5635x auto* kq_impl = static_cast<kqueue_tcp_socket*>(&impl);
283 5635x kq_impl->close_socket();
284
285 5635x int fd = ::socket(family, type, protocol);
286
1/2
✓ Branch 0 taken 5635 times.
✗ Branch 1 not taken.
5635x if (fd < 0)
287 return make_err(errno);
288
289
2/2
✓ Branch 0 taken 5630 times.
✓ Branch 1 taken 5 times.
5635x if (family == AF_INET6)
290 {
291 5x int v6only = 1;
292 5x ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &v6only, sizeof(v6only));
293 5x }
294
295 // Set non-blocking
296 5635x int flags = ::fcntl(fd, F_GETFL, 0);
297
1/2
✓ Branch 0 taken 5635 times.
✗ Branch 1 not taken.
5635x if (flags == -1)
298 {
299 int errn = errno;
300 ::close(fd);
301 return make_err(errn);
302 }
303
1/2
✓ Branch 0 taken 5635 times.
✗ Branch 1 not taken.
5635x if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
304 {
305 int errn = errno;
306 ::close(fd);
307 return make_err(errn);
308 }
309
310 // Set close-on-exec
311
1/2
✓ Branch 0 taken 5635 times.
✗ Branch 1 not taken.
5635x if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
312 {
313 int errn = errno;
314 ::close(fd);
315 return make_err(errn);
316 }
317
318 // Suppress SIGPIPE on this socket; writev() has no MSG_NOSIGNAL
319 // equivalent, so SO_NOSIGPIPE is required on macOS/FreeBSD.
320 5635x int one = 1;
321
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 5635 times.
5635x if (::setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one)) != 0)
322 {
323 int errn = errno;
324 ::close(fd);
325 return make_err(errn);
326 }
327
328 5635x kq_impl->fd_ = fd;
329
330 // Register fd with kqueue (edge-triggered mode via EV_CLEAR)
331 5635x kq_impl->desc_state_.fd = fd;
332 {
333 5635x std::lock_guard lock(kq_impl->desc_state_.mutex);
334 5635x kq_impl->desc_state_.read_op = nullptr;
335 5635x kq_impl->desc_state_.write_op = nullptr;
336 5635x kq_impl->desc_state_.connect_op = nullptr;
337 5635x }
338 5635x scheduler().register_descriptor(fd, &kq_impl->desc_state_);
339
340 5635x return {};
341 5635x }
342
343 } // namespace boost::corosio::detail
344
345 #endif // BOOST_COROSIO_HAS_KQUEUE
346
347 #endif // BOOST_COROSIO_NATIVE_DETAIL_KQUEUE_KQUEUE_TCP_SERVICE_HPP
348