include/boost/corosio/native/detail/kqueue/kqueue_op.hpp

83.0% Lines (122/147) 90.3% Functions (28/31) 35.7% Branches (20/56)
include/boost/corosio/native/detail/kqueue/kqueue_op.hpp
Line Branch TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Michael Vandeberg
3 // Copyright (c) 2026 Steve Gerbino
4 //
5 // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 //
8 // Official repository: https://github.com/cppalliance/corosio
9 //
10
11 #ifndef BOOST_COROSIO_NATIVE_DETAIL_KQUEUE_KQUEUE_OP_HPP
12 #define BOOST_COROSIO_NATIVE_DETAIL_KQUEUE_KQUEUE_OP_HPP
13
14 #include <boost/corosio/detail/platform.hpp>
15
16 #if BOOST_COROSIO_HAS_KQUEUE
17
18 #include <boost/corosio/detail/config.hpp>
19 #include <boost/corosio/io/io_object.hpp>
20 #include <boost/corosio/endpoint.hpp>
21 #include <boost/capy/ex/executor_ref.hpp>
22 #include <coroutine>
23 #include <boost/capy/error.hpp>
24 #include <system_error>
25
26 #include <boost/corosio/detail/scheduler_op.hpp>
27
28 #include <unistd.h>
29 #include <errno.h>
30 #include <fcntl.h>
31
32 #include <atomic>
33 #include <cstddef>
34 #include <memory>
35 #include <mutex>
36 #include <optional>
37 #include <stop_token>
38
39 #include <netinet/in.h>
40 #include <sys/socket.h>
41 #include <sys/uio.h>
42
43 /*
44 kqueue Operation State
45 ======================
46
47 Each async I/O operation has a corresponding kqueue_op-derived struct that
48 holds the operation's state while it's in flight. The socket impl owns
49 fixed slots for each operation type (conn_, rd_, wr_), so only one
50 operation of each type can be pending per socket at a time.
51
52 Persistent Registration
53 -----------------------
54 File descriptors are registered with kqueue once (via descriptor_state) and
55 stay registered until closed. Uses EV_CLEAR for edge-triggered semantics
56 (equivalent to epoll's EPOLLET). The descriptor_state tracks which operations
57 are pending (read_op, write_op, connect_op). When an event arrives, the
58 reactor dispatches to the appropriate pending operation.
59
60 Impl Lifetime Management
61 ------------------------
62 When cancel() posts an op to the scheduler's ready queue, the socket impl
63 might be destroyed before the scheduler processes the op. The `impl_ptr`
64 member holds a shared_ptr to the impl, keeping it alive until the op
65 completes. This is set by cancel() and cleared in operator() after the
66 coroutine is resumed.
67
68 EOF Detection
69 -------------
70 For reads, 0 bytes with no error means EOF. But an empty user buffer also
71 returns 0 bytes. The `empty_buffer_read` flag distinguishes these cases.
72
73 SIGPIPE Prevention
74 ------------------
75 SO_NOSIGPIPE is set on each socket at creation time (see sockets.cpp).
76 Writes use writev() which is safe because the socket-level option suppresses
77 SIGPIPE delivery.
78 */
79
80 namespace boost::corosio::detail {
81
82 // Ready-event flag constants for descriptor_state::ready_events_.
83 // These match the epoll numeric values (EPOLLIN=0x1, EPOLLOUT=0x4,
84 // EPOLLERR=0x8) so that descriptor_state::operator()() uses the same
85 // flag-checking logic as the epoll backend.
86 static constexpr std::uint32_t kqueue_event_read = 0x001;
87 static constexpr std::uint32_t kqueue_event_write = 0x004;
88 static constexpr std::uint32_t kqueue_event_error = 0x008;
89
90 // Forward declarations
91 class kqueue_socket;
92 class kqueue_acceptor;
93 struct kqueue_op;
94
95 class kqueue_scheduler;
96
97 /** Per-descriptor state for persistent kqueue registration.
98
99 Tracks pending operations for a file descriptor. The fd is registered
100 once with kqueue (EVFILT_READ + EVFILT_WRITE, both EV_CLEAR) and stays
101 registered until closed.
102
103 This struct extends scheduler_op to support deferred I/O processing.
104 When kqueue events arrive, the reactor sets ready_events and queues
105 this descriptor for processing. When popped from the scheduler queue,
106 operator() performs the actual I/O and queues completion handlers.
107
108 @par Deferred I/O Model
109 The reactor no longer performs I/O directly. Instead:
110 1. Reactor sets ready_events and queues descriptor_state
111 2. Scheduler pops descriptor_state and calls operator()
112 3. operator() performs I/O under mutex and queues completions
113
114 This eliminates per-descriptor mutex locking from the reactor hot path.
115
116 @par Thread Safety
117 The mutex protects operation pointers and ready flags during I/O.
118 ready_events_ and is_enqueued_ are atomic for lock-free reactor access.
119 */
120 17376 struct descriptor_state final : scheduler_op
121 {
122 std::mutex mutex;
123
124 // Protected by mutex
125 17376 kqueue_op* read_op = nullptr;
126 17376 kqueue_op* write_op = nullptr;
127 17376 kqueue_op* connect_op = nullptr;
128
129 // Caches edge events that arrived before an op was registered
130 17376 bool read_ready = false;
131 17376 bool write_ready = false;
132
133 // Deferred cancellation: set by cancel() when the target op is not
134 // parked (e.g. completing inline via speculative I/O). Checked when
135 // the next op parks; if set, the op is immediately self-cancelled.
136 // This matches IOCP semantics where CancelIoEx always succeeds.
137 17376 bool read_cancel_pending = false;
138 17376 bool write_cancel_pending = false;
139 17376 bool connect_cancel_pending = false;
140
141 // Set during registration only (no mutex needed)
142 17376 std::uint32_t registered_events = 0;
143 17376 int fd = -1;
144
145 // For deferred I/O - set by reactor, read by scheduler
146 17376 std::atomic<std::uint32_t> ready_events_{0};
147 17376 std::atomic<bool> is_enqueued_{false};
148 17376 kqueue_scheduler const* scheduler_ = nullptr;
149
150 // Prevents impl destruction while this descriptor_state is queued.
151 // Set by close_socket() when is_enqueued_ is true, cleared by operator().
152 std::shared_ptr<void> impl_ref_;
153
154 /// Add ready events atomically.
155 /// Release pairs with the consumer's acquire exchange on
156 /// ready_events_ so the consumer sees all flags. On x86 (TSO)
157 /// this compiles to the same LOCK OR as relaxed.
158 121474 void add_ready_events(std::uint32_t ev) noexcept
159 {
160 121474 ready_events_.fetch_or(ev, std::memory_order_release);
161 121474 }
162
163 /// Perform deferred I/O and queue completions.
164 void operator()() override;
165
166 /// Destroy without invoking.
167 /// Called during scheduler::shutdown() drain. Clear impl_ref_ to break
168 /// the self-referential cycle set by close_socket().
169 78 void destroy() override
170 {
171 78 impl_ref_.reset();
172 78 }
173 };
174
175 struct kqueue_op : scheduler_op
176 {
177 struct canceller
178 {
179 kqueue_op* op;
180 void operator()() const noexcept;
181 };
182
183 std::coroutine_handle<> h;
184 capy::executor_ref ex;
185 50196 std::error_code* ec_out = nullptr;
186 50196 std::size_t* bytes_out = nullptr;
187
188 50196 int fd = -1;
189 50196 int errn = 0;
190 50196 std::size_t bytes_transferred = 0;
191
192 50196 std::atomic<bool> cancelled{false};
193 std::optional<std::stop_callback<canceller>> stop_cb;
194
195 // Prevents use-after-free when socket is closed with pending ops.
196 // See "Impl Lifetime Management" in file header.
197 std::shared_ptr<void> impl_ptr;
198
199 // For stop_token cancellation - pointer to owning socket/acceptor impl.
200 // When stop is requested, we call back to the impl to perform actual I/O cancellation.
201 50196 kqueue_socket* socket_impl_ = nullptr;
202 50196 kqueue_acceptor* acceptor_impl_ = nullptr;
203
204 150588 kqueue_op() = default;
205
206 815169 void reset() noexcept
207 {
208 815169 fd = -1;
209 815169 errn = 0;
210 815169 bytes_transferred = 0;
211 815169 cancelled.store(false, std::memory_order_relaxed);
212 815169 impl_ptr.reset();
213 815169 socket_impl_ = nullptr;
214 815169 acceptor_impl_ = nullptr;
215 815169 }
216
217 // Defined in sockets.cpp where kqueue_socket is complete
218 void operator()() override;
219
220 83053 virtual bool is_read_operation() const noexcept
221 {
222 83053 return false;
223 }
224 virtual void cancel() noexcept = 0;
225
226 void destroy() override
227 {
228 stop_cb.reset();
229 impl_ptr.reset();
230 }
231
232 152626 void request_cancel() noexcept
233 {
234 152626 cancelled.store(true, std::memory_order_release);
235 152626 }
236
237 197277 void start(std::stop_token token, kqueue_socket* impl)
238 {
239 197277 cancelled.store(false, std::memory_order_release);
240 197277 stop_cb.reset();
241 197277 socket_impl_ = impl;
242 197277 acceptor_impl_ = nullptr;
243
244
2/2
✓ Branch 0 taken 196055 times.
✓ Branch 1 taken 1222 times.
197277 if (token.stop_possible())
245 1222 stop_cb.emplace(token, canceller{this});
246 197277 }
247
248 5445 void start(std::stop_token token, kqueue_acceptor* impl)
249 {
250 5445 cancelled.store(false, std::memory_order_release);
251 5445 stop_cb.reset();
252 5445 socket_impl_ = nullptr;
253 5445 acceptor_impl_ = impl;
254
255
2/2
✓ Branch 0 taken 5436 times.
✓ Branch 1 taken 9 times.
5445 if (token.stop_possible())
256 9 stop_cb.emplace(token, canceller{this});
257 5445 }
258
259 202485 void complete(int err, std::size_t bytes) noexcept
260 {
261 202485 errn = err;
262 202485 bytes_transferred = bytes;
263 202485 }
264
265 virtual void perform_io() noexcept {}
266 };
267
268 struct kqueue_connect_op final : kqueue_op
269 {
270 endpoint target_endpoint;
271
272 5437 void reset() noexcept
273 {
274 5437 kqueue_op::reset();
275 5437 target_endpoint = endpoint{};
276 5437 }
277
278 5436 void perform_io() noexcept override
279 {
280 // connect() completion status is retrieved via SO_ERROR, not return value
281 5436 int err = 0;
282 5436 socklen_t len = sizeof(err);
283
2/4
✓ Branch 0 taken 5436 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 5436 times.
5436 if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
284 err = errno;
285 5436 complete(err, 0);
286 5436 }
287
288 // Defined in sockets.cpp where kqueue_socket is complete
289 void operator()() override;
290 void cancel() noexcept override;
291 };
292
293 16410 struct kqueue_read_op final : kqueue_op
294 {
295 static constexpr std::size_t max_buffers = 16;
296 iovec iovecs[max_buffers];
297 16410 int iovec_count = 0;
298 16410 bool empty_buffer_read = false;
299
300 107983 bool is_read_operation() const noexcept override
301 {
302 107983 return !empty_buffer_read;
303 }
304
305 356722 void reset() noexcept
306 {
307 356722 kqueue_op::reset();
308 356722 iovec_count = 0;
309 356722 empty_buffer_read = false;
310 356722 }
311
312 49079 void perform_io() noexcept override
313 {
314
1/2
✓ Branch 0 taken 49079 times.
✗ Branch 1 not taken.
49079 ssize_t n = ::readv(fd, iovecs, iovec_count);
315
2/2
✓ Branch 0 taken 557 times.
✓ Branch 1 taken 48522 times.
49079 if (n >= 0)
316 48522 complete(0, static_cast<std::size_t>(n));
317 else
318
1/2
✓ Branch 0 taken 557 times.
✗ Branch 1 not taken.
557 complete(errno, 0);
319 49079 }
320
321 void cancel() noexcept override;
322 };
323
324 16410 struct kqueue_write_op final : kqueue_op
325 {
326 static constexpr std::size_t max_buffers = 16;
327 iovec iovecs[max_buffers];
328 16410 int iovec_count = 0;
329
330 447565 void reset() noexcept
331 {
332 447565 kqueue_op::reset();
333 447565 iovec_count = 0;
334 447565 }
335
336 void perform_io() noexcept override
337 {
338 // SO_NOSIGPIPE is set on the socket at creation time (see sockets.cpp),
339 // so writev() is safe from SIGPIPE.
340 // FreeBSD: Supports MSG_NOSIGNAL on sendmsg()
341 ssize_t n = ::writev(fd, iovecs, iovec_count);
342 if (n >= 0)
343 complete(0, static_cast<std::size_t>(n));
344 else
345 complete(errno, 0);
346 }
347
348 void cancel() noexcept override;
349 };
350
351 966 struct kqueue_accept_op final : kqueue_op
352 {
353 966 int accepted_fd = -1;
354 966 io_object::implementation* peer_impl = nullptr;
355 966 io_object::implementation** impl_out = nullptr;
356
357 5445 void reset() noexcept
358 {
359 5445 kqueue_op::reset();
360 5445 accepted_fd = -1;
361 5445 peer_impl = nullptr;
362 5445 impl_out = nullptr;
363 5445 }
364
365 5436 void perform_io() noexcept override
366 {
367 5436 sockaddr_storage addr_storage{};
368 5436 socklen_t addrlen = sizeof(addr_storage);
369
370 // FreeBSD: Can use accept4(fd, addr, len, SOCK_NONBLOCK | SOCK_CLOEXEC)
371 5436 int new_fd =
372
1/2
✓ Branch 0 taken 5436 times.
✗ Branch 1 not taken.
5436 ::accept(fd, reinterpret_cast<sockaddr*>(&addr_storage), &addrlen);
373
374
1/2
✓ Branch 0 taken 5436 times.
✗ Branch 1 not taken.
5436 if (new_fd >= 0)
375 {
376 // Set non-blocking
377
1/2
✓ Branch 0 taken 5436 times.
✗ Branch 1 not taken.
5436 int flags = ::fcntl(new_fd, F_GETFL, 0);
378
2/4
✓ Branch 0 taken 5436 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 5436 times.
✗ Branch 3 not taken.
10872 if (flags == -1 ||
379
1/2
✓ Branch 0 taken 5436 times.
✗ Branch 1 not taken.
5436 ::fcntl(new_fd, F_SETFL, flags | O_NONBLOCK) == -1)
380 {
381 int err = errno;
382 ::close(new_fd);
383 complete(err, 0);
384 return;
385 }
386
387 // Set close-on-exec
388
2/4
✓ Branch 0 taken 5436 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 5436 times.
5436 if (::fcntl(new_fd, F_SETFD, FD_CLOEXEC) == -1)
389 {
390 int err = errno;
391 ::close(new_fd);
392 complete(err, 0);
393 return;
394 }
395
396 // Suppress SIGPIPE on accepted sockets; macOS lacks MSG_NOSIGNAL
397 5436 int one = 1;
398
2/4
✓ Branch 0 taken 5436 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 5436 times.
5436 if (::setsockopt(
399 5436 new_fd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one)) == -1)
400 {
401 int err = errno;
402 ::close(new_fd);
403 complete(err, 0);
404 return;
405 }
406
407 5436 accepted_fd = new_fd;
408 5436 complete(0, 0);
409 5436 }
410 else
411 {
412 complete(errno, 0);
413 }
414 5436 }
415
416 // Defined in acceptors.cpp where kqueue_acceptor is complete
417 void operator()() override;
418 void cancel() noexcept override;
419 };
420
421 } // namespace boost::corosio::detail
422
423 #endif // BOOST_COROSIO_HAS_KQUEUE
424
425 #endif // BOOST_COROSIO_NATIVE_DETAIL_KQUEUE_KQUEUE_OP_HPP
426