include/boost/corosio/native/detail/kqueue/kqueue_scheduler.hpp

90.1% Lines (127/141) 100.0% List of functions (12/13) 54.0% Branches (54/100)
kqueue_scheduler.hpp
f(x) Functions (13)
Function Calls Lines Branches Blocks
<unknown function 157> :157 boost::corosio::detail::kqueue_scheduler::kqueue_scheduler(boost::capy::execution_context&, int) :163 1326x 61.1% 25.0% 100.0% boost::corosio::detail::kqueue_scheduler::kqueue_scheduler(boost::capy::execution_context&, int)::'lambda'(void*)::__invoke(void*) :189 3815x 100.0% 100.0% boost::corosio::detail::kqueue_scheduler::kqueue_scheduler(boost::capy::execution_context&, int)::'lambda'(void*)::operator void (*)(void*)() const :189 663x 100.0% 100.0% boost::corosio::detail::kqueue_scheduler::kqueue_scheduler(boost::capy::execution_context&, int)::'lambda'(void*)::operator()(void*) const :189 3815x 100.0% 50.0% 100.0% boost::corosio::detail::kqueue_scheduler::~kqueue_scheduler() :201 1989x 100.0% 50.0% 100.0% boost::corosio::detail::kqueue_scheduler::shutdown() :208 663x 100.0% 50.0% 100.0% boost::corosio::detail::kqueue_scheduler::configure_reactor(unsigned int, unsigned int, unsigned int, unsigned int) :217 8x 100.0% 100.0% boost::corosio::detail::kqueue_scheduler::register_descriptor(int, boost::corosio::detail::reactor_descriptor_state*) const :229 8723x 93.3% 50.0% 85.0% boost::corosio::detail::kqueue_scheduler::deregister_descriptor(int) const :255 8723x 100.0% 100.0% boost::corosio::detail::kqueue_scheduler::interrupt_reactor() const :268 6616x 100.0% 100.0% 100.0% boost::corosio::detail::kqueue_scheduler::calculate_timeout(long) const :282 47963x 85.0% 75.0% 80.0% boost::corosio::detail::kqueue_scheduler::run_task(boost::corosio::detail::conditionally_enabled_mutex::scoped_lock&, boost::corosio::detail::reactor_scheduler_context*, long) :312 77398x 94.1% 71.4% 83.0%
Line Branch TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Michael Vandeberg
3 // Copyright (c) 2026 Steve Gerbino
4 //
5 // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 //
8 // Official repository: https://github.com/cppalliance/corosio
9 //
10
11 #ifndef BOOST_COROSIO_NATIVE_DETAIL_KQUEUE_KQUEUE_SCHEDULER_HPP
12 #define BOOST_COROSIO_NATIVE_DETAIL_KQUEUE_KQUEUE_SCHEDULER_HPP
13
14 #include <boost/corosio/detail/platform.hpp>
15
16 #if BOOST_COROSIO_HAS_KQUEUE
17
18 #include <boost/corosio/detail/config.hpp>
19 #include <boost/capy/ex/execution_context.hpp>
20
21 #include <boost/corosio/native/detail/reactor/reactor_scheduler.hpp>
22
23 #include <boost/corosio/native/detail/kqueue/kqueue_traits.hpp>
24 #include <boost/corosio/detail/timer_service.hpp>
25 #include <boost/corosio/native/detail/make_err.hpp>
26 #include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
27 #include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
28 #include <boost/corosio/native/detail/posix/posix_stream_file_service.hpp>
29 #include <boost/corosio/native/detail/posix/posix_random_access_file_service.hpp>
30
31 #include <boost/corosio/detail/except.hpp>
32
33 #include <atomic>
34 #include <chrono>
35 #include <cstdint>
36 #include <limits>
37 #include <mutex>
38 #include <vector>
39
40 #include <errno.h>
41 #include <fcntl.h>
42 #include <sys/event.h>
43 #include <sys/time.h>
44 #include <unistd.h>
45
46 namespace boost::corosio::detail {
47
48 struct kqueue_op;
49
50 /** macOS/BSD scheduler using kqueue for I/O multiplexing.
51
52 This scheduler implements the scheduler interface using the BSD kqueue
53 API for efficient I/O event notification. It uses a single reactor model
54 where one thread runs kevent() while other threads
55 wait on a condition variable for handler work. This design provides:
56
57 - Handler parallelism: N posted handlers can execute on N threads
58 - No thundering herd: condition_variable wakes exactly one thread
59 - IOCP parity: Behavior matches Windows I/O completion port semantics
60
61 When threads call run(), they first try to execute queued handlers.
62 If the queue is empty and no reactor is running, one thread becomes
63 the reactor and runs kevent(). Other threads wait on a condition
64 variable until handlers are available.
65
66 kqueue uses EV_CLEAR for edge-triggered semantics (equivalent to
67 epoll's EPOLLET). File descriptors are registered once with both
68 EVFILT_READ and EVFILT_WRITE and stay registered until closed.
69
70 @par Thread Safety
71 All public member functions are thread-safe.
72 */
73 class BOOST_COROSIO_DECL kqueue_scheduler final : public reactor_scheduler
74 {
75 public:
76 /** Construct the scheduler.
77
78 Creates a kqueue file descriptor via kqueue(), sets
79 close-on-exec, and registers EVFILT_USER for reactor
80 interruption. On failure the kqueue fd is closed before
81 throwing.
82
83 @param ctx Reference to the owning execution_context.
84 @param concurrency_hint Hint for expected thread count (unused).
85
86 @throws std::system_error if kqueue() fails, if setting
87 FD_CLOEXEC on the kqueue fd fails, or if registering
88 the EVFILT_USER event fails. The error code contains
89 the errno from the failed syscall.
90 */
91 kqueue_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
92
93 /** Destructor.
94
95 Closes the kqueue file descriptor if valid. Does not throw.
96 */
97 ~kqueue_scheduler();
98
99 kqueue_scheduler(kqueue_scheduler const&) = delete;
100 kqueue_scheduler& operator=(kqueue_scheduler const&) = delete;
101
102 /// Shut down the scheduler, draining pending operations.
103 void shutdown() override;
104
105 /// Apply runtime configuration, resizing the event buffer.
106 void configure_reactor(
107 unsigned max_events,
108 unsigned budget_init,
109 unsigned budget_max,
110 unsigned unassisted) override;
111
112 /** Return the kqueue file descriptor.
113
114 Used by socket services to register file descriptors
115 for I/O event notification.
116
117 @return The kqueue file descriptor.
118 */
119 int kq_fd() const noexcept
120 {
121 return kq_fd_;
122 }
123
124 /** Register a descriptor for persistent monitoring.
125
126 Adds EVFILT_READ and EVFILT_WRITE (both EV_CLEAR) for @a fd
127 and stores @a desc in the kevent udata field so that the
128 reactor can dispatch events to the correct reactor_descriptor_state.
129
130 @param fd The file descriptor to register.
131 @param desc Pointer to the caller-owned reactor_descriptor_state.
132
133 @throws std::system_error if kevent(EV_ADD) fails.
134 */
135 void register_descriptor(int fd, reactor_descriptor_state* desc) const;
136
137 /** Deregister a persistently registered descriptor.
138
139 Issues kevent(EV_DELETE) for both EVFILT_READ and EVFILT_WRITE.
140 Errors are silently ignored because the fd may already be
141 closed and kqueue automatically removes closed descriptors.
142
143 @param fd The file descriptor to deregister.
144 */
145 void deregister_descriptor(int fd) const;
146
147 private:
148 void
149 run_task(lock_type& lock, context_type* ctx,
150 long timeout_us) override;
151 void interrupt_reactor() const override;
152 long calculate_timeout(long requested_timeout_us) const;
153
154 int kq_fd_;
155
156 // EVFILT_USER idempotency
157 663x mutable std::atomic<bool> user_event_armed_{false};
158
159 // Event buffer sized from max_events_per_poll_.
160 std::vector<struct kevent> event_buffer_;
161 };
162
163 1326x inline kqueue_scheduler::kqueue_scheduler(capy::execution_context& ctx, int)
164 663x : kq_fd_(-1)
165
1/2
✓ Branch 0 taken 663 times.
✗ Branch 1 not taken.
663x , event_buffer_(max_events_per_poll_)
166 1326x {
167
1/2
✓ Branch 0 taken 663 times.
✗ Branch 1 not taken.
663x kq_fd_ = ::kqueue();
168
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 663 times.
663x if (kq_fd_ < 0)
169 detail::throw_system_error(make_err(errno), "kqueue");
170
171
2/4
✓ Branch 0 taken 663 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 663 times.
663x if (::fcntl(kq_fd_, F_SETFD, FD_CLOEXEC) == -1)
172 {
173 int errn = errno;
174 ::close(kq_fd_);
175 detail::throw_system_error(make_err(errn), "fcntl (kqueue FD_CLOEXEC)");
176 }
177
178 struct kevent ev;
179 663x EV_SET(&ev, 0, EVFILT_USER, EV_ADD | EV_CLEAR, 0, 0, nullptr);
180
2/4
✓ Branch 0 taken 663 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 663 times.
663x if (::kevent(kq_fd_, &ev, 1, nullptr, 0, nullptr) < 0)
181 {
182 int errn = errno;
183 ::close(kq_fd_);
184 detail::throw_system_error(make_err(errn), "kevent (EVFILT_USER)");
185 }
186
187
1/2
✓ Branch 0 taken 663 times.
✗ Branch 1 not taken.
663x timer_svc_ = &get_timer_service(ctx, *this);
188 1326x timer_svc_->set_on_earliest_changed(
189 4478x timer_service::callback(this, [](void* p) {
190 3815x static_cast<kqueue_scheduler*>(p)->interrupt_reactor();
191 3815x }));
192
193
1/2
✓ Branch 0 taken 663 times.
✗ Branch 1 not taken.
663x get_resolver_service(ctx, *this);
194
1/2
✓ Branch 0 taken 663 times.
✗ Branch 1 not taken.
663x get_signal_service(ctx, *this);
195
1/2
✓ Branch 0 taken 663 times.
✗ Branch 1 not taken.
663x get_stream_file_service(ctx, *this);
196
1/2
✓ Branch 0 taken 663 times.
✗ Branch 1 not taken.
663x get_random_access_file_service(ctx, *this);
197
198 663x completed_ops_.push(&task_op_);
199 1326x }
200
201 1989x inline kqueue_scheduler::~kqueue_scheduler()
202 1326x {
203
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 663 times.
663x if (kq_fd_ >= 0)
204
1/2
✓ Branch 0 taken 663 times.
✗ Branch 1 not taken.
663x ::close(kq_fd_);
205 1989x }
206
207 inline void
208 663x kqueue_scheduler::shutdown()
209 {
210 663x shutdown_drain();
211
212
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 663 times.
663x if (kq_fd_ >= 0)
213 663x interrupt_reactor();
214 663x }
215
216 inline void
217 8x kqueue_scheduler::configure_reactor(
218 unsigned max_events,
219 unsigned budget_init,
220 unsigned budget_max,
221 unsigned unassisted)
222 {
223 8x reactor_scheduler::configure_reactor(
224 8x max_events, budget_init, budget_max, unassisted);
225 8x event_buffer_.resize(max_events_per_poll_);
226 8x }
227
228 inline void
229 8723x kqueue_scheduler::register_descriptor(int fd, reactor_descriptor_state* desc) const
230 {
231 struct kevent changes[2];
232 8723x EV_SET(
233 &changes[0], static_cast<uintptr_t>(fd), EVFILT_READ, EV_ADD | EV_CLEAR,
234 0, 0, desc);
235 8723x EV_SET(
236 &changes[1], static_cast<uintptr_t>(fd), EVFILT_WRITE,
237 EV_ADD | EV_CLEAR, 0, 0, desc);
238
239
1/2
✓ Branch 0 taken 8723 times.
✗ Branch 1 not taken.
8723x if (::kevent(kq_fd_, changes, 2, nullptr, 0, nullptr) < 0)
240 detail::throw_system_error(make_err(errno), "kevent (register)");
241
242 8723x desc->registered_events = reactor_event_read | reactor_event_write;
243 8723x desc->fd = fd;
244 8723x desc->scheduler_ = this;
245 8723x desc->mutex.set_enabled(!single_threaded_);
246 8723x desc->ready_events_.store(0, std::memory_order_relaxed);
247
248 8723x conditionally_enabled_mutex::scoped_lock lock(desc->mutex);
249 8723x desc->impl_ref_.reset();
250 8723x desc->read_ready = false;
251 8723x desc->write_ready = false;
252 8723x }
253
254 inline void
255 8723x kqueue_scheduler::deregister_descriptor(int fd) const
256 {
257 struct kevent changes[2];
258 8723x EV_SET(
259 &changes[0], static_cast<uintptr_t>(fd), EVFILT_READ, EV_DELETE, 0, 0,
260 nullptr);
261 8723x EV_SET(
262 &changes[1], static_cast<uintptr_t>(fd), EVFILT_WRITE, EV_DELETE, 0, 0,
263 nullptr);
264 8723x ::kevent(kq_fd_, changes, 2, nullptr, 0, nullptr);
265 8723x }
266
267 inline void
268 6616x kqueue_scheduler::interrupt_reactor() const
269 {
270 6616x bool expected = false;
271
2/2
✓ Branch 0 taken 421 times.
✓ Branch 1 taken 6195 times.
6616x if (user_event_armed_.compare_exchange_strong(
272 expected, true, std::memory_order_acq_rel,
273 std::memory_order_acquire))
274 {
275 struct kevent ev;
276 6195x EV_SET(&ev, 0, EVFILT_USER, 0, NOTE_TRIGGER, 0, nullptr);
277 6195x ::kevent(kq_fd_, &ev, 1, nullptr, 0, nullptr);
278 6195x }
279 6616x }
280
281 inline long
282 47963x kqueue_scheduler::calculate_timeout(long requested_timeout_us) const
283 {
284
1/2
✓ Branch 0 taken 47963 times.
✗ Branch 1 not taken.
47963x if (requested_timeout_us == 0)
285 return 0;
286
287 47963x auto nearest = timer_svc_->nearest_expiry();
288
2/2
✓ Branch 0 taken 2131 times.
✓ Branch 1 taken 45832 times.
47963x if (nearest == timer_service::time_point::max())
289 2131x return requested_timeout_us;
290
291 45832x auto now = std::chrono::steady_clock::now();
292
2/2
✓ Branch 0 taken 141 times.
✓ Branch 1 taken 45691 times.
45832x if (nearest <= now)
293 141x return 0;
294
295 45691x auto timer_timeout_us =
296 45691x std::chrono::duration_cast<std::chrono::microseconds>(nearest - now)
297 45691x .count();
298
299 45691x constexpr auto long_max =
300 static_cast<long long>((std::numeric_limits<long>::max)());
301 45691x auto capped_timer_us = std::min(
302 45691x std::max(timer_timeout_us, static_cast<long long>(0)), long_max);
303
304
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 45691 times.
45691x if (requested_timeout_us < 0)
305 45691x return static_cast<long>(capped_timer_us);
306
307 return static_cast<long>(std::min(
308 static_cast<long long>(requested_timeout_us), capped_timer_us));
309 47963x }
310
311 inline void
312 77398x kqueue_scheduler::run_task(
313 lock_type& lock, context_type* ctx, long timeout_us)
314 {
315 77398x long effective_timeout_us =
316
2/2
✓ Branch 0 taken 29435 times.
✓ Branch 1 taken 47963 times.
77398x task_interrupted_ ? 0 : calculate_timeout(timeout_us);
317
318
2/2
✓ Branch 0 taken 29435 times.
✓ Branch 1 taken 47963 times.
77398x if (lock.owns_lock())
319 47963x lock.unlock();
320
321 77398x task_cleanup on_exit{this, &lock, ctx};
322
323 struct timespec ts;
324 77398x struct timespec* ts_ptr = nullptr;
325
2/2
✓ Branch 0 taken 2127 times.
✓ Branch 1 taken 75271 times.
77398x if (effective_timeout_us >= 0)
326 {
327 75271x ts.tv_sec = effective_timeout_us / 1000000;
328 75271x ts.tv_nsec = (effective_timeout_us % 1000000) * 1000;
329 75271x ts_ptr = &ts;
330 75271x }
331
332
1/2
✓ Branch 0 taken 77398 times.
✗ Branch 1 not taken.
77398x int nev = ::kevent(
333 77398x kq_fd_, nullptr, 0, event_buffer_.data(),
334 77398x static_cast<int>(event_buffer_.size()), ts_ptr);
335
1/2
✓ Branch 0 taken 77398 times.
✗ Branch 1 not taken.
77398x int saved_errno = errno;
336
337
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 77398 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
77398x if (nev < 0 && saved_errno != EINTR)
338 detail::throw_system_error(make_err(saved_errno), "kevent");
339
340 77398x op_queue local_ops;
341
342
2/2
✓ Branch 0 taken 77398 times.
✓ Branch 1 taken 108706 times.
186104x for (int i = 0; i < nev; ++i)
343 {
344
2/2
✓ Branch 0 taken 5532 times.
✓ Branch 1 taken 103174 times.
108706x if (event_buffer_[i].filter == EVFILT_USER)
345 {
346 5532x user_event_armed_.store(false, std::memory_order_release);
347 5532x continue;
348 }
349
350 103174x auto* desc =
351 103174x static_cast<reactor_descriptor_state*>(event_buffer_[i].udata);
352
1/2
✓ Branch 0 taken 103174 times.
✗ Branch 1 not taken.
103174x if (!desc)
353 continue;
354
355 103174x std::uint32_t ready = 0;
356
357
2/2
✓ Branch 0 taken 52445 times.
✓ Branch 1 taken 50729 times.
103174x if (event_buffer_[i].filter == EVFILT_READ)
358 50729x ready |= reactor_event_read;
359
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 52445 times.
52445x else if (event_buffer_[i].filter == EVFILT_WRITE)
360 52445x ready |= reactor_event_write;
361
362
1/2
✓ Branch 0 taken 103174 times.
✗ Branch 1 not taken.
103174x if (event_buffer_[i].flags & EV_ERROR)
363 ready |= reactor_event_error;
364
365
2/2
✓ Branch 0 taken 103129 times.
✓ Branch 1 taken 45 times.
103174x if (event_buffer_[i].flags & EV_EOF)
366 {
367
2/2
✓ Branch 0 taken 11 times.
✓ Branch 1 taken 34 times.
45x if (event_buffer_[i].filter == EVFILT_READ)
368 34x ready |= reactor_event_read;
369
2/2
✓ Branch 0 taken 23 times.
✓ Branch 1 taken 22 times.
45x if (event_buffer_[i].fflags != 0)
370 22x ready |= reactor_event_error;
371 45x }
372
373 103174x desc->add_ready_events(ready);
374
375 103174x bool expected = false;
376
2/2
✓ Branch 0 taken 92 times.
✓ Branch 1 taken 103082 times.
103174x if (desc->is_enqueued_.compare_exchange_strong(
377 expected, true, std::memory_order_acq_rel,
378 std::memory_order_acquire))
379 {
380 103082x local_ops.push(desc);
381 103082x }
382 103174x }
383
384
1/2
✓ Branch 0 taken 77398 times.
✗ Branch 1 not taken.
77398x timer_svc_->process_expired();
385
386
1/2
✓ Branch 0 taken 77398 times.
✗ Branch 1 not taken.
77398x lock.lock();
387
388
2/2
✓ Branch 0 taken 19780 times.
✓ Branch 1 taken 57618 times.
77398x if (!local_ops.empty())
389 57618x completed_ops_.splice(local_ops);
390 77398x }
391
392 } // namespace boost::corosio::detail
393
394 #endif // BOOST_COROSIO_HAS_KQUEUE
395
396 #endif // BOOST_COROSIO_NATIVE_DETAIL_KQUEUE_KQUEUE_SCHEDULER_HPP
397