include/boost/corosio/native/detail/kqueue/kqueue_scheduler.hpp

89.1% Lines (114/128) 100.0% List of functions (11/12) 54.3% Branches (51/94)
f(x) Functions (12)
Function Calls Lines Branches Blocks
<unknown function 147> :147 boost::corosio::detail::kqueue_scheduler::kqueue_scheduler(boost::capy::execution_context&, int) :150 750x 58.8% 23.3% 100.0% boost::corosio::detail::kqueue_scheduler::kqueue_scheduler(boost::capy::execution_context&, int)::'lambda'(void*)::__invoke(void*) :175 5516x 100.0% 100.0% boost::corosio::detail::kqueue_scheduler::kqueue_scheduler(boost::capy::execution_context&, int)::'lambda'(void*)::operator void (*)(void*)() const :175 375x 100.0% 100.0% boost::corosio::detail::kqueue_scheduler::kqueue_scheduler(boost::capy::execution_context&, int)::'lambda'(void*)::operator()(void*) const :175 5516x 100.0% 50.0% 100.0% boost::corosio::detail::kqueue_scheduler::~kqueue_scheduler() :185 1125x 100.0% 50.0% 100.0% boost::corosio::detail::kqueue_scheduler::shutdown() :192 375x 100.0% 50.0% 100.0% boost::corosio::detail::kqueue_scheduler::register_descriptor(int, boost::corosio::detail::descriptor_state*) const :201 12463x 92.9% 50.0% 85.0% boost::corosio::detail::kqueue_scheduler::deregister_descriptor(int) const :226 12463x 100.0% 100.0% boost::corosio::detail::kqueue_scheduler::interrupt_reactor() const :239 8714x 100.0% 100.0% 100.0% boost::corosio::detail::kqueue_scheduler::calculate_timeout(long) const :253 65147x 85.0% 75.0% 80.0% boost::corosio::detail::kqueue_scheduler::run_task(std::__1::unique_lock<std::__1::mutex>&, boost::corosio::detail::reactor_scheduler_context*) :283 110543x 93.6% 71.4% 83.0%
Line Branch TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Michael Vandeberg
3 // Copyright (c) 2026 Steve Gerbino
4 //
5 // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 //
8 // Official repository: https://github.com/cppalliance/corosio
9 //
10
11 #ifndef BOOST_COROSIO_NATIVE_DETAIL_KQUEUE_KQUEUE_SCHEDULER_HPP
12 #define BOOST_COROSIO_NATIVE_DETAIL_KQUEUE_KQUEUE_SCHEDULER_HPP
13
14 #include <boost/corosio/detail/platform.hpp>
15
16 #if BOOST_COROSIO_HAS_KQUEUE
17
18 #include <boost/corosio/detail/config.hpp>
19 #include <boost/capy/ex/execution_context.hpp>
20
21 #include <boost/corosio/native/detail/reactor/reactor_scheduler.hpp>
22
23 #include <boost/corosio/native/detail/kqueue/kqueue_op.hpp>
24 #include <boost/corosio/detail/timer_service.hpp>
25 #include <boost/corosio/native/detail/make_err.hpp>
26 #include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
27 #include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
28
29 #include <boost/corosio/detail/except.hpp>
30
31 #include <atomic>
32 #include <chrono>
33 #include <cstdint>
34 #include <limits>
35 #include <mutex>
36
37 #include <errno.h>
38 #include <fcntl.h>
39 #include <sys/event.h>
40 #include <sys/time.h>
41 #include <unistd.h>
42
43 namespace boost::corosio::detail {
44
45 struct kqueue_op;
46 struct descriptor_state;
47
48 /** macOS/BSD scheduler using kqueue for I/O multiplexing.
49
50 This scheduler implements the scheduler interface using the BSD kqueue
51 API for efficient I/O event notification. It uses a single reactor model
52 where one thread runs kevent() while other threads
53 wait on a condition variable for handler work. This design provides:
54
55 - Handler parallelism: N posted handlers can execute on N threads
56 - No thundering herd: condition_variable wakes exactly one thread
57 - IOCP parity: Behavior matches Windows I/O completion port semantics
58
59 When threads call run(), they first try to execute queued handlers.
60 If the queue is empty and no reactor is running, one thread becomes
61 the reactor and runs kevent(). Other threads wait on a condition
62 variable until handlers are available.
63
64 kqueue uses EV_CLEAR for edge-triggered semantics (equivalent to
65 epoll's EPOLLET). File descriptors are registered once with both
66 EVFILT_READ and EVFILT_WRITE and stay registered until closed.
67
68 @par Thread Safety
69 All public member functions are thread-safe.
70 */
71 class BOOST_COROSIO_DECL kqueue_scheduler final : public reactor_scheduler_base
72 {
73 public:
74 /** Construct the scheduler.
75
76 Creates a kqueue file descriptor via kqueue(), sets
77 close-on-exec, and registers EVFILT_USER for reactor
78 interruption. On failure the kqueue fd is closed before
79 throwing.
80
81 @param ctx Reference to the owning execution_context.
82 @param concurrency_hint Hint for expected thread count (unused).
83
84 @throws std::system_error if kqueue() fails, if setting
85 FD_CLOEXEC on the kqueue fd fails, or if registering
86 the EVFILT_USER event fails. The error code contains
87 the errno from the failed syscall.
88 */
89 kqueue_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
90
91 /** Destructor.
92
93 Closes the kqueue file descriptor if valid. Does not throw.
94 */
95 ~kqueue_scheduler();
96
97 kqueue_scheduler(kqueue_scheduler const&) = delete;
98 kqueue_scheduler& operator=(kqueue_scheduler const&) = delete;
99
100 /// Shut down the scheduler, draining pending operations.
101 void shutdown() override;
102
103 /** Return the kqueue file descriptor.
104
105 Used by socket services to register file descriptors
106 for I/O event notification.
107
108 @return The kqueue file descriptor.
109 */
110 int kq_fd() const noexcept
111 {
112 return kq_fd_;
113 }
114
115 /** Register a descriptor for persistent monitoring.
116
117 Adds EVFILT_READ and EVFILT_WRITE (both EV_CLEAR) for @a fd
118 and stores @a desc in the kevent udata field so that the
119 reactor can dispatch events to the correct descriptor_state.
120
121 @param fd The file descriptor to register.
122 @param desc Pointer to the caller-owned descriptor_state.
123
124 @throws std::system_error if kevent(EV_ADD) fails.
125 */
126 void register_descriptor(int fd, descriptor_state* desc) const;
127
128 /** Deregister a persistently registered descriptor.
129
130 Issues kevent(EV_DELETE) for both EVFILT_READ and EVFILT_WRITE.
131 Errors are silently ignored because the fd may already be
132 closed and kqueue automatically removes closed descriptors.
133
134 @param fd The file descriptor to deregister.
135 */
136 void deregister_descriptor(int fd) const;
137
138 private:
139 void
140 run_task(std::unique_lock<std::mutex>& lock, context_type* ctx) override;
141 void interrupt_reactor() const override;
142 long calculate_timeout(long requested_timeout_us) const;
143
144 int kq_fd_;
145
146 // EVFILT_USER idempotency
147 375x mutable std::atomic<bool> user_event_armed_{false};
148 };
149
150 750x inline kqueue_scheduler::kqueue_scheduler(capy::execution_context& ctx, int)
151 375x : kq_fd_(-1)
152 750x {
153
1/2
✓ Branch 0 taken 375 times.
✗ Branch 1 not taken.
375x kq_fd_ = ::kqueue();
154
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 375 times.
375x if (kq_fd_ < 0)
155 detail::throw_system_error(make_err(errno), "kqueue");
156
157
2/4
✓ Branch 0 taken 375 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 375 times.
375x if (::fcntl(kq_fd_, F_SETFD, FD_CLOEXEC) == -1)
158 {
159 int errn = errno;
160 ::close(kq_fd_);
161 detail::throw_system_error(make_err(errn), "fcntl (kqueue FD_CLOEXEC)");
162 }
163
164 struct kevent ev;
165 375x EV_SET(&ev, 0, EVFILT_USER, EV_ADD | EV_CLEAR, 0, 0, nullptr);
166
2/4
✓ Branch 0 taken 375 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 375 times.
375x if (::kevent(kq_fd_, &ev, 1, nullptr, 0, nullptr) < 0)
167 {
168 int errn = errno;
169 ::close(kq_fd_);
170 detail::throw_system_error(make_err(errn), "kevent (EVFILT_USER)");
171 }
172
173
1/2
✓ Branch 0 taken 375 times.
✗ Branch 1 not taken.
375x timer_svc_ = &get_timer_service(ctx, *this);
174 750x timer_svc_->set_on_earliest_changed(
175 5891x timer_service::callback(this, [](void* p) {
176 5516x static_cast<kqueue_scheduler*>(p)->interrupt_reactor();
177 5516x }));
178
179
1/2
✓ Branch 0 taken 375 times.
✗ Branch 1 not taken.
375x get_resolver_service(ctx, *this);
180
1/2
✓ Branch 0 taken 375 times.
✗ Branch 1 not taken.
375x get_signal_service(ctx, *this);
181
182 375x completed_ops_.push(&task_op_);
183 750x }
184
185 1125x inline kqueue_scheduler::~kqueue_scheduler()
186 750x {
187
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 375 times.
375x if (kq_fd_ >= 0)
188
1/2
✓ Branch 0 taken 375 times.
✗ Branch 1 not taken.
375x ::close(kq_fd_);
189 1125x }
190
191 inline void
192 375x kqueue_scheduler::shutdown()
193 {
194 375x shutdown_drain();
195
196
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 375 times.
375x if (kq_fd_ >= 0)
197 375x interrupt_reactor();
198 375x }
199
200 inline void
201 12463x kqueue_scheduler::register_descriptor(int fd, descriptor_state* desc) const
202 {
203 struct kevent changes[2];
204 12463x EV_SET(
205 &changes[0], static_cast<uintptr_t>(fd), EVFILT_READ, EV_ADD | EV_CLEAR,
206 0, 0, desc);
207 12463x EV_SET(
208 &changes[1], static_cast<uintptr_t>(fd), EVFILT_WRITE,
209 EV_ADD | EV_CLEAR, 0, 0, desc);
210
211
1/2
✓ Branch 0 taken 12463 times.
✗ Branch 1 not taken.
12463x if (::kevent(kq_fd_, changes, 2, nullptr, 0, nullptr) < 0)
212 detail::throw_system_error(make_err(errno), "kevent (register)");
213
214 12463x desc->registered_events = kqueue_event_read | kqueue_event_write;
215 12463x desc->fd = fd;
216 12463x desc->scheduler_ = this;
217 12463x desc->ready_events_.store(0, std::memory_order_relaxed);
218
219 12463x std::lock_guard lock(desc->mutex);
220 12463x desc->impl_ref_.reset();
221 12463x desc->read_ready = false;
222 12463x desc->write_ready = false;
223 12463x }
224
225 inline void
226 12463x kqueue_scheduler::deregister_descriptor(int fd) const
227 {
228 struct kevent changes[2];
229 12463x EV_SET(
230 &changes[0], static_cast<uintptr_t>(fd), EVFILT_READ, EV_DELETE, 0, 0,
231 nullptr);
232 12463x EV_SET(
233 &changes[1], static_cast<uintptr_t>(fd), EVFILT_WRITE, EV_DELETE, 0, 0,
234 nullptr);
235 12463x ::kevent(kq_fd_, changes, 2, nullptr, 0, nullptr);
236 12463x }
237
238 inline void
239 8714x kqueue_scheduler::interrupt_reactor() const
240 {
241 8714x bool expected = false;
242
2/2
✓ Branch 0 taken 267 times.
✓ Branch 1 taken 8447 times.
8714x if (user_event_armed_.compare_exchange_strong(
243 expected, true, std::memory_order_acq_rel,
244 std::memory_order_acquire))
245 {
246 struct kevent ev;
247 8447x EV_SET(&ev, 0, EVFILT_USER, 0, NOTE_TRIGGER, 0, nullptr);
248 8447x ::kevent(kq_fd_, &ev, 1, nullptr, 0, nullptr);
249 8447x }
250 8714x }
251
252 inline long
253 65147x kqueue_scheduler::calculate_timeout(long requested_timeout_us) const
254 {
255
1/2
✓ Branch 0 taken 65147 times.
✗ Branch 1 not taken.
65147x if (requested_timeout_us == 0)
256 return 0;
257
258 65147x auto nearest = timer_svc_->nearest_expiry();
259
2/2
✓ Branch 0 taken 2111 times.
✓ Branch 1 taken 63036 times.
65147x if (nearest == timer_service::time_point::max())
260 2111x return requested_timeout_us;
261
262 63036x auto now = std::chrono::steady_clock::now();
263
2/2
✓ Branch 0 taken 78 times.
✓ Branch 1 taken 62958 times.
63036x if (nearest <= now)
264 78x return 0;
265
266 62958x auto timer_timeout_us =
267 62958x std::chrono::duration_cast<std::chrono::microseconds>(nearest - now)
268 62958x .count();
269
270 62958x constexpr auto long_max =
271 static_cast<long long>((std::numeric_limits<long>::max)());
272 62958x auto capped_timer_us = std::min(
273 62958x std::max(timer_timeout_us, static_cast<long long>(0)), long_max);
274
275
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 62958 times.
62958x if (requested_timeout_us < 0)
276 62958x return static_cast<long>(capped_timer_us);
277
278 return static_cast<long>(std::min(
279 static_cast<long long>(requested_timeout_us), capped_timer_us));
280 65147x }
281
282 inline void
283 110543x kqueue_scheduler::run_task(
284 std::unique_lock<std::mutex>& lock, context_type* ctx)
285 {
286
2/2
✓ Branch 0 taken 45396 times.
✓ Branch 1 taken 65147 times.
110543x long effective_timeout_us = task_interrupted_ ? 0 : calculate_timeout(-1);
287
288
2/2
✓ Branch 0 taken 45396 times.
✓ Branch 1 taken 65147 times.
110543x if (lock.owns_lock())
289 65147x lock.unlock();
290
291 110543x task_cleanup on_exit{this, &lock, ctx};
292
293 struct timespec ts;
294 110543x struct timespec* ts_ptr = nullptr;
295
2/2
✓ Branch 0 taken 2111 times.
✓ Branch 1 taken 108432 times.
110543x if (effective_timeout_us >= 0)
296 {
297 108432x ts.tv_sec = effective_timeout_us / 1000000;
298 108432x ts.tv_nsec = (effective_timeout_us % 1000000) * 1000;
299 108432x ts_ptr = &ts;
300 108432x }
301
302 struct kevent events[128];
303
1/2
✓ Branch 0 taken 110543 times.
✗ Branch 1 not taken.
110543x int nev = ::kevent(kq_fd_, nullptr, 0, events, 128, ts_ptr);
304
1/2
✓ Branch 0 taken 110543 times.
✗ Branch 1 not taken.
110543x int saved_errno = errno;
305
306
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 110543 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
110543x if (nev < 0 && saved_errno != EINTR)
307 detail::throw_system_error(make_err(saved_errno), "kevent");
308
309 110543x op_queue local_ops;
310
311
2/2
✓ Branch 0 taken 110543 times.
✓ Branch 1 taken 164938 times.
275481x for (int i = 0; i < nev; ++i)
312 {
313
2/2
✓ Branch 0 taken 8072 times.
✓ Branch 1 taken 156866 times.
164938x if (events[i].filter == EVFILT_USER)
314 {
315 8072x user_event_armed_.store(false, std::memory_order_release);
316 8072x continue;
317 }
318
319 156866x auto* desc = static_cast<descriptor_state*>(events[i].udata);
320
1/2
✓ Branch 0 taken 156866 times.
✗ Branch 1 not taken.
156866x if (!desc)
321 continue;
322
323 156866x std::uint32_t ready = 0;
324
325
2/2
✓ Branch 0 taken 79642 times.
✓ Branch 1 taken 77224 times.
156866x if (events[i].filter == EVFILT_READ)
326 77224x ready |= kqueue_event_read;
327
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 79642 times.
79642x else if (events[i].filter == EVFILT_WRITE)
328 79642x ready |= kqueue_event_write;
329
330
1/2
✓ Branch 0 taken 156866 times.
✗ Branch 1 not taken.
156866x if (events[i].flags & EV_ERROR)
331 ready |= kqueue_event_error;
332
333
2/2
✓ Branch 0 taken 156833 times.
✓ Branch 1 taken 33 times.
156866x if (events[i].flags & EV_EOF)
334 {
335
2/2
✓ Branch 0 taken 10 times.
✓ Branch 1 taken 23 times.
33x if (events[i].filter == EVFILT_READ)
336 23x ready |= kqueue_event_read;
337
2/2
✓ Branch 0 taken 17 times.
✓ Branch 1 taken 16 times.
33x if (events[i].fflags != 0)
338 16x ready |= kqueue_event_error;
339 33x }
340
341 156866x desc->add_ready_events(ready);
342
343 156866x bool expected = false;
344
2/2
✓ Branch 0 taken 208 times.
✓ Branch 1 taken 156658 times.
156866x if (desc->is_enqueued_.compare_exchange_strong(
345 expected, true, std::memory_order_acq_rel,
346 std::memory_order_acquire))
347 {
348 156658x local_ops.push(desc);
349 156658x }
350 156866x }
351
352
1/2
✓ Branch 0 taken 110543 times.
✗ Branch 1 not taken.
110543x timer_svc_->process_expired();
353
354
1/2
✓ Branch 0 taken 110543 times.
✗ Branch 1 not taken.
110543x lock.lock();
355
356
2/2
✓ Branch 0 taken 29306 times.
✓ Branch 1 taken 81237 times.
110543x if (!local_ops.empty())
357 81237x completed_ops_.splice(local_ops);
358 110543x }
359
360 } // namespace boost::corosio::detail
361
362 #endif // BOOST_COROSIO_HAS_KQUEUE
363
364 #endif // BOOST_COROSIO_NATIVE_DETAIL_KQUEUE_KQUEUE_SCHEDULER_HPP
365