include/boost/corosio/native/detail/kqueue/kqueue_scheduler.hpp

87.9% Lines (124/141) 91.7% List of functions (11/13) 55.0% Branches (55/100)
kqueue_scheduler.hpp
f(x) Functions (13)
Function Calls Lines Branches Blocks
<unknown function 157> :157 boost::corosio::detail::kqueue_scheduler::kqueue_scheduler(boost::capy::execution_context&, int) :163 938x 61.1% 25.0% 100.0% boost::corosio::detail::kqueue_scheduler::kqueue_scheduler(boost::capy::execution_context&, int)::'lambda'(void*)::__invoke(void*) :189 3401x 100.0% 100.0% boost::corosio::detail::kqueue_scheduler::kqueue_scheduler(boost::capy::execution_context&, int)::'lambda'(void*)::operator void (*)(void*)() const :189 469x 100.0% 100.0% boost::corosio::detail::kqueue_scheduler::kqueue_scheduler(boost::capy::execution_context&, int)::'lambda'(void*)::operator()(void*) const :189 3401x 100.0% 50.0% 100.0% boost::corosio::detail::kqueue_scheduler::~kqueue_scheduler() :201 1407x 100.0% 50.0% 100.0% boost::corosio::detail::kqueue_scheduler::shutdown() :208 469x 100.0% 50.0% 100.0% boost::corosio::detail::kqueue_scheduler::configure_reactor(unsigned int, unsigned int, unsigned int, unsigned int) :217 0 0.0% 0.0% boost::corosio::detail::kqueue_scheduler::register_descriptor(int, boost::corosio::detail::reactor_descriptor_state*) const :229 7625x 93.3% 50.0% 85.0% boost::corosio::detail::kqueue_scheduler::deregister_descriptor(int) const :255 7625x 100.0% 100.0% boost::corosio::detail::kqueue_scheduler::interrupt_reactor() const :268 5641x 100.0% 100.0% 100.0% boost::corosio::detail::kqueue_scheduler::calculate_timeout(long) const :282 31138x 95.0% 87.5% 90.0% boost::corosio::detail::kqueue_scheduler::run_task(boost::corosio::detail::conditionally_enabled_mutex::scoped_lock&, boost::corosio::detail::reactor_scheduler_context*, long) :312 54874x 94.1% 71.4% 83.0%
Line Branch TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Michael Vandeberg
3 // Copyright (c) 2026 Steve Gerbino
4 //
5 // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 //
8 // Official repository: https://github.com/cppalliance/corosio
9 //
10
11 #ifndef BOOST_COROSIO_NATIVE_DETAIL_KQUEUE_KQUEUE_SCHEDULER_HPP
12 #define BOOST_COROSIO_NATIVE_DETAIL_KQUEUE_KQUEUE_SCHEDULER_HPP
13
14 #include <boost/corosio/detail/platform.hpp>
15
16 #if BOOST_COROSIO_HAS_KQUEUE
17
18 #include <boost/corosio/detail/config.hpp>
19 #include <boost/capy/ex/execution_context.hpp>
20
21 #include <boost/corosio/native/detail/reactor/reactor_scheduler.hpp>
22
23 #include <boost/corosio/native/detail/kqueue/kqueue_traits.hpp>
24 #include <boost/corosio/detail/timer_service.hpp>
25 #include <boost/corosio/native/detail/make_err.hpp>
26 #include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
27 #include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
28 #include <boost/corosio/native/detail/posix/posix_stream_file_service.hpp>
29 #include <boost/corosio/native/detail/posix/posix_random_access_file_service.hpp>
30
31 #include <boost/corosio/detail/except.hpp>
32
33 #include <atomic>
34 #include <chrono>
35 #include <cstdint>
36 #include <limits>
37 #include <mutex>
38 #include <vector>
39
40 #include <errno.h>
41 #include <fcntl.h>
42 #include <sys/event.h>
43 #include <sys/time.h>
44 #include <unistd.h>
45
46 namespace boost::corosio::detail {
47
48 struct kqueue_op;
49
50 /** macOS/BSD scheduler using kqueue for I/O multiplexing.
51
52 This scheduler implements the scheduler interface using the BSD kqueue
53 API for efficient I/O event notification. It uses a single reactor model
54 where one thread runs kevent() while other threads
55 wait on a condition variable for handler work. This design provides:
56
57 - Handler parallelism: N posted handlers can execute on N threads
58 - No thundering herd: condition_variable wakes exactly one thread
59 - IOCP parity: Behavior matches Windows I/O completion port semantics
60
61 When threads call run(), they first try to execute queued handlers.
62 If the queue is empty and no reactor is running, one thread becomes
63 the reactor and runs kevent(). Other threads wait on a condition
64 variable until handlers are available.
65
66 kqueue uses EV_CLEAR for edge-triggered semantics (equivalent to
67 epoll's EPOLLET). File descriptors are registered once with both
68 EVFILT_READ and EVFILT_WRITE and stay registered until closed.
69
70 @par Thread Safety
71 All public member functions are thread-safe.
72 */
73 class BOOST_COROSIO_DECL kqueue_scheduler final : public reactor_scheduler
74 {
75 public:
76 /** Construct the scheduler.
77
78 Creates a kqueue file descriptor via kqueue(), sets
79 close-on-exec, and registers EVFILT_USER for reactor
80 interruption. On failure the kqueue fd is closed before
81 throwing.
82
83 @param ctx Reference to the owning execution_context.
84 @param concurrency_hint Hint for expected thread count (unused).
85
86 @throws std::system_error if kqueue() fails, if setting
87 FD_CLOEXEC on the kqueue fd fails, or if registering
88 the EVFILT_USER event fails. The error code contains
89 the errno from the failed syscall.
90 */
91 kqueue_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
92
93 /** Destructor.
94
95 Closes the kqueue file descriptor if valid. Does not throw.
96 */
97 ~kqueue_scheduler();
98
99 kqueue_scheduler(kqueue_scheduler const&) = delete;
100 kqueue_scheduler& operator=(kqueue_scheduler const&) = delete;
101
102 /// Shut down the scheduler, draining pending operations.
103 void shutdown() override;
104
105 /// Apply runtime configuration, resizing the event buffer.
106 void configure_reactor(
107 unsigned max_events,
108 unsigned budget_init,
109 unsigned budget_max,
110 unsigned unassisted) override;
111
112 /** Return the kqueue file descriptor.
113
114 Used by socket services to register file descriptors
115 for I/O event notification.
116
117 @return The kqueue file descriptor.
118 */
119 int kq_fd() const noexcept
120 {
121 return kq_fd_;
122 }
123
124 /** Register a descriptor for persistent monitoring.
125
126 Adds EVFILT_READ and EVFILT_WRITE (both EV_CLEAR) for @a fd
127 and stores @a desc in the kevent udata field so that the
128 reactor can dispatch events to the correct reactor_descriptor_state.
129
130 @param fd The file descriptor to register.
131 @param desc Pointer to the caller-owned reactor_descriptor_state.
132
133 @throws std::system_error if kevent(EV_ADD) fails.
134 */
135 void register_descriptor(int fd, reactor_descriptor_state* desc) const;
136
137 /** Deregister a persistently registered descriptor.
138
139 Issues kevent(EV_DELETE) for both EVFILT_READ and EVFILT_WRITE.
140 Errors are silently ignored because the fd may already be
141 closed and kqueue automatically removes closed descriptors.
142
143 @param fd The file descriptor to deregister.
144 */
145 void deregister_descriptor(int fd) const;
146
147 private:
148 void
149 run_task(lock_type& lock, context_type* ctx,
150 long timeout_us) override;
151 void interrupt_reactor() const override;
152 long calculate_timeout(long requested_timeout_us) const;
153
154 int kq_fd_;
155
156 // EVFILT_USER idempotency
157 469x mutable std::atomic<bool> user_event_armed_{false};
158
159 // Event buffer sized from max_events_per_poll_.
160 std::vector<struct kevent> event_buffer_;
161 };
162
163 938x inline kqueue_scheduler::kqueue_scheduler(capy::execution_context& ctx, int)
164 469x : kq_fd_(-1)
165
1/2
✓ Branch 0 taken 469 times.
✗ Branch 1 not taken.
469x , event_buffer_(max_events_per_poll_)
166 938x {
167
1/2
✓ Branch 0 taken 469 times.
✗ Branch 1 not taken.
469x kq_fd_ = ::kqueue();
168
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 469 times.
469x if (kq_fd_ < 0)
169 detail::throw_system_error(make_err(errno), "kqueue");
170
171
2/4
✓ Branch 0 taken 469 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 469 times.
469x if (::fcntl(kq_fd_, F_SETFD, FD_CLOEXEC) == -1)
172 {
173 int errn = errno;
174 ::close(kq_fd_);
175 detail::throw_system_error(make_err(errn), "fcntl (kqueue FD_CLOEXEC)");
176 }
177
178 struct kevent ev;
179 469x EV_SET(&ev, 0, EVFILT_USER, EV_ADD | EV_CLEAR, 0, 0, nullptr);
180
2/4
✓ Branch 0 taken 469 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 469 times.
469x if (::kevent(kq_fd_, &ev, 1, nullptr, 0, nullptr) < 0)
181 {
182 int errn = errno;
183 ::close(kq_fd_);
184 detail::throw_system_error(make_err(errn), "kevent (EVFILT_USER)");
185 }
186
187
1/2
✓ Branch 0 taken 469 times.
✗ Branch 1 not taken.
469x timer_svc_ = &get_timer_service(ctx, *this);
188 938x timer_svc_->set_on_earliest_changed(
189 3870x timer_service::callback(this, [](void* p) {
190 3401x static_cast<kqueue_scheduler*>(p)->interrupt_reactor();
191 3401x }));
192
193
1/2
✓ Branch 0 taken 469 times.
✗ Branch 1 not taken.
469x get_resolver_service(ctx, *this);
194
1/2
✓ Branch 0 taken 469 times.
✗ Branch 1 not taken.
469x get_signal_service(ctx, *this);
195
1/2
✓ Branch 0 taken 469 times.
✗ Branch 1 not taken.
469x get_stream_file_service(ctx, *this);
196
1/2
✓ Branch 0 taken 469 times.
✗ Branch 1 not taken.
469x get_random_access_file_service(ctx, *this);
197
198 469x completed_ops_.push(&task_op_);
199 938x }
200
201 1407x inline kqueue_scheduler::~kqueue_scheduler()
202 938x {
203
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 469 times.
469x if (kq_fd_ >= 0)
204
1/2
✓ Branch 0 taken 469 times.
✗ Branch 1 not taken.
469x ::close(kq_fd_);
205 1407x }
206
207 inline void
208 469x kqueue_scheduler::shutdown()
209 {
210 469x shutdown_drain();
211
212
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 469 times.
469x if (kq_fd_ >= 0)
213 469x interrupt_reactor();
214 469x }
215
216 inline void
217 kqueue_scheduler::configure_reactor(
218 unsigned max_events,
219 unsigned budget_init,
220 unsigned budget_max,
221 unsigned unassisted)
222 {
223 reactor_scheduler::configure_reactor(
224 max_events, budget_init, budget_max, unassisted);
225 event_buffer_.resize(max_events_per_poll_);
226 }
227
228 inline void
229 7625x kqueue_scheduler::register_descriptor(int fd, reactor_descriptor_state* desc) const
230 {
231 struct kevent changes[2];
232 7625x EV_SET(
233 &changes[0], static_cast<uintptr_t>(fd), EVFILT_READ, EV_ADD | EV_CLEAR,
234 0, 0, desc);
235 7625x EV_SET(
236 &changes[1], static_cast<uintptr_t>(fd), EVFILT_WRITE,
237 EV_ADD | EV_CLEAR, 0, 0, desc);
238
239
1/2
✓ Branch 0 taken 7625 times.
✗ Branch 1 not taken.
7625x if (::kevent(kq_fd_, changes, 2, nullptr, 0, nullptr) < 0)
240 detail::throw_system_error(make_err(errno), "kevent (register)");
241
242 7625x desc->registered_events = reactor_event_read | reactor_event_write;
243 7625x desc->fd = fd;
244 7625x desc->scheduler_ = this;
245 7625x desc->mutex.set_enabled(!single_threaded_);
246 7625x desc->ready_events_.store(0, std::memory_order_relaxed);
247
248 7625x conditionally_enabled_mutex::scoped_lock lock(desc->mutex);
249 7625x desc->impl_ref_.reset();
250 7625x desc->read_ready = false;
251 7625x desc->write_ready = false;
252 7625x }
253
254 inline void
255 7625x kqueue_scheduler::deregister_descriptor(int fd) const
256 {
257 struct kevent changes[2];
258 7625x EV_SET(
259 &changes[0], static_cast<uintptr_t>(fd), EVFILT_READ, EV_DELETE, 0, 0,
260 nullptr);
261 7625x EV_SET(
262 &changes[1], static_cast<uintptr_t>(fd), EVFILT_WRITE, EV_DELETE, 0, 0,
263 nullptr);
264 7625x ::kevent(kq_fd_, changes, 2, nullptr, 0, nullptr);
265 7625x }
266
267 inline void
268 5641x kqueue_scheduler::interrupt_reactor() const
269 {
270 5641x bool expected = false;
271
2/2
✓ Branch 0 taken 315 times.
✓ Branch 1 taken 5326 times.
5641x if (user_event_armed_.compare_exchange_strong(
272 expected, true, std::memory_order_acq_rel,
273 std::memory_order_acquire))
274 {
275 struct kevent ev;
276 5326x EV_SET(&ev, 0, EVFILT_USER, 0, NOTE_TRIGGER, 0, nullptr);
277 5326x ::kevent(kq_fd_, &ev, 1, nullptr, 0, nullptr);
278 5326x }
279 5641x }
280
281 inline long
282 31138x kqueue_scheduler::calculate_timeout(long requested_timeout_us) const
283 {
284
1/2
✓ Branch 0 taken 31138 times.
✗ Branch 1 not taken.
31138x if (requested_timeout_us == 0)
285 return 0;
286
287 31138x auto nearest = timer_svc_->nearest_expiry();
288
2/2
✓ Branch 0 taken 1507 times.
✓ Branch 1 taken 29631 times.
31138x if (nearest == timer_service::time_point::max())
289 1507x return requested_timeout_us;
290
291 29631x auto now = std::chrono::steady_clock::now();
292
2/2
✓ Branch 0 taken 254 times.
✓ Branch 1 taken 29377 times.
29631x if (nearest <= now)
293 254x return 0;
294
295 29377x auto timer_timeout_us =
296 29377x std::chrono::duration_cast<std::chrono::microseconds>(nearest - now)
297 29377x .count();
298
299 29377x constexpr auto long_max =
300 static_cast<long long>((std::numeric_limits<long>::max)());
301 29377x auto capped_timer_us = std::min(
302 29377x std::max(timer_timeout_us, static_cast<long long>(0)), long_max);
303
304
2/2
✓ Branch 0 taken 6 times.
✓ Branch 1 taken 29371 times.
29377x if (requested_timeout_us < 0)
305 29371x return static_cast<long>(capped_timer_us);
306
307 6x return static_cast<long>(std::min(
308 6x static_cast<long long>(requested_timeout_us), capped_timer_us));
309 31138x }
310
311 inline void
312 54874x kqueue_scheduler::run_task(
313 lock_type& lock, context_type* ctx, long timeout_us)
314 {
315 54874x long effective_timeout_us =
316
2/2
✓ Branch 0 taken 23736 times.
✓ Branch 1 taken 31138 times.
54874x task_interrupted_ ? 0 : calculate_timeout(timeout_us);
317
318
2/2
✓ Branch 0 taken 23736 times.
✓ Branch 1 taken 31138 times.
54874x if (lock.owns_lock())
319 31138x lock.unlock();
320
321 54874x task_cleanup on_exit{this, &lock, ctx};
322
323 struct timespec ts;
324 54874x struct timespec* ts_ptr = nullptr;
325
2/2
✓ Branch 0 taken 1505 times.
✓ Branch 1 taken 53369 times.
54874x if (effective_timeout_us >= 0)
326 {
327 53369x ts.tv_sec = effective_timeout_us / 1000000;
328 53369x ts.tv_nsec = (effective_timeout_us % 1000000) * 1000;
329 53369x ts_ptr = &ts;
330 53369x }
331
332
1/2
✓ Branch 0 taken 54874 times.
✗ Branch 1 not taken.
54874x int nev = ::kevent(
333 54874x kq_fd_, nullptr, 0, event_buffer_.data(),
334 54874x static_cast<int>(event_buffer_.size()), ts_ptr);
335
1/2
✓ Branch 0 taken 54874 times.
✗ Branch 1 not taken.
54874x int saved_errno = errno;
336
337
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 54874 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
54874x if (nev < 0 && saved_errno != EINTR)
338 detail::throw_system_error(make_err(saved_errno), "kevent");
339
340 54874x op_queue local_ops;
341
342
2/2
✓ Branch 0 taken 54874 times.
✓ Branch 1 taken 74642 times.
129516x for (int i = 0; i < nev; ++i)
343 {
344
2/2
✓ Branch 0 taken 4857 times.
✓ Branch 1 taken 69785 times.
74642x if (event_buffer_[i].filter == EVFILT_USER)
345 {
346 4857x user_event_armed_.store(false, std::memory_order_release);
347 4857x continue;
348 }
349
350 69785x auto* desc =
351 69785x static_cast<reactor_descriptor_state*>(event_buffer_[i].udata);
352
1/2
✓ Branch 0 taken 69785 times.
✗ Branch 1 not taken.
69785x if (!desc)
353 continue;
354
355 69785x std::uint32_t ready = 0;
356
357
2/2
✓ Branch 0 taken 35826 times.
✓ Branch 1 taken 33959 times.
69785x if (event_buffer_[i].filter == EVFILT_READ)
358 33959x ready |= reactor_event_read;
359
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 35826 times.
35826x else if (event_buffer_[i].filter == EVFILT_WRITE)
360 35826x ready |= reactor_event_write;
361
362
1/2
✓ Branch 0 taken 69785 times.
✗ Branch 1 not taken.
69785x if (event_buffer_[i].flags & EV_ERROR)
363 ready |= reactor_event_error;
364
365
2/2
✓ Branch 0 taken 69750 times.
✓ Branch 1 taken 35 times.
69785x if (event_buffer_[i].flags & EV_EOF)
366 {
367
2/2
✓ Branch 0 taken 9 times.
✓ Branch 1 taken 26 times.
35x if (event_buffer_[i].filter == EVFILT_READ)
368 26x ready |= reactor_event_read;
369
2/2
✓ Branch 0 taken 19 times.
✓ Branch 1 taken 16 times.
35x if (event_buffer_[i].fflags != 0)
370 16x ready |= reactor_event_error;
371 35x }
372
373 69785x desc->add_ready_events(ready);
374
375 69785x bool expected = false;
376
2/2
✓ Branch 0 taken 132 times.
✓ Branch 1 taken 69653 times.
69785x if (desc->is_enqueued_.compare_exchange_strong(
377 expected, true, std::memory_order_acq_rel,
378 std::memory_order_acquire))
379 {
380 69653x local_ops.push(desc);
381 69653x }
382 69785x }
383
384
1/2
✓ Branch 0 taken 54874 times.
✗ Branch 1 not taken.
54874x timer_svc_->process_expired();
385
386
1/2
✓ Branch 0 taken 54874 times.
✗ Branch 1 not taken.
54874x lock.lock();
387
388
2/2
✓ Branch 0 taken 15599 times.
✓ Branch 1 taken 39275 times.
54874x if (!local_ops.empty())
389 39275x completed_ops_.splice(local_ops);
390 54874x }
391
392 } // namespace boost::corosio::detail
393
394 #endif // BOOST_COROSIO_HAS_KQUEUE
395
396 #endif // BOOST_COROSIO_NATIVE_DETAIL_KQUEUE_KQUEUE_SCHEDULER_HPP
397