include/boost/corosio/native/detail/kqueue/kqueue_scheduler.hpp

87.9% Lines (124/141) 91.7% List of functions (11/13) 55.0% Branches (55/100)
kqueue_scheduler.hpp
f(x) Functions (13)
Function Calls Lines Branches Blocks
<unknown function 157> :157 boost::corosio::detail::kqueue_scheduler::kqueue_scheduler(boost::capy::execution_context&, int) :163 938x 61.1% 25.0% 100.0% boost::corosio::detail::kqueue_scheduler::kqueue_scheduler(boost::capy::execution_context&, int)::'lambda'(void*)::__invoke(void*) :189 5315x 100.0% 100.0% boost::corosio::detail::kqueue_scheduler::kqueue_scheduler(boost::capy::execution_context&, int)::'lambda'(void*)::operator void (*)(void*)() const :189 469x 100.0% 100.0% boost::corosio::detail::kqueue_scheduler::kqueue_scheduler(boost::capy::execution_context&, int)::'lambda'(void*)::operator()(void*) const :189 5315x 100.0% 50.0% 100.0% boost::corosio::detail::kqueue_scheduler::~kqueue_scheduler() :201 1407x 100.0% 50.0% 100.0% boost::corosio::detail::kqueue_scheduler::shutdown() :208 469x 100.0% 50.0% 100.0% boost::corosio::detail::kqueue_scheduler::configure_reactor(unsigned int, unsigned int, unsigned int, unsigned int) :217 0 0.0% 0.0% boost::corosio::detail::kqueue_scheduler::register_descriptor(int, boost::corosio::detail::reactor_descriptor_state*) const :229 11999x 93.3% 50.0% 85.0% boost::corosio::detail::kqueue_scheduler::deregister_descriptor(int) const :255 11999x 100.0% 100.0% boost::corosio::detail::kqueue_scheduler::interrupt_reactor() const :268 8475x 100.0% 100.0% 100.0% boost::corosio::detail::kqueue_scheduler::calculate_timeout(long) const :282 64739x 95.0% 87.5% 90.0% boost::corosio::detail::kqueue_scheduler::run_task(boost::corosio::detail::conditionally_enabled_mutex::scoped_lock&, boost::corosio::detail::reactor_scheduler_context*, long) :312 106118x 94.1% 71.4% 83.0%
Line Branch TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Michael Vandeberg
3 // Copyright (c) 2026 Steve Gerbino
4 //
5 // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 //
8 // Official repository: https://github.com/cppalliance/corosio
9 //
10
11 #ifndef BOOST_COROSIO_NATIVE_DETAIL_KQUEUE_KQUEUE_SCHEDULER_HPP
12 #define BOOST_COROSIO_NATIVE_DETAIL_KQUEUE_KQUEUE_SCHEDULER_HPP
13
14 #include <boost/corosio/detail/platform.hpp>
15
16 #if BOOST_COROSIO_HAS_KQUEUE
17
18 #include <boost/corosio/detail/config.hpp>
19 #include <boost/capy/ex/execution_context.hpp>
20
21 #include <boost/corosio/native/detail/reactor/reactor_scheduler.hpp>
22
23 #include <boost/corosio/native/detail/kqueue/kqueue_traits.hpp>
24 #include <boost/corosio/detail/timer_service.hpp>
25 #include <boost/corosio/native/detail/make_err.hpp>
26 #include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
27 #include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
28 #include <boost/corosio/native/detail/posix/posix_stream_file_service.hpp>
29 #include <boost/corosio/native/detail/posix/posix_random_access_file_service.hpp>
30
31 #include <boost/corosio/detail/except.hpp>
32
33 #include <atomic>
34 #include <chrono>
35 #include <cstdint>
36 #include <limits>
37 #include <mutex>
38 #include <vector>
39
40 #include <errno.h>
41 #include <fcntl.h>
42 #include <sys/event.h>
43 #include <sys/time.h>
44 #include <unistd.h>
45
46 namespace boost::corosio::detail {
47
48 struct kqueue_op;
49
50 /** macOS/BSD scheduler using kqueue for I/O multiplexing.
51
52 This scheduler implements the scheduler interface using the BSD kqueue
53 API for efficient I/O event notification. It uses a single reactor model
54 where one thread runs kevent() while other threads
55 wait on a condition variable for handler work. This design provides:
56
57 - Handler parallelism: N posted handlers can execute on N threads
58 - No thundering herd: condition_variable wakes exactly one thread
59 - IOCP parity: Behavior matches Windows I/O completion port semantics
60
61 When threads call run(), they first try to execute queued handlers.
62 If the queue is empty and no reactor is running, one thread becomes
63 the reactor and runs kevent(). Other threads wait on a condition
64 variable until handlers are available.
65
66 kqueue uses EV_CLEAR for edge-triggered semantics (equivalent to
67 epoll's EPOLLET). File descriptors are registered once with both
68 EVFILT_READ and EVFILT_WRITE and stay registered until closed.
69
70 @par Thread Safety
71 All public member functions are thread-safe.
72 */
73 class BOOST_COROSIO_DECL kqueue_scheduler final : public reactor_scheduler
74 {
75 public:
76 /** Construct the scheduler.
77
78 Creates a kqueue file descriptor via kqueue(), sets
79 close-on-exec, and registers EVFILT_USER for reactor
80 interruption. On failure the kqueue fd is closed before
81 throwing.
82
83 @param ctx Reference to the owning execution_context.
84 @param concurrency_hint Hint for expected thread count (unused).
85
86 @throws std::system_error if kqueue() fails, if setting
87 FD_CLOEXEC on the kqueue fd fails, or if registering
88 the EVFILT_USER event fails. The error code contains
89 the errno from the failed syscall.
90 */
91 kqueue_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
92
93 /** Destructor.
94
95 Closes the kqueue file descriptor if valid. Does not throw.
96 */
97 ~kqueue_scheduler();
98
99 kqueue_scheduler(kqueue_scheduler const&) = delete;
100 kqueue_scheduler& operator=(kqueue_scheduler const&) = delete;
101
102 /// Shut down the scheduler, draining pending operations.
103 void shutdown() override;
104
105 /// Apply runtime configuration, resizing the event buffer.
106 void configure_reactor(
107 unsigned max_events,
108 unsigned budget_init,
109 unsigned budget_max,
110 unsigned unassisted) override;
111
112 /** Return the kqueue file descriptor.
113
114 Used by socket services to register file descriptors
115 for I/O event notification.
116
117 @return The kqueue file descriptor.
118 */
119 int kq_fd() const noexcept
120 {
121 return kq_fd_;
122 }
123
124 /** Register a descriptor for persistent monitoring.
125
126 Adds EVFILT_READ and EVFILT_WRITE (both EV_CLEAR) for @a fd
127 and stores @a desc in the kevent udata field so that the
128 reactor can dispatch events to the correct reactor_descriptor_state.
129
130 @param fd The file descriptor to register.
131 @param desc Pointer to the caller-owned reactor_descriptor_state.
132
133 @throws std::system_error if kevent(EV_ADD) fails.
134 */
135 void register_descriptor(int fd, reactor_descriptor_state* desc) const;
136
137 /** Deregister a persistently registered descriptor.
138
139 Issues kevent(EV_DELETE) for both EVFILT_READ and EVFILT_WRITE.
140 Errors are silently ignored because the fd may already be
141 closed and kqueue automatically removes closed descriptors.
142
143 @param fd The file descriptor to deregister.
144 */
145 void deregister_descriptor(int fd) const;
146
147 private:
148 void
149 run_task(lock_type& lock, context_type* ctx,
150 long timeout_us) override;
151 void interrupt_reactor() const override;
152 long calculate_timeout(long requested_timeout_us) const;
153
154 int kq_fd_;
155
156 // EVFILT_USER idempotency
157 469x mutable std::atomic<bool> user_event_armed_{false};
158
159 // Event buffer sized from max_events_per_poll_.
160 std::vector<struct kevent> event_buffer_;
161 };
162
163 938x inline kqueue_scheduler::kqueue_scheduler(capy::execution_context& ctx, int)
164 469x : kq_fd_(-1)
165
1/2
✓ Branch 0 taken 469 times.
✗ Branch 1 not taken.
469x , event_buffer_(max_events_per_poll_)
166 938x {
167
1/2
✓ Branch 0 taken 469 times.
✗ Branch 1 not taken.
469x kq_fd_ = ::kqueue();
168
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 469 times.
469x if (kq_fd_ < 0)
169 detail::throw_system_error(make_err(errno), "kqueue");
170
171
2/4
✓ Branch 0 taken 469 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 469 times.
469x if (::fcntl(kq_fd_, F_SETFD, FD_CLOEXEC) == -1)
172 {
173 int errn = errno;
174 ::close(kq_fd_);
175 detail::throw_system_error(make_err(errn), "fcntl (kqueue FD_CLOEXEC)");
176 }
177
178 struct kevent ev;
179 469x EV_SET(&ev, 0, EVFILT_USER, EV_ADD | EV_CLEAR, 0, 0, nullptr);
180
2/4
✓ Branch 0 taken 469 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 469 times.
469x if (::kevent(kq_fd_, &ev, 1, nullptr, 0, nullptr) < 0)
181 {
182 int errn = errno;
183 ::close(kq_fd_);
184 detail::throw_system_error(make_err(errn), "kevent (EVFILT_USER)");
185 }
186
187
1/2
✓ Branch 0 taken 469 times.
✗ Branch 1 not taken.
469x timer_svc_ = &get_timer_service(ctx, *this);
188 938x timer_svc_->set_on_earliest_changed(
189 5784x timer_service::callback(this, [](void* p) {
190 5315x static_cast<kqueue_scheduler*>(p)->interrupt_reactor();
191 5315x }));
192
193
1/2
✓ Branch 0 taken 469 times.
✗ Branch 1 not taken.
469x get_resolver_service(ctx, *this);
194
1/2
✓ Branch 0 taken 469 times.
✗ Branch 1 not taken.
469x get_signal_service(ctx, *this);
195
1/2
✓ Branch 0 taken 469 times.
✗ Branch 1 not taken.
469x get_stream_file_service(ctx, *this);
196
1/2
✓ Branch 0 taken 469 times.
✗ Branch 1 not taken.
469x get_random_access_file_service(ctx, *this);
197
198 469x completed_ops_.push(&task_op_);
199 938x }
200
201 1407x inline kqueue_scheduler::~kqueue_scheduler()
202 938x {
203
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 469 times.
469x if (kq_fd_ >= 0)
204
1/2
✓ Branch 0 taken 469 times.
✗ Branch 1 not taken.
469x ::close(kq_fd_);
205 1407x }
206
207 inline void
208 469x kqueue_scheduler::shutdown()
209 {
210 469x shutdown_drain();
211
212
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 469 times.
469x if (kq_fd_ >= 0)
213 469x interrupt_reactor();
214 469x }
215
216 inline void
217 kqueue_scheduler::configure_reactor(
218 unsigned max_events,
219 unsigned budget_init,
220 unsigned budget_max,
221 unsigned unassisted)
222 {
223 reactor_scheduler::configure_reactor(
224 max_events, budget_init, budget_max, unassisted);
225 event_buffer_.resize(max_events_per_poll_);
226 }
227
228 inline void
229 11999x kqueue_scheduler::register_descriptor(int fd, reactor_descriptor_state* desc) const
230 {
231 struct kevent changes[2];
232 11999x EV_SET(
233 &changes[0], static_cast<uintptr_t>(fd), EVFILT_READ, EV_ADD | EV_CLEAR,
234 0, 0, desc);
235 11999x EV_SET(
236 &changes[1], static_cast<uintptr_t>(fd), EVFILT_WRITE,
237 EV_ADD | EV_CLEAR, 0, 0, desc);
238
239
1/2
✓ Branch 0 taken 11999 times.
✗ Branch 1 not taken.
11999x if (::kevent(kq_fd_, changes, 2, nullptr, 0, nullptr) < 0)
240 detail::throw_system_error(make_err(errno), "kevent (register)");
241
242 11999x desc->registered_events = reactor_event_read | reactor_event_write;
243 11999x desc->fd = fd;
244 11999x desc->scheduler_ = this;
245 11999x desc->mutex.set_enabled(!single_threaded_);
246 11999x desc->ready_events_.store(0, std::memory_order_relaxed);
247
248 11999x conditionally_enabled_mutex::scoped_lock lock(desc->mutex);
249 11999x desc->impl_ref_.reset();
250 11999x desc->read_ready = false;
251 11999x desc->write_ready = false;
252 11999x }
253
254 inline void
255 11999x kqueue_scheduler::deregister_descriptor(int fd) const
256 {
257 struct kevent changes[2];
258 11999x EV_SET(
259 &changes[0], static_cast<uintptr_t>(fd), EVFILT_READ, EV_DELETE, 0, 0,
260 nullptr);
261 11999x EV_SET(
262 &changes[1], static_cast<uintptr_t>(fd), EVFILT_WRITE, EV_DELETE, 0, 0,
263 nullptr);
264 11999x ::kevent(kq_fd_, changes, 2, nullptr, 0, nullptr);
265 11999x }
266
267 inline void
268 8475x kqueue_scheduler::interrupt_reactor() const
269 {
270 8475x bool expected = false;
271
2/2
✓ Branch 0 taken 311 times.
✓ Branch 1 taken 8164 times.
8475x if (user_event_armed_.compare_exchange_strong(
272 expected, true, std::memory_order_acq_rel,
273 std::memory_order_acquire))
274 {
275 struct kevent ev;
276 8164x EV_SET(&ev, 0, EVFILT_USER, 0, NOTE_TRIGGER, 0, nullptr);
277 8164x ::kevent(kq_fd_, &ev, 1, nullptr, 0, nullptr);
278 8164x }
279 8475x }
280
281 inline long
282 64739x kqueue_scheduler::calculate_timeout(long requested_timeout_us) const
283 {
284
1/2
✓ Branch 0 taken 64739 times.
✗ Branch 1 not taken.
64739x if (requested_timeout_us == 0)
285 return 0;
286
287 64739x auto nearest = timer_svc_->nearest_expiry();
288
2/2
✓ Branch 0 taken 2025 times.
✓ Branch 1 taken 62714 times.
64739x if (nearest == timer_service::time_point::max())
289 2025x return requested_timeout_us;
290
291 62714x auto now = std::chrono::steady_clock::now();
292
2/2
✓ Branch 0 taken 69 times.
✓ Branch 1 taken 62645 times.
62714x if (nearest <= now)
293 69x return 0;
294
295 62645x auto timer_timeout_us =
296 62645x std::chrono::duration_cast<std::chrono::microseconds>(nearest - now)
297 62645x .count();
298
299 62645x constexpr auto long_max =
300 static_cast<long long>((std::numeric_limits<long>::max)());
301 62645x auto capped_timer_us = std::min(
302 62645x std::max(timer_timeout_us, static_cast<long long>(0)), long_max);
303
304
2/2
✓ Branch 0 taken 6 times.
✓ Branch 1 taken 62639 times.
62645x if (requested_timeout_us < 0)
305 62639x return static_cast<long>(capped_timer_us);
306
307 6x return static_cast<long>(std::min(
308 6x static_cast<long long>(requested_timeout_us), capped_timer_us));
309 64739x }
310
311 inline void
312 106118x kqueue_scheduler::run_task(
313 lock_type& lock, context_type* ctx, long timeout_us)
314 {
315 106118x long effective_timeout_us =
316
2/2
✓ Branch 0 taken 41379 times.
✓ Branch 1 taken 64739 times.
106118x task_interrupted_ ? 0 : calculate_timeout(timeout_us);
317
318
2/2
✓ Branch 0 taken 41379 times.
✓ Branch 1 taken 64739 times.
106118x if (lock.owns_lock())
319 64739x lock.unlock();
320
321 106118x task_cleanup on_exit{this, &lock, ctx};
322
323 struct timespec ts;
324 106118x struct timespec* ts_ptr = nullptr;
325
2/2
✓ Branch 0 taken 2023 times.
✓ Branch 1 taken 104095 times.
106118x if (effective_timeout_us >= 0)
326 {
327 104095x ts.tv_sec = effective_timeout_us / 1000000;
328 104095x ts.tv_nsec = (effective_timeout_us % 1000000) * 1000;
329 104095x ts_ptr = &ts;
330 104095x }
331
332
1/2
✓ Branch 0 taken 106118 times.
✗ Branch 1 not taken.
106118x int nev = ::kevent(
333 106118x kq_fd_, nullptr, 0, event_buffer_.data(),
334 106118x static_cast<int>(event_buffer_.size()), ts_ptr);
335
1/2
✓ Branch 0 taken 106118 times.
✗ Branch 1 not taken.
106118x int saved_errno = errno;
336
337
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 106118 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
106118x if (nev < 0 && saved_errno != EINTR)
338 detail::throw_system_error(make_err(saved_errno), "kevent");
339
340 106118x op_queue local_ops;
341
342
2/2
✓ Branch 0 taken 106118 times.
✓ Branch 1 taken 159010 times.
265128x for (int i = 0; i < nev; ++i)
343 {
344
2/2
✓ Branch 0 taken 7695 times.
✓ Branch 1 taken 151315 times.
159010x if (event_buffer_[i].filter == EVFILT_USER)
345 {
346 7695x user_event_armed_.store(false, std::memory_order_release);
347 7695x continue;
348 }
349
350 151315x auto* desc =
351 151315x static_cast<reactor_descriptor_state*>(event_buffer_[i].udata);
352
1/2
✓ Branch 0 taken 151315 times.
✗ Branch 1 not taken.
151315x if (!desc)
353 continue;
354
355 151315x std::uint32_t ready = 0;
356
357
2/2
✓ Branch 0 taken 76322 times.
✓ Branch 1 taken 74993 times.
151315x if (event_buffer_[i].filter == EVFILT_READ)
358 74993x ready |= reactor_event_read;
359
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 76322 times.
76322x else if (event_buffer_[i].filter == EVFILT_WRITE)
360 76322x ready |= reactor_event_write;
361
362
1/2
✓ Branch 0 taken 151315 times.
✗ Branch 1 not taken.
151315x if (event_buffer_[i].flags & EV_ERROR)
363 ready |= reactor_event_error;
364
365
2/2
✓ Branch 0 taken 151286 times.
✓ Branch 1 taken 29 times.
151315x if (event_buffer_[i].flags & EV_EOF)
366 {
367
2/2
✓ Branch 0 taken 9 times.
✓ Branch 1 taken 20 times.
29x if (event_buffer_[i].filter == EVFILT_READ)
368 20x ready |= reactor_event_read;
369
2/2
✓ Branch 0 taken 13 times.
✓ Branch 1 taken 16 times.
29x if (event_buffer_[i].fflags != 0)
370 16x ready |= reactor_event_error;
371 29x }
372
373 151315x desc->add_ready_events(ready);
374
375 151315x bool expected = false;
376
2/2
✓ Branch 0 taken 122 times.
✓ Branch 1 taken 151193 times.
151315x if (desc->is_enqueued_.compare_exchange_strong(
377 expected, true, std::memory_order_acq_rel,
378 std::memory_order_acquire))
379 {
380 151193x local_ops.push(desc);
381 151193x }
382 151315x }
383
384
1/2
✓ Branch 0 taken 106118 times.
✗ Branch 1 not taken.
106118x timer_svc_->process_expired();
385
386
1/2
✓ Branch 0 taken 106118 times.
✗ Branch 1 not taken.
106118x lock.lock();
387
388
2/2
✓ Branch 0 taken 27843 times.
✓ Branch 1 taken 78275 times.
106118x if (!local_ops.empty())
389 78275x completed_ops_.splice(local_ops);
390 106118x }
391
392 } // namespace boost::corosio::detail
393
394 #endif // BOOST_COROSIO_HAS_KQUEUE
395
396 #endif // BOOST_COROSIO_NATIVE_DETAIL_KQUEUE_KQUEUE_SCHEDULER_HPP
397