include/boost/corosio/native/detail/kqueue/kqueue_scheduler.hpp

90.1% Lines (127/141) 100.0% List of functions (12/13) 54.0% Branches (54/100)
kqueue_scheduler.hpp
f(x) Functions (13)
Function Calls Lines Branches Blocks
<unknown function 157> :157 boost::corosio::detail::kqueue_scheduler::kqueue_scheduler(boost::capy::execution_context&, int) :163 1452x 61.1% 25.0% 100.0% boost::corosio::detail::kqueue_scheduler::kqueue_scheduler(boost::capy::execution_context&, int)::'lambda'(void*)::__invoke(void*) :189 4720x 100.0% 100.0% boost::corosio::detail::kqueue_scheduler::kqueue_scheduler(boost::capy::execution_context&, int)::'lambda'(void*)::operator void (*)(void*)() const :189 726x 100.0% 100.0% boost::corosio::detail::kqueue_scheduler::kqueue_scheduler(boost::capy::execution_context&, int)::'lambda'(void*)::operator()(void*) const :189 4720x 100.0% 50.0% 100.0% boost::corosio::detail::kqueue_scheduler::~kqueue_scheduler() :201 2178x 100.0% 50.0% 100.0% boost::corosio::detail::kqueue_scheduler::shutdown() :208 726x 100.0% 50.0% 100.0% boost::corosio::detail::kqueue_scheduler::configure_reactor(unsigned int, unsigned int, unsigned int, unsigned int) :217 10x 100.0% 100.0% boost::corosio::detail::kqueue_scheduler::register_descriptor(int, boost::corosio::detail::reactor_descriptor_state*) const :229 10701x 93.3% 50.0% 85.0% boost::corosio::detail::kqueue_scheduler::deregister_descriptor(int) const :255 10701x 100.0% 100.0% boost::corosio::detail::kqueue_scheduler::interrupt_reactor() const :268 7881x 100.0% 100.0% 100.0% boost::corosio::detail::kqueue_scheduler::calculate_timeout(long) const :282 52650x 85.0% 75.0% 80.0% boost::corosio::detail::kqueue_scheduler::run_task(boost::corosio::detail::conditionally_enabled_mutex::scoped_lock&, boost::corosio::detail::reactor_scheduler_context*, long) :312 89611x 94.1% 71.4% 83.0%
Line Branch TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Michael Vandeberg
3 // Copyright (c) 2026 Steve Gerbino
4 //
5 // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 //
8 // Official repository: https://github.com/cppalliance/corosio
9 //
10
11 #ifndef BOOST_COROSIO_NATIVE_DETAIL_KQUEUE_KQUEUE_SCHEDULER_HPP
12 #define BOOST_COROSIO_NATIVE_DETAIL_KQUEUE_KQUEUE_SCHEDULER_HPP
13
14 #include <boost/corosio/detail/platform.hpp>
15
16 #if BOOST_COROSIO_HAS_KQUEUE
17
18 #include <boost/corosio/detail/config.hpp>
19 #include <boost/capy/ex/execution_context.hpp>
20
21 #include <boost/corosio/native/detail/reactor/reactor_scheduler.hpp>
22
23 #include <boost/corosio/native/detail/kqueue/kqueue_traits.hpp>
24 #include <boost/corosio/detail/timer_service.hpp>
25 #include <boost/corosio/native/detail/make_err.hpp>
26 #include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
27 #include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
28 #include <boost/corosio/native/detail/posix/posix_stream_file_service.hpp>
29 #include <boost/corosio/native/detail/posix/posix_random_access_file_service.hpp>
30
31 #include <boost/corosio/detail/except.hpp>
32
33 #include <atomic>
34 #include <chrono>
35 #include <cstdint>
36 #include <limits>
37 #include <mutex>
38 #include <vector>
39
40 #include <errno.h>
41 #include <fcntl.h>
42 #include <sys/event.h>
43 #include <sys/time.h>
44 #include <unistd.h>
45
46 namespace boost::corosio::detail {
47
48 struct kqueue_op;
49
50 /** macOS/BSD scheduler using kqueue for I/O multiplexing.
51
52 This scheduler implements the scheduler interface using the BSD kqueue
53 API for efficient I/O event notification. It uses a single reactor model
54 where one thread runs kevent() while other threads
55 wait on a condition variable for handler work. This design provides:
56
57 - Handler parallelism: N posted handlers can execute on N threads
58 - No thundering herd: condition_variable wakes exactly one thread
59 - IOCP parity: Behavior matches Windows I/O completion port semantics
60
61 When threads call run(), they first try to execute queued handlers.
62 If the queue is empty and no reactor is running, one thread becomes
63 the reactor and runs kevent(). Other threads wait on a condition
64 variable until handlers are available.
65
66 kqueue uses EV_CLEAR for edge-triggered semantics (equivalent to
67 epoll's EPOLLET). File descriptors are registered once with both
68 EVFILT_READ and EVFILT_WRITE and stay registered until closed.
69
70 @par Thread Safety
71 All public member functions are thread-safe.
72 */
73 class BOOST_COROSIO_DECL kqueue_scheduler final : public reactor_scheduler
74 {
75 public:
76 /** Construct the scheduler.
77
78 Creates a kqueue file descriptor via kqueue(), sets
79 close-on-exec, and registers EVFILT_USER for reactor
80 interruption. On failure the kqueue fd is closed before
81 throwing.
82
83 @param ctx Reference to the owning execution_context.
84 @param concurrency_hint Hint for expected thread count (unused).
85
86 @throws std::system_error if kqueue() fails, if setting
87 FD_CLOEXEC on the kqueue fd fails, or if registering
88 the EVFILT_USER event fails. The error code contains
89 the errno from the failed syscall.
90 */
91 kqueue_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
92
93 /** Destructor.
94
95 Closes the kqueue file descriptor if valid. Does not throw.
96 */
97 ~kqueue_scheduler();
98
99 kqueue_scheduler(kqueue_scheduler const&) = delete;
100 kqueue_scheduler& operator=(kqueue_scheduler const&) = delete;
101
102 /// Shut down the scheduler, draining pending operations.
103 void shutdown() override;
104
105 /// Apply runtime configuration, resizing the event buffer.
106 void configure_reactor(
107 unsigned max_events,
108 unsigned budget_init,
109 unsigned budget_max,
110 unsigned unassisted) override;
111
112 /** Return the kqueue file descriptor.
113
114 Used by socket services to register file descriptors
115 for I/O event notification.
116
117 @return The kqueue file descriptor.
118 */
119 int kq_fd() const noexcept
120 {
121 return kq_fd_;
122 }
123
124 /** Register a descriptor for persistent monitoring.
125
126 Adds EVFILT_READ and EVFILT_WRITE (both EV_CLEAR) for @a fd
127 and stores @a desc in the kevent udata field so that the
128 reactor can dispatch events to the correct reactor_descriptor_state.
129
130 @param fd The file descriptor to register.
131 @param desc Pointer to the caller-owned reactor_descriptor_state.
132
133 @throws std::system_error if kevent(EV_ADD) fails.
134 */
135 void register_descriptor(int fd, reactor_descriptor_state* desc) const;
136
137 /** Deregister a persistently registered descriptor.
138
139 Issues kevent(EV_DELETE) for both EVFILT_READ and EVFILT_WRITE.
140 Errors are silently ignored because the fd may already be
141 closed and kqueue automatically removes closed descriptors.
142
143 @param fd The file descriptor to deregister.
144 */
145 void deregister_descriptor(int fd) const;
146
147 private:
148 void
149 run_task(lock_type& lock, context_type* ctx,
150 long timeout_us) override;
151 void interrupt_reactor() const override;
152 long calculate_timeout(long requested_timeout_us) const;
153
154 int kq_fd_;
155
156 // EVFILT_USER idempotency
157 726x mutable std::atomic<bool> user_event_armed_{false};
158
159 // Event buffer sized from max_events_per_poll_.
160 std::vector<struct kevent> event_buffer_;
161 };
162
163 1452x inline kqueue_scheduler::kqueue_scheduler(capy::execution_context& ctx, int)
164 726x : kq_fd_(-1)
165
1/2
✓ Branch 0 taken 726 times.
✗ Branch 1 not taken.
726x , event_buffer_(max_events_per_poll_)
166 1452x {
167
1/2
✓ Branch 0 taken 726 times.
✗ Branch 1 not taken.
726x kq_fd_ = ::kqueue();
168
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 726 times.
726x if (kq_fd_ < 0)
169 detail::throw_system_error(make_err(errno), "kqueue");
170
171
2/4
✓ Branch 0 taken 726 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 726 times.
726x if (::fcntl(kq_fd_, F_SETFD, FD_CLOEXEC) == -1)
172 {
173 int errn = errno;
174 ::close(kq_fd_);
175 detail::throw_system_error(make_err(errn), "fcntl (kqueue FD_CLOEXEC)");
176 }
177
178 struct kevent ev;
179 726x EV_SET(&ev, 0, EVFILT_USER, EV_ADD | EV_CLEAR, 0, 0, nullptr);
180
2/4
✓ Branch 0 taken 726 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 726 times.
726x if (::kevent(kq_fd_, &ev, 1, nullptr, 0, nullptr) < 0)
181 {
182 int errn = errno;
183 ::close(kq_fd_);
184 detail::throw_system_error(make_err(errn), "kevent (EVFILT_USER)");
185 }
186
187
1/2
✓ Branch 0 taken 726 times.
✗ Branch 1 not taken.
726x timer_svc_ = &get_timer_service(ctx, *this);
188 1452x timer_svc_->set_on_earliest_changed(
189 5446x timer_service::callback(this, [](void* p) {
190 4720x static_cast<kqueue_scheduler*>(p)->interrupt_reactor();
191 4720x }));
192
193
1/2
✓ Branch 0 taken 726 times.
✗ Branch 1 not taken.
726x get_resolver_service(ctx, *this);
194
1/2
✓ Branch 0 taken 726 times.
✗ Branch 1 not taken.
726x get_signal_service(ctx, *this);
195
1/2
✓ Branch 0 taken 726 times.
✗ Branch 1 not taken.
726x get_stream_file_service(ctx, *this);
196
1/2
✓ Branch 0 taken 726 times.
✗ Branch 1 not taken.
726x get_random_access_file_service(ctx, *this);
197
198 726x completed_ops_.push(&task_op_);
199 1452x }
200
201 2178x inline kqueue_scheduler::~kqueue_scheduler()
202 1452x {
203
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 726 times.
726x if (kq_fd_ >= 0)
204
1/2
✓ Branch 0 taken 726 times.
✗ Branch 1 not taken.
726x ::close(kq_fd_);
205 2178x }
206
207 inline void
208 726x kqueue_scheduler::shutdown()
209 {
210 726x shutdown_drain();
211
212
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 726 times.
726x if (kq_fd_ >= 0)
213 726x interrupt_reactor();
214 726x }
215
216 inline void
217 10x kqueue_scheduler::configure_reactor(
218 unsigned max_events,
219 unsigned budget_init,
220 unsigned budget_max,
221 unsigned unassisted)
222 {
223 10x reactor_scheduler::configure_reactor(
224 10x max_events, budget_init, budget_max, unassisted);
225 10x event_buffer_.resize(max_events_per_poll_);
226 10x }
227
228 inline void
229 10701x kqueue_scheduler::register_descriptor(int fd, reactor_descriptor_state* desc) const
230 {
231 struct kevent changes[2];
232 10701x EV_SET(
233 &changes[0], static_cast<uintptr_t>(fd), EVFILT_READ, EV_ADD | EV_CLEAR,
234 0, 0, desc);
235 10701x EV_SET(
236 &changes[1], static_cast<uintptr_t>(fd), EVFILT_WRITE,
237 EV_ADD | EV_CLEAR, 0, 0, desc);
238
239
1/2
✓ Branch 0 taken 10701 times.
✗ Branch 1 not taken.
10701x if (::kevent(kq_fd_, changes, 2, nullptr, 0, nullptr) < 0)
240 detail::throw_system_error(make_err(errno), "kevent (register)");
241
242 10701x desc->registered_events = reactor_event_read | reactor_event_write;
243 10701x desc->fd = fd;
244 10701x desc->scheduler_ = this;
245 10701x desc->mutex.set_enabled(!single_threaded_);
246 10701x desc->ready_events_.store(0, std::memory_order_relaxed);
247
248 10701x conditionally_enabled_mutex::scoped_lock lock(desc->mutex);
249 10701x desc->impl_ref_.reset();
250 10701x desc->read_ready = false;
251 10701x desc->write_ready = false;
252 10701x }
253
254 inline void
255 10701x kqueue_scheduler::deregister_descriptor(int fd) const
256 {
257 struct kevent changes[2];
258 10701x EV_SET(
259 &changes[0], static_cast<uintptr_t>(fd), EVFILT_READ, EV_DELETE, 0, 0,
260 nullptr);
261 10701x EV_SET(
262 &changes[1], static_cast<uintptr_t>(fd), EVFILT_WRITE, EV_DELETE, 0, 0,
263 nullptr);
264 10701x ::kevent(kq_fd_, changes, 2, nullptr, 0, nullptr);
265 10701x }
266
267 inline void
268 7881x kqueue_scheduler::interrupt_reactor() const
269 {
270 7881x bool expected = false;
271
2/2
✓ Branch 0 taken 455 times.
✓ Branch 1 taken 7426 times.
7881x if (user_event_armed_.compare_exchange_strong(
272 expected, true, std::memory_order_acq_rel,
273 std::memory_order_acquire))
274 {
275 struct kevent ev;
276 7426x EV_SET(&ev, 0, EVFILT_USER, 0, NOTE_TRIGGER, 0, nullptr);
277 7426x ::kevent(kq_fd_, &ev, 1, nullptr, 0, nullptr);
278 7426x }
279 7881x }
280
281 inline long
282 52650x kqueue_scheduler::calculate_timeout(long requested_timeout_us) const
283 {
284
1/2
✓ Branch 0 taken 52650 times.
✗ Branch 1 not taken.
52650x if (requested_timeout_us == 0)
285 return 0;
286
287 52650x auto nearest = timer_svc_->nearest_expiry();
288
2/2
✓ Branch 0 taken 2802 times.
✓ Branch 1 taken 49848 times.
52650x if (nearest == timer_service::time_point::max())
289 2802x return requested_timeout_us;
290
291 49848x auto now = std::chrono::steady_clock::now();
292
2/2
✓ Branch 0 taken 90 times.
✓ Branch 1 taken 49758 times.
49848x if (nearest <= now)
293 90x return 0;
294
295 49758x auto timer_timeout_us =
296 49758x std::chrono::duration_cast<std::chrono::microseconds>(nearest - now)
297 49758x .count();
298
299 49758x constexpr auto long_max =
300 static_cast<long long>((std::numeric_limits<long>::max)());
301 49758x auto capped_timer_us = std::min(
302 49758x std::max(timer_timeout_us, static_cast<long long>(0)), long_max);
303
304
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 49758 times.
49758x if (requested_timeout_us < 0)
305 49758x return static_cast<long>(capped_timer_us);
306
307 return static_cast<long>(std::min(
308 static_cast<long long>(requested_timeout_us), capped_timer_us));
309 52650x }
310
311 inline void
312 89611x kqueue_scheduler::run_task(
313 lock_type& lock, context_type* ctx, long timeout_us)
314 {
315 89611x long effective_timeout_us =
316
2/2
✓ Branch 0 taken 36961 times.
✓ Branch 1 taken 52650 times.
89611x task_interrupted_ ? 0 : calculate_timeout(timeout_us);
317
318
2/2
✓ Branch 0 taken 36961 times.
✓ Branch 1 taken 52650 times.
89611x if (lock.owns_lock())
319 52650x lock.unlock();
320
321 89611x task_cleanup on_exit{this, &lock, ctx};
322
323 struct timespec ts;
324 89611x struct timespec* ts_ptr = nullptr;
325
2/2
✓ Branch 0 taken 2798 times.
✓ Branch 1 taken 86813 times.
89611x if (effective_timeout_us >= 0)
326 {
327 86813x ts.tv_sec = effective_timeout_us / 1000000;
328 86813x ts.tv_nsec = (effective_timeout_us % 1000000) * 1000;
329 86813x ts_ptr = &ts;
330 86813x }
331
332
1/2
✓ Branch 0 taken 89611 times.
✗ Branch 1 not taken.
89611x int nev = ::kevent(
333 89611x kq_fd_, nullptr, 0, event_buffer_.data(),
334 89611x static_cast<int>(event_buffer_.size()), ts_ptr);
335
1/2
✓ Branch 0 taken 89611 times.
✗ Branch 1 not taken.
89611x int saved_errno = errno;
336
337
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 89611 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
89611x if (nev < 0 && saved_errno != EINTR)
338 detail::throw_system_error(make_err(saved_errno), "kevent");
339
340 89611x op_queue local_ops;
341
342
2/2
✓ Branch 0 taken 89611 times.
✓ Branch 1 taken 121066 times.
210677x for (int i = 0; i < nev; ++i)
343 {
344
2/2
✓ Branch 0 taken 6700 times.
✓ Branch 1 taken 114366 times.
121066x if (event_buffer_[i].filter == EVFILT_USER)
345 {
346 6700x user_event_armed_.store(false, std::memory_order_release);
347 6700x continue;
348 }
349
350 114366x auto* desc =
351 114366x static_cast<reactor_descriptor_state*>(event_buffer_[i].udata);
352
1/2
✓ Branch 0 taken 114366 times.
✗ Branch 1 not taken.
114366x if (!desc)
353 continue;
354
355 114366x std::uint32_t ready = 0;
356
357
2/2
✓ Branch 0 taken 57658 times.
✓ Branch 1 taken 56708 times.
114366x if (event_buffer_[i].filter == EVFILT_READ)
358 56708x ready |= reactor_event_read;
359
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 57658 times.
57658x else if (event_buffer_[i].filter == EVFILT_WRITE)
360 57658x ready |= reactor_event_write;
361
362
1/2
✓ Branch 0 taken 114366 times.
✗ Branch 1 not taken.
114366x if (event_buffer_[i].flags & EV_ERROR)
363 ready |= reactor_event_error;
364
365
2/2
✓ Branch 0 taken 114310 times.
✓ Branch 1 taken 56 times.
114366x if (event_buffer_[i].flags & EV_EOF)
366 {
367
2/2
✓ Branch 0 taken 15 times.
✓ Branch 1 taken 41 times.
56x if (event_buffer_[i].filter == EVFILT_READ)
368 41x ready |= reactor_event_read;
369
2/2
✓ Branch 0 taken 32 times.
✓ Branch 1 taken 24 times.
56x if (event_buffer_[i].fflags != 0)
370 24x ready |= reactor_event_error;
371 56x }
372
373 114366x desc->add_ready_events(ready);
374
375 114366x bool expected = false;
376
2/2
✓ Branch 0 taken 91 times.
✓ Branch 1 taken 114275 times.
114366x if (desc->is_enqueued_.compare_exchange_strong(
377 expected, true, std::memory_order_acq_rel,
378 std::memory_order_acquire))
379 {
380 114275x local_ops.push(desc);
381 114275x }
382 114366x }
383
384
1/2
✓ Branch 0 taken 89611 times.
✗ Branch 1 not taken.
89611x timer_svc_->process_expired();
385
386
1/2
✓ Branch 0 taken 89611 times.
✗ Branch 1 not taken.
89611x lock.lock();
387
388
2/2
✓ Branch 0 taken 27255 times.
✓ Branch 1 taken 62356 times.
89611x if (!local_ops.empty())
389 62356x completed_ops_.splice(local_ops);
390 89611x }
391
392 } // namespace boost::corosio::detail
393
394 #endif // BOOST_COROSIO_HAS_KQUEUE
395
396 #endif // BOOST_COROSIO_NATIVE_DETAIL_KQUEUE_KQUEUE_SCHEDULER_HPP
397