include/boost/corosio/native/detail/kqueue/kqueue_scheduler.hpp

90.1% Lines (127/141) 100.0% List of functions (12/13) 54.0% Branches (54/100)
kqueue_scheduler.hpp
f(x) Functions (13)
Function Calls Lines Branches Blocks
<unknown function 157> :157 boost::corosio::detail::kqueue_scheduler::kqueue_scheduler(boost::capy::execution_context&, int) :163 1326x 61.1% 25.0% 100.0% boost::corosio::detail::kqueue_scheduler::kqueue_scheduler(boost::capy::execution_context&, int)::'lambda'(void*)::__invoke(void*) :189 4220x 100.0% 100.0% boost::corosio::detail::kqueue_scheduler::kqueue_scheduler(boost::capy::execution_context&, int)::'lambda'(void*)::operator void (*)(void*)() const :189 663x 100.0% 100.0% boost::corosio::detail::kqueue_scheduler::kqueue_scheduler(boost::capy::execution_context&, int)::'lambda'(void*)::operator()(void*) const :189 4220x 100.0% 50.0% 100.0% boost::corosio::detail::kqueue_scheduler::~kqueue_scheduler() :201 1989x 100.0% 50.0% 100.0% boost::corosio::detail::kqueue_scheduler::shutdown() :208 663x 100.0% 50.0% 100.0% boost::corosio::detail::kqueue_scheduler::configure_reactor(unsigned int, unsigned int, unsigned int, unsigned int) :217 8x 100.0% 100.0% boost::corosio::detail::kqueue_scheduler::register_descriptor(int, boost::corosio::detail::reactor_descriptor_state*) const :229 9821x 93.3% 50.0% 85.0% boost::corosio::detail::kqueue_scheduler::deregister_descriptor(int) const :255 9821x 100.0% 100.0% boost::corosio::detail::kqueue_scheduler::interrupt_reactor() const :268 7407x 100.0% 100.0% 100.0% boost::corosio::detail::kqueue_scheduler::calculate_timeout(long) const :282 45210x 85.0% 75.0% 80.0% boost::corosio::detail::kqueue_scheduler::run_task(boost::corosio::detail::conditionally_enabled_mutex::scoped_lock&, boost::corosio::detail::reactor_scheduler_context*, long) :312 85779x 94.1% 71.4% 83.0%
Line Branch TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Michael Vandeberg
3 // Copyright (c) 2026 Steve Gerbino
4 //
5 // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 //
8 // Official repository: https://github.com/cppalliance/corosio
9 //
10
11 #ifndef BOOST_COROSIO_NATIVE_DETAIL_KQUEUE_KQUEUE_SCHEDULER_HPP
12 #define BOOST_COROSIO_NATIVE_DETAIL_KQUEUE_KQUEUE_SCHEDULER_HPP
13
14 #include <boost/corosio/detail/platform.hpp>
15
16 #if BOOST_COROSIO_HAS_KQUEUE
17
18 #include <boost/corosio/detail/config.hpp>
19 #include <boost/capy/ex/execution_context.hpp>
20
21 #include <boost/corosio/native/detail/reactor/reactor_scheduler.hpp>
22
23 #include <boost/corosio/native/detail/kqueue/kqueue_traits.hpp>
24 #include <boost/corosio/detail/timer_service.hpp>
25 #include <boost/corosio/native/detail/make_err.hpp>
26 #include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
27 #include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
28 #include <boost/corosio/native/detail/posix/posix_stream_file_service.hpp>
29 #include <boost/corosio/native/detail/posix/posix_random_access_file_service.hpp>
30
31 #include <boost/corosio/detail/except.hpp>
32
33 #include <atomic>
34 #include <chrono>
35 #include <cstdint>
36 #include <limits>
37 #include <mutex>
38 #include <vector>
39
40 #include <errno.h>
41 #include <fcntl.h>
42 #include <sys/event.h>
43 #include <sys/time.h>
44 #include <unistd.h>
45
46 namespace boost::corosio::detail {
47
48 struct kqueue_op;
49
50 /** macOS/BSD scheduler using kqueue for I/O multiplexing.
51
52 This scheduler implements the scheduler interface using the BSD kqueue
53 API for efficient I/O event notification. It uses a single reactor model
54 where one thread runs kevent() while other threads
55 wait on a condition variable for handler work. This design provides:
56
57 - Handler parallelism: N posted handlers can execute on N threads
58 - No thundering herd: condition_variable wakes exactly one thread
59 - IOCP parity: Behavior matches Windows I/O completion port semantics
60
61 When threads call run(), they first try to execute queued handlers.
62 If the queue is empty and no reactor is running, one thread becomes
63 the reactor and runs kevent(). Other threads wait on a condition
64 variable until handlers are available.
65
66 kqueue uses EV_CLEAR for edge-triggered semantics (equivalent to
67 epoll's EPOLLET). File descriptors are registered once with both
68 EVFILT_READ and EVFILT_WRITE and stay registered until closed.
69
70 @par Thread Safety
71 All public member functions are thread-safe.
72 */
73 class BOOST_COROSIO_DECL kqueue_scheduler final : public reactor_scheduler
74 {
75 public:
76 /** Construct the scheduler.
77
78 Creates a kqueue file descriptor via kqueue(), sets
79 close-on-exec, and registers EVFILT_USER for reactor
80 interruption. On failure the kqueue fd is closed before
81 throwing.
82
83 @param ctx Reference to the owning execution_context.
84 @param concurrency_hint Hint for expected thread count (unused).
85
86 @throws std::system_error if kqueue() fails, if setting
87 FD_CLOEXEC on the kqueue fd fails, or if registering
88 the EVFILT_USER event fails. The error code contains
89 the errno from the failed syscall.
90 */
91 kqueue_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
92
93 /** Destructor.
94
95 Closes the kqueue file descriptor if valid. Does not throw.
96 */
97 ~kqueue_scheduler();
98
99 kqueue_scheduler(kqueue_scheduler const&) = delete;
100 kqueue_scheduler& operator=(kqueue_scheduler const&) = delete;
101
102 /// Shut down the scheduler, draining pending operations.
103 void shutdown() override;
104
105 /// Apply runtime configuration, resizing the event buffer.
106 void configure_reactor(
107 unsigned max_events,
108 unsigned budget_init,
109 unsigned budget_max,
110 unsigned unassisted) override;
111
112 /** Return the kqueue file descriptor.
113
114 Used by socket services to register file descriptors
115 for I/O event notification.
116
117 @return The kqueue file descriptor.
118 */
119 int kq_fd() const noexcept
120 {
121 return kq_fd_;
122 }
123
124 /** Register a descriptor for persistent monitoring.
125
126 Adds EVFILT_READ and EVFILT_WRITE (both EV_CLEAR) for @a fd
127 and stores @a desc in the kevent udata field so that the
128 reactor can dispatch events to the correct reactor_descriptor_state.
129
130 @param fd The file descriptor to register.
131 @param desc Pointer to the caller-owned reactor_descriptor_state.
132
133 @throws std::system_error if kevent(EV_ADD) fails.
134 */
135 void register_descriptor(int fd, reactor_descriptor_state* desc) const;
136
137 /** Deregister a persistently registered descriptor.
138
139 Issues kevent(EV_DELETE) for both EVFILT_READ and EVFILT_WRITE.
140 Errors are silently ignored because the fd may already be
141 closed and kqueue automatically removes closed descriptors.
142
143 @param fd The file descriptor to deregister.
144 */
145 void deregister_descriptor(int fd) const;
146
147 private:
148 void
149 run_task(lock_type& lock, context_type* ctx,
150 long timeout_us) override;
151 void interrupt_reactor() const override;
152 long calculate_timeout(long requested_timeout_us) const;
153
154 int kq_fd_;
155
156 // EVFILT_USER idempotency
157 663x mutable std::atomic<bool> user_event_armed_{false};
158
159 // Event buffer sized from max_events_per_poll_.
160 std::vector<struct kevent> event_buffer_;
161 };
162
163 1326x inline kqueue_scheduler::kqueue_scheduler(capy::execution_context& ctx, int)
164 663x : kq_fd_(-1)
165
1/2
✓ Branch 0 taken 663 times.
✗ Branch 1 not taken.
663x , event_buffer_(max_events_per_poll_)
166 1326x {
167
1/2
✓ Branch 0 taken 663 times.
✗ Branch 1 not taken.
663x kq_fd_ = ::kqueue();
168
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 663 times.
663x if (kq_fd_ < 0)
169 detail::throw_system_error(make_err(errno), "kqueue");
170
171
2/4
✓ Branch 0 taken 663 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 663 times.
663x if (::fcntl(kq_fd_, F_SETFD, FD_CLOEXEC) == -1)
172 {
173 int errn = errno;
174 ::close(kq_fd_);
175 detail::throw_system_error(make_err(errn), "fcntl (kqueue FD_CLOEXEC)");
176 }
177
178 struct kevent ev;
179 663x EV_SET(&ev, 0, EVFILT_USER, EV_ADD | EV_CLEAR, 0, 0, nullptr);
180
2/4
✓ Branch 0 taken 663 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 663 times.
663x if (::kevent(kq_fd_, &ev, 1, nullptr, 0, nullptr) < 0)
181 {
182 int errn = errno;
183 ::close(kq_fd_);
184 detail::throw_system_error(make_err(errn), "kevent (EVFILT_USER)");
185 }
186
187
1/2
✓ Branch 0 taken 663 times.
✗ Branch 1 not taken.
663x timer_svc_ = &get_timer_service(ctx, *this);
188 1326x timer_svc_->set_on_earliest_changed(
189 4883x timer_service::callback(this, [](void* p) {
190 4220x static_cast<kqueue_scheduler*>(p)->interrupt_reactor();
191 4220x }));
192
193
1/2
✓ Branch 0 taken 663 times.
✗ Branch 1 not taken.
663x get_resolver_service(ctx, *this);
194
1/2
✓ Branch 0 taken 663 times.
✗ Branch 1 not taken.
663x get_signal_service(ctx, *this);
195
1/2
✓ Branch 0 taken 663 times.
✗ Branch 1 not taken.
663x get_stream_file_service(ctx, *this);
196
1/2
✓ Branch 0 taken 663 times.
✗ Branch 1 not taken.
663x get_random_access_file_service(ctx, *this);
197
198 663x completed_ops_.push(&task_op_);
199 1326x }
200
201 1989x inline kqueue_scheduler::~kqueue_scheduler()
202 1326x {
203
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 663 times.
663x if (kq_fd_ >= 0)
204
1/2
✓ Branch 0 taken 663 times.
✗ Branch 1 not taken.
663x ::close(kq_fd_);
205 1989x }
206
207 inline void
208 663x kqueue_scheduler::shutdown()
209 {
210 663x shutdown_drain();
211
212
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 663 times.
663x if (kq_fd_ >= 0)
213 663x interrupt_reactor();
214 663x }
215
216 inline void
217 8x kqueue_scheduler::configure_reactor(
218 unsigned max_events,
219 unsigned budget_init,
220 unsigned budget_max,
221 unsigned unassisted)
222 {
223 8x reactor_scheduler::configure_reactor(
224 8x max_events, budget_init, budget_max, unassisted);
225 8x event_buffer_.resize(max_events_per_poll_);
226 8x }
227
228 inline void
229 9821x kqueue_scheduler::register_descriptor(int fd, reactor_descriptor_state* desc) const
230 {
231 struct kevent changes[2];
232 9821x EV_SET(
233 &changes[0], static_cast<uintptr_t>(fd), EVFILT_READ, EV_ADD | EV_CLEAR,
234 0, 0, desc);
235 9821x EV_SET(
236 &changes[1], static_cast<uintptr_t>(fd), EVFILT_WRITE,
237 EV_ADD | EV_CLEAR, 0, 0, desc);
238
239
1/2
✓ Branch 0 taken 9821 times.
✗ Branch 1 not taken.
9821x if (::kevent(kq_fd_, changes, 2, nullptr, 0, nullptr) < 0)
240 detail::throw_system_error(make_err(errno), "kevent (register)");
241
242 9821x desc->registered_events = reactor_event_read | reactor_event_write;
243 9821x desc->fd = fd;
244 9821x desc->scheduler_ = this;
245 9821x desc->mutex.set_enabled(!single_threaded_);
246 9821x desc->ready_events_.store(0, std::memory_order_relaxed);
247
248 9821x conditionally_enabled_mutex::scoped_lock lock(desc->mutex);
249 9821x desc->impl_ref_.reset();
250 9821x desc->read_ready = false;
251 9821x desc->write_ready = false;
252 9821x }
253
254 inline void
255 9821x kqueue_scheduler::deregister_descriptor(int fd) const
256 {
257 struct kevent changes[2];
258 9821x EV_SET(
259 &changes[0], static_cast<uintptr_t>(fd), EVFILT_READ, EV_DELETE, 0, 0,
260 nullptr);
261 9821x EV_SET(
262 &changes[1], static_cast<uintptr_t>(fd), EVFILT_WRITE, EV_DELETE, 0, 0,
263 nullptr);
264 9821x ::kevent(kq_fd_, changes, 2, nullptr, 0, nullptr);
265 9821x }
266
267 inline void
268 7407x kqueue_scheduler::interrupt_reactor() const
269 {
270 7407x bool expected = false;
271
2/2
✓ Branch 0 taken 421 times.
✓ Branch 1 taken 6986 times.
7407x if (user_event_armed_.compare_exchange_strong(
272 expected, true, std::memory_order_acq_rel,
273 std::memory_order_acquire))
274 {
275 struct kevent ev;
276 6986x EV_SET(&ev, 0, EVFILT_USER, 0, NOTE_TRIGGER, 0, nullptr);
277 6986x ::kevent(kq_fd_, &ev, 1, nullptr, 0, nullptr);
278 6986x }
279 7407x }
280
281 inline long
282 45210x kqueue_scheduler::calculate_timeout(long requested_timeout_us) const
283 {
284
1/2
✓ Branch 0 taken 45210 times.
✗ Branch 1 not taken.
45210x if (requested_timeout_us == 0)
285 return 0;
286
287 45210x auto nearest = timer_svc_->nearest_expiry();
288
2/2
✓ Branch 0 taken 2447 times.
✓ Branch 1 taken 42763 times.
45210x if (nearest == timer_service::time_point::max())
289 2447x return requested_timeout_us;
290
291 42763x auto now = std::chrono::steady_clock::now();
292
2/2
✓ Branch 0 taken 80 times.
✓ Branch 1 taken 42683 times.
42763x if (nearest <= now)
293 80x return 0;
294
295 42683x auto timer_timeout_us =
296 42683x std::chrono::duration_cast<std::chrono::microseconds>(nearest - now)
297 42683x .count();
298
299 42683x constexpr auto long_max =
300 static_cast<long long>((std::numeric_limits<long>::max)());
301 42683x auto capped_timer_us = std::min(
302 42683x std::max(timer_timeout_us, static_cast<long long>(0)), long_max);
303
304
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 42683 times.
42683x if (requested_timeout_us < 0)
305 42683x return static_cast<long>(capped_timer_us);
306
307 return static_cast<long>(std::min(
308 static_cast<long long>(requested_timeout_us), capped_timer_us));
309 45210x }
310
311 inline void
312 85779x kqueue_scheduler::run_task(
313 lock_type& lock, context_type* ctx, long timeout_us)
314 {
315 85779x long effective_timeout_us =
316
2/2
✓ Branch 0 taken 40569 times.
✓ Branch 1 taken 45210 times.
85779x task_interrupted_ ? 0 : calculate_timeout(timeout_us);
317
318
2/2
✓ Branch 0 taken 40569 times.
✓ Branch 1 taken 45210 times.
85779x if (lock.owns_lock())
319 45210x lock.unlock();
320
321 85779x task_cleanup on_exit{this, &lock, ctx};
322
323 struct timespec ts;
324 85779x struct timespec* ts_ptr = nullptr;
325
2/2
✓ Branch 0 taken 2443 times.
✓ Branch 1 taken 83336 times.
85779x if (effective_timeout_us >= 0)
326 {
327 83336x ts.tv_sec = effective_timeout_us / 1000000;
328 83336x ts.tv_nsec = (effective_timeout_us % 1000000) * 1000;
329 83336x ts_ptr = &ts;
330 83336x }
331
332
1/2
✓ Branch 0 taken 85779 times.
✗ Branch 1 not taken.
85779x int nev = ::kevent(
333 85779x kq_fd_, nullptr, 0, event_buffer_.data(),
334 85779x static_cast<int>(event_buffer_.size()), ts_ptr);
335
1/2
✓ Branch 0 taken 85779 times.
✗ Branch 1 not taken.
85779x int saved_errno = errno;
336
337
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 85779 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
85779x if (nev < 0 && saved_errno != EINTR)
338 detail::throw_system_error(make_err(saved_errno), "kevent");
339
340 85779x op_queue local_ops;
341
342
2/2
✓ Branch 0 taken 85779 times.
✓ Branch 1 taken 116618 times.
202397x for (int i = 0; i < nev; ++i)
343 {
344
2/2
✓ Branch 0 taken 6323 times.
✓ Branch 1 taken 110295 times.
116618x if (event_buffer_[i].filter == EVFILT_USER)
345 {
346 6323x user_event_armed_.store(false, std::memory_order_release);
347 6323x continue;
348 }
349
350 110295x auto* desc =
351 110295x static_cast<reactor_descriptor_state*>(event_buffer_[i].udata);
352
1/2
✓ Branch 0 taken 110295 times.
✗ Branch 1 not taken.
110295x if (!desc)
353 continue;
354
355 110295x std::uint32_t ready = 0;
356
357
2/2
✓ Branch 0 taken 56899 times.
✓ Branch 1 taken 53396 times.
110295x if (event_buffer_[i].filter == EVFILT_READ)
358 53396x ready |= reactor_event_read;
359
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 56899 times.
56899x else if (event_buffer_[i].filter == EVFILT_WRITE)
360 56899x ready |= reactor_event_write;
361
362
1/2
✓ Branch 0 taken 110295 times.
✗ Branch 1 not taken.
110295x if (event_buffer_[i].flags & EV_ERROR)
363 ready |= reactor_event_error;
364
365
2/2
✓ Branch 0 taken 110245 times.
✓ Branch 1 taken 50 times.
110295x if (event_buffer_[i].flags & EV_EOF)
366 {
367
2/2
✓ Branch 0 taken 13 times.
✓ Branch 1 taken 37 times.
50x if (event_buffer_[i].filter == EVFILT_READ)
368 37x ready |= reactor_event_read;
369
2/2
✓ Branch 0 taken 28 times.
✓ Branch 1 taken 22 times.
50x if (event_buffer_[i].fflags != 0)
370 22x ready |= reactor_event_error;
371 50x }
372
373 110295x desc->add_ready_events(ready);
374
375 110295x bool expected = false;
376
2/2
✓ Branch 0 taken 125 times.
✓ Branch 1 taken 110170 times.
110295x if (desc->is_enqueued_.compare_exchange_strong(
377 expected, true, std::memory_order_acq_rel,
378 std::memory_order_acquire))
379 {
380 110170x local_ops.push(desc);
381 110170x }
382 110295x }
383
384
1/2
✓ Branch 0 taken 85779 times.
✗ Branch 1 not taken.
85779x timer_svc_->process_expired();
385
386
1/2
✓ Branch 0 taken 85779 times.
✗ Branch 1 not taken.
85779x lock.lock();
387
388
2/2
✓ Branch 0 taken 25379 times.
✓ Branch 1 taken 60400 times.
85779x if (!local_ops.empty())
389 60400x completed_ops_.splice(local_ops);
390 85779x }
391
392 } // namespace boost::corosio::detail
393
394 #endif // BOOST_COROSIO_HAS_KQUEUE
395
396 #endif // BOOST_COROSIO_NATIVE_DETAIL_KQUEUE_KQUEUE_SCHEDULER_HPP
397