include/boost/corosio/native/detail/kqueue/kqueue_op.hpp

60.6% Lines (20/33) 100.0% List of functions (12/12) 31.2% Branches (15/48)
f(x) Functions (12)
Line Branch TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Michael Vandeberg
3 // Copyright (c) 2026 Steve Gerbino
4 //
5 // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 //
8 // Official repository: https://github.com/cppalliance/corosio
9 //
10
11 #ifndef BOOST_COROSIO_NATIVE_DETAIL_KQUEUE_KQUEUE_OP_HPP
12 #define BOOST_COROSIO_NATIVE_DETAIL_KQUEUE_KQUEUE_OP_HPP
13
14 #include <boost/corosio/detail/platform.hpp>
15
16 #if BOOST_COROSIO_HAS_KQUEUE
17
18 #include <boost/corosio/native/detail/reactor/reactor_op.hpp>
19 #include <boost/corosio/native/detail/reactor/reactor_descriptor_state.hpp>
20
21 #include <fcntl.h>
22 #include <unistd.h>
23
24 /*
25 kqueue Operation State
26 ======================
27
28 Each async I/O operation has a corresponding kqueue_op-derived struct that
29 holds the operation's state while it's in flight. The socket impl owns
30 fixed slots for each operation type (conn_, rd_, wr_), so only one
31 operation of each type can be pending per socket at a time.
32
33 Persistent Registration
34 -----------------------
35 File descriptors are registered with kqueue once (via descriptor_state) and
36 stay registered until closed. Uses EV_CLEAR for edge-triggered semantics
37 (equivalent to epoll's EPOLLET). The descriptor_state tracks which operations
38 are pending (read_op, write_op, connect_op). When an event arrives, the
39 reactor dispatches to the appropriate pending operation.
40
41 Impl Lifetime Management
42 ------------------------
43 When cancel() posts an op to the scheduler's ready queue, the socket impl
44 might be destroyed before the scheduler processes the op. The `impl_ptr`
45 member holds a shared_ptr to the impl, keeping it alive until the op
46 completes. This is set by cancel() and cleared in operator() after the
47 coroutine is resumed.
48
49 EOF Detection
50 -------------
51 For reads, 0 bytes with no error means EOF. But an empty user buffer also
52 returns 0 bytes. The `empty_buffer_read` flag distinguishes these cases.
53
54 SIGPIPE Prevention
55 ------------------
56 SO_NOSIGPIPE is set on each socket at creation time (see sockets.cpp).
57 Writes use writev() which is safe because the socket-level option suppresses
58 SIGPIPE delivery.
59 */
60
61 namespace boost::corosio::detail {
62
63 // Aliases for shared reactor event constants.
64 // Kept for backward compatibility in kqueue-specific code.
65 static constexpr std::uint32_t kqueue_event_read = reactor_event_read;
66 static constexpr std::uint32_t kqueue_event_write = reactor_event_write;
67 static constexpr std::uint32_t kqueue_event_error = reactor_event_error;
68
69 // Forward declarations
70 class kqueue_tcp_socket;
71 class kqueue_tcp_acceptor;
72 struct kqueue_op;
73
74 class kqueue_scheduler;
75
76 /// Per-descriptor state for persistent kqueue registration.
77 struct descriptor_state final : reactor_descriptor_state
78 {};
79
80 /// kqueue base operation — thin wrapper over reactor_op.
81 struct kqueue_op : reactor_op<kqueue_tcp_socket, kqueue_tcp_acceptor>
82 {
83 void operator()() override;
84 };
85
86 /// kqueue connect operation.
87 struct kqueue_connect_op final : reactor_connect_op<kqueue_op>
88 {
89 void operator()() override;
90 void cancel() noexcept override;
91 };
92
93 /// kqueue scatter-read operation.
94 struct kqueue_read_op final : reactor_read_op<kqueue_op>
95 {
96 void cancel() noexcept override;
97 };
98
99 /** Provides writev() for kqueue writes.
100
101 SO_NOSIGPIPE is set on the socket at creation time (macOS lacks
102 MSG_NOSIGNAL), so writev() is safe from SIGPIPE.
103 */
104 struct kqueue_write_policy
105 {
106 516757x static ssize_t write(int fd, iovec* iovecs, int count) noexcept
107 {
108 ssize_t n;
109 516757x do
110 {
111
1/2
✓ Branch 0 taken 516757 times.
✗ Branch 1 not taken.
516757x n = ::writev(fd, iovecs, count);
112 1033514x }
113
3/4
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 516756 times.
✓ Branch 2 taken 1 time.
✗ Branch 3 not taken.
516757x while (n < 0 && errno == EINTR);
114 516757x return n;
115 }
116 };
117
118 /// kqueue gather-write operation.
119 struct kqueue_write_op final : reactor_write_op<kqueue_op, kqueue_write_policy>
120 {
121 void cancel() noexcept override;
122 };
123
124 /** Provides accept() + fcntl() + SO_NOSIGPIPE for kqueue accepts.
125
126 Unlike Linux's accept4(), BSD accept() does not support atomic
127 flag setting. Non-blocking, close-on-exec, and SIGPIPE suppression
128 are applied via separate syscalls after accept().
129 */
130 struct kqueue_accept_policy
131 {
132 5616x static int do_accept(int fd, sockaddr_storage& peer) noexcept
133 {
134 int new_fd;
135 5616x do
136 {
137 5616x socklen_t addrlen = sizeof(peer);
138
1/2
✓ Branch 0 taken 5616 times.
✗ Branch 1 not taken.
5616x new_fd = ::accept(fd, reinterpret_cast<sockaddr*>(&peer), &addrlen);
139 11232x }
140
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 5616 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
5616x while (new_fd < 0 && errno == EINTR);
141
142
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 5616 times.
5616x if (new_fd < 0)
143 return new_fd;
144
145
1/2
✓ Branch 0 taken 5616 times.
✗ Branch 1 not taken.
5616x int flags = ::fcntl(new_fd, F_GETFL, 0);
146
3/6
✓ Branch 0 taken 5616 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 5616 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 5616 times.
✗ Branch 5 not taken.
5616x if (flags == -1 || ::fcntl(new_fd, F_SETFL, flags | O_NONBLOCK) == -1)
147 {
148 int err = errno;
149 ::close(new_fd);
150 errno = err;
151 return -1;
152 }
153
154
2/4
✓ Branch 0 taken 5616 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 5616 times.
5616x if (::fcntl(new_fd, F_SETFD, FD_CLOEXEC) == -1)
155 {
156 int err = errno;
157 ::close(new_fd);
158 errno = err;
159 return -1;
160 }
161
162 // macOS lacks MSG_NOSIGNAL
163 5616x int one = 1;
164
2/4
✓ Branch 0 taken 5616 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 5616 times.
5616x if (::setsockopt(new_fd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one)) ==
165 -1)
166 {
167 int err = errno;
168 ::close(new_fd);
169 errno = err;
170 return -1;
171 }
172
173 5616x return new_fd;
174 5616x }
175 };
176
177 /// kqueue accept operation.
178 struct kqueue_accept_op final
179 : reactor_accept_op<kqueue_op, kqueue_accept_policy>
180 {
181 void operator()() override;
182 void cancel() noexcept override;
183 };
184
185 } // namespace boost::corosio::detail
186
187 #endif // BOOST_COROSIO_HAS_KQUEUE
188
189 #endif // BOOST_COROSIO_NATIVE_DETAIL_KQUEUE_KQUEUE_OP_HPP
190