include/boost/corosio/native/detail/kqueue/kqueue_op.hpp
83.0% Lines (122/147)
90.3% Functions (28/31)
35.7% Branches (20/56)
| Line | Branch | TLA | Hits | Source Code |
|---|---|---|---|---|
| 1 | // | |||
| 2 | // Copyright (c) 2026 Michael Vandeberg | |||
| 3 | // Copyright (c) 2026 Steve Gerbino | |||
| 4 | // | |||
| 5 | // Distributed under the Boost Software License, Version 1.0. (See accompanying | |||
| 6 | // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) | |||
| 7 | // | |||
| 8 | // Official repository: https://github.com/cppalliance/corosio | |||
| 9 | // | |||
| 10 | ||||
| 11 | #ifndef BOOST_COROSIO_NATIVE_DETAIL_KQUEUE_KQUEUE_OP_HPP | |||
| 12 | #define BOOST_COROSIO_NATIVE_DETAIL_KQUEUE_KQUEUE_OP_HPP | |||
| 13 | ||||
| 14 | #include <boost/corosio/detail/platform.hpp> | |||
| 15 | ||||
| 16 | #if BOOST_COROSIO_HAS_KQUEUE | |||
| 17 | ||||
| 18 | #include <boost/corosio/detail/config.hpp> | |||
| 19 | #include <boost/corosio/io/io_object.hpp> | |||
| 20 | #include <boost/corosio/endpoint.hpp> | |||
| 21 | #include <boost/capy/ex/executor_ref.hpp> | |||
| 22 | #include <coroutine> | |||
| 23 | #include <boost/capy/error.hpp> | |||
| 24 | #include <system_error> | |||
| 25 | ||||
| 26 | #include <boost/corosio/detail/scheduler_op.hpp> | |||
| 27 | ||||
| 28 | #include <unistd.h> | |||
| 29 | #include <errno.h> | |||
| 30 | #include <fcntl.h> | |||
| 31 | ||||
| 32 | #include <atomic> | |||
| 33 | #include <cstddef> | |||
| 34 | #include <memory> | |||
| 35 | #include <mutex> | |||
| 36 | #include <optional> | |||
| 37 | #include <stop_token> | |||
| 38 | ||||
| 39 | #include <netinet/in.h> | |||
| 40 | #include <sys/socket.h> | |||
| 41 | #include <sys/uio.h> | |||
| 42 | ||||
| 43 | /* | |||
| 44 | kqueue Operation State | |||
| 45 | ====================== | |||
| 46 | ||||
| 47 | Each async I/O operation has a corresponding kqueue_op-derived struct that | |||
| 48 | holds the operation's state while it's in flight. The socket impl owns | |||
| 49 | fixed slots for each operation type (conn_, rd_, wr_), so only one | |||
| 50 | operation of each type can be pending per socket at a time. | |||
| 51 | ||||
| 52 | Persistent Registration | |||
| 53 | ----------------------- | |||
| 54 | File descriptors are registered with kqueue once (via descriptor_state) and | |||
| 55 | stay registered until closed. Uses EV_CLEAR for edge-triggered semantics | |||
| 56 | (equivalent to epoll's EPOLLET). The descriptor_state tracks which operations | |||
| 57 | are pending (read_op, write_op, connect_op). When an event arrives, the | |||
| 58 | reactor dispatches to the appropriate pending operation. | |||
| 59 | ||||
| 60 | Impl Lifetime Management | |||
| 61 | ------------------------ | |||
| 62 | When cancel() posts an op to the scheduler's ready queue, the socket impl | |||
| 63 | might be destroyed before the scheduler processes the op. The `impl_ptr` | |||
| 64 | member holds a shared_ptr to the impl, keeping it alive until the op | |||
| 65 | completes. This is set by cancel() and cleared in operator() after the | |||
| 66 | coroutine is resumed. | |||
| 67 | ||||
| 68 | EOF Detection | |||
| 69 | ------------- | |||
| 70 | For reads, 0 bytes with no error means EOF. But an empty user buffer also | |||
| 71 | returns 0 bytes. The `empty_buffer_read` flag distinguishes these cases. | |||
| 72 | ||||
| 73 | SIGPIPE Prevention | |||
| 74 | ------------------ | |||
| 75 | SO_NOSIGPIPE is set on each socket at creation time (see sockets.cpp). | |||
| 76 | Writes use writev() which is safe because the socket-level option suppresses | |||
| 77 | SIGPIPE delivery. | |||
| 78 | */ | |||
| 79 | ||||
| 80 | namespace boost::corosio::detail { | |||
| 81 | ||||
| 82 | // Ready-event flag constants for descriptor_state::ready_events_. | |||
| 83 | // These match the epoll numeric values (EPOLLIN=0x1, EPOLLOUT=0x4, | |||
| 84 | // EPOLLERR=0x8) so that descriptor_state::operator()() uses the same | |||
| 85 | // flag-checking logic as the epoll backend. | |||
| 86 | static constexpr std::uint32_t kqueue_event_read = 0x001; | |||
| 87 | static constexpr std::uint32_t kqueue_event_write = 0x004; | |||
| 88 | static constexpr std::uint32_t kqueue_event_error = 0x008; | |||
| 89 | ||||
| 90 | // Forward declarations | |||
| 91 | class kqueue_socket; | |||
| 92 | class kqueue_acceptor; | |||
| 93 | struct kqueue_op; | |||
| 94 | ||||
| 95 | class kqueue_scheduler; | |||
| 96 | ||||
| 97 | /** Per-descriptor state for persistent kqueue registration. | |||
| 98 | ||||
| 99 | Tracks pending operations for a file descriptor. The fd is registered | |||
| 100 | once with kqueue (EVFILT_READ + EVFILT_WRITE, both EV_CLEAR) and stays | |||
| 101 | registered until closed. | |||
| 102 | ||||
| 103 | This struct extends scheduler_op to support deferred I/O processing. | |||
| 104 | When kqueue events arrive, the reactor sets ready_events and queues | |||
| 105 | this descriptor for processing. When popped from the scheduler queue, | |||
| 106 | operator() performs the actual I/O and queues completion handlers. | |||
| 107 | ||||
| 108 | @par Deferred I/O Model | |||
| 109 | The reactor no longer performs I/O directly. Instead: | |||
| 110 | 1. Reactor sets ready_events and queues descriptor_state | |||
| 111 | 2. Scheduler pops descriptor_state and calls operator() | |||
| 112 | 3. operator() performs I/O under mutex and queues completions | |||
| 113 | ||||
| 114 | This eliminates per-descriptor mutex locking from the reactor hot path. | |||
| 115 | ||||
| 116 | @par Thread Safety | |||
| 117 | The mutex protects operation pointers and ready flags during I/O. | |||
| 118 | ready_events_ and is_enqueued_ are atomic for lock-free reactor access. | |||
| 119 | */ | |||
| 120 | 17376 | struct descriptor_state final : scheduler_op | ||
| 121 | { | |||
| 122 | std::mutex mutex; | |||
| 123 | ||||
| 124 | // Protected by mutex | |||
| 125 | 17376 | kqueue_op* read_op = nullptr; | ||
| 126 | 17376 | kqueue_op* write_op = nullptr; | ||
| 127 | 17376 | kqueue_op* connect_op = nullptr; | ||
| 128 | ||||
| 129 | // Caches edge events that arrived before an op was registered | |||
| 130 | 17376 | bool read_ready = false; | ||
| 131 | 17376 | bool write_ready = false; | ||
| 132 | ||||
| 133 | // Deferred cancellation: set by cancel() when the target op is not | |||
| 134 | // parked (e.g. completing inline via speculative I/O). Checked when | |||
| 135 | // the next op parks; if set, the op is immediately self-cancelled. | |||
| 136 | // This matches IOCP semantics where CancelIoEx always succeeds. | |||
| 137 | 17376 | bool read_cancel_pending = false; | ||
| 138 | 17376 | bool write_cancel_pending = false; | ||
| 139 | 17376 | bool connect_cancel_pending = false; | ||
| 140 | ||||
| 141 | // Set during registration only (no mutex needed) | |||
| 142 | 17376 | std::uint32_t registered_events = 0; | ||
| 143 | 17376 | int fd = -1; | ||
| 144 | ||||
| 145 | // For deferred I/O - set by reactor, read by scheduler | |||
| 146 | 17376 | std::atomic<std::uint32_t> ready_events_{0}; | ||
| 147 | 17376 | std::atomic<bool> is_enqueued_{false}; | ||
| 148 | 17376 | kqueue_scheduler const* scheduler_ = nullptr; | ||
| 149 | ||||
| 150 | // Prevents impl destruction while this descriptor_state is queued. | |||
| 151 | // Set by close_socket() when is_enqueued_ is true, cleared by operator(). | |||
| 152 | std::shared_ptr<void> impl_ref_; | |||
| 153 | ||||
| 154 | /// Add ready events atomically. | |||
| 155 | /// Release pairs with the consumer's acquire exchange on | |||
| 156 | /// ready_events_ so the consumer sees all flags. On x86 (TSO) | |||
| 157 | /// this compiles to the same LOCK OR as relaxed. | |||
| 158 | 121474 | void add_ready_events(std::uint32_t ev) noexcept | ||
| 159 | { | |||
| 160 | 121474 | ready_events_.fetch_or(ev, std::memory_order_release); | ||
| 161 | 121474 | } | ||
| 162 | ||||
| 163 | /// Perform deferred I/O and queue completions. | |||
| 164 | void operator()() override; | |||
| 165 | ||||
| 166 | /// Destroy without invoking. | |||
| 167 | /// Called during scheduler::shutdown() drain. Clear impl_ref_ to break | |||
| 168 | /// the self-referential cycle set by close_socket(). | |||
| 169 | 78 | void destroy() override | ||
| 170 | { | |||
| 171 | 78 | impl_ref_.reset(); | ||
| 172 | 78 | } | ||
| 173 | }; | |||
| 174 | ||||
| 175 | struct kqueue_op : scheduler_op | |||
| 176 | { | |||
| 177 | struct canceller | |||
| 178 | { | |||
| 179 | kqueue_op* op; | |||
| 180 | void operator()() const noexcept; | |||
| 181 | }; | |||
| 182 | ||||
| 183 | std::coroutine_handle<> h; | |||
| 184 | capy::executor_ref ex; | |||
| 185 | 50196 | std::error_code* ec_out = nullptr; | ||
| 186 | 50196 | std::size_t* bytes_out = nullptr; | ||
| 187 | ||||
| 188 | 50196 | int fd = -1; | ||
| 189 | 50196 | int errn = 0; | ||
| 190 | 50196 | std::size_t bytes_transferred = 0; | ||
| 191 | ||||
| 192 | 50196 | std::atomic<bool> cancelled{false}; | ||
| 193 | std::optional<std::stop_callback<canceller>> stop_cb; | |||
| 194 | ||||
| 195 | // Prevents use-after-free when socket is closed with pending ops. | |||
| 196 | // See "Impl Lifetime Management" in file header. | |||
| 197 | std::shared_ptr<void> impl_ptr; | |||
| 198 | ||||
| 199 | // For stop_token cancellation - pointer to owning socket/acceptor impl. | |||
| 200 | // When stop is requested, we call back to the impl to perform actual I/O cancellation. | |||
| 201 | 50196 | kqueue_socket* socket_impl_ = nullptr; | ||
| 202 | 50196 | kqueue_acceptor* acceptor_impl_ = nullptr; | ||
| 203 | ||||
| 204 | 150588 | kqueue_op() = default; | ||
| 205 | ||||
| 206 | 815169 | void reset() noexcept | ||
| 207 | { | |||
| 208 | 815169 | fd = -1; | ||
| 209 | 815169 | errn = 0; | ||
| 210 | 815169 | bytes_transferred = 0; | ||
| 211 | 815169 | cancelled.store(false, std::memory_order_relaxed); | ||
| 212 | 815169 | impl_ptr.reset(); | ||
| 213 | 815169 | socket_impl_ = nullptr; | ||
| 214 | 815169 | acceptor_impl_ = nullptr; | ||
| 215 | 815169 | } | ||
| 216 | ||||
| 217 | // Defined in sockets.cpp where kqueue_socket is complete | |||
| 218 | void operator()() override; | |||
| 219 | ||||
| 220 | 83053 | virtual bool is_read_operation() const noexcept | ||
| 221 | { | |||
| 222 | 83053 | return false; | ||
| 223 | } | |||
| 224 | virtual void cancel() noexcept = 0; | |||
| 225 | ||||
| 226 | ✗ | void destroy() override | ||
| 227 | { | |||
| 228 | ✗ | stop_cb.reset(); | ||
| 229 | ✗ | impl_ptr.reset(); | ||
| 230 | ✗ | } | ||
| 231 | ||||
| 232 | 152626 | void request_cancel() noexcept | ||
| 233 | { | |||
| 234 | 152626 | cancelled.store(true, std::memory_order_release); | ||
| 235 | 152626 | } | ||
| 236 | ||||
| 237 | 197277 | void start(std::stop_token token, kqueue_socket* impl) | ||
| 238 | { | |||
| 239 | 197277 | cancelled.store(false, std::memory_order_release); | ||
| 240 | 197277 | stop_cb.reset(); | ||
| 241 | 197277 | socket_impl_ = impl; | ||
| 242 | 197277 | acceptor_impl_ = nullptr; | ||
| 243 | ||||
| 244 |
2/2✓ Branch 0 taken 196055 times.
✓ Branch 1 taken 1222 times.
|
197277 | if (token.stop_possible()) | |
| 245 | 1222 | stop_cb.emplace(token, canceller{this}); | ||
| 246 | 197277 | } | ||
| 247 | ||||
| 248 | 5445 | void start(std::stop_token token, kqueue_acceptor* impl) | ||
| 249 | { | |||
| 250 | 5445 | cancelled.store(false, std::memory_order_release); | ||
| 251 | 5445 | stop_cb.reset(); | ||
| 252 | 5445 | socket_impl_ = nullptr; | ||
| 253 | 5445 | acceptor_impl_ = impl; | ||
| 254 | ||||
| 255 |
2/2✓ Branch 0 taken 5436 times.
✓ Branch 1 taken 9 times.
|
5445 | if (token.stop_possible()) | |
| 256 | 9 | stop_cb.emplace(token, canceller{this}); | ||
| 257 | 5445 | } | ||
| 258 | ||||
| 259 | 202485 | void complete(int err, std::size_t bytes) noexcept | ||
| 260 | { | |||
| 261 | 202485 | errn = err; | ||
| 262 | 202485 | bytes_transferred = bytes; | ||
| 263 | 202485 | } | ||
| 264 | ||||
| 265 | ✗ | virtual void perform_io() noexcept {} | ||
| 266 | }; | |||
| 267 | ||||
| 268 | struct kqueue_connect_op final : kqueue_op | |||
| 269 | { | |||
| 270 | endpoint target_endpoint; | |||
| 271 | ||||
| 272 | 5437 | void reset() noexcept | ||
| 273 | { | |||
| 274 | 5437 | kqueue_op::reset(); | ||
| 275 | 5437 | target_endpoint = endpoint{}; | ||
| 276 | 5437 | } | ||
| 277 | ||||
| 278 | 5436 | void perform_io() noexcept override | ||
| 279 | { | |||
| 280 | // connect() completion status is retrieved via SO_ERROR, not return value | |||
| 281 | 5436 | int err = 0; | ||
| 282 | 5436 | socklen_t len = sizeof(err); | ||
| 283 |
2/4✓ Branch 0 taken 5436 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 5436 times.
|
5436 | if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0) | |
| 284 | ✗ | err = errno; | ||
| 285 | 5436 | complete(err, 0); | ||
| 286 | 5436 | } | ||
| 287 | ||||
| 288 | // Defined in sockets.cpp where kqueue_socket is complete | |||
| 289 | void operator()() override; | |||
| 290 | void cancel() noexcept override; | |||
| 291 | }; | |||
| 292 | ||||
| 293 | 16410 | struct kqueue_read_op final : kqueue_op | ||
| 294 | { | |||
| 295 | static constexpr std::size_t max_buffers = 16; | |||
| 296 | iovec iovecs[max_buffers]; | |||
| 297 | 16410 | int iovec_count = 0; | ||
| 298 | 16410 | bool empty_buffer_read = false; | ||
| 299 | ||||
| 300 | 107983 | bool is_read_operation() const noexcept override | ||
| 301 | { | |||
| 302 | 107983 | return !empty_buffer_read; | ||
| 303 | } | |||
| 304 | ||||
| 305 | 356722 | void reset() noexcept | ||
| 306 | { | |||
| 307 | 356722 | kqueue_op::reset(); | ||
| 308 | 356722 | iovec_count = 0; | ||
| 309 | 356722 | empty_buffer_read = false; | ||
| 310 | 356722 | } | ||
| 311 | ||||
| 312 | 49079 | void perform_io() noexcept override | ||
| 313 | { | |||
| 314 |
1/2✓ Branch 0 taken 49079 times.
✗ Branch 1 not taken.
|
49079 | ssize_t n = ::readv(fd, iovecs, iovec_count); | |
| 315 |
2/2✓ Branch 0 taken 557 times.
✓ Branch 1 taken 48522 times.
|
49079 | if (n >= 0) | |
| 316 | 48522 | complete(0, static_cast<std::size_t>(n)); | ||
| 317 | else | |||
| 318 |
1/2✓ Branch 0 taken 557 times.
✗ Branch 1 not taken.
|
557 | complete(errno, 0); | |
| 319 | 49079 | } | ||
| 320 | ||||
| 321 | void cancel() noexcept override; | |||
| 322 | }; | |||
| 323 | ||||
| 324 | 16410 | struct kqueue_write_op final : kqueue_op | ||
| 325 | { | |||
| 326 | static constexpr std::size_t max_buffers = 16; | |||
| 327 | iovec iovecs[max_buffers]; | |||
| 328 | 16410 | int iovec_count = 0; | ||
| 329 | ||||
| 330 | 447565 | void reset() noexcept | ||
| 331 | { | |||
| 332 | 447565 | kqueue_op::reset(); | ||
| 333 | 447565 | iovec_count = 0; | ||
| 334 | 447565 | } | ||
| 335 | ||||
| 336 | ✗ | void perform_io() noexcept override | ||
| 337 | { | |||
| 338 | // SO_NOSIGPIPE is set on the socket at creation time (see sockets.cpp), | |||
| 339 | // so writev() is safe from SIGPIPE. | |||
| 340 | // FreeBSD: Supports MSG_NOSIGNAL on sendmsg() | |||
| 341 | ✗ | ssize_t n = ::writev(fd, iovecs, iovec_count); | ||
| 342 | ✗ | if (n >= 0) | ||
| 343 | ✗ | complete(0, static_cast<std::size_t>(n)); | ||
| 344 | else | |||
| 345 | ✗ | complete(errno, 0); | ||
| 346 | ✗ | } | ||
| 347 | ||||
| 348 | void cancel() noexcept override; | |||
| 349 | }; | |||
| 350 | ||||
| 351 | 966 | struct kqueue_accept_op final : kqueue_op | ||
| 352 | { | |||
| 353 | 966 | int accepted_fd = -1; | ||
| 354 | 966 | io_object::implementation* peer_impl = nullptr; | ||
| 355 | 966 | io_object::implementation** impl_out = nullptr; | ||
| 356 | ||||
| 357 | 5445 | void reset() noexcept | ||
| 358 | { | |||
| 359 | 5445 | kqueue_op::reset(); | ||
| 360 | 5445 | accepted_fd = -1; | ||
| 361 | 5445 | peer_impl = nullptr; | ||
| 362 | 5445 | impl_out = nullptr; | ||
| 363 | 5445 | } | ||
| 364 | ||||
| 365 | 5436 | void perform_io() noexcept override | ||
| 366 | { | |||
| 367 | 5436 | sockaddr_storage addr_storage{}; | ||
| 368 | 5436 | socklen_t addrlen = sizeof(addr_storage); | ||
| 369 | ||||
| 370 | // FreeBSD: Can use accept4(fd, addr, len, SOCK_NONBLOCK | SOCK_CLOEXEC) | |||
| 371 | 5436 | int new_fd = | ||
| 372 |
1/2✓ Branch 0 taken 5436 times.
✗ Branch 1 not taken.
|
5436 | ::accept(fd, reinterpret_cast<sockaddr*>(&addr_storage), &addrlen); | |
| 373 | ||||
| 374 |
1/2✓ Branch 0 taken 5436 times.
✗ Branch 1 not taken.
|
5436 | if (new_fd >= 0) | |
| 375 | { | |||
| 376 | // Set non-blocking | |||
| 377 |
1/2✓ Branch 0 taken 5436 times.
✗ Branch 1 not taken.
|
5436 | int flags = ::fcntl(new_fd, F_GETFL, 0); | |
| 378 |
2/4✓ Branch 0 taken 5436 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 5436 times.
✗ Branch 3 not taken.
|
10872 | if (flags == -1 || | |
| 379 |
1/2✓ Branch 0 taken 5436 times.
✗ Branch 1 not taken.
|
5436 | ::fcntl(new_fd, F_SETFL, flags | O_NONBLOCK) == -1) | |
| 380 | { | |||
| 381 | ✗ | int err = errno; | ||
| 382 | ✗ | ::close(new_fd); | ||
| 383 | ✗ | complete(err, 0); | ||
| 384 | ✗ | return; | ||
| 385 | } | |||
| 386 | ||||
| 387 | // Set close-on-exec | |||
| 388 |
2/4✓ Branch 0 taken 5436 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 5436 times.
|
5436 | if (::fcntl(new_fd, F_SETFD, FD_CLOEXEC) == -1) | |
| 389 | { | |||
| 390 | ✗ | int err = errno; | ||
| 391 | ✗ | ::close(new_fd); | ||
| 392 | ✗ | complete(err, 0); | ||
| 393 | ✗ | return; | ||
| 394 | } | |||
| 395 | ||||
| 396 | // Suppress SIGPIPE on accepted sockets; macOS lacks MSG_NOSIGNAL | |||
| 397 | 5436 | int one = 1; | ||
| 398 |
2/4✓ Branch 0 taken 5436 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 5436 times.
|
5436 | if (::setsockopt( | |
| 399 | 5436 | new_fd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one)) == -1) | ||
| 400 | { | |||
| 401 | ✗ | int err = errno; | ||
| 402 | ✗ | ::close(new_fd); | ||
| 403 | ✗ | complete(err, 0); | ||
| 404 | ✗ | return; | ||
| 405 | } | |||
| 406 | ||||
| 407 | 5436 | accepted_fd = new_fd; | ||
| 408 | 5436 | complete(0, 0); | ||
| 409 | 5436 | } | ||
| 410 | else | |||
| 411 | { | |||
| 412 | ✗ | complete(errno, 0); | ||
| 413 | } | |||
| 414 | 5436 | } | ||
| 415 | ||||
| 416 | // Defined in acceptors.cpp where kqueue_acceptor is complete | |||
| 417 | void operator()() override; | |||
| 418 | void cancel() noexcept override; | |||
| 419 | }; | |||
| 420 | ||||
| 421 | } // namespace boost::corosio::detail | |||
| 422 | ||||
| 423 | #endif // BOOST_COROSIO_HAS_KQUEUE | |||
| 424 | ||||
| 425 | #endif // BOOST_COROSIO_NATIVE_DETAIL_KQUEUE_KQUEUE_OP_HPP | |||
| 426 |