include/boost/corosio/native/detail/kqueue/kqueue_scheduler.hpp

79.3% Lines (457/576) 88.7% Functions (47/54) 56.0% Branches (216/386)
include/boost/corosio/native/detail/kqueue/kqueue_scheduler.hpp
Line Branch TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Michael Vandeberg
3 // Copyright (c) 2026 Steve Gerbino
4 //
5 // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 //
8 // Official repository: https://github.com/cppalliance/corosio
9 //
10
11 #ifndef BOOST_COROSIO_NATIVE_DETAIL_KQUEUE_KQUEUE_SCHEDULER_HPP
12 #define BOOST_COROSIO_NATIVE_DETAIL_KQUEUE_KQUEUE_SCHEDULER_HPP
13
14 #include <boost/corosio/detail/platform.hpp>
15
16 #if BOOST_COROSIO_HAS_KQUEUE
17
18 #include <boost/corosio/detail/config.hpp>
19 #include <boost/capy/ex/execution_context.hpp>
20
21 #include <boost/corosio/native/native_scheduler.hpp>
22 #include <boost/corosio/detail/scheduler_op.hpp>
23 #include <boost/corosio/native/detail/kqueue/kqueue_op.hpp>
24 #include <boost/corosio/detail/timer_service.hpp>
25 #include <boost/corosio/detail/make_err.hpp>
26 #include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
27 #include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
28 #include <boost/corosio/detail/except.hpp>
29 #include <boost/corosio/detail/thread_local_ptr.hpp>
30
31 #include <atomic>
32 #include <chrono>
33 #include <condition_variable>
34 #include <cstddef>
35 #include <cstdint>
36 #include <limits>
37 #include <mutex>
38 #include <utility>
39
40 #include <errno.h>
41 #include <fcntl.h>
42 #include <sys/event.h>
43 #include <sys/socket.h>
44 #include <sys/time.h>
45 #include <unistd.h>
46
47 /*
48 kqueue Scheduler - Single Reactor Model
49 ========================================
50
51 This scheduler uses the same thread coordination strategy as the epoll
52 backend to provide handler parallelism and avoid the thundering herd problem.
53 Instead of all threads blocking on kevent(), one thread becomes the
54 "reactor" while others wait on a condition variable for handler work.
55
56 Thread Model
57 ------------
58 - ONE thread runs kevent() at a time (the reactor thread)
59 - OTHER threads wait on cond_ (condition variable) for handlers
60 - When work is posted, exactly one waiting thread wakes via notify_one()
61 - This matches Windows IOCP semantics where N posted items wake N threads
62
63 Event Loop Structure (do_one)
64 -----------------------------
65 1. Lock mutex, try to pop handler from queue
66 2. If got handler: execute it (unlocked), return
67 3. If queue empty and no reactor running: become reactor
68 - Run kevent() (unlocked), queue I/O completions, loop back
69 4. If queue empty and reactor running: wait on condvar for work
70
71 kqueue-Specific Design
72 ----------------------
73 - Uses EVFILT_USER for reactor interruption (no extra fd needed)
74 - Uses EV_CLEAR for edge-triggered semantics (equivalent to EPOLLET)
75 - Timer expiry computed from timer_service, passed as kevent() timeout
76 - No timerfd equivalent; uses software timer queue
77
78 Signaling State (state_)
79 ------------------------
80 Same as epoll: bit 0 = signaled, upper bits = waiter count.
81 */
82
83 namespace boost::corosio::detail {
84
85 struct kqueue_op;
86 struct descriptor_state;
87 namespace kqueue {
88 struct BOOST_COROSIO_SYMBOL_VISIBLE scheduler_context;
89 } // namespace kqueue
90
91 /** macOS/BSD scheduler using kqueue for I/O multiplexing.
92
93 This scheduler implements the scheduler interface using the BSD kqueue
94 API for efficient I/O event notification. It uses a single reactor model
95 where one thread runs kevent() while other threads
96 wait on a condition variable for handler work. This design provides:
97
98 - Handler parallelism: N posted handlers can execute on N threads
99 - No thundering herd: condition_variable wakes exactly one thread
100 - IOCP parity: Behavior matches Windows I/O completion port semantics
101
102 When threads call run(), they first try to execute queued handlers.
103 If the queue is empty and no reactor is running, one thread becomes
104 the reactor and runs kevent(). Other threads wait on a condition
105 variable until handlers are available.
106
107 kqueue uses EV_CLEAR for edge-triggered semantics (equivalent to
108 epoll's EPOLLET). File descriptors are registered once with both
109 EVFILT_READ and EVFILT_WRITE and stay registered until closed.
110
111 @par Thread Safety
112 All public member functions are thread-safe.
113 */
114 class BOOST_COROSIO_DECL kqueue_scheduler final
115 : public native_scheduler
116 , public capy::execution_context::service
117 {
118 public:
119 using key_type = scheduler;
120
121 /** Construct the scheduler.
122
123 Creates a kqueue file descriptor via kqueue(), sets
124 close-on-exec, and registers EVFILT_USER for reactor
125 interruption. On failure the kqueue fd is closed before
126 throwing.
127
128 @param ctx Reference to the owning execution_context.
129 @param concurrency_hint Hint for expected thread count (unused).
130
131 @throws std::system_error if kqueue() fails, if setting
132 FD_CLOEXEC on the kqueue fd fails, or if registering
133 the EVFILT_USER event fails. The error code contains
134 the errno from the failed syscall.
135 */
136 kqueue_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
137
138 /** Destructor.
139
140 Closes the kqueue file descriptor if valid. Does not throw.
141 */
142 ~kqueue_scheduler();
143
144 kqueue_scheduler(kqueue_scheduler const&) = delete;
145 kqueue_scheduler& operator=(kqueue_scheduler const&) = delete;
146
147 void shutdown() override;
148 void post(std::coroutine_handle<> h) const override;
149 void post(scheduler_op* h) const override;
150 bool running_in_this_thread() const noexcept override;
151 void stop() override;
152 bool stopped() const noexcept override;
153 void restart() override;
154 std::size_t run() override;
155 std::size_t run_one() override;
156 std::size_t wait_one(long usec) override;
157 std::size_t poll() override;
158 std::size_t poll_one() override;
159
160 /** Return the kqueue file descriptor.
161
162 Used by socket services to register file descriptors
163 for I/O event notification.
164
165 @return The kqueue file descriptor.
166 */
167 int kq_fd() const noexcept
168 {
169 return kq_fd_;
170 }
171
172 /** Reset the thread's inline completion budget.
173
174 Called at the start of each posted completion handler to
175 grant a fresh budget for speculative inline completions.
176 Operates in two modes depending on whether another thread
177 absorbed queued work from the previous dispatch cycle:
178
179 - **Adaptive** (default): the effective cap ramps up when
180 the previous cycle fully consumed its budget (doubles up
181 to 16) and ramps down to the floor (2) when budget was
182 only partially consumed, tracking actual inline demand.
183 - **Unassisted**: entered when no other thread was available
184 to signal (unlock_and_signal_one returned false). Applies
185 a fixed conservative cap (4) to amortize scheduling
186 overhead for small buffers while avoiding bursty I/O that
187 fills socket buffers and stalls large transfers.
188 */
189 void reset_inline_budget() const noexcept;
190
191 /** Consume one unit of inline budget if available.
192
193 @return True if budget was available and consumed.
194 */
195 bool try_consume_inline_budget() const noexcept;
196
197 /** Register a descriptor for persistent monitoring.
198
199 Adds EVFILT_READ and EVFILT_WRITE (both EV_CLEAR) for @a fd
200 and stores @a desc in the kevent udata field so that the
201 reactor can dispatch events to the correct descriptor_state.
202
203 The caller retains ownership of @a desc. It must remain valid
204 until deregister_descriptor() is called and all pending
205 read/write/connect operations referencing it have completed.
206 The scheduler accesses @a desc asynchronously from the reactor
207 thread when kevent delivers events.
208
209 @param fd The file descriptor to register.
210 @param desc Pointer to the caller-owned descriptor_state.
211
212 @throws std::system_error if kevent(EV_ADD) fails.
213 */
214 void register_descriptor(int fd, descriptor_state* desc) const;
215
216 /** Deregister a persistently registered descriptor.
217
218 Issues kevent(EV_DELETE) for both EVFILT_READ and EVFILT_WRITE.
219 Errors are silently ignored because the fd may already be
220 closed and kqueue automatically removes closed descriptors.
221
222 After this call returns, the reactor will not deliver any
223 further events for @a fd, so the associated descriptor_state
224 may be safely destroyed once all previously queued completions
225 have been processed.
226
227 @param fd The file descriptor to deregister.
228 */
229 void deregister_descriptor(int fd) const;
230
231 void work_started() noexcept override;
232 void work_finished() noexcept override;
233
234 /** Offset a forthcoming work_finished from work_cleanup.
235
236 Called by descriptor_state when all I/O returned EAGAIN and no
237 handler will be executed. Must be called from a scheduler thread.
238 */
239 void compensating_work_started() const noexcept;
240
241 /** Drain work from thread context's private queue to global queue.
242
243 Called by thread_context_guard destructor when a thread exits run().
244 Transfers pending work to the global queue under mutex protection.
245
246 @param queue The private queue to drain.
247 @param count Item count for wakeup decisions (wakes other threads if positive).
248 */
249 void drain_thread_queue(op_queue& queue, std::int64_t count) const;
250
251 /** Post completed operations for deferred invocation.
252
253 If called from a thread running this scheduler, operations go to
254 the thread's private queue (fast path). Otherwise, operations are
255 added to the global queue under mutex and a waiter is signaled.
256
257 @par Preconditions
258 work_started() must have been called for each operation.
259
260 @param ops Queue of operations to post.
261 */
262 void post_deferred_completions(op_queue& ops) const;
263
264 private:
265 struct work_cleanup
266 {
267 kqueue_scheduler* scheduler;
268 std::unique_lock<std::mutex>* lock;
269 kqueue::scheduler_context* ctx;
270 ~work_cleanup();
271 };
272
273 struct task_cleanup
274 {
275 kqueue_scheduler const* scheduler;
276 kqueue::scheduler_context* ctx;
277 ~task_cleanup();
278 };
279
280 std::size_t do_one(
281 std::unique_lock<std::mutex>& lock,
282 long timeout_us,
283 kqueue::scheduler_context* ctx);
284 void run_task(
285 std::unique_lock<std::mutex>& lock, kqueue::scheduler_context* ctx);
286 void wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const;
287 void interrupt_reactor() const;
288 long calculate_timeout(long requested_timeout_us) const;
289
290 /** Set the signaled state and wake all waiting threads.
291
292 @par Preconditions
293 Mutex must be held.
294
295 @param lock The held mutex lock.
296 */
297 void signal_all(std::unique_lock<std::mutex>& lock) const;
298
299 /** Set the signaled state and wake one waiter if any exist.
300
301 Only unlocks and signals if at least one thread is waiting.
302 Use this when the caller needs to perform a fallback action
303 (such as interrupting the reactor) when no waiters exist.
304
305 @par Preconditions
306 Mutex must be held.
307
308 @param lock The held mutex lock.
309
310 @return `true` if unlocked and signaled, `false` if lock still held.
311 */
312 bool maybe_unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const;
313
314 /** Set the signaled state, unlock, and wake one waiter if any exist.
315
316 Always unlocks the mutex. Use this when the caller will release
317 the lock regardless of whether a waiter exists.
318
319 @par Preconditions
320 Mutex must be held.
321
322 @param lock The held mutex lock.
323
324 @return `true` if at least one waiter was signaled,
325 `false` if no waiters existed.
326 */
327 bool unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const;
328
329 /** Clear the signaled state before waiting.
330
331 @par Preconditions
332 Mutex must be held.
333 */
334 void clear_signal() const;
335
336 /** Block until the signaled state is set.
337
338 Returns immediately if already signaled (fast-path). Otherwise
339 increments the waiter count, waits on the condition variable,
340 and decrements the waiter count upon waking.
341
342 @par Preconditions
343 Mutex must be held.
344
345 @param lock The held mutex lock.
346 */
347 void wait_for_signal(std::unique_lock<std::mutex>& lock) const;
348
349 /** Block until signaled or timeout expires.
350
351 @par Preconditions
352 Mutex must be held.
353
354 @param lock The held mutex lock.
355 @param timeout_us Maximum time to wait in microseconds.
356 */
357 void wait_for_signal_for(
358 std::unique_lock<std::mutex>& lock, long timeout_us) const;
359
360 int kq_fd_;
361 mutable std::mutex mutex_;
362 mutable std::condition_variable cond_;
363 mutable op_queue completed_ops_;
364 mutable std::atomic<std::int64_t> outstanding_work_{0};
365 std::atomic<bool> stopped_{false};
366 bool shutdown_ = false;
367
368 // True while a thread is blocked in kevent(). Used by
369 // wake_one_thread_and_unlock and work_finished to know when
370 // an EVFILT_USER interrupt is needed instead of a condvar signal.
371 mutable bool task_running_ = false;
372
373 // True when the reactor has been told to do a non-blocking poll
374 // (more handlers queued or poll mode). Prevents redundant EVFILT_USER
375 // triggers and controls the kevent() timeout.
376 mutable bool task_interrupted_ = false;
377
378 // Signaling state: bit 0 = signaled, upper bits = waiter count
379 static constexpr std::size_t signaled_bit = 1;
380 static constexpr std::size_t waiter_increment = 2;
381 mutable std::size_t state_ = 0;
382
383 // EVFILT_USER idempotency: prevents redundant NOTE_TRIGGER writes
384 299 mutable std::atomic<bool> user_event_armed_{false};
385
386 // Sentinel operation for interleaving reactor runs with handler execution.
387 // Ensures the reactor runs periodically even when handlers are continuously
388 // posted, preventing starvation of I/O events, timers, and signals.
389 struct task_op final : scheduler_op
390 {
391 void operator()() override {}
392 void destroy() override {}
393 };
394 task_op task_op_;
395 };
396
397 // -- Implementation ---------------------------------------------------------
398
399 namespace kqueue {
400
401 struct BOOST_COROSIO_SYMBOL_VISIBLE scheduler_context
402 {
403 kqueue_scheduler const* key;
404 scheduler_context* next;
405 op_queue private_queue;
406 std::int64_t private_outstanding_work;
407 int inline_budget;
408 int inline_budget_max;
409 bool unassisted;
410
411 6963 scheduler_context(kqueue_scheduler const* k, scheduler_context* n)
412 2321 : key(k)
413 2321 , next(n)
414 2321 , private_outstanding_work(0)
415 2321 , inline_budget(0)
416 2321 , inline_budget_max(2)
417 2321 , unassisted(false)
418 2321 {
419 4642 }
420 };
421
422 inline thread_local_ptr<scheduler_context> context_stack;
423
424 struct thread_context_guard
425 {
426 scheduler_context frame_;
427
428 4642 explicit thread_context_guard(kqueue_scheduler const* ctx) noexcept
429
1/2
✓ Branch 0 taken 2321 times.
✗ Branch 1 not taken.
2321 : frame_(ctx, context_stack.get())
430 2321 {
431 2321 context_stack.set(&frame_);
432 4642 }
433
434 4642 ~thread_context_guard() noexcept
435 2321 {
436
1/2
✓ Branch 0 taken 2321 times.
✗ Branch 1 not taken.
2321 if (!frame_.private_queue.empty())
437 frame_.key->drain_thread_queue(
438 frame_.private_queue, frame_.private_outstanding_work);
439 2321 context_stack.set(frame_.next);
440 4642 }
441 };
442
443 inline scheduler_context*
444 1180142 find_context(kqueue_scheduler const* self) noexcept
445 {
446
2/2
✓ Branch 0 taken 1173942 times.
✓ Branch 1 taken 6200 times.
1180142 for (auto* c = context_stack.get(); c != nullptr; c = c->next)
447
1/2
✓ Branch 0 taken 1173942 times.
✗ Branch 1 not taken.
1173942 if (c->key == self)
448 1173942 return c;
449 6200 return nullptr;
450 1180142 }
451
452 /// Flush private work count to global counter.
453 inline void
454 flush_private_work(
455 scheduler_context* ctx,
456 std::atomic<std::int64_t>& outstanding_work) noexcept
457 {
458 if (ctx && ctx->private_outstanding_work > 0)
459 {
460 outstanding_work.fetch_add(
461 ctx->private_outstanding_work, std::memory_order_relaxed);
462 ctx->private_outstanding_work = 0;
463 }
464 }
465
466 /// Drain private queue to global queue, flushing work count first.
467 ///
468 /// @return True if any ops were drained.
469 inline bool
470 7 drain_private_queue(
471 scheduler_context* ctx,
472 std::atomic<std::int64_t>& outstanding_work,
473 op_queue& completed_ops) noexcept
474 {
475
2/4
✓ Branch 0 taken 7 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 7 times.
✗ Branch 3 not taken.
7 if (!ctx || ctx->private_queue.empty())
476 7 return false;
477
478 flush_private_work(ctx, outstanding_work);
479 completed_ops.splice(ctx->private_queue);
480 return true;
481 7 }
482
483 } // namespace kqueue
484
485 inline void
486 202722 kqueue_scheduler::reset_inline_budget() const noexcept
487 {
488
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 202722 times.
202722 if (auto* ctx = kqueue::find_context(this))
489 {
490 // Cap when no other thread absorbed queued work. A moderate
491 // cap (4) amortizes scheduling for small buffers while avoiding
492 // bursty I/O that fills socket buffers and stalls large transfers.
493
1/2
✓ Branch 0 taken 202722 times.
✗ Branch 1 not taken.
202722 if (ctx->unassisted)
494 {
495 202722 ctx->inline_budget_max = 4;
496 202722 ctx->inline_budget = 4;
497 202722 return;
498 }
499 // Ramp up when previous cycle fully consumed budget.
500 // Reset on partial consumption (EAGAIN hit or peer got scheduled).
501 if (ctx->inline_budget == 0)
502 ctx->inline_budget_max = (std::min)(ctx->inline_budget_max * 2, 16);
503 else if (ctx->inline_budget < ctx->inline_budget_max)
504 ctx->inline_budget_max = 2;
505 ctx->inline_budget = ctx->inline_budget_max;
506 }
507 202722 }
508
509 inline bool
510 754975 kqueue_scheduler::try_consume_inline_budget() const noexcept
511 {
512
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 754975 times.
754975 if (auto* ctx = kqueue::find_context(this))
513 {
514
2/2
✓ Branch 0 taken 612447 times.
✓ Branch 1 taken 142528 times.
754975 if (ctx->inline_budget > 0)
515 {
516 612447 --ctx->inline_budget;
517 612447 return true;
518 }
519 142528 }
520 142528 return false;
521 754975 }
522
523 inline void
524 121326 descriptor_state::operator()()
525 {
526 // Release ensures the false is visible to the reactor's CAS on other
527 // cores. With relaxed, ARM's store buffer can delay the write,
528 // causing the reactor's CAS to see a stale 'true' and skip
529 // enqueue—permanently losing the edge-triggered event and
530 // eventually deadlocking. On x86 (TSO) release compiles to the
531 // same MOV as relaxed, so there is no cost there.
532 121326 is_enqueued_.store(false, std::memory_order_release);
533
534 // Take ownership of impl ref set by close_socket() to prevent
535 // the owning impl from being freed while we're executing
536 121326 auto prevent_impl_destruction = std::move(impl_ref_);
537
538 121326 std::uint32_t ev = ready_events_.exchange(0, std::memory_order_acquire);
539
1/2
✓ Branch 0 taken 121326 times.
✗ Branch 1 not taken.
121326 if (ev == 0)
540 {
541 scheduler_->compensating_work_started();
542 return;
543 }
544
545 121326 op_queue local_ops;
546
547 121326 int err = 0;
548
2/2
✓ Branch 0 taken 6 times.
✓ Branch 1 taken 121320 times.
121326 if (ev & kqueue_event_error)
549 {
550 6 socklen_t len = sizeof(err);
551
2/4
✓ Branch 0 taken 6 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 6 times.
6 if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
552 err = errno;
553
1/2
✓ Branch 0 taken 6 times.
✗ Branch 1 not taken.
6 if (err == 0)
554 err = EIO;
555 6 }
556
557 121326 kqueue_op* rd = nullptr;
558 121326 kqueue_op* wr = nullptr;
559 121326 kqueue_op* cn = nullptr;
560 {
561
1/2
✓ Branch 0 taken 121326 times.
✗ Branch 1 not taken.
121326 std::lock_guard lock(mutex);
562
2/2
✓ Branch 0 taken 60891 times.
✓ Branch 1 taken 60435 times.
121326 if (ev & kqueue_event_read)
563 {
564 60435 rd = std::exchange(read_op, nullptr);
565
2/2
✓ Branch 0 taken 54174 times.
✓ Branch 1 taken 6261 times.
60435 if (!rd)
566 6261 read_ready = true;
567 60435 }
568
2/2
✓ Branch 0 taken 60368 times.
✓ Branch 1 taken 60958 times.
121326 if (ev & kqueue_event_write)
569 {
570 60958 cn = std::exchange(connect_op, nullptr);
571 60958 wr = std::exchange(write_op, nullptr);
572
3/4
✓ Branch 0 taken 55521 times.
✓ Branch 1 taken 5437 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 55521 times.
60958 if (!cn && !wr)
573 55521 write_ready = true;
574 60958 }
575
3/4
✓ Branch 0 taken 6 times.
✓ Branch 1 taken 121320 times.
✓ Branch 2 taken 6 times.
✗ Branch 3 not taken.
121326 if (err && !(ev & (kqueue_event_read | kqueue_event_write)))
576 {
577 rd = std::exchange(read_op, nullptr);
578 wr = std::exchange(write_op, nullptr);
579 cn = std::exchange(connect_op, nullptr);
580 }
581 121326 }
582
583 // Non-null after I/O means EAGAIN; re-register under lock below
584
2/2
✓ Branch 0 taken 67152 times.
✓ Branch 1 taken 54174 times.
121326 if (rd)
585 {
586
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 54171 times.
54174 if (err)
587 3 rd->complete(err, 0);
588 else
589 54171 rd->perform_io();
590
591
3/4
✓ Branch 0 taken 53952 times.
✓ Branch 1 taken 222 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 53952 times.
54174 if (rd->errn == EAGAIN || rd->errn == EWOULDBLOCK)
592 {
593 222 rd->errn = 0;
594 222 }
595 else
596 {
597 53952 local_ops.push(rd);
598 53952 rd = nullptr;
599 }
600 54174 }
601
602
2/2
✓ Branch 0 taken 115889 times.
✓ Branch 1 taken 5437 times.
121326 if (cn)
603 {
604
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 5436 times.
5437 if (err)
605 1 cn->complete(err, 0);
606 else
607 5436 cn->perform_io();
608 5437 local_ops.push(cn);
609 5437 cn = nullptr;
610 5437 }
611
612
1/2
✓ Branch 0 taken 121326 times.
✗ Branch 1 not taken.
121326 if (wr)
613 {
614 if (err)
615 wr->complete(err, 0);
616 else
617 wr->perform_io();
618
619 if (wr->errn == EAGAIN || wr->errn == EWOULDBLOCK)
620 {
621 wr->errn = 0;
622 }
623 else
624 {
625 local_ops.push(wr);
626 wr = nullptr;
627 }
628 }
629
630 // Re-register EAGAIN ops. A concurrent operator()() invocation may
631 // have set read_ready/write_ready while we held the op (no read_op
632 // was registered, so it cached the edge event). Check the flags
633 // under the same lock as re-registration so no edge is lost.
634
4/4
✓ Branch 0 taken 222 times.
✓ Branch 1 taken 121104 times.
✓ Branch 2 taken 121104 times.
✓ Branch 3 taken 222 times.
121326 while (rd || wr)
635 {
636 222 bool retry = false;
637 {
638
1/2
✓ Branch 0 taken 222 times.
✗ Branch 1 not taken.
222 std::lock_guard lock(mutex);
639
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 222 times.
222 if (rd)
640 {
641
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 222 times.
222 if (read_ready)
642 {
643 read_ready = false;
644 retry = true;
645 }
646 else
647 {
648 222 read_op = rd;
649 222 rd = nullptr;
650 }
651 222 }
652
1/2
✓ Branch 0 taken 222 times.
✗ Branch 1 not taken.
222 if (wr)
653 {
654 if (write_ready)
655 {
656 write_ready = false;
657 retry = true;
658 }
659 else
660 {
661 write_op = wr;
662 wr = nullptr;
663 }
664 }
665 222 }
666
667
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 222 times.
222 if (!retry)
668 222 break;
669
670 if (rd)
671 {
672 rd->perform_io();
673 if (rd->errn == EAGAIN || rd->errn == EWOULDBLOCK)
674 rd->errn = 0;
675 else
676 {
677 local_ops.push(rd);
678 rd = nullptr;
679 }
680 }
681 if (wr)
682 {
683 wr->perform_io();
684 if (wr->errn == EAGAIN || wr->errn == EWOULDBLOCK)
685 wr->errn = 0;
686 else
687 {
688 local_ops.push(wr);
689 wr = nullptr;
690 }
691 }
692 }
693
694 // Execute first handler inline — the scheduler's work_cleanup
695 // accounts for this as the "consumed" work item
696 121326 scheduler_op* first = local_ops.pop();
697
2/2
✓ Branch 0 taken 59389 times.
✓ Branch 1 taken 61937 times.
121326 if (first)
698 {
699
1/2
✓ Branch 0 taken 59389 times.
✗ Branch 1 not taken.
59389 scheduler_->post_deferred_completions(local_ops);
700
1/2
✓ Branch 0 taken 59389 times.
✗ Branch 1 not taken.
59389 (*first)();
701 59389 }
702 else
703 {
704 61937 scheduler_->compensating_work_started();
705 }
706 121326 }
707
708 1495 inline kqueue_scheduler::kqueue_scheduler(capy::execution_context& ctx, int)
709 299 : kq_fd_(-1)
710 299 , outstanding_work_(0)
711 299 , stopped_(false)
712 299 , shutdown_(false)
713 299 , task_running_(false)
714 299 , task_interrupted_(false)
715 299 , state_(0)
716 897 {
717 // FreeBSD 13+: kqueue1(O_CLOEXEC) available
718
1/2
✓ Branch 0 taken 299 times.
✗ Branch 1 not taken.
299 kq_fd_ = ::kqueue();
719
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 299 times.
299 if (kq_fd_ < 0)
720 detail::throw_system_error(make_err(errno), "kqueue");
721
722
2/4
✓ Branch 0 taken 299 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 299 times.
299 if (::fcntl(kq_fd_, F_SETFD, FD_CLOEXEC) == -1)
723 {
724 int errn = errno;
725 ::close(kq_fd_);
726 detail::throw_system_error(make_err(errn), "fcntl (kqueue FD_CLOEXEC)");
727 }
728
729 // Register EVFILT_USER for reactor interruption (no self-pipe fallback).
730 // Requires FreeBSD 11+ or macOS 10.6+; fails with throw on older kernels.
731 struct kevent ev;
732 299 EV_SET(&ev, 0, EVFILT_USER, EV_ADD | EV_CLEAR, 0, 0, nullptr);
733
2/4
✓ Branch 0 taken 299 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 299 times.
299 if (::kevent(kq_fd_, &ev, 1, nullptr, 0, nullptr) < 0)
734 {
735 int errn = errno;
736 ::close(kq_fd_);
737 detail::throw_system_error(make_err(errn), "kevent (EVFILT_USER)");
738 }
739
740
1/2
✓ Branch 0 taken 299 times.
✗ Branch 1 not taken.
299 timer_svc_ = &get_timer_service(ctx, *this);
741 598 timer_svc_->set_on_earliest_changed(
742 5752 timer_service::callback(this, [](void* p) {
743 5453 static_cast<kqueue_scheduler*>(p)->interrupt_reactor();
744 5453 }));
745
746 // Initialize resolver service
747
1/2
✓ Branch 0 taken 299 times.
✗ Branch 1 not taken.
299 get_resolver_service(ctx, *this);
748
749 // Initialize signal service
750
1/2
✓ Branch 0 taken 299 times.
✗ Branch 1 not taken.
299 get_signal_service(ctx, *this);
751
752 // Push task sentinel to interleave reactor runs with handler execution
753 299 completed_ops_.push(&task_op_);
754 598 }
755
756 897 inline kqueue_scheduler::~kqueue_scheduler()
757 598 {
758
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 299 times.
299 if (kq_fd_ >= 0)
759
1/2
✓ Branch 0 taken 299 times.
✗ Branch 1 not taken.
299 ::close(kq_fd_);
760 897 }
761
762 inline void
763 299 kqueue_scheduler::shutdown()
764 {
765 {
766 299 std::unique_lock lock(mutex_);
767 299 shutdown_ = true;
768
769
2/2
✓ Branch 0 taken 299 times.
✓ Branch 1 taken 377 times.
676 while (auto* h = completed_ops_.pop())
770 {
771
2/2
✓ Branch 0 taken 78 times.
✓ Branch 1 taken 299 times.
377 if (h == &task_op_)
772 299 continue;
773
1/2
✓ Branch 0 taken 78 times.
✗ Branch 1 not taken.
78 lock.unlock();
774
1/2
✓ Branch 0 taken 78 times.
✗ Branch 1 not taken.
78 h->destroy();
775
1/2
✓ Branch 0 taken 78 times.
✗ Branch 1 not taken.
78 lock.lock();
776 }
777
778 299 signal_all(lock);
779 299 }
780
781 299 outstanding_work_.store(0, std::memory_order_release);
782
783
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 299 times.
299 if (kq_fd_ >= 0)
784 299 interrupt_reactor();
785 299 }
786
787 inline void
788 11669 kqueue_scheduler::post(std::coroutine_handle<> h) const
789 {
790 struct post_handler final : scheduler_op
791 {
792 std::coroutine_handle<> h_;
793
794 23334 explicit post_handler(std::coroutine_handle<> h) : h_(h) {}
795
796 23338 ~post_handler() = default;
797
798 11669 void operator()() override
799 {
800 11669 auto h = h_;
801
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 11669 times.
11669 delete this;
802 // Acquire fence on *this thread* (not the deleted object) ensures
803 // stores made by the posting thread (e.g. coroutine state written
804 // before the cross-thread post) are visible before we resume.
805 11669 std::atomic_thread_fence(std::memory_order_acquire);
806 11669 h.resume();
807 11669 }
808
809 void destroy() override
810 {
811 delete this;
812 }
813 };
814
815 11669 auto ph = std::make_unique<post_handler>(h);
816
817 // Fast path: same thread posts to private queue
818 // Only count locally; work_cleanup batches to global counter
819
2/2
✓ Branch 0 taken 5496 times.
✓ Branch 1 taken 6173 times.
11669 if (auto* ctx = kqueue::find_context(this))
820 {
821 5496 ++ctx->private_outstanding_work;
822 5496 ctx->private_queue.push(ph.release());
823 5496 return;
824 }
825
826 // Slow path: cross-thread post requires mutex
827 6173 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
828
829
1/2
✓ Branch 0 taken 6173 times.
✗ Branch 1 not taken.
6173 std::unique_lock lock(mutex_);
830 6173 completed_ops_.push(ph.release());
831
1/2
✓ Branch 0 taken 6173 times.
✗ Branch 1 not taken.
6173 wake_one_thread_and_unlock(lock);
832 11669 }
833
834 inline void
835 148839 kqueue_scheduler::post(scheduler_op* h) const
836 {
837 // Fast path: same thread posts to private queue
838 // Only count locally; work_cleanup batches to global counter
839
2/2
✓ Branch 0 taken 148812 times.
✓ Branch 1 taken 27 times.
148839 if (auto* ctx = kqueue::find_context(this))
840 {
841 148812 ++ctx->private_outstanding_work;
842 148812 ctx->private_queue.push(h);
843 148812 return;
844 }
845
846 // Slow path: cross-thread post requires mutex
847 27 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
848
849 27 std::unique_lock lock(mutex_);
850 27 completed_ops_.push(h);
851
1/2
✓ Branch 0 taken 27 times.
✗ Branch 1 not taken.
27 wake_one_thread_and_unlock(lock);
852 148839 }
853
854 inline bool
855 5198 kqueue_scheduler::running_in_this_thread() const noexcept
856 {
857
2/2
✓ Branch 0 taken 438 times.
✓ Branch 1 taken 4760 times.
5198 for (auto* c = kqueue::context_stack.get(); c != nullptr; c = c->next)
858
1/2
✓ Branch 0 taken 438 times.
✗ Branch 1 not taken.
438 if (c->key == this)
859 438 return true;
860 4760 return false;
861 5198 }
862
863 inline void
864 2287 kqueue_scheduler::stop()
865 {
866 2287 std::unique_lock lock(mutex_);
867
2/2
✓ Branch 0 taken 11 times.
✓ Branch 1 taken 2276 times.
2287 if (!stopped_.load(std::memory_order_relaxed))
868 {
869 2276 stopped_.store(true, std::memory_order_release);
870
1/2
✓ Branch 0 taken 2276 times.
✗ Branch 1 not taken.
2276 signal_all(lock);
871
1/2
✓ Branch 0 taken 2276 times.
✗ Branch 1 not taken.
2276 interrupt_reactor();
872 2276 }
873 2287 }
874
875 inline bool
876 22 kqueue_scheduler::stopped() const noexcept
877 {
878 22 return stopped_.load(std::memory_order_acquire);
879 }
880
881 inline void
882 2099 kqueue_scheduler::restart()
883 {
884 2099 std::unique_lock lock(mutex_);
885 2099 stopped_.store(false, std::memory_order_release);
886 2099 }
887
888 inline std::size_t
889 2292 kqueue_scheduler::run()
890 {
891
2/2
✓ Branch 0 taken 2287 times.
✓ Branch 1 taken 5 times.
2292 if (outstanding_work_.load(std::memory_order_acquire) == 0)
892 {
893 5 stop();
894 5 return 0;
895 }
896
897 2287 kqueue::thread_context_guard ctx(this);
898
1/2
✓ Branch 0 taken 2287 times.
✗ Branch 1 not taken.
2287 std::unique_lock lock(mutex_);
899
900 2287 std::size_t n = 0;
901 284086 for (;;)
902 {
903
3/4
✓ Branch 0 taken 284086 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 281799 times.
✓ Branch 3 taken 2287 times.
284086 if (!do_one(lock, -1, &ctx.frame_))
904 2287 break;
905
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 281799 times.
281799 if (n != (std::numeric_limits<std::size_t>::max)())
906 281799 ++n;
907
2/2
✓ Branch 0 taken 132379 times.
✓ Branch 1 taken 149420 times.
281799 if (!lock.owns_lock())
908
1/2
✓ Branch 0 taken 132379 times.
✗ Branch 1 not taken.
132379 lock.lock();
909 }
910 2287 return n;
911 2292 }
912
913 inline std::size_t
914 2 kqueue_scheduler::run_one()
915 {
916
1/2
✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
2 if (outstanding_work_.load(std::memory_order_acquire) == 0)
917 {
918 stop();
919 return 0;
920 }
921
922 2 kqueue::thread_context_guard ctx(this);
923
1/2
✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
2 std::unique_lock lock(mutex_);
924
1/2
✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
2 return do_one(lock, -1, &ctx.frame_);
925 2 }
926
927 inline std::size_t
928 36 kqueue_scheduler::wait_one(long usec)
929 {
930
2/2
✓ Branch 0 taken 28 times.
✓ Branch 1 taken 8 times.
36 if (outstanding_work_.load(std::memory_order_acquire) == 0)
931 {
932 8 stop();
933 8 return 0;
934 }
935
936 28 kqueue::thread_context_guard ctx(this);
937
1/2
✓ Branch 0 taken 28 times.
✗ Branch 1 not taken.
28 std::unique_lock lock(mutex_);
938
1/2
✓ Branch 0 taken 28 times.
✗ Branch 1 not taken.
28 return do_one(lock, usec, &ctx.frame_);
939 36 }
940
941 inline std::size_t
942 3 kqueue_scheduler::poll()
943 {
944
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 1 time.
3 if (outstanding_work_.load(std::memory_order_acquire) == 0)
945 {
946 1 stop();
947 1 return 0;
948 }
949
950 2 kqueue::thread_context_guard ctx(this);
951
1/2
✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
2 std::unique_lock lock(mutex_);
952
953 2 std::size_t n = 0;
954 5 for (;;)
955 {
956
3/4
✓ Branch 0 taken 5 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
✓ Branch 3 taken 2 times.
5 if (!do_one(lock, 0, &ctx.frame_))
957 2 break;
958
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if (n != (std::numeric_limits<std::size_t>::max)())
959 3 ++n;
960
1/2
✓ Branch 0 taken 3 times.
✗ Branch 1 not taken.
3 if (!lock.owns_lock())
961
1/2
✓ Branch 0 taken 3 times.
✗ Branch 1 not taken.
3 lock.lock();
962 }
963 2 return n;
964 3 }
965
966 inline std::size_t
967 4 kqueue_scheduler::poll_one()
968 {
969
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 2 times.
4 if (outstanding_work_.load(std::memory_order_acquire) == 0)
970 {
971 2 stop();
972 2 return 0;
973 }
974
975 2 kqueue::thread_context_guard ctx(this);
976
1/2
✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
2 std::unique_lock lock(mutex_);
977
1/2
✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
2 return do_one(lock, 0, &ctx.frame_);
978 4 }
979
980 inline void
981 11848 kqueue_scheduler::register_descriptor(int fd, descriptor_state* desc) const
982 {
983 struct kevent changes[2];
984 11848 EV_SET(
985 &changes[0], static_cast<uintptr_t>(fd), EVFILT_READ, EV_ADD | EV_CLEAR,
986 0, 0, desc);
987 11848 EV_SET(
988 &changes[1], static_cast<uintptr_t>(fd), EVFILT_WRITE,
989 EV_ADD | EV_CLEAR, 0, 0, desc);
990
991
1/2
✓ Branch 0 taken 11848 times.
✗ Branch 1 not taken.
11848 if (::kevent(kq_fd_, changes, 2, nullptr, 0, nullptr) < 0)
992 detail::throw_system_error(make_err(errno), "kevent (register)");
993
994 11848 desc->registered_events = kqueue_event_read | kqueue_event_write;
995 11848 desc->fd = fd;
996 11848 desc->scheduler_ = this;
997
998 11848 std::lock_guard lock(desc->mutex);
999 11848 desc->read_ready = false;
1000 11848 desc->write_ready = false;
1001 11848 }
1002
1003 inline void
1004 kqueue_scheduler::deregister_descriptor(int fd) const
1005 {
1006 struct kevent changes[2];
1007 EV_SET(
1008 &changes[0], static_cast<uintptr_t>(fd), EVFILT_READ, EV_DELETE, 0, 0,
1009 nullptr);
1010 EV_SET(
1011 &changes[1], static_cast<uintptr_t>(fd), EVFILT_WRITE, EV_DELETE, 0, 0,
1012 nullptr);
1013 // Ignore errors - fd may already be closed (kqueue auto-removes on close)
1014 ::kevent(kq_fd_, changes, 2, nullptr, 0, nullptr);
1015 }
1016
1017 inline void
1018 70872 kqueue_scheduler::work_started() noexcept
1019 {
1020 70872 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
1021 70872 }
1022
1023 inline void
1024 81923 kqueue_scheduler::work_finished() noexcept
1025 {
1026
2/2
✓ Branch 0 taken 79654 times.
✓ Branch 1 taken 2269 times.
81923 if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
1027
1/2
✓ Branch 0 taken 2269 times.
✗ Branch 1 not taken.
2269 stop();
1028 81923 }
1029
1030 inline void
1031 61937 kqueue_scheduler::compensating_work_started() const noexcept
1032 {
1033 61937 auto* ctx = kqueue::find_context(this);
1034
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 61937 times.
61937 if (ctx)
1035 61937 ++ctx->private_outstanding_work;
1036 61937 }
1037
1038 inline void
1039 kqueue_scheduler::drain_thread_queue(op_queue& queue, std::int64_t count) const
1040 {
1041 // Flush private work count to global counter — private posts
1042 // only incremented the thread-local counter, not outstanding_work_
1043 if (count > 0)
1044 outstanding_work_.fetch_add(count, std::memory_order_relaxed);
1045
1046 std::unique_lock lock(mutex_);
1047 completed_ops_.splice(queue);
1048 if (count > 0)
1049 maybe_unlock_and_signal_one(lock);
1050 }
1051
1052 inline void
1053 59389 kqueue_scheduler::post_deferred_completions(op_queue& ops) const
1054 {
1055
1/2
✓ Branch 0 taken 59389 times.
✗ Branch 1 not taken.
59389 if (ops.empty())
1056 59389 return;
1057
1058 // Fast path: if on scheduler thread, use private queue
1059 if (auto* ctx = kqueue::find_context(this))
1060 {
1061 ctx->private_queue.splice(ops);
1062 return;
1063 }
1064
1065 // Slow path: add to global queue and wake a thread
1066 std::unique_lock lock(mutex_);
1067 completed_ops_.splice(ops);
1068 wake_one_thread_and_unlock(lock);
1069 59389 }
1070
1071 inline void
1072 8055 kqueue_scheduler::interrupt_reactor() const
1073 {
1074 // Only trigger if not already armed to avoid redundant triggers.
1075 // acq_rel: release makes the true store visible to the reactor;
1076 // acquire on failure sees the reactor's release store of false,
1077 // preventing a stale-true read that would silently drop the trigger.
1078 // On x86 (TSO) this compiles to the same LOCK CMPXCHG as before.
1079 8055 bool expected = false;
1080
2/2
✓ Branch 0 taken 211 times.
✓ Branch 1 taken 7844 times.
8055 if (user_event_armed_.compare_exchange_strong(
1081 expected, true, std::memory_order_acq_rel,
1082 std::memory_order_acquire))
1083 {
1084 struct kevent ev;
1085 7844 EV_SET(&ev, 0, EVFILT_USER, 0, NOTE_TRIGGER, 0, nullptr);
1086 7844 ::kevent(kq_fd_, &ev, 1, nullptr, 0, nullptr);
1087 7844 }
1088 8055 }
1089
1090 inline void
1091 2575 kqueue_scheduler::signal_all(std::unique_lock<std::mutex>&) const
1092 {
1093 2575 state_ |= signaled_bit;
1094 2575 cond_.notify_all();
1095 2575 }
1096
1097 inline bool
1098 74522 kqueue_scheduler::maybe_unlock_and_signal_one(
1099 std::unique_lock<std::mutex>& lock) const
1100 {
1101 74522 state_ |= signaled_bit;
1102
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 74522 times.
74522 if (state_ > signaled_bit)
1103 {
1104 lock.unlock();
1105 cond_.notify_one();
1106 return true;
1107 }
1108 74522 return false;
1109 74522 }
1110
1111 inline bool
1112 320360 kqueue_scheduler::unlock_and_signal_one(
1113 std::unique_lock<std::mutex>& lock) const
1114 {
1115 320360 state_ |= signaled_bit;
1116 320360 bool have_waiters = state_ > signaled_bit;
1117 320360 lock.unlock();
1118
1/2
✓ Branch 0 taken 320360 times.
✗ Branch 1 not taken.
320360 if (have_waiters)
1119 cond_.notify_one();
1120 320360 return have_waiters;
1121 }
1122
1123 inline void
1124 5 kqueue_scheduler::clear_signal() const
1125 {
1126 5 state_ &= ~signaled_bit;
1127 5 }
1128
1129 inline void
1130 5 kqueue_scheduler::wait_for_signal(std::unique_lock<std::mutex>& lock) const
1131 {
1132
2/2
✓ Branch 0 taken 5 times.
✓ Branch 1 taken 5 times.
10 while ((state_ & signaled_bit) == 0)
1133 {
1134 5 state_ += waiter_increment;
1135 5 cond_.wait(lock);
1136 5 state_ -= waiter_increment;
1137 }
1138 5 }
1139
1140 inline void
1141 kqueue_scheduler::wait_for_signal_for(
1142 std::unique_lock<std::mutex>& lock, long timeout_us) const
1143 {
1144 if ((state_ & signaled_bit) == 0)
1145 {
1146 state_ += waiter_increment;
1147 cond_.wait_for(lock, std::chrono::microseconds(timeout_us));
1148 state_ -= waiter_increment;
1149 }
1150 }
1151
1152 inline void
1153 6200 kqueue_scheduler::wake_one_thread_and_unlock(
1154 std::unique_lock<std::mutex>& lock) const
1155 {
1156
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 6200 times.
6200 if (maybe_unlock_and_signal_one(lock))
1157 return;
1158
1159
3/4
✓ Branch 0 taken 27 times.
✓ Branch 1 taken 6173 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 27 times.
6200 if (task_running_ && !task_interrupted_)
1160 {
1161 27 task_interrupted_ = true;
1162 27 lock.unlock();
1163 27 interrupt_reactor();
1164 27 }
1165 else
1166 {
1167 6173 lock.unlock();
1168 }
1169 6200 }
1170
1171 inline long
1172 53755 kqueue_scheduler::calculate_timeout(long requested_timeout_us) const
1173 {
1174
1/2
✓ Branch 0 taken 53755 times.
✗ Branch 1 not taken.
53755 if (requested_timeout_us == 0)
1175 return 0;
1176
1177 53755 auto nearest = timer_svc_->nearest_expiry();
1178
2/2
✓ Branch 0 taken 1780 times.
✓ Branch 1 taken 51975 times.
53755 if (nearest == timer_service::time_point::max())
1179 1780 return requested_timeout_us;
1180
1181 51975 auto now = std::chrono::steady_clock::now();
1182
2/2
✓ Branch 0 taken 108 times.
✓ Branch 1 taken 51867 times.
51975 if (nearest <= now)
1183 108 return 0;
1184
1185 51867 auto timer_timeout_us =
1186 51867 std::chrono::duration_cast<std::chrono::microseconds>(nearest - now)
1187 51867 .count();
1188
1189 // Clamp to [0, LONG_MAX] to prevent truncation on 32-bit long platforms
1190 51867 constexpr auto long_max =
1191 static_cast<long long>((std::numeric_limits<long>::max)());
1192 51867 auto capped_timer_us = std::min(
1193 51867 std::max(timer_timeout_us, static_cast<long long>(0)), long_max);
1194
1195
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 51867 times.
51867 if (requested_timeout_us < 0)
1196 51867 return static_cast<long>(capped_timer_us);
1197
1198 // requested_timeout_us is already long, so min() result fits in long
1199 return static_cast<long>(std::min(
1200 static_cast<long long>(requested_timeout_us), capped_timer_us));
1201 53755 }
1202
1203 563639 inline kqueue_scheduler::work_cleanup::~work_cleanup()
1204 281805 {
1205
1/2
✓ Branch 0 taken 281834 times.
✗ Branch 1 not taken.
281834 if (ctx)
1206 {
1207 281834 std::int64_t produced = ctx->private_outstanding_work;
1208
2/2
✓ Branch 0 taken 8 times.
✓ Branch 1 taken 281826 times.
281834 if (produced > 1)
1209 16 scheduler->outstanding_work_.fetch_add(
1210 8 produced - 1, std::memory_order_relaxed);
1211
2/2
✓ Branch 0 taken 211386 times.
✓ Branch 1 taken 70440 times.
281826 else if (produced < 1)
1212 70440 scheduler->work_finished();
1213 281834 ctx->private_outstanding_work = 0;
1214
1215
2/2
✓ Branch 0 taken 132403 times.
✓ Branch 1 taken 149431 times.
281834 if (!ctx->private_queue.empty())
1216 {
1217
1/2
✓ Branch 0 taken 149431 times.
✗ Branch 1 not taken.
149431 lock->lock();
1218 149431 scheduler->completed_ops_.splice(ctx->private_queue);
1219 149431 }
1220 281834 }
1221 else
1222 {
1223 scheduler->work_finished();
1224 }
1225 563639 }
1226
1227 184582 inline kqueue_scheduler::task_cleanup::~task_cleanup()
1228 92291 {
1229
2/4
✓ Branch 0 taken 92291 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 92291 times.
✗ Branch 3 not taken.
92291 if (ctx && ctx->private_outstanding_work > 0)
1230 {
1231 scheduler->outstanding_work_.fetch_add(
1232 ctx->private_outstanding_work, std::memory_order_relaxed);
1233 ctx->private_outstanding_work = 0;
1234 }
1235 184582 }
1236
1237 inline void
1238 92291 kqueue_scheduler::run_task(
1239 std::unique_lock<std::mutex>& lock, kqueue::scheduler_context* ctx)
1240 {
1241
2/2
✓ Branch 0 taken 38536 times.
✓ Branch 1 taken 53755 times.
92291 long effective_timeout_us = task_interrupted_ ? 0 : calculate_timeout(-1);
1242
1243
2/2
✓ Branch 0 taken 38536 times.
✓ Branch 1 taken 53755 times.
92291 if (lock.owns_lock())
1244 53755 lock.unlock();
1245
1246 // Flush private work count when reactor completes
1247 92291 task_cleanup on_exit{this, ctx};
1248 (void)on_exit;
1249
1250 // Convert timeout to timespec for kevent()
1251 struct timespec ts;
1252 92291 struct timespec* ts_ptr = nullptr;
1253
2/2
✓ Branch 0 taken 1780 times.
✓ Branch 1 taken 90511 times.
92291 if (effective_timeout_us >= 0)
1254 {
1255 90511 ts.tv_sec = effective_timeout_us / 1000000;
1256 90511 ts.tv_nsec = (effective_timeout_us % 1000000) * 1000;
1257 90511 ts_ptr = &ts;
1258 90511 }
1259
1260 // Event loop runs without mutex held
1261 struct kevent events[128];
1262
1/2
✓ Branch 0 taken 92291 times.
✗ Branch 1 not taken.
92291 int nev = ::kevent(kq_fd_, nullptr, 0, events, 128, ts_ptr);
1263
1/2
✓ Branch 0 taken 92291 times.
✗ Branch 1 not taken.
92291 int saved_errno = errno;
1264
1265
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 92291 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
92291 if (nev < 0 && saved_errno != EINTR)
1266 detail::throw_system_error(make_err(saved_errno), "kevent");
1267
1268 92291 op_queue local_ops;
1269 92291 std::int64_t completions_queued = 0;
1270
1271 // Process events without holding the mutex
1272
2/2
✓ Branch 0 taken 92291 times.
✓ Branch 1 taken 129019 times.
221310 for (int i = 0; i < nev; ++i)
1273 {
1274
2/2
✓ Branch 0 taken 7545 times.
✓ Branch 1 taken 121474 times.
129019 if (events[i].filter == EVFILT_USER)
1275 {
1276 // Interrupt event - clear the armed flag.
1277 // Release pairs with the acquire CAS failure path in
1278 // interrupt_reactor(), ensuring the reactor sees our
1279 // store of false and can re-arm the EVFILT_USER trigger.
1280 // On x86 (TSO) this compiles identically to relaxed.
1281 7545 user_event_armed_.store(false, std::memory_order_release);
1282 7545 continue;
1283 }
1284
1285 121474 auto* desc = static_cast<descriptor_state*>(events[i].udata);
1286
1/2
✓ Branch 0 taken 121474 times.
✗ Branch 1 not taken.
121474 if (!desc)
1287 continue;
1288
1289 // Map kqueue events to ready-event flags
1290 121474 std::uint32_t ready = 0;
1291
1292
2/2
✓ Branch 0 taken 61022 times.
✓ Branch 1 taken 60452 times.
121474 if (events[i].filter == EVFILT_READ)
1293 60452 ready |= kqueue_event_read;
1294
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 61022 times.
61022 else if (events[i].filter == EVFILT_WRITE)
1295 61022 ready |= kqueue_event_write;
1296
1297
1/2
✓ Branch 0 taken 121474 times.
✗ Branch 1 not taken.
121474 if (events[i].flags & EV_ERROR)
1298 ready |= kqueue_event_error;
1299
1300 // EV_EOF: peer closed or error condition
1301
2/2
✓ Branch 0 taken 121444 times.
✓ Branch 1 taken 30 times.
121474 if (events[i].flags & EV_EOF)
1302 {
1303 // EV_EOF on a read filter means the peer closed — deliver as
1304 // a read event so the read returns 0 (EOF)
1305
2/2
✓ Branch 0 taken 9 times.
✓ Branch 1 taken 21 times.
30 if (events[i].filter == EVFILT_READ)
1306 21 ready |= kqueue_event_read;
1307 // fflags contains the socket error (if any) when EV_EOF is set
1308
2/2
✓ Branch 0 taken 18 times.
✓ Branch 1 taken 12 times.
30 if (events[i].fflags != 0)
1309 12 ready |= kqueue_event_error;
1310 30 }
1311
1312 121474 desc->add_ready_events(ready);
1313
1314 // Only enqueue if not already enqueued.
1315 // acq_rel on success: release makes add_ready_events visible
1316 // to the consumer's acquire exchange; acquire pairs with the
1317 // consumer's release store of false so we read the latest
1318 // value. acquire on failure: ensures the CAS load sees the
1319 // consumer's release store on ARM (prevents stale reads from
1320 // the store buffer). On x86 (TSO) these compile identically
1321 // to the weaker orderings.
1322 121474 bool expected = false;
1323
2/2
✓ Branch 0 taken 70 times.
✓ Branch 1 taken 121404 times.
121474 if (desc->is_enqueued_.compare_exchange_strong(
1324 expected, true, std::memory_order_acq_rel,
1325 std::memory_order_acquire))
1326 {
1327 121404 local_ops.push(desc);
1328 121404 ++completions_queued;
1329 121404 }
1330 121474 }
1331
1332 // Process timers after kevent returns
1333
1/2
✓ Branch 0 taken 92291 times.
✗ Branch 1 not taken.
92291 timer_svc_->process_expired();
1334
1335 // --- Acquire mutex only for queue operations ---
1336
1/2
✓ Branch 0 taken 92291 times.
✗ Branch 1 not taken.
92291 lock.lock();
1337
1338
2/2
✓ Branch 0 taken 28784 times.
✓ Branch 1 taken 63507 times.
92291 if (!local_ops.empty())
1339 63507 completed_ops_.splice(local_ops);
1340
1341 // Drain private queue to global — flush work count BEFORE splicing
1342 // so consumer threads can't decrement outstanding_work_ to zero
1343 // before the count reflects the newly visible operations.
1344
3/4
✓ Branch 0 taken 92291 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 87427 times.
✓ Branch 3 taken 4864 times.
92291 if (ctx && !ctx->private_queue.empty())
1345 {
1346
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4864 times.
4864 if (ctx->private_outstanding_work > 0)
1347 {
1348 9728 outstanding_work_.fetch_add(
1349 4864 ctx->private_outstanding_work, std::memory_order_relaxed);
1350 4864 completions_queued += ctx->private_outstanding_work;
1351 4864 ctx->private_outstanding_work = 0;
1352 4864 }
1353 4864 completed_ops_.splice(ctx->private_queue);
1354 4864 }
1355
1356 // Signal and wake one waiter if work is queued
1357
2/2
✓ Branch 0 taken 68322 times.
✓ Branch 1 taken 23969 times.
92291 if (completions_queued > 0)
1358 {
1359
2/4
✓ Branch 0 taken 68322 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 68322 times.
68322 if (maybe_unlock_and_signal_one(lock))
1360 lock.lock();
1361 68322 }
1362 92291 }
1363
1364 inline std::size_t
1365 284123 kqueue_scheduler::do_one(
1366 std::unique_lock<std::mutex>& lock,
1367 long timeout_us,
1368 kqueue::scheduler_context* ctx)
1369 {
1370 284128 for (;;)
1371 {
1372
2/2
✓ Branch 0 taken 374132 times.
✓ Branch 1 taken 2287 times.
376419 if (stopped_.load(std::memory_order_relaxed))
1373 2287 return 0;
1374
1375 374132 scheduler_op* op = completed_ops_.pop();
1376
1377 // Handle reactor sentinel - time to poll for I/O
1378
2/2
✓ Branch 0 taken 281841 times.
✓ Branch 1 taken 92291 times.
374132 if (op == &task_op_)
1379 {
1380 92291 bool more_handlers =
1381
3/4
✓ Branch 0 taken 38536 times.
✓ Branch 1 taken 53755 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 53755 times.
92291 !completed_ops_.empty() || (ctx && !ctx->private_queue.empty());
1382
1383 // Nothing to run the reactor for: no pending work to wait on,
1384 // or caller requested a non-blocking poll
1385
3/4
✓ Branch 0 taken 53755 times.
✓ Branch 1 taken 38536 times.
✓ Branch 2 taken 53755 times.
✗ Branch 3 not taken.
146046 if (!more_handlers &&
1386
1/2
✓ Branch 0 taken 53755 times.
✗ Branch 1 not taken.
53755 (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1387 53755 timeout_us == 0))
1388 {
1389 completed_ops_.push(&task_op_);
1390 return 0;
1391 }
1392
1393
2/2
✓ Branch 0 taken 38536 times.
✓ Branch 1 taken 53755 times.
92291 task_interrupted_ = more_handlers || timeout_us == 0;
1394 92291 task_running_ = true;
1395
1396
2/2
✓ Branch 0 taken 53755 times.
✓ Branch 1 taken 38536 times.
92291 if (more_handlers)
1397 38536 unlock_and_signal_one(lock);
1398
1399 try
1400 {
1401
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 92291 times.
92291 run_task(lock, ctx);
1402 92291 }
1403 catch (...)
1404 {
1405 task_running_ = false;
1406 throw;
1407 }
1408
1409 92291 task_running_ = false;
1410 92291 completed_ops_.push(&task_op_);
1411 92291 continue;
1412 }
1413
1414 // Handle operation
1415
2/2
✓ Branch 0 taken 7 times.
✓ Branch 1 taken 281834 times.
281841 if (op != nullptr)
1416 {
1417 281834 bool more = !completed_ops_.empty();
1418
1419
2/2
✓ Branch 0 taken 281824 times.
✓ Branch 1 taken 10 times.
281834 if (more)
1420 281824 ctx->unassisted = !unlock_and_signal_one(lock);
1421 else
1422 {
1423 10 ctx->unassisted = false;
1424 10 lock.unlock();
1425 }
1426
1427 281834 work_cleanup on_exit{this, &lock, ctx};
1428 (void)on_exit;
1429
1430
1/2
✓ Branch 0 taken 281834 times.
✗ Branch 1 not taken.
281834 (*op)();
1431 281834 return 1;
1432 281834 }
1433
1434 // No work from global queue - try private queue before blocking
1435
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7 times.
7 if (kqueue::drain_private_queue(ctx, outstanding_work_, completed_ops_))
1436 continue;
1437
1438 // No pending work to wait on, or caller requested non-blocking poll
1439
3/4
✓ Branch 0 taken 5 times.
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 5 times.
7 if (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1440 5 timeout_us == 0)
1441 2 return 0;
1442
1443 5 clear_signal();
1444
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 5 times.
5 if (timeout_us < 0)
1445 5 wait_for_signal(lock);
1446 else
1447 wait_for_signal_for(lock, timeout_us);
1448 }
1449 284123 }
1450
1451 } // namespace boost::corosio::detail
1452
1453 #endif // BOOST_COROSIO_HAS_KQUEUE
1454
1455 #endif // BOOST_COROSIO_NATIVE_DETAIL_KQUEUE_KQUEUE_SCHEDULER_HPP
1456