include/boost/corosio/native/detail/kqueue/kqueue_tcp_service.hpp
73.3% Lines (77/105)
90.0% List of functions (18/20)
47.2% Branches (17/36)
Functions (20)
Function
Calls
Lines
Branches
Blocks
boost::corosio::detail::kqueue_tcp_service::~kqueue_tcp_service()
:119
1311x
100.0%
–
100.0%
boost::corosio::detail::kqueue_tcp_service::reset_linger(boost::corosio::detail::kqueue_tcp_socket*)
:136
16035x
42.9%
16.7%
33.0%
boost::corosio::detail::kqueue_tcp_service::pre_shutdown(boost::corosio::detail::kqueue_tcp_socket*)
:147
0
0.0%
–
0.0%
boost::corosio::detail::kqueue_tcp_service::pre_destroy(boost::corosio::detail::kqueue_tcp_socket*)
:152
16035x
100.0%
–
100.0%
boost::corosio::detail::kqueue_tcp_service::kqueue_tcp_service(boost::capy::execution_context&)
:158
874x
100.0%
–
100.0%
boost::corosio::detail::kqueue_connect_op::cancel()
:174
0
0.0%
0.0%
0.0%
boost::corosio::detail::kqueue_read_op::cancel()
:183
656x
80.0%
50.0%
75.0%
boost::corosio::detail::kqueue_write_op::cancel()
:192
1x
80.0%
50.0%
75.0%
boost::corosio::detail::kqueue_op::operator()()
:201
213791x
100.0%
–
100.0%
boost::corosio::detail::kqueue_connect_op::operator()()
:207
5309x
100.0%
–
100.0%
boost::corosio::detail::kqueue_tcp_socket::kqueue_tcp_socket(boost::corosio::detail::kqueue_tcp_service&)
:212
32070x
100.0%
–
100.0%
boost::corosio::detail::kqueue_tcp_socket::~kqueue_tcp_socket()
:217
32070x
100.0%
–
100.0%
boost::corosio::detail::kqueue_tcp_socket::connect(std::__1::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::endpoint, std::__1::stop_token, std::__1::error_code*)
:220
5309x
100.0%
–
100.0%
boost::corosio::detail::kqueue_tcp_socket::read_some(std::__1::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::buffer_param, std::__1::stop_token, std::__1::error_code*, unsigned long*)
:231
386255x
100.0%
–
100.0%
boost::corosio::detail::kqueue_tcp_socket::write_some(std::__1::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::buffer_param, std::__1::stop_token, std::__1::error_code*, unsigned long*)
:243
484200x
100.0%
–
100.0%
boost::corosio::detail::kqueue_tcp_socket::set_option(int, int, void const*, unsigned long)
:255
1808x
88.9%
58.3%
72.0%
boost::corosio::detail::kqueue_tcp_socket::cancel()
:269
117x
100.0%
–
100.0%
boost::corosio::detail::kqueue_tcp_socket::close_socket()
:275
48039x
100.0%
–
100.0%
boost::corosio::detail::kqueue_tcp_service::open_socket(boost::corosio::tcp_socket::implementation&, int, int, int)
:282
5331x
65.8%
58.3%
64.0%
boost::corosio::detail::kqueue_tcp_service::bind_socket(boost::corosio::tcp_socket::implementation&, boost::corosio::endpoint)
:347
6x
100.0%
–
100.0%
| Line | Branch | TLA | Hits | Source Code |
|---|---|---|---|---|
| 1 | // | |||
| 2 | // Copyright (c) 2026 Michael Vandeberg | |||
| 3 | // Copyright (c) 2026 Steve Gerbino | |||
| 4 | // | |||
| 5 | // Distributed under the Boost Software License, Version 1.0. (See accompanying | |||
| 6 | // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) | |||
| 7 | // | |||
| 8 | // Official repository: https://github.com/cppalliance/corosio | |||
| 9 | // | |||
| 10 | ||||
| 11 | #ifndef BOOST_COROSIO_NATIVE_DETAIL_KQUEUE_KQUEUE_TCP_SERVICE_HPP | |||
| 12 | #define BOOST_COROSIO_NATIVE_DETAIL_KQUEUE_KQUEUE_TCP_SERVICE_HPP | |||
| 13 | ||||
| 14 | #include <boost/corosio/detail/platform.hpp> | |||
| 15 | ||||
| 16 | #if BOOST_COROSIO_HAS_KQUEUE | |||
| 17 | ||||
| 18 | #include <boost/corosio/detail/config.hpp> | |||
| 19 | #include <boost/corosio/detail/tcp_service.hpp> | |||
| 20 | ||||
| 21 | #include <boost/corosio/native/detail/kqueue/kqueue_tcp_socket.hpp> | |||
| 22 | #include <boost/corosio/native/detail/kqueue/kqueue_scheduler.hpp> | |||
| 23 | #include <boost/corosio/native/detail/reactor/reactor_socket_service.hpp> | |||
| 24 | ||||
| 25 | #include <boost/corosio/native/detail/reactor/reactor_op_complete.hpp> | |||
| 26 | ||||
| 27 | #include <coroutine> | |||
| 28 | ||||
| 29 | #include <errno.h> | |||
| 30 | #include <fcntl.h> | |||
| 31 | #include <netinet/in.h> | |||
| 32 | #include <netinet/tcp.h> | |||
| 33 | #include <sys/socket.h> | |||
| 34 | #include <unistd.h> | |||
| 35 | ||||
| 36 | /* | |||
| 37 | kqueue Socket Implementation | |||
| 38 | ============================ | |||
| 39 | ||||
| 40 | Each I/O operation follows the same pattern: | |||
| 41 | 1. Try the syscall speculatively (readv/writev) before suspending | |||
| 42 | 2. On success, return via symmetric transfer (the "pump" fast path) | |||
| 43 | 3. On budget exhaustion, post to the scheduler queue for fairness | |||
| 44 | 4. On EAGAIN, register_op() parks the op in the descriptor_state | |||
| 45 | ||||
| 46 | The speculative path avoids scheduler queue, mutex, and reactor | |||
| 47 | round-trips entirely. An inline budget limits consecutive inline | |||
| 48 | completions to prevent starvation of other connections. | |||
| 49 | ||||
| 50 | Cancellation | |||
| 51 | ------------ | |||
| 52 | See op.hpp for the completion/cancellation race handling via the | |||
| 53 | descriptor_state mutex. cancel() must complete pending operations (post | |||
| 54 | them with cancelled flag) so coroutines waiting on them can resume. | |||
| 55 | close_socket() calls cancel() first to ensure this. | |||
| 56 | ||||
| 57 | Impl Lifetime with shared_ptr | |||
| 58 | ----------------------------- | |||
| 59 | Socket impls use enable_shared_from_this. The service owns impls via | |||
| 60 | shared_ptr maps (impl_ptrs_) keyed by raw pointer for O(1) lookup and | |||
| 61 | removal. When a user calls close(), we call cancel() which posts pending | |||
| 62 | ops to the scheduler. | |||
| 63 | ||||
| 64 | CRITICAL: The posted ops must keep the impl alive until they complete. | |||
| 65 | Otherwise the scheduler would process a freed op (use-after-free). The | |||
| 66 | cancel() method captures shared_from_this() into op.impl_ptr before | |||
| 67 | posting. When the op completes, impl_ptr is cleared, allowing the impl | |||
| 68 | to be destroyed if no other references exist. | |||
| 69 | ||||
| 70 | Service Ownership | |||
| 71 | ----------------- | |||
| 72 | kqueue_tcp_service owns all socket impls. destroy_impl() removes the | |||
| 73 | shared_ptr from the map, but the impl may survive if ops still hold | |||
| 74 | impl_ptr refs. shutdown() closes all sockets and clears the map; any | |||
| 75 | in-flight ops will complete and release their refs. | |||
| 76 | */ | |||
| 77 | ||||
| 78 | /* | |||
| 79 | kqueue socket implementation | |||
| 80 | ============================ | |||
| 81 | ||||
| 82 | Each kqueue_tcp_socket owns a descriptor_state that is persistently | |||
| 83 | registered with kqueue (EVFILT_READ + EVFILT_WRITE, both EV_CLEAR for | |||
| 84 | edge-triggered semantics). The descriptor_state tracks three operation | |||
| 85 | slots (read_op, write_op, connect_op) and two ready flags | |||
| 86 | (read_ready, write_ready) under a per-descriptor mutex. | |||
| 87 | ||||
| 88 | Speculative I/O and the pump | |||
| 89 | ---------------------------- | |||
| 90 | read_some() and write_some() attempt the syscall (readv/writev) | |||
| 91 | speculatively before suspending the caller. If data is available the | |||
| 92 | result is returned via symmetric transfer — no scheduler queue, no | |||
| 93 | mutex, no reactor round-trip. An inline budget limits consecutive | |||
| 94 | inline completions to prevent starvation of other connections. | |||
| 95 | ||||
| 96 | When the speculative attempt returns EAGAIN, register_op() parks the | |||
| 97 | operation in its descriptor_state slot under the per-descriptor mutex. | |||
| 98 | If a cached ready flag fires before parking, register_op() retries | |||
| 99 | the I/O once under the mutex. This eliminates the cached_initiator | |||
| 100 | coroutine frame that previously trampolined into do_read_io() / | |||
| 101 | do_write_io() after the caller suspended. | |||
| 102 | ||||
| 103 | Ready-flag protocol | |||
| 104 | ------------------- | |||
| 105 | When a kqueue event fires and no operation is pending for that | |||
| 106 | direction, the reactor sets the corresponding ready flag instead of | |||
| 107 | dropping the event. When register_op() finds the ready flag set, it | |||
| 108 | performs I/O immediately rather than parking. This prevents lost | |||
| 109 | wakeups under edge-triggered notification. | |||
| 110 | */ | |||
| 111 | ||||
| 112 | namespace boost::corosio::detail { | |||
| 113 | ||||
| 114 | /** kqueue TCP service implementation. | |||
| 115 | ||||
| 116 | Inherits from tcp_service to enable runtime polymorphism. | |||
| 117 | Uses key_type = tcp_service for service lookup. | |||
| 118 | */ | |||
| 119 | class BOOST_COROSIO_DECL kqueue_tcp_service final | |||
| 120 | : public reactor_socket_service< | |||
| 121 | kqueue_tcp_service, | |||
| 122 | tcp_service, | |||
| 123 | kqueue_scheduler, | |||
| 124 | kqueue_tcp_socket> | |||
| 125 | { | |||
| 126 | using base_service = reactor_socket_service< | |||
| 127 | kqueue_tcp_service, | |||
| 128 | tcp_service, | |||
| 129 | kqueue_scheduler, | |||
| 130 | kqueue_tcp_socket>; | |||
| 131 | friend base_service; | |||
| 132 | ||||
| 133 | // Clear SO_LINGER before close so the destructor doesn't block | |||
| 134 | // and close() sends FIN instead of RST. RST doesn't reliably | |||
| 135 | // trigger EV_EOF on macOS kqueue. | |||
| 136 | 16035x | static void reset_linger(kqueue_tcp_socket* impl) noexcept | ||
| 137 | { | |||
| 138 |
1/4✗ Branch 0 not taken.
✓ Branch 1 taken 16035 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
|
16035x | if (impl->user_set_linger_ && impl->fd_ >= 0) | |
| 139 | { | |||
| 140 | struct ::linger lg; | |||
| 141 | ✗ | lg.l_onoff = 0; | ||
| 142 | ✗ | lg.l_linger = 0; | ||
| 143 | ✗ | ::setsockopt(impl->fd_, SOL_SOCKET, SO_LINGER, &lg, sizeof(lg)); | ||
| 144 | ✗ | } | ||
| 145 | 16035x | } | ||
| 146 | ||||
| 147 | ✗ | void pre_shutdown(kqueue_tcp_socket* impl) noexcept | ||
| 148 | { | |||
| 149 | ✗ | reset_linger(impl); | ||
| 150 | ✗ | } | ||
| 151 | ||||
| 152 | 16035x | void pre_destroy(kqueue_tcp_socket* impl) noexcept | ||
| 153 | { | |||
| 154 | 16035x | reset_linger(impl); | ||
| 155 | 16035x | } | ||
| 156 | ||||
| 157 | public: | |||
| 158 | 874x | explicit kqueue_tcp_service(capy::execution_context& ctx) | ||
| 159 | 437x | : reactor_socket_service(ctx) | ||
| 160 | 874x | { | ||
| 161 | 874x | } | ||
| 162 | ||||
| 163 | std::error_code open_socket( | |||
| 164 | tcp_socket::implementation& impl, | |||
| 165 | int family, | |||
| 166 | int type, | |||
| 167 | int protocol) override; | |||
| 168 | ||||
| 169 | std::error_code | |||
| 170 | bind_socket(tcp_socket::implementation& impl, endpoint ep) override; | |||
| 171 | }; | |||
| 172 | ||||
| 173 | inline void | |||
| 174 | ✗ | kqueue_connect_op::cancel() noexcept | ||
| 175 | { | |||
| 176 | ✗ | if (socket_impl_) | ||
| 177 | ✗ | socket_impl_->cancel_single_op(*this); | ||
| 178 | else | |||
| 179 | ✗ | request_cancel(); | ||
| 180 | ✗ | } | ||
| 181 | ||||
| 182 | inline void | |||
| 183 | 656x | kqueue_read_op::cancel() noexcept | ||
| 184 | { | |||
| 185 |
1/2✓ Branch 0 taken 656 times.
✗ Branch 1 not taken.
|
656x | if (socket_impl_) | |
| 186 | 656x | socket_impl_->cancel_single_op(*this); | ||
| 187 | else | |||
| 188 | ✗ | request_cancel(); | ||
| 189 | 656x | } | ||
| 190 | ||||
| 191 | inline void | |||
| 192 | 1x | kqueue_write_op::cancel() noexcept | ||
| 193 | { | |||
| 194 |
1/2✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
|
1x | if (socket_impl_) | |
| 195 | 1x | socket_impl_->cancel_single_op(*this); | ||
| 196 | else | |||
| 197 | ✗ | request_cancel(); | ||
| 198 | 1x | } | ||
| 199 | ||||
| 200 | inline void | |||
| 201 | 213791x | kqueue_op::operator()() | ||
| 202 | { | |||
| 203 | 213791x | complete_io_op(*this); | ||
| 204 | 213791x | } | ||
| 205 | ||||
| 206 | inline void | |||
| 207 | 5309x | kqueue_connect_op::operator()() | ||
| 208 | { | |||
| 209 | 5309x | complete_connect_op(*this); | ||
| 210 | 5309x | } | ||
| 211 | ||||
| 212 | 32070x | inline kqueue_tcp_socket::kqueue_tcp_socket(kqueue_tcp_service& svc) noexcept | ||
| 213 | 16035x | : reactor_stream_socket(svc) | ||
| 214 | 32070x | { | ||
| 215 | 16035x | } | ||
| 216 | ||||
| 217 | 32070x | inline kqueue_tcp_socket::~kqueue_tcp_socket() = default; | ||
| 218 | ||||
| 219 | inline std::coroutine_handle<> | |||
| 220 | 5309x | kqueue_tcp_socket::connect( | ||
| 221 | std::coroutine_handle<> h, | |||
| 222 | capy::executor_ref ex, | |||
| 223 | endpoint ep, | |||
| 224 | std::stop_token token, | |||
| 225 | std::error_code* ec) | |||
| 226 | { | |||
| 227 | 5309x | return do_connect(h, ex, ep, token, ec); | ||
| 228 | } | |||
| 229 | ||||
| 230 | inline std::coroutine_handle<> | |||
| 231 | 386255x | kqueue_tcp_socket::read_some( | ||
| 232 | std::coroutine_handle<> h, | |||
| 233 | capy::executor_ref ex, | |||
| 234 | buffer_param param, | |||
| 235 | std::stop_token token, | |||
| 236 | std::error_code* ec, | |||
| 237 | std::size_t* bytes_out) | |||
| 238 | { | |||
| 239 | 386255x | return do_read_some(h, ex, param, token, ec, bytes_out); | ||
| 240 | } | |||
| 241 | ||||
| 242 | inline std::coroutine_handle<> | |||
| 243 | 484200x | kqueue_tcp_socket::write_some( | ||
| 244 | std::coroutine_handle<> h, | |||
| 245 | capy::executor_ref ex, | |||
| 246 | buffer_param param, | |||
| 247 | std::stop_token token, | |||
| 248 | std::error_code* ec, | |||
| 249 | std::size_t* bytes_out) | |||
| 250 | { | |||
| 251 | 484200x | return do_write_some(h, ex, param, token, ec, bytes_out); | ||
| 252 | } | |||
| 253 | ||||
| 254 | inline std::error_code | |||
| 255 | 1808x | kqueue_tcp_socket::set_option( | ||
| 256 | int level, int optname, void const* data, std::size_t size) noexcept | |||
| 257 | { | |||
| 258 |
2/4✓ Branch 0 taken 1808 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 1808 times.
|
1808x | if (::setsockopt(fd_, level, optname, data, static_cast<socklen_t>(size)) != | |
| 259 | 0) | |||
| 260 | ✗ | return make_err(errno); | ||
| 261 |
5/6✓ Branch 0 taken 1795 times.
✓ Branch 1 taken 13 times.
✓ Branch 2 taken 1787 times.
✓ Branch 3 taken 8 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 1787 times.
|
1808x | if (level == SOL_SOCKET && optname == SO_LINGER && | |
| 262 | 1787x | size >= sizeof(struct ::linger)) | ||
| 263 | 1787x | user_set_linger_ = | ||
| 264 | 1787x | static_cast<struct ::linger const*>(data)->l_onoff != 0; | ||
| 265 | 1808x | return {}; | ||
| 266 | 1808x | } | ||
| 267 | ||||
| 268 | inline void | |||
| 269 | 117x | kqueue_tcp_socket::cancel() noexcept | ||
| 270 | { | |||
| 271 | 117x | do_cancel(); | ||
| 272 | 117x | } | ||
| 273 | ||||
| 274 | inline void | |||
| 275 | 48039x | kqueue_tcp_socket::close_socket() noexcept | ||
| 276 | { | |||
| 277 | 48039x | do_close_socket(); | ||
| 278 | 48039x | user_set_linger_ = false; | ||
| 279 | 48039x | } | ||
| 280 | ||||
| 281 | inline std::error_code | |||
| 282 | 5331x | kqueue_tcp_service::open_socket( | ||
| 283 | tcp_socket::implementation& impl, int family, int type, int protocol) | |||
| 284 | { | |||
| 285 | 5331x | auto* kq_impl = static_cast<kqueue_tcp_socket*>(&impl); | ||
| 286 | 5331x | kq_impl->close_socket(); | ||
| 287 | ||||
| 288 | 5331x | int fd = ::socket(family, type, protocol); | ||
| 289 |
1/2✓ Branch 0 taken 5331 times.
✗ Branch 1 not taken.
|
5331x | if (fd < 0) | |
| 290 | ✗ | return make_err(errno); | ||
| 291 | ||||
| 292 |
2/2✓ Branch 0 taken 5325 times.
✓ Branch 1 taken 6 times.
|
5331x | if (family == AF_INET6) | |
| 293 | { | |||
| 294 | 6x | int v6only = 1; | ||
| 295 | 6x | ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &v6only, sizeof(v6only)); | ||
| 296 | 6x | } | ||
| 297 | ||||
| 298 | // Set non-blocking | |||
| 299 | 5331x | int flags = ::fcntl(fd, F_GETFL, 0); | ||
| 300 |
1/2✓ Branch 0 taken 5331 times.
✗ Branch 1 not taken.
|
5331x | if (flags == -1) | |
| 301 | { | |||
| 302 | ✗ | int errn = errno; | ||
| 303 | ✗ | ::close(fd); | ||
| 304 | ✗ | return make_err(errn); | ||
| 305 | } | |||
| 306 |
1/2✓ Branch 0 taken 5331 times.
✗ Branch 1 not taken.
|
5331x | if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) | |
| 307 | { | |||
| 308 | ✗ | int errn = errno; | ||
| 309 | ✗ | ::close(fd); | ||
| 310 | ✗ | return make_err(errn); | ||
| 311 | } | |||
| 312 | ||||
| 313 | // Set close-on-exec | |||
| 314 |
1/2✓ Branch 0 taken 5331 times.
✗ Branch 1 not taken.
|
5331x | if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1) | |
| 315 | { | |||
| 316 | ✗ | int errn = errno; | ||
| 317 | ✗ | ::close(fd); | ||
| 318 | ✗ | return make_err(errn); | ||
| 319 | } | |||
| 320 | ||||
| 321 | // Suppress SIGPIPE on this socket; writev() has no MSG_NOSIGNAL | |||
| 322 | // equivalent, so SO_NOSIGPIPE is required on macOS/FreeBSD. | |||
| 323 | 5331x | int one = 1; | ||
| 324 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 5331 times.
|
5331x | if (::setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one)) != 0) | |
| 325 | { | |||
| 326 | ✗ | int errn = errno; | ||
| 327 | ✗ | ::close(fd); | ||
| 328 | ✗ | return make_err(errn); | ||
| 329 | } | |||
| 330 | ||||
| 331 | 5331x | kq_impl->fd_ = fd; | ||
| 332 | ||||
| 333 | // Register fd with kqueue (edge-triggered mode via EV_CLEAR) | |||
| 334 | 5331x | kq_impl->desc_state_.fd = fd; | ||
| 335 | { | |||
| 336 | 5331x | std::lock_guard lock(kq_impl->desc_state_.mutex); | ||
| 337 | 5331x | kq_impl->desc_state_.read_op = nullptr; | ||
| 338 | 5331x | kq_impl->desc_state_.write_op = nullptr; | ||
| 339 | 5331x | kq_impl->desc_state_.connect_op = nullptr; | ||
| 340 | 5331x | } | ||
| 341 | 5331x | scheduler().register_descriptor(fd, &kq_impl->desc_state_); | ||
| 342 | ||||
| 343 | 5331x | return {}; | ||
| 344 | 5331x | } | ||
| 345 | ||||
| 346 | inline std::error_code | |||
| 347 | 6x | kqueue_tcp_service::bind_socket( | ||
| 348 | tcp_socket::implementation& impl, endpoint ep) | |||
| 349 | { | |||
| 350 | 6x | return static_cast<kqueue_tcp_socket*>(&impl)->do_bind(ep); | ||
| 351 | } | |||
| 352 | ||||
| 353 | } // namespace boost::corosio::detail | |||
| 354 | ||||
| 355 | #endif // BOOST_COROSIO_HAS_KQUEUE | |||
| 356 | ||||
| 357 | #endif // BOOST_COROSIO_NATIVE_DETAIL_KQUEUE_KQUEUE_TCP_SERVICE_HPP | |||
| 358 |