include/boost/corosio/native/detail/kqueue/kqueue_scheduler.hpp

90.1% Lines (127/141) 100.0% List of functions (12/13) 54.0% Branches (54/100)
kqueue_scheduler.hpp
f(x) Functions (13)
Function Calls Lines Branches Blocks
<unknown function 157> :157 boost::corosio::detail::kqueue_scheduler::kqueue_scheduler(boost::capy::execution_context&, int) :163 1452x 61.1% 25.0% 100.0% boost::corosio::detail::kqueue_scheduler::kqueue_scheduler(boost::capy::execution_context&, int)::'lambda'(void*)::__invoke(void*) :189 5551x 100.0% 100.0% boost::corosio::detail::kqueue_scheduler::kqueue_scheduler(boost::capy::execution_context&, int)::'lambda'(void*)::operator void (*)(void*)() const :189 726x 100.0% 100.0% boost::corosio::detail::kqueue_scheduler::kqueue_scheduler(boost::capy::execution_context&, int)::'lambda'(void*)::operator()(void*) const :189 5551x 100.0% 50.0% 100.0% boost::corosio::detail::kqueue_scheduler::~kqueue_scheduler() :201 2178x 100.0% 50.0% 100.0% boost::corosio::detail::kqueue_scheduler::shutdown() :208 726x 100.0% 50.0% 100.0% boost::corosio::detail::kqueue_scheduler::configure_reactor(unsigned int, unsigned int, unsigned int, unsigned int) :217 10x 100.0% 100.0% boost::corosio::detail::kqueue_scheduler::register_descriptor(int, boost::corosio::detail::reactor_descriptor_state*) const :229 12710x 93.3% 50.0% 85.0% boost::corosio::detail::kqueue_scheduler::deregister_descriptor(int) const :255 12710x 100.0% 100.0% boost::corosio::detail::kqueue_scheduler::interrupt_reactor() const :268 9200x 100.0% 100.0% 100.0% boost::corosio::detail::kqueue_scheduler::calculate_timeout(long) const :282 67168x 85.0% 75.0% 80.0% boost::corosio::detail::kqueue_scheduler::run_task(boost::corosio::detail::conditionally_enabled_mutex::scoped_lock&, boost::corosio::detail::reactor_scheduler_context*, long) :312 110912x 94.1% 71.4% 83.0%
Line Branch TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Michael Vandeberg
3 // Copyright (c) 2026 Steve Gerbino
4 //
5 // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 //
8 // Official repository: https://github.com/cppalliance/corosio
9 //
10
11 #ifndef BOOST_COROSIO_NATIVE_DETAIL_KQUEUE_KQUEUE_SCHEDULER_HPP
12 #define BOOST_COROSIO_NATIVE_DETAIL_KQUEUE_KQUEUE_SCHEDULER_HPP
13
14 #include <boost/corosio/detail/platform.hpp>
15
16 #if BOOST_COROSIO_HAS_KQUEUE
17
18 #include <boost/corosio/detail/config.hpp>
19 #include <boost/capy/ex/execution_context.hpp>
20
21 #include <boost/corosio/native/detail/reactor/reactor_scheduler.hpp>
22
23 #include <boost/corosio/native/detail/kqueue/kqueue_traits.hpp>
24 #include <boost/corosio/detail/timer_service.hpp>
25 #include <boost/corosio/native/detail/make_err.hpp>
26 #include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
27 #include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
28 #include <boost/corosio/native/detail/posix/posix_stream_file_service.hpp>
29 #include <boost/corosio/native/detail/posix/posix_random_access_file_service.hpp>
30
31 #include <boost/corosio/detail/except.hpp>
32
33 #include <atomic>
34 #include <chrono>
35 #include <cstdint>
36 #include <limits>
37 #include <mutex>
38 #include <vector>
39
40 #include <errno.h>
41 #include <fcntl.h>
42 #include <sys/event.h>
43 #include <sys/time.h>
44 #include <unistd.h>
45
46 namespace boost::corosio::detail {
47
48 struct kqueue_op;
49
50 /** macOS/BSD scheduler using kqueue for I/O multiplexing.
51
52 This scheduler implements the scheduler interface using the BSD kqueue
53 API for efficient I/O event notification. It uses a single reactor model
54 where one thread runs kevent() while other threads
55 wait on a condition variable for handler work. This design provides:
56
57 - Handler parallelism: N posted handlers can execute on N threads
58 - No thundering herd: condition_variable wakes exactly one thread
59 - IOCP parity: Behavior matches Windows I/O completion port semantics
60
61 When threads call run(), they first try to execute queued handlers.
62 If the queue is empty and no reactor is running, one thread becomes
63 the reactor and runs kevent(). Other threads wait on a condition
64 variable until handlers are available.
65
66 kqueue uses EV_CLEAR for edge-triggered semantics (equivalent to
67 epoll's EPOLLET). File descriptors are registered once with both
68 EVFILT_READ and EVFILT_WRITE and stay registered until closed.
69
70 @par Thread Safety
71 All public member functions are thread-safe.
72 */
73 class BOOST_COROSIO_DECL kqueue_scheduler final : public reactor_scheduler
74 {
75 public:
76 /** Construct the scheduler.
77
78 Creates a kqueue file descriptor via kqueue(), sets
79 close-on-exec, and registers EVFILT_USER for reactor
80 interruption. On failure the kqueue fd is closed before
81 throwing.
82
83 @param ctx Reference to the owning execution_context.
84 @param concurrency_hint Hint for expected thread count (unused).
85
86 @throws std::system_error if kqueue() fails, if setting
87 FD_CLOEXEC on the kqueue fd fails, or if registering
88 the EVFILT_USER event fails. The error code contains
89 the errno from the failed syscall.
90 */
91 kqueue_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
92
93 /** Destructor.
94
95 Closes the kqueue file descriptor if valid. Does not throw.
96 */
97 ~kqueue_scheduler();
98
99 kqueue_scheduler(kqueue_scheduler const&) = delete;
100 kqueue_scheduler& operator=(kqueue_scheduler const&) = delete;
101
102 /// Shut down the scheduler, draining pending operations.
103 void shutdown() override;
104
105 /// Apply runtime configuration, resizing the event buffer.
106 void configure_reactor(
107 unsigned max_events,
108 unsigned budget_init,
109 unsigned budget_max,
110 unsigned unassisted) override;
111
112 /** Return the kqueue file descriptor.
113
114 Used by socket services to register file descriptors
115 for I/O event notification.
116
117 @return The kqueue file descriptor.
118 */
119 int kq_fd() const noexcept
120 {
121 return kq_fd_;
122 }
123
124 /** Register a descriptor for persistent monitoring.
125
126 Adds EVFILT_READ and EVFILT_WRITE (both EV_CLEAR) for @a fd
127 and stores @a desc in the kevent udata field so that the
128 reactor can dispatch events to the correct reactor_descriptor_state.
129
130 @param fd The file descriptor to register.
131 @param desc Pointer to the caller-owned reactor_descriptor_state.
132
133 @throws std::system_error if kevent(EV_ADD) fails.
134 */
135 void register_descriptor(int fd, reactor_descriptor_state* desc) const;
136
137 /** Deregister a persistently registered descriptor.
138
139 Issues kevent(EV_DELETE) for both EVFILT_READ and EVFILT_WRITE.
140 Errors are silently ignored because the fd may already be
141 closed and kqueue automatically removes closed descriptors.
142
143 @param fd The file descriptor to deregister.
144 */
145 void deregister_descriptor(int fd) const;
146
147 private:
148 void
149 run_task(lock_type& lock, context_type* ctx,
150 long timeout_us) override;
151 void interrupt_reactor() const override;
152 long calculate_timeout(long requested_timeout_us) const;
153
154 int kq_fd_;
155
156 // EVFILT_USER idempotency
157 726x mutable std::atomic<bool> user_event_armed_{false};
158
159 // Event buffer sized from max_events_per_poll_.
160 std::vector<struct kevent> event_buffer_;
161 };
162
163 1452x inline kqueue_scheduler::kqueue_scheduler(capy::execution_context& ctx, int)
164 726x : kq_fd_(-1)
165
1/2
✓ Branch 0 taken 726 times.
✗ Branch 1 not taken.
726x , event_buffer_(max_events_per_poll_)
166 1452x {
167
1/2
✓ Branch 0 taken 726 times.
✗ Branch 1 not taken.
726x kq_fd_ = ::kqueue();
168
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 726 times.
726x if (kq_fd_ < 0)
169 detail::throw_system_error(make_err(errno), "kqueue");
170
171
2/4
✓ Branch 0 taken 726 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 726 times.
726x if (::fcntl(kq_fd_, F_SETFD, FD_CLOEXEC) == -1)
172 {
173 int errn = errno;
174 ::close(kq_fd_);
175 detail::throw_system_error(make_err(errn), "fcntl (kqueue FD_CLOEXEC)");
176 }
177
178 struct kevent ev;
179 726x EV_SET(&ev, 0, EVFILT_USER, EV_ADD | EV_CLEAR, 0, 0, nullptr);
180
2/4
✓ Branch 0 taken 726 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 726 times.
726x if (::kevent(kq_fd_, &ev, 1, nullptr, 0, nullptr) < 0)
181 {
182 int errn = errno;
183 ::close(kq_fd_);
184 detail::throw_system_error(make_err(errn), "kevent (EVFILT_USER)");
185 }
186
187
1/2
✓ Branch 0 taken 726 times.
✗ Branch 1 not taken.
726x timer_svc_ = &get_timer_service(ctx, *this);
188 1452x timer_svc_->set_on_earliest_changed(
189 6277x timer_service::callback(this, [](void* p) {
190 5551x static_cast<kqueue_scheduler*>(p)->interrupt_reactor();
191 5551x }));
192
193
1/2
✓ Branch 0 taken 726 times.
✗ Branch 1 not taken.
726x get_resolver_service(ctx, *this);
194
1/2
✓ Branch 0 taken 726 times.
✗ Branch 1 not taken.
726x get_signal_service(ctx, *this);
195
1/2
✓ Branch 0 taken 726 times.
✗ Branch 1 not taken.
726x get_stream_file_service(ctx, *this);
196
1/2
✓ Branch 0 taken 726 times.
✗ Branch 1 not taken.
726x get_random_access_file_service(ctx, *this);
197
198 726x completed_ops_.push(&task_op_);
199 1452x }
200
201 2178x inline kqueue_scheduler::~kqueue_scheduler()
202 1452x {
203
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 726 times.
726x if (kq_fd_ >= 0)
204
1/2
✓ Branch 0 taken 726 times.
✗ Branch 1 not taken.
726x ::close(kq_fd_);
205 2178x }
206
207 inline void
208 726x kqueue_scheduler::shutdown()
209 {
210 726x shutdown_drain();
211
212
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 726 times.
726x if (kq_fd_ >= 0)
213 726x interrupt_reactor();
214 726x }
215
216 inline void
217 10x kqueue_scheduler::configure_reactor(
218 unsigned max_events,
219 unsigned budget_init,
220 unsigned budget_max,
221 unsigned unassisted)
222 {
223 10x reactor_scheduler::configure_reactor(
224 10x max_events, budget_init, budget_max, unassisted);
225 10x event_buffer_.resize(max_events_per_poll_);
226 10x }
227
228 inline void
229 12710x kqueue_scheduler::register_descriptor(int fd, reactor_descriptor_state* desc) const
230 {
231 struct kevent changes[2];
232 12710x EV_SET(
233 &changes[0], static_cast<uintptr_t>(fd), EVFILT_READ, EV_ADD | EV_CLEAR,
234 0, 0, desc);
235 12710x EV_SET(
236 &changes[1], static_cast<uintptr_t>(fd), EVFILT_WRITE,
237 EV_ADD | EV_CLEAR, 0, 0, desc);
238
239
1/2
✓ Branch 0 taken 12710 times.
✗ Branch 1 not taken.
12710x if (::kevent(kq_fd_, changes, 2, nullptr, 0, nullptr) < 0)
240 detail::throw_system_error(make_err(errno), "kevent (register)");
241
242 12710x desc->registered_events = reactor_event_read | reactor_event_write;
243 12710x desc->fd = fd;
244 12710x desc->scheduler_ = this;
245 12710x desc->mutex.set_enabled(!single_threaded_);
246 12710x desc->ready_events_.store(0, std::memory_order_relaxed);
247
248 12710x conditionally_enabled_mutex::scoped_lock lock(desc->mutex);
249 12710x desc->impl_ref_.reset();
250 12710x desc->read_ready = false;
251 12710x desc->write_ready = false;
252 12710x }
253
254 inline void
255 12710x kqueue_scheduler::deregister_descriptor(int fd) const
256 {
257 struct kevent changes[2];
258 12710x EV_SET(
259 &changes[0], static_cast<uintptr_t>(fd), EVFILT_READ, EV_DELETE, 0, 0,
260 nullptr);
261 12710x EV_SET(
262 &changes[1], static_cast<uintptr_t>(fd), EVFILT_WRITE, EV_DELETE, 0, 0,
263 nullptr);
264 12710x ::kevent(kq_fd_, changes, 2, nullptr, 0, nullptr);
265 12710x }
266
267 inline void
268 9200x kqueue_scheduler::interrupt_reactor() const
269 {
270 9200x bool expected = false;
271
2/2
✓ Branch 0 taken 457 times.
✓ Branch 1 taken 8743 times.
9200x if (user_event_armed_.compare_exchange_strong(
272 expected, true, std::memory_order_acq_rel,
273 std::memory_order_acquire))
274 {
275 struct kevent ev;
276 8743x EV_SET(&ev, 0, EVFILT_USER, 0, NOTE_TRIGGER, 0, nullptr);
277 8743x ::kevent(kq_fd_, &ev, 1, nullptr, 0, nullptr);
278 8743x }
279 9200x }
280
281 inline long
282 67168x kqueue_scheduler::calculate_timeout(long requested_timeout_us) const
283 {
284
1/2
✓ Branch 0 taken 67168 times.
✗ Branch 1 not taken.
67168x if (requested_timeout_us == 0)
285 return 0;
286
287 67168x auto nearest = timer_svc_->nearest_expiry();
288
2/2
✓ Branch 0 taken 3117 times.
✓ Branch 1 taken 64051 times.
67168x if (nearest == timer_service::time_point::max())
289 3117x return requested_timeout_us;
290
291 64051x auto now = std::chrono::steady_clock::now();
292
2/2
✓ Branch 0 taken 68 times.
✓ Branch 1 taken 63983 times.
64051x if (nearest <= now)
293 68x return 0;
294
295 63983x auto timer_timeout_us =
296 63983x std::chrono::duration_cast<std::chrono::microseconds>(nearest - now)
297 63983x .count();
298
299 63983x constexpr auto long_max =
300 static_cast<long long>((std::numeric_limits<long>::max)());
301 63983x auto capped_timer_us = std::min(
302 63983x std::max(timer_timeout_us, static_cast<long long>(0)), long_max);
303
304
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 63983 times.
63983x if (requested_timeout_us < 0)
305 63983x return static_cast<long>(capped_timer_us);
306
307 return static_cast<long>(std::min(
308 static_cast<long long>(requested_timeout_us), capped_timer_us));
309 67168x }
310
311 inline void
312 110912x kqueue_scheduler::run_task(
313 lock_type& lock, context_type* ctx, long timeout_us)
314 {
315 110912x long effective_timeout_us =
316
2/2
✓ Branch 0 taken 43744 times.
✓ Branch 1 taken 67168 times.
110912x task_interrupted_ ? 0 : calculate_timeout(timeout_us);
317
318
2/2
✓ Branch 0 taken 43744 times.
✓ Branch 1 taken 67168 times.
110912x if (lock.owns_lock())
319 67168x lock.unlock();
320
321 110912x task_cleanup on_exit{this, &lock, ctx};
322
323 struct timespec ts;
324 110912x struct timespec* ts_ptr = nullptr;
325
2/2
✓ Branch 0 taken 3113 times.
✓ Branch 1 taken 107799 times.
110912x if (effective_timeout_us >= 0)
326 {
327 107799x ts.tv_sec = effective_timeout_us / 1000000;
328 107799x ts.tv_nsec = (effective_timeout_us % 1000000) * 1000;
329 107799x ts_ptr = &ts;
330 107799x }
331
332
1/2
✓ Branch 0 taken 110912 times.
✗ Branch 1 not taken.
110912x int nev = ::kevent(
333 110912x kq_fd_, nullptr, 0, event_buffer_.data(),
334 110912x static_cast<int>(event_buffer_.size()), ts_ptr);
335
1/2
✓ Branch 0 taken 110912 times.
✗ Branch 1 not taken.
110912x int saved_errno = errno;
336
337
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 110912 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
110912x if (nev < 0 && saved_errno != EINTR)
338 detail::throw_system_error(make_err(saved_errno), "kevent");
339
340 110912x op_queue local_ops;
341
342
2/2
✓ Branch 0 taken 110912 times.
✓ Branch 1 taken 160424 times.
271336x for (int i = 0; i < nev; ++i)
343 {
344
2/2
✓ Branch 0 taken 8017 times.
✓ Branch 1 taken 152407 times.
160424x if (event_buffer_[i].filter == EVFILT_USER)
345 {
346 8017x user_event_armed_.store(false, std::memory_order_release);
347 8017x continue;
348 }
349
350 152407x auto* desc =
351 152407x static_cast<reactor_descriptor_state*>(event_buffer_[i].udata);
352
1/2
✓ Branch 0 taken 152407 times.
✗ Branch 1 not taken.
152407x if (!desc)
353 continue;
354
355 152407x std::uint32_t ready = 0;
356
357
2/2
✓ Branch 0 taken 76037 times.
✓ Branch 1 taken 76370 times.
152407x if (event_buffer_[i].filter == EVFILT_READ)
358 76370x ready |= reactor_event_read;
359
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 76037 times.
76037x else if (event_buffer_[i].filter == EVFILT_WRITE)
360 76037x ready |= reactor_event_write;
361
362
1/2
✓ Branch 0 taken 152407 times.
✗ Branch 1 not taken.
152407x if (event_buffer_[i].flags & EV_ERROR)
363 ready |= reactor_event_error;
364
365
2/2
✓ Branch 0 taken 152355 times.
✓ Branch 1 taken 52 times.
152407x if (event_buffer_[i].flags & EV_EOF)
366 {
367
2/2
✓ Branch 0 taken 12 times.
✓ Branch 1 taken 40 times.
52x if (event_buffer_[i].filter == EVFILT_READ)
368 40x ready |= reactor_event_read;
369
2/2
✓ Branch 0 taken 30 times.
✓ Branch 1 taken 22 times.
52x if (event_buffer_[i].fflags != 0)
370 22x ready |= reactor_event_error;
371 52x }
372
373 152407x desc->add_ready_events(ready);
374
375 152407x bool expected = false;
376
2/2
✓ Branch 0 taken 146 times.
✓ Branch 1 taken 152261 times.
152407x if (desc->is_enqueued_.compare_exchange_strong(
377 expected, true, std::memory_order_acq_rel,
378 std::memory_order_acquire))
379 {
380 152261x local_ops.push(desc);
381 152261x }
382 152407x }
383
384
1/2
✓ Branch 0 taken 110912 times.
✗ Branch 1 not taken.
110912x timer_svc_->process_expired();
385
386
1/2
✓ Branch 0 taken 110912 times.
✗ Branch 1 not taken.
110912x lock.lock();
387
388
2/2
✓ Branch 0 taken 31250 times.
✓ Branch 1 taken 79662 times.
110912x if (!local_ops.empty())
389 79662x completed_ops_.splice(local_ops);
390 110912x }
391
392 } // namespace boost::corosio::detail
393
394 #endif // BOOST_COROSIO_HAS_KQUEUE
395
396 #endif // BOOST_COROSIO_NATIVE_DETAIL_KQUEUE_KQUEUE_SCHEDULER_HPP
397