include/boost/corosio/native/detail/kqueue/kqueue_scheduler.hpp

87.9% Lines (124/141) 91.7% List of functions (11/13) 55.0% Branches (55/100)
kqueue_scheduler.hpp
f(x) Functions (13)
Function Calls Lines Branches Blocks
<unknown function 158> :158 boost::corosio::detail::kqueue_scheduler::kqueue_scheduler(boost::capy::execution_context&, int) :164 874x 61.1% 25.0% 100.0% boost::corosio::detail::kqueue_scheduler::kqueue_scheduler(boost::capy::execution_context&, int)::'lambda'(void*)::__invoke(void*) :190 5261x 100.0% 100.0% boost::corosio::detail::kqueue_scheduler::kqueue_scheduler(boost::capy::execution_context&, int)::'lambda'(void*)::operator void (*)(void*)() const :190 437x 100.0% 100.0% boost::corosio::detail::kqueue_scheduler::kqueue_scheduler(boost::capy::execution_context&, int)::'lambda'(void*)::operator()(void*) const :190 5261x 100.0% 50.0% 100.0% boost::corosio::detail::kqueue_scheduler::~kqueue_scheduler() :202 1311x 100.0% 50.0% 100.0% boost::corosio::detail::kqueue_scheduler::shutdown() :209 437x 100.0% 50.0% 100.0% boost::corosio::detail::kqueue_scheduler::configure_reactor(unsigned int, unsigned int, unsigned int, unsigned int) :218 0 0.0% 0.0% boost::corosio::detail::kqueue_scheduler::register_descriptor(int, boost::corosio::detail::descriptor_state*) const :230 11694x 93.3% 50.0% 85.0% boost::corosio::detail::kqueue_scheduler::deregister_descriptor(int) const :256 11694x 100.0% 100.0% boost::corosio::detail::kqueue_scheduler::interrupt_reactor() const :269 8204x 100.0% 100.0% 100.0% boost::corosio::detail::kqueue_scheduler::calculate_timeout(long) const :283 68272x 95.0% 87.5% 90.0% boost::corosio::detail::kqueue_scheduler::run_task(boost::corosio::detail::conditionally_enabled_mutex::scoped_lock&, boost::corosio::detail::reactor_scheduler_context*, long) :313 106763x 94.1% 71.4% 83.0%
Line Branch TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Michael Vandeberg
3 // Copyright (c) 2026 Steve Gerbino
4 //
5 // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 //
8 // Official repository: https://github.com/cppalliance/corosio
9 //
10
11 #ifndef BOOST_COROSIO_NATIVE_DETAIL_KQUEUE_KQUEUE_SCHEDULER_HPP
12 #define BOOST_COROSIO_NATIVE_DETAIL_KQUEUE_KQUEUE_SCHEDULER_HPP
13
14 #include <boost/corosio/detail/platform.hpp>
15
16 #if BOOST_COROSIO_HAS_KQUEUE
17
18 #include <boost/corosio/detail/config.hpp>
19 #include <boost/capy/ex/execution_context.hpp>
20
21 #include <boost/corosio/native/detail/reactor/reactor_scheduler.hpp>
22
23 #include <boost/corosio/native/detail/kqueue/kqueue_op.hpp>
24 #include <boost/corosio/detail/timer_service.hpp>
25 #include <boost/corosio/native/detail/make_err.hpp>
26 #include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
27 #include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
28 #include <boost/corosio/native/detail/posix/posix_stream_file_service.hpp>
29 #include <boost/corosio/native/detail/posix/posix_random_access_file_service.hpp>
30
31 #include <boost/corosio/detail/except.hpp>
32
33 #include <atomic>
34 #include <chrono>
35 #include <cstdint>
36 #include <limits>
37 #include <mutex>
38 #include <vector>
39
40 #include <errno.h>
41 #include <fcntl.h>
42 #include <sys/event.h>
43 #include <sys/time.h>
44 #include <unistd.h>
45
46 namespace boost::corosio::detail {
47
48 struct kqueue_op;
49 struct descriptor_state;
50
51 /** macOS/BSD scheduler using kqueue for I/O multiplexing.
52
53 This scheduler implements the scheduler interface using the BSD kqueue
54 API for efficient I/O event notification. It uses a single reactor model
55 where one thread runs kevent() while other threads
56 wait on a condition variable for handler work. This design provides:
57
58 - Handler parallelism: N posted handlers can execute on N threads
59 - No thundering herd: condition_variable wakes exactly one thread
60 - IOCP parity: Behavior matches Windows I/O completion port semantics
61
62 When threads call run(), they first try to execute queued handlers.
63 If the queue is empty and no reactor is running, one thread becomes
64 the reactor and runs kevent(). Other threads wait on a condition
65 variable until handlers are available.
66
67 kqueue uses EV_CLEAR for edge-triggered semantics (equivalent to
68 epoll's EPOLLET). File descriptors are registered once with both
69 EVFILT_READ and EVFILT_WRITE and stay registered until closed.
70
71 @par Thread Safety
72 All public member functions are thread-safe.
73 */
74 class BOOST_COROSIO_DECL kqueue_scheduler final : public reactor_scheduler
75 {
76 public:
77 /** Construct the scheduler.
78
79 Creates a kqueue file descriptor via kqueue(), sets
80 close-on-exec, and registers EVFILT_USER for reactor
81 interruption. On failure the kqueue fd is closed before
82 throwing.
83
84 @param ctx Reference to the owning execution_context.
85 @param concurrency_hint Hint for expected thread count (unused).
86
87 @throws std::system_error if kqueue() fails, if setting
88 FD_CLOEXEC on the kqueue fd fails, or if registering
89 the EVFILT_USER event fails. The error code contains
90 the errno from the failed syscall.
91 */
92 kqueue_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
93
94 /** Destructor.
95
96 Closes the kqueue file descriptor if valid. Does not throw.
97 */
98 ~kqueue_scheduler();
99
100 kqueue_scheduler(kqueue_scheduler const&) = delete;
101 kqueue_scheduler& operator=(kqueue_scheduler const&) = delete;
102
103 /// Shut down the scheduler, draining pending operations.
104 void shutdown() override;
105
106 /// Apply runtime configuration, resizing the event buffer.
107 void configure_reactor(
108 unsigned max_events,
109 unsigned budget_init,
110 unsigned budget_max,
111 unsigned unassisted) override;
112
113 /** Return the kqueue file descriptor.
114
115 Used by socket services to register file descriptors
116 for I/O event notification.
117
118 @return The kqueue file descriptor.
119 */
120 int kq_fd() const noexcept
121 {
122 return kq_fd_;
123 }
124
125 /** Register a descriptor for persistent monitoring.
126
127 Adds EVFILT_READ and EVFILT_WRITE (both EV_CLEAR) for @a fd
128 and stores @a desc in the kevent udata field so that the
129 reactor can dispatch events to the correct descriptor_state.
130
131 @param fd The file descriptor to register.
132 @param desc Pointer to the caller-owned descriptor_state.
133
134 @throws std::system_error if kevent(EV_ADD) fails.
135 */
136 void register_descriptor(int fd, descriptor_state* desc) const;
137
138 /** Deregister a persistently registered descriptor.
139
140 Issues kevent(EV_DELETE) for both EVFILT_READ and EVFILT_WRITE.
141 Errors are silently ignored because the fd may already be
142 closed and kqueue automatically removes closed descriptors.
143
144 @param fd The file descriptor to deregister.
145 */
146 void deregister_descriptor(int fd) const;
147
148 private:
149 void
150 run_task(lock_type& lock, context_type* ctx,
151 long timeout_us) override;
152 void interrupt_reactor() const override;
153 long calculate_timeout(long requested_timeout_us) const;
154
155 int kq_fd_;
156
157 // EVFILT_USER idempotency
158 437x mutable std::atomic<bool> user_event_armed_{false};
159
160 // Event buffer sized from max_events_per_poll_.
161 std::vector<struct kevent> event_buffer_;
162 };
163
164 874x inline kqueue_scheduler::kqueue_scheduler(capy::execution_context& ctx, int)
165 437x : kq_fd_(-1)
166
1/2
✓ Branch 0 taken 437 times.
✗ Branch 1 not taken.
437x , event_buffer_(max_events_per_poll_)
167 874x {
168
1/2
✓ Branch 0 taken 437 times.
✗ Branch 1 not taken.
437x kq_fd_ = ::kqueue();
169
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 437 times.
437x if (kq_fd_ < 0)
170 detail::throw_system_error(make_err(errno), "kqueue");
171
172
2/4
✓ Branch 0 taken 437 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 437 times.
437x if (::fcntl(kq_fd_, F_SETFD, FD_CLOEXEC) == -1)
173 {
174 int errn = errno;
175 ::close(kq_fd_);
176 detail::throw_system_error(make_err(errn), "fcntl (kqueue FD_CLOEXEC)");
177 }
178
179 struct kevent ev;
180 437x EV_SET(&ev, 0, EVFILT_USER, EV_ADD | EV_CLEAR, 0, 0, nullptr);
181
2/4
✓ Branch 0 taken 437 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 437 times.
437x if (::kevent(kq_fd_, &ev, 1, nullptr, 0, nullptr) < 0)
182 {
183 int errn = errno;
184 ::close(kq_fd_);
185 detail::throw_system_error(make_err(errn), "kevent (EVFILT_USER)");
186 }
187
188
1/2
✓ Branch 0 taken 437 times.
✗ Branch 1 not taken.
437x timer_svc_ = &get_timer_service(ctx, *this);
189 874x timer_svc_->set_on_earliest_changed(
190 5698x timer_service::callback(this, [](void* p) {
191 5261x static_cast<kqueue_scheduler*>(p)->interrupt_reactor();
192 5261x }));
193
194
1/2
✓ Branch 0 taken 437 times.
✗ Branch 1 not taken.
437x get_resolver_service(ctx, *this);
195
1/2
✓ Branch 0 taken 437 times.
✗ Branch 1 not taken.
437x get_signal_service(ctx, *this);
196
1/2
✓ Branch 0 taken 437 times.
✗ Branch 1 not taken.
437x get_stream_file_service(ctx, *this);
197
1/2
✓ Branch 0 taken 437 times.
✗ Branch 1 not taken.
437x get_random_access_file_service(ctx, *this);
198
199 437x completed_ops_.push(&task_op_);
200 874x }
201
202 1311x inline kqueue_scheduler::~kqueue_scheduler()
203 874x {
204
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 437 times.
437x if (kq_fd_ >= 0)
205
1/2
✓ Branch 0 taken 437 times.
✗ Branch 1 not taken.
437x ::close(kq_fd_);
206 1311x }
207
208 inline void
209 437x kqueue_scheduler::shutdown()
210 {
211 437x shutdown_drain();
212
213
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 437 times.
437x if (kq_fd_ >= 0)
214 437x interrupt_reactor();
215 437x }
216
217 inline void
218 kqueue_scheduler::configure_reactor(
219 unsigned max_events,
220 unsigned budget_init,
221 unsigned budget_max,
222 unsigned unassisted)
223 {
224 reactor_scheduler::configure_reactor(
225 max_events, budget_init, budget_max, unassisted);
226 event_buffer_.resize(max_events_per_poll_);
227 }
228
229 inline void
230 11694x kqueue_scheduler::register_descriptor(int fd, descriptor_state* desc) const
231 {
232 struct kevent changes[2];
233 11694x EV_SET(
234 &changes[0], static_cast<uintptr_t>(fd), EVFILT_READ, EV_ADD | EV_CLEAR,
235 0, 0, desc);
236 11694x EV_SET(
237 &changes[1], static_cast<uintptr_t>(fd), EVFILT_WRITE,
238 EV_ADD | EV_CLEAR, 0, 0, desc);
239
240
1/2
✓ Branch 0 taken 11694 times.
✗ Branch 1 not taken.
11694x if (::kevent(kq_fd_, changes, 2, nullptr, 0, nullptr) < 0)
241 detail::throw_system_error(make_err(errno), "kevent (register)");
242
243 11694x desc->registered_events = kqueue_event_read | kqueue_event_write;
244 11694x desc->fd = fd;
245 11694x desc->scheduler_ = this;
246 11694x desc->mutex.set_enabled(!single_threaded_);
247 11694x desc->ready_events_.store(0, std::memory_order_relaxed);
248
249 11694x conditionally_enabled_mutex::scoped_lock lock(desc->mutex);
250 11694x desc->impl_ref_.reset();
251 11694x desc->read_ready = false;
252 11694x desc->write_ready = false;
253 11694x }
254
255 inline void
256 11694x kqueue_scheduler::deregister_descriptor(int fd) const
257 {
258 struct kevent changes[2];
259 11694x EV_SET(
260 &changes[0], static_cast<uintptr_t>(fd), EVFILT_READ, EV_DELETE, 0, 0,
261 nullptr);
262 11694x EV_SET(
263 &changes[1], static_cast<uintptr_t>(fd), EVFILT_WRITE, EV_DELETE, 0, 0,
264 nullptr);
265 11694x ::kevent(kq_fd_, changes, 2, nullptr, 0, nullptr);
266 11694x }
267
268 inline void
269 8204x kqueue_scheduler::interrupt_reactor() const
270 {
271 8204x bool expected = false;
272
2/2
✓ Branch 0 taken 290 times.
✓ Branch 1 taken 7914 times.
8204x if (user_event_armed_.compare_exchange_strong(
273 expected, true, std::memory_order_acq_rel,
274 std::memory_order_acquire))
275 {
276 struct kevent ev;
277 7914x EV_SET(&ev, 0, EVFILT_USER, 0, NOTE_TRIGGER, 0, nullptr);
278 7914x ::kevent(kq_fd_, &ev, 1, nullptr, 0, nullptr);
279 7914x }
280 8204x }
281
282 inline long
283 68272x kqueue_scheduler::calculate_timeout(long requested_timeout_us) const
284 {
285
1/2
✓ Branch 0 taken 68272 times.
✗ Branch 1 not taken.
68272x if (requested_timeout_us == 0)
286 return 0;
287
288 68272x auto nearest = timer_svc_->nearest_expiry();
289
2/2
✓ Branch 0 taken 1951 times.
✓ Branch 1 taken 66321 times.
68272x if (nearest == timer_service::time_point::max())
290 1951x return requested_timeout_us;
291
292 66321x auto now = std::chrono::steady_clock::now();
293
2/2
✓ Branch 0 taken 87 times.
✓ Branch 1 taken 66234 times.
66321x if (nearest <= now)
294 87x return 0;
295
296 66234x auto timer_timeout_us =
297 66234x std::chrono::duration_cast<std::chrono::microseconds>(nearest - now)
298 66234x .count();
299
300 66234x constexpr auto long_max =
301 static_cast<long long>((std::numeric_limits<long>::max)());
302 66234x auto capped_timer_us = std::min(
303 66234x std::max(timer_timeout_us, static_cast<long long>(0)), long_max);
304
305
2/2
✓ Branch 0 taken 6 times.
✓ Branch 1 taken 66228 times.
66234x if (requested_timeout_us < 0)
306 66228x return static_cast<long>(capped_timer_us);
307
308 6x return static_cast<long>(std::min(
309 6x static_cast<long long>(requested_timeout_us), capped_timer_us));
310 68272x }
311
312 inline void
313 106763x kqueue_scheduler::run_task(
314 lock_type& lock, context_type* ctx, long timeout_us)
315 {
316 106763x long effective_timeout_us =
317
2/2
✓ Branch 0 taken 38491 times.
✓ Branch 1 taken 68272 times.
106763x task_interrupted_ ? 0 : calculate_timeout(timeout_us);
318
319
2/2
✓ Branch 0 taken 38491 times.
✓ Branch 1 taken 68272 times.
106763x if (lock.owns_lock())
320 68272x lock.unlock();
321
322 106763x task_cleanup on_exit{this, &lock, ctx};
323
324 struct timespec ts;
325 106763x struct timespec* ts_ptr = nullptr;
326
2/2
✓ Branch 0 taken 1949 times.
✓ Branch 1 taken 104814 times.
106763x if (effective_timeout_us >= 0)
327 {
328 104814x ts.tv_sec = effective_timeout_us / 1000000;
329 104814x ts.tv_nsec = (effective_timeout_us % 1000000) * 1000;
330 104814x ts_ptr = &ts;
331 104814x }
332
333
1/2
✓ Branch 0 taken 106763 times.
✗ Branch 1 not taken.
106763x int nev = ::kevent(
334 106763x kq_fd_, nullptr, 0, event_buffer_.data(),
335 106763x static_cast<int>(event_buffer_.size()), ts_ptr);
336
1/2
✓ Branch 0 taken 106763 times.
✗ Branch 1 not taken.
106763x int saved_errno = errno;
337
338
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 106763 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
106763x if (nev < 0 && saved_errno != EINTR)
339 detail::throw_system_error(make_err(saved_errno), "kevent");
340
341 106763x op_queue local_ops;
342
343
2/2
✓ Branch 0 taken 106763 times.
✓ Branch 1 taken 155697 times.
262460x for (int i = 0; i < nev; ++i)
344 {
345
2/2
✓ Branch 0 taken 7477 times.
✓ Branch 1 taken 148220 times.
155697x if (event_buffer_[i].filter == EVFILT_USER)
346 {
347 7477x user_event_armed_.store(false, std::memory_order_release);
348 7477x continue;
349 }
350
351 148220x auto* desc =
352 148220x static_cast<descriptor_state*>(event_buffer_[i].udata);
353
1/2
✓ Branch 0 taken 148220 times.
✗ Branch 1 not taken.
148220x if (!desc)
354 continue;
355
356 148220x std::uint32_t ready = 0;
357
358
2/2
✓ Branch 0 taken 74058 times.
✓ Branch 1 taken 74162 times.
148220x if (event_buffer_[i].filter == EVFILT_READ)
359 74162x ready |= kqueue_event_read;
360
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 74058 times.
74058x else if (event_buffer_[i].filter == EVFILT_WRITE)
361 74058x ready |= kqueue_event_write;
362
363
1/2
✓ Branch 0 taken 148220 times.
✗ Branch 1 not taken.
148220x if (event_buffer_[i].flags & EV_ERROR)
364 ready |= kqueue_event_error;
365
366
2/2
✓ Branch 0 taken 148186 times.
✓ Branch 1 taken 34 times.
148220x if (event_buffer_[i].flags & EV_EOF)
367 {
368
2/2
✓ Branch 0 taken 11 times.
✓ Branch 1 taken 23 times.
34x if (event_buffer_[i].filter == EVFILT_READ)
369 23x ready |= kqueue_event_read;
370
2/2
✓ Branch 0 taken 18 times.
✓ Branch 1 taken 16 times.
34x if (event_buffer_[i].fflags != 0)
371 16x ready |= kqueue_event_error;
372 34x }
373
374 148220x desc->add_ready_events(ready);
375
376 148220x bool expected = false;
377
2/2
✓ Branch 0 taken 145 times.
✓ Branch 1 taken 148075 times.
148220x if (desc->is_enqueued_.compare_exchange_strong(
378 expected, true, std::memory_order_acq_rel,
379 std::memory_order_acquire))
380 {
381 148075x local_ops.push(desc);
382 148075x }
383 148220x }
384
385
1/2
✓ Branch 0 taken 106763 times.
✗ Branch 1 not taken.
106763x timer_svc_->process_expired();
386
387
1/2
✓ Branch 0 taken 106763 times.
✗ Branch 1 not taken.
106763x lock.lock();
388
389
2/2
✓ Branch 0 taken 28710 times.
✓ Branch 1 taken 78053 times.
106763x if (!local_ops.empty())
390 78053x completed_ops_.splice(local_ops);
391 106763x }
392
393 } // namespace boost::corosio::detail
394
395 #endif // BOOST_COROSIO_HAS_KQUEUE
396
397 #endif // BOOST_COROSIO_NATIVE_DETAIL_KQUEUE_KQUEUE_SCHEDULER_HPP
398