include/boost/corosio/native/detail/kqueue/kqueue_tcp_service.hpp
72.8% Lines (75/103)
89.5% List of functions (17/19)
47.2% Branches (17/36)
Functions (19)
Function
Calls
Lines
Branches
Blocks
boost::corosio::detail::kqueue_tcp_service::~kqueue_tcp_service()
:119
1125x
100.0%
–
100.0%
boost::corosio::detail::kqueue_tcp_service::reset_linger(boost::corosio::detail::kqueue_tcp_socket*)
:136
16956x
42.9%
16.7%
33.0%
boost::corosio::detail::kqueue_tcp_service::pre_shutdown(boost::corosio::detail::kqueue_tcp_socket*)
:147
0
0.0%
–
0.0%
boost::corosio::detail::kqueue_tcp_service::pre_destroy(boost::corosio::detail::kqueue_tcp_socket*)
:152
16956x
100.0%
–
100.0%
boost::corosio::detail::kqueue_tcp_service::kqueue_tcp_service(boost::capy::execution_context&)
:158
750x
100.0%
–
100.0%
boost::corosio::detail::kqueue_connect_op::cancel()
:171
0
0.0%
0.0%
0.0%
boost::corosio::detail::kqueue_read_op::cancel()
:180
757x
80.0%
50.0%
75.0%
boost::corosio::detail::kqueue_write_op::cancel()
:189
1x
80.0%
50.0%
75.0%
boost::corosio::detail::kqueue_op::operator()()
:198
222116x
100.0%
–
100.0%
boost::corosio::detail::kqueue_connect_op::operator()()
:204
5618x
100.0%
–
100.0%
boost::corosio::detail::kqueue_tcp_socket::kqueue_tcp_socket(boost::corosio::detail::kqueue_tcp_service&)
:209
33912x
100.0%
–
100.0%
boost::corosio::detail::kqueue_tcp_socket::~kqueue_tcp_socket()
:214
33912x
100.0%
–
100.0%
boost::corosio::detail::kqueue_tcp_socket::connect(std::__1::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::endpoint, std::__1::stop_token, std::__1::error_code*)
:217
5618x
100.0%
–
100.0%
boost::corosio::detail::kqueue_tcp_socket::read_some(std::__1::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::buffer_param, std::__1::stop_token, std::__1::error_code*, unsigned long*)
:228
385012x
100.0%
–
100.0%
boost::corosio::detail::kqueue_tcp_socket::write_some(std::__1::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::buffer_param, std::__1::stop_token, std::__1::error_code*, unsigned long*)
:240
516758x
100.0%
–
100.0%
boost::corosio::detail::kqueue_tcp_socket::set_option(int, int, void const*, unsigned long)
:252
2128x
88.9%
58.3%
72.0%
boost::corosio::detail::kqueue_tcp_socket::cancel()
:266
118x
100.0%
–
100.0%
boost::corosio::detail::kqueue_tcp_socket::close_socket()
:272
50798x
100.0%
–
100.0%
boost::corosio::detail::kqueue_tcp_service::open_socket(boost::corosio::tcp_socket::implementation&, int, int, int)
:279
5635x
65.8%
58.3%
64.0%
| Line | Branch | TLA | Hits | Source Code |
|---|---|---|---|---|
| 1 | // | |||
| 2 | // Copyright (c) 2026 Michael Vandeberg | |||
| 3 | // Copyright (c) 2026 Steve Gerbino | |||
| 4 | // | |||
| 5 | // Distributed under the Boost Software License, Version 1.0. (See accompanying | |||
| 6 | // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) | |||
| 7 | // | |||
| 8 | // Official repository: https://github.com/cppalliance/corosio | |||
| 9 | // | |||
| 10 | ||||
| 11 | #ifndef BOOST_COROSIO_NATIVE_DETAIL_KQUEUE_KQUEUE_TCP_SERVICE_HPP | |||
| 12 | #define BOOST_COROSIO_NATIVE_DETAIL_KQUEUE_KQUEUE_TCP_SERVICE_HPP | |||
| 13 | ||||
| 14 | #include <boost/corosio/detail/platform.hpp> | |||
| 15 | ||||
| 16 | #if BOOST_COROSIO_HAS_KQUEUE | |||
| 17 | ||||
| 18 | #include <boost/corosio/detail/config.hpp> | |||
| 19 | #include <boost/corosio/detail/tcp_service.hpp> | |||
| 20 | ||||
| 21 | #include <boost/corosio/native/detail/kqueue/kqueue_tcp_socket.hpp> | |||
| 22 | #include <boost/corosio/native/detail/kqueue/kqueue_scheduler.hpp> | |||
| 23 | #include <boost/corosio/native/detail/reactor/reactor_socket_service.hpp> | |||
| 24 | ||||
| 25 | #include <boost/corosio/native/detail/reactor/reactor_op_complete.hpp> | |||
| 26 | ||||
| 27 | #include <coroutine> | |||
| 28 | ||||
| 29 | #include <errno.h> | |||
| 30 | #include <fcntl.h> | |||
| 31 | #include <netinet/in.h> | |||
| 32 | #include <netinet/tcp.h> | |||
| 33 | #include <sys/socket.h> | |||
| 34 | #include <unistd.h> | |||
| 35 | ||||
| 36 | /* | |||
| 37 | kqueue Socket Implementation | |||
| 38 | ============================ | |||
| 39 | ||||
| 40 | Each I/O operation follows the same pattern: | |||
| 41 | 1. Try the syscall speculatively (readv/writev) before suspending | |||
| 42 | 2. On success, return via symmetric transfer (the "pump" fast path) | |||
| 43 | 3. On budget exhaustion, post to the scheduler queue for fairness | |||
| 44 | 4. On EAGAIN, register_op() parks the op in the descriptor_state | |||
| 45 | ||||
| 46 | The speculative path avoids scheduler queue, mutex, and reactor | |||
| 47 | round-trips entirely. An inline budget limits consecutive inline | |||
| 48 | completions to prevent starvation of other connections. | |||
| 49 | ||||
| 50 | Cancellation | |||
| 51 | ------------ | |||
| 52 | See op.hpp for the completion/cancellation race handling via the | |||
| 53 | descriptor_state mutex. cancel() must complete pending operations (post | |||
| 54 | them with cancelled flag) so coroutines waiting on them can resume. | |||
| 55 | close_socket() calls cancel() first to ensure this. | |||
| 56 | ||||
| 57 | Impl Lifetime with shared_ptr | |||
| 58 | ----------------------------- | |||
| 59 | Socket impls use enable_shared_from_this. The service owns impls via | |||
| 60 | shared_ptr maps (impl_ptrs_) keyed by raw pointer for O(1) lookup and | |||
| 61 | removal. When a user calls close(), we call cancel() which posts pending | |||
| 62 | ops to the scheduler. | |||
| 63 | ||||
| 64 | CRITICAL: The posted ops must keep the impl alive until they complete. | |||
| 65 | Otherwise the scheduler would process a freed op (use-after-free). The | |||
| 66 | cancel() method captures shared_from_this() into op.impl_ptr before | |||
| 67 | posting. When the op completes, impl_ptr is cleared, allowing the impl | |||
| 68 | to be destroyed if no other references exist. | |||
| 69 | ||||
| 70 | Service Ownership | |||
| 71 | ----------------- | |||
| 72 | kqueue_tcp_service owns all socket impls. destroy_impl() removes the | |||
| 73 | shared_ptr from the map, but the impl may survive if ops still hold | |||
| 74 | impl_ptr refs. shutdown() closes all sockets and clears the map; any | |||
| 75 | in-flight ops will complete and release their refs. | |||
| 76 | */ | |||
| 77 | ||||
| 78 | /* | |||
| 79 | kqueue socket implementation | |||
| 80 | ============================ | |||
| 81 | ||||
| 82 | Each kqueue_tcp_socket owns a descriptor_state that is persistently | |||
| 83 | registered with kqueue (EVFILT_READ + EVFILT_WRITE, both EV_CLEAR for | |||
| 84 | edge-triggered semantics). The descriptor_state tracks three operation | |||
| 85 | slots (read_op, write_op, connect_op) and two ready flags | |||
| 86 | (read_ready, write_ready) under a per-descriptor mutex. | |||
| 87 | ||||
| 88 | Speculative I/O and the pump | |||
| 89 | ---------------------------- | |||
| 90 | read_some() and write_some() attempt the syscall (readv/writev) | |||
| 91 | speculatively before suspending the caller. If data is available the | |||
| 92 | result is returned via symmetric transfer — no scheduler queue, no | |||
| 93 | mutex, no reactor round-trip. An inline budget limits consecutive | |||
| 94 | inline completions to prevent starvation of other connections. | |||
| 95 | ||||
| 96 | When the speculative attempt returns EAGAIN, register_op() parks the | |||
| 97 | operation in its descriptor_state slot under the per-descriptor mutex. | |||
| 98 | If a cached ready flag fires before parking, register_op() retries | |||
| 99 | the I/O once under the mutex. This eliminates the cached_initiator | |||
| 100 | coroutine frame that previously trampolined into do_read_io() / | |||
| 101 | do_write_io() after the caller suspended. | |||
| 102 | ||||
| 103 | Ready-flag protocol | |||
| 104 | ------------------- | |||
| 105 | When a kqueue event fires and no operation is pending for that | |||
| 106 | direction, the reactor sets the corresponding ready flag instead of | |||
| 107 | dropping the event. When register_op() finds the ready flag set, it | |||
| 108 | performs I/O immediately rather than parking. This prevents lost | |||
| 109 | wakeups under edge-triggered notification. | |||
| 110 | */ | |||
| 111 | ||||
| 112 | namespace boost::corosio::detail { | |||
| 113 | ||||
| 114 | /** kqueue TCP service implementation. | |||
| 115 | ||||
| 116 | Inherits from tcp_service to enable runtime polymorphism. | |||
| 117 | Uses key_type = tcp_service for service lookup. | |||
| 118 | */ | |||
| 119 | class BOOST_COROSIO_DECL kqueue_tcp_service final | |||
| 120 | : public reactor_socket_service< | |||
| 121 | kqueue_tcp_service, | |||
| 122 | tcp_service, | |||
| 123 | kqueue_scheduler, | |||
| 124 | kqueue_tcp_socket> | |||
| 125 | { | |||
| 126 | using base_service = reactor_socket_service< | |||
| 127 | kqueue_tcp_service, | |||
| 128 | tcp_service, | |||
| 129 | kqueue_scheduler, | |||
| 130 | kqueue_tcp_socket>; | |||
| 131 | friend base_service; | |||
| 132 | ||||
| 133 | // Clear SO_LINGER before close so the destructor doesn't block | |||
| 134 | // and close() sends FIN instead of RST. RST doesn't reliably | |||
| 135 | // trigger EV_EOF on macOS kqueue. | |||
| 136 | 16956x | static void reset_linger(kqueue_tcp_socket* impl) noexcept | ||
| 137 | { | |||
| 138 |
1/4✗ Branch 0 not taken.
✓ Branch 1 taken 16956 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
|
16956x | if (impl->user_set_linger_ && impl->fd_ >= 0) | |
| 139 | { | |||
| 140 | struct ::linger lg; | |||
| 141 | ✗ | lg.l_onoff = 0; | ||
| 142 | ✗ | lg.l_linger = 0; | ||
| 143 | ✗ | ::setsockopt(impl->fd_, SOL_SOCKET, SO_LINGER, &lg, sizeof(lg)); | ||
| 144 | ✗ | } | ||
| 145 | 16956x | } | ||
| 146 | ||||
| 147 | ✗ | void pre_shutdown(kqueue_tcp_socket* impl) noexcept | ||
| 148 | { | |||
| 149 | ✗ | reset_linger(impl); | ||
| 150 | ✗ | } | ||
| 151 | ||||
| 152 | 16956x | void pre_destroy(kqueue_tcp_socket* impl) noexcept | ||
| 153 | { | |||
| 154 | 16956x | reset_linger(impl); | ||
| 155 | 16956x | } | ||
| 156 | ||||
| 157 | public: | |||
| 158 | 750x | explicit kqueue_tcp_service(capy::execution_context& ctx) | ||
| 159 | 375x | : reactor_socket_service(ctx) | ||
| 160 | 750x | { | ||
| 161 | 750x | } | ||
| 162 | ||||
| 163 | std::error_code open_socket( | |||
| 164 | tcp_socket::implementation& impl, | |||
| 165 | int family, | |||
| 166 | int type, | |||
| 167 | int protocol) override; | |||
| 168 | }; | |||
| 169 | ||||
| 170 | inline void | |||
| 171 | ✗ | kqueue_connect_op::cancel() noexcept | ||
| 172 | { | |||
| 173 | ✗ | if (socket_impl_) | ||
| 174 | ✗ | socket_impl_->cancel_single_op(*this); | ||
| 175 | else | |||
| 176 | ✗ | request_cancel(); | ||
| 177 | ✗ | } | ||
| 178 | ||||
| 179 | inline void | |||
| 180 | 757x | kqueue_read_op::cancel() noexcept | ||
| 181 | { | |||
| 182 |
1/2✓ Branch 0 taken 757 times.
✗ Branch 1 not taken.
|
757x | if (socket_impl_) | |
| 183 | 757x | socket_impl_->cancel_single_op(*this); | ||
| 184 | else | |||
| 185 | ✗ | request_cancel(); | ||
| 186 | 757x | } | ||
| 187 | ||||
| 188 | inline void | |||
| 189 | 1x | kqueue_write_op::cancel() noexcept | ||
| 190 | { | |||
| 191 |
1/2✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
|
1x | if (socket_impl_) | |
| 192 | 1x | socket_impl_->cancel_single_op(*this); | ||
| 193 | else | |||
| 194 | ✗ | request_cancel(); | ||
| 195 | 1x | } | ||
| 196 | ||||
| 197 | inline void | |||
| 198 | 222116x | kqueue_op::operator()() | ||
| 199 | { | |||
| 200 | 222116x | complete_io_op(*this); | ||
| 201 | 222116x | } | ||
| 202 | ||||
| 203 | inline void | |||
| 204 | 5618x | kqueue_connect_op::operator()() | ||
| 205 | { | |||
| 206 | 5618x | complete_connect_op(*this); | ||
| 207 | 5618x | } | ||
| 208 | ||||
| 209 | 33912x | inline kqueue_tcp_socket::kqueue_tcp_socket(kqueue_tcp_service& svc) noexcept | ||
| 210 | 16956x | : reactor_stream_socket(svc) | ||
| 211 | 33912x | { | ||
| 212 | 16956x | } | ||
| 213 | ||||
| 214 | 33912x | inline kqueue_tcp_socket::~kqueue_tcp_socket() = default; | ||
| 215 | ||||
| 216 | inline std::coroutine_handle<> | |||
| 217 | 5618x | kqueue_tcp_socket::connect( | ||
| 218 | std::coroutine_handle<> h, | |||
| 219 | capy::executor_ref ex, | |||
| 220 | endpoint ep, | |||
| 221 | std::stop_token token, | |||
| 222 | std::error_code* ec) | |||
| 223 | { | |||
| 224 | 5618x | return do_connect(h, ex, ep, token, ec); | ||
| 225 | } | |||
| 226 | ||||
| 227 | inline std::coroutine_handle<> | |||
| 228 | 385012x | kqueue_tcp_socket::read_some( | ||
| 229 | std::coroutine_handle<> h, | |||
| 230 | capy::executor_ref ex, | |||
| 231 | buffer_param param, | |||
| 232 | std::stop_token token, | |||
| 233 | std::error_code* ec, | |||
| 234 | std::size_t* bytes_out) | |||
| 235 | { | |||
| 236 | 385012x | return do_read_some(h, ex, param, token, ec, bytes_out); | ||
| 237 | } | |||
| 238 | ||||
| 239 | inline std::coroutine_handle<> | |||
| 240 | 516758x | kqueue_tcp_socket::write_some( | ||
| 241 | std::coroutine_handle<> h, | |||
| 242 | capy::executor_ref ex, | |||
| 243 | buffer_param param, | |||
| 244 | std::stop_token token, | |||
| 245 | std::error_code* ec, | |||
| 246 | std::size_t* bytes_out) | |||
| 247 | { | |||
| 248 | 516758x | return do_write_some(h, ex, param, token, ec, bytes_out); | ||
| 249 | } | |||
| 250 | ||||
| 251 | inline std::error_code | |||
| 252 | 2128x | kqueue_tcp_socket::set_option( | ||
| 253 | int level, int optname, void const* data, std::size_t size) noexcept | |||
| 254 | { | |||
| 255 |
2/4✓ Branch 0 taken 2128 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 2128 times.
|
2128x | if (::setsockopt(fd_, level, optname, data, static_cast<socklen_t>(size)) != | |
| 256 | 0) | |||
| 257 | ✗ | return make_err(errno); | ||
| 258 |
5/6✓ Branch 0 taken 2115 times.
✓ Branch 1 taken 13 times.
✓ Branch 2 taken 2107 times.
✓ Branch 3 taken 8 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 2107 times.
|
2128x | if (level == SOL_SOCKET && optname == SO_LINGER && | |
| 259 | 2107x | size >= sizeof(struct ::linger)) | ||
| 260 | 2107x | user_set_linger_ = | ||
| 261 | 2107x | static_cast<struct ::linger const*>(data)->l_onoff != 0; | ||
| 262 | 2128x | return {}; | ||
| 263 | 2128x | } | ||
| 264 | ||||
| 265 | inline void | |||
| 266 | 118x | kqueue_tcp_socket::cancel() noexcept | ||
| 267 | { | |||
| 268 | 118x | do_cancel(); | ||
| 269 | 118x | } | ||
| 270 | ||||
| 271 | inline void | |||
| 272 | 50798x | kqueue_tcp_socket::close_socket() noexcept | ||
| 273 | { | |||
| 274 | 50798x | do_close_socket(); | ||
| 275 | 50798x | user_set_linger_ = false; | ||
| 276 | 50798x | } | ||
| 277 | ||||
| 278 | inline std::error_code | |||
| 279 | 5635x | kqueue_tcp_service::open_socket( | ||
| 280 | tcp_socket::implementation& impl, int family, int type, int protocol) | |||
| 281 | { | |||
| 282 | 5635x | auto* kq_impl = static_cast<kqueue_tcp_socket*>(&impl); | ||
| 283 | 5635x | kq_impl->close_socket(); | ||
| 284 | ||||
| 285 | 5635x | int fd = ::socket(family, type, protocol); | ||
| 286 |
1/2✓ Branch 0 taken 5635 times.
✗ Branch 1 not taken.
|
5635x | if (fd < 0) | |
| 287 | ✗ | return make_err(errno); | ||
| 288 | ||||
| 289 |
2/2✓ Branch 0 taken 5630 times.
✓ Branch 1 taken 5 times.
|
5635x | if (family == AF_INET6) | |
| 290 | { | |||
| 291 | 5x | int v6only = 1; | ||
| 292 | 5x | ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &v6only, sizeof(v6only)); | ||
| 293 | 5x | } | ||
| 294 | ||||
| 295 | // Set non-blocking | |||
| 296 | 5635x | int flags = ::fcntl(fd, F_GETFL, 0); | ||
| 297 |
1/2✓ Branch 0 taken 5635 times.
✗ Branch 1 not taken.
|
5635x | if (flags == -1) | |
| 298 | { | |||
| 299 | ✗ | int errn = errno; | ||
| 300 | ✗ | ::close(fd); | ||
| 301 | ✗ | return make_err(errn); | ||
| 302 | } | |||
| 303 |
1/2✓ Branch 0 taken 5635 times.
✗ Branch 1 not taken.
|
5635x | if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) | |
| 304 | { | |||
| 305 | ✗ | int errn = errno; | ||
| 306 | ✗ | ::close(fd); | ||
| 307 | ✗ | return make_err(errn); | ||
| 308 | } | |||
| 309 | ||||
| 310 | // Set close-on-exec | |||
| 311 |
1/2✓ Branch 0 taken 5635 times.
✗ Branch 1 not taken.
|
5635x | if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1) | |
| 312 | { | |||
| 313 | ✗ | int errn = errno; | ||
| 314 | ✗ | ::close(fd); | ||
| 315 | ✗ | return make_err(errn); | ||
| 316 | } | |||
| 317 | ||||
| 318 | // Suppress SIGPIPE on this socket; writev() has no MSG_NOSIGNAL | |||
| 319 | // equivalent, so SO_NOSIGPIPE is required on macOS/FreeBSD. | |||
| 320 | 5635x | int one = 1; | ||
| 321 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 5635 times.
|
5635x | if (::setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one)) != 0) | |
| 322 | { | |||
| 323 | ✗ | int errn = errno; | ||
| 324 | ✗ | ::close(fd); | ||
| 325 | ✗ | return make_err(errn); | ||
| 326 | } | |||
| 327 | ||||
| 328 | 5635x | kq_impl->fd_ = fd; | ||
| 329 | ||||
| 330 | // Register fd with kqueue (edge-triggered mode via EV_CLEAR) | |||
| 331 | 5635x | kq_impl->desc_state_.fd = fd; | ||
| 332 | { | |||
| 333 | 5635x | std::lock_guard lock(kq_impl->desc_state_.mutex); | ||
| 334 | 5635x | kq_impl->desc_state_.read_op = nullptr; | ||
| 335 | 5635x | kq_impl->desc_state_.write_op = nullptr; | ||
| 336 | 5635x | kq_impl->desc_state_.connect_op = nullptr; | ||
| 337 | 5635x | } | ||
| 338 | 5635x | scheduler().register_descriptor(fd, &kq_impl->desc_state_); | ||
| 339 | ||||
| 340 | 5635x | return {}; | ||
| 341 | 5635x | } | ||
| 342 | ||||
| 343 | } // namespace boost::corosio::detail | |||
| 344 | ||||
| 345 | #endif // BOOST_COROSIO_HAS_KQUEUE | |||
| 346 | ||||
| 347 | #endif // BOOST_COROSIO_NATIVE_DETAIL_KQUEUE_KQUEUE_TCP_SERVICE_HPP | |||
| 348 |