include/boost/corosio/native/detail/select/select_scheduler.hpp

76.4% Lines (295/386) 86.8% Functions (33/38) 49.7% Branches (162/326)
include/boost/corosio/native/detail/select/select_scheduler.hpp
Line Branch TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_SELECT
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/capy/ex/execution_context.hpp>
19
20 #include <boost/corosio/native/native_scheduler.hpp>
21 #include <boost/corosio/detail/scheduler_op.hpp>
22
23 #include <boost/corosio/native/detail/select/select_op.hpp>
24 #include <boost/corosio/detail/timer_service.hpp>
25 #include <boost/corosio/detail/make_err.hpp>
26 #include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
27 #include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
28
29 #include <boost/corosio/detail/except.hpp>
30 #include <boost/corosio/detail/thread_local_ptr.hpp>
31
32 #include <sys/select.h>
33 #include <sys/socket.h>
34 #include <unistd.h>
35 #include <errno.h>
36 #include <fcntl.h>
37
38 #include <algorithm>
39 #include <atomic>
40 #include <chrono>
41 #include <condition_variable>
42 #include <cstddef>
43 #include <limits>
44 #include <mutex>
45 #include <unordered_map>
46
47 namespace boost::corosio::detail {
48
49 struct select_op;
50
51 /** POSIX scheduler using select() for I/O multiplexing.
52
53 This scheduler implements the scheduler interface using the POSIX select()
54 call for I/O event notification. It uses a single reactor model
55 where one thread runs select() while other threads wait on a condition
56 variable for handler work. This design provides:
57
58 - Handler parallelism: N posted handlers can execute on N threads
59 - No thundering herd: condition_variable wakes exactly one thread
60 - Portability: Works on all POSIX systems
61
62 The design mirrors epoll_scheduler for behavioral consistency:
63 - Same single-reactor thread coordination model
64 - Same work counting semantics
65 - Same timer integration pattern
66
67 Known Limitations:
68 - FD_SETSIZE (~1024) limits maximum concurrent connections
69 - O(n) scanning: rebuilds fd_sets each iteration
70 - Level-triggered only (no edge-triggered mode)
71
72 @par Thread Safety
73 All public member functions are thread-safe.
74 */
75 class BOOST_COROSIO_DECL select_scheduler final
76 : public native_scheduler
77 , public capy::execution_context::service
78 {
79 public:
80 using key_type = scheduler;
81
82 /** Construct the scheduler.
83
84 Creates a self-pipe for reactor interruption.
85
86 @param ctx Reference to the owning execution_context.
87 @param concurrency_hint Hint for expected thread count (unused).
88 */
89 select_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
90
91 ~select_scheduler() override;
92
93 select_scheduler(select_scheduler const&) = delete;
94 select_scheduler& operator=(select_scheduler const&) = delete;
95
96 void shutdown() override;
97 void post(std::coroutine_handle<> h) const override;
98 void post(scheduler_op* h) const override;
99 bool running_in_this_thread() const noexcept override;
100 void stop() override;
101 bool stopped() const noexcept override;
102 void restart() override;
103 std::size_t run() override;
104 std::size_t run_one() override;
105 std::size_t wait_one(long usec) override;
106 std::size_t poll() override;
107 std::size_t poll_one() override;
108
109 /** Return the maximum file descriptor value supported.
110
111 Returns FD_SETSIZE - 1, the maximum fd value that can be
112 monitored by select(). Operations with fd >= FD_SETSIZE
113 will fail with EINVAL.
114
115 @return The maximum supported file descriptor value.
116 */
117 static constexpr int max_fd() noexcept
118 {
119 return FD_SETSIZE - 1;
120 }
121
122 /** Register a file descriptor for monitoring.
123
124 @param fd The file descriptor to register.
125 @param op The operation associated with this fd.
126 @param events Event mask: 1 = read, 2 = write, 3 = both.
127 */
128 void register_fd(int fd, select_op* op, int events) const;
129
130 /** Unregister a file descriptor from monitoring.
131
132 @param fd The file descriptor to unregister.
133 @param events Event mask to remove: 1 = read, 2 = write, 3 = both.
134 */
135 void deregister_fd(int fd, int events) const;
136
137 void work_started() noexcept override;
138 void work_finished() noexcept override;
139
140 // Event flags for register_fd/deregister_fd
141 static constexpr int event_read = 1;
142 static constexpr int event_write = 2;
143
144 private:
145 std::size_t do_one(long timeout_us);
146 void run_reactor(std::unique_lock<std::mutex>& lock);
147 void wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const;
148 void interrupt_reactor() const;
149 long calculate_timeout(long requested_timeout_us) const;
150
151 // Self-pipe for interrupting select()
152 int pipe_fds_[2]; // [0]=read, [1]=write
153
154 mutable std::mutex mutex_;
155 mutable std::condition_variable wakeup_event_;
156 mutable op_queue completed_ops_;
157 mutable std::atomic<long> outstanding_work_;
158 std::atomic<bool> stopped_;
159 bool shutdown_;
160
161 // Per-fd state for tracking registered operations
162 35693 struct fd_state
163 {
164 35693 select_op* read_op = nullptr;
165 35693 select_op* write_op = nullptr;
166 };
167 mutable std::unordered_map<int, fd_state> registered_fds_;
168 mutable int max_fd_ = -1;
169
170 // Single reactor thread coordination
171 mutable bool reactor_running_ = false;
172 mutable bool reactor_interrupted_ = false;
173 mutable int idle_thread_count_ = 0;
174
175 // Sentinel operation for interleaving reactor runs with handler execution.
176 // Ensures the reactor runs periodically even when handlers are continuously
177 // posted, preventing timer starvation.
178 struct task_op final : scheduler_op
179 {
180 void operator()() override {}
181 void destroy() override {}
182 };
183 task_op task_op_;
184 };
185
186 /*
187 select Scheduler - Single Reactor Model
188 =======================================
189
190 This scheduler mirrors the epoll_scheduler design but uses select() instead
191 of epoll for I/O multiplexing. The thread coordination strategy is identical:
192 one thread becomes the "reactor" while others wait on a condition variable.
193
194 Thread Model
195 ------------
196 - ONE thread runs select() at a time (the reactor thread)
197 - OTHER threads wait on wakeup_event_ (condition variable) for handlers
198 - When work is posted, exactly one waiting thread wakes via notify_one()
199
200 Key Differences from epoll
201 --------------------------
202 - Uses self-pipe instead of eventfd for interruption (more portable)
203 - fd_set rebuilding each iteration (O(n) vs O(1) for epoll)
204 - FD_SETSIZE limit (~1024 fds on most systems)
205 - Level-triggered only (no edge-triggered mode)
206
207 Self-Pipe Pattern
208 -----------------
209 To interrupt a blocking select() call (e.g., when work is posted or a timer
210 expires), we write a byte to pipe_fds_[1]. The read end pipe_fds_[0] is
211 always in the read_fds set, so select() returns immediately. We drain the
212 pipe to clear the readable state.
213
214 fd-to-op Mapping
215 ----------------
216 We use an unordered_map<int, fd_state> to track which operations are
217 registered for each fd. This allows O(1) lookup when select() returns
218 ready fds. Each fd can have at most one read op and one write op registered.
219 */
220
221 namespace select {
222
223 struct BOOST_COROSIO_SYMBOL_VISIBLE scheduler_context
224 {
225 select_scheduler const* key;
226 scheduler_context* next;
227 };
228
229 inline thread_local_ptr<scheduler_context> context_stack;
230
231 struct thread_context_guard
232 {
233 scheduler_context frame_;
234
235 268 explicit thread_context_guard(select_scheduler const* ctx) noexcept
236 134 : frame_{ctx, context_stack.get()}
237 134 {
238 134 context_stack.set(&frame_);
239 268 }
240
241 268 ~thread_context_guard() noexcept
242 134 {
243 134 context_stack.set(frame_.next);
244 268 }
245 };
246
247 struct work_guard
248 {
249 select_scheduler* self;
250 1479316 ~work_guard()
251 739658 {
252 739658 self->work_finished();
253 1479316 }
254 };
255
256 } // namespace select
257
258 966 inline select_scheduler::select_scheduler(capy::execution_context& ctx, int)
259 161 : pipe_fds_{-1, -1}
260 161 , outstanding_work_(0)
261 161 , stopped_(false)
262 161 , shutdown_(false)
263 161 , max_fd_(-1)
264 161 , reactor_running_(false)
265 161 , reactor_interrupted_(false)
266 161 , idle_thread_count_(0)
267 483 {
268 // Create self-pipe for interrupting select()
269
2/4
✓ Branch 0 taken 161 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 161 times.
161 if (::pipe(pipe_fds_) < 0)
270 detail::throw_system_error(make_err(errno), "pipe");
271
272 // Set both ends to non-blocking and close-on-exec
273
2/2
✓ Branch 0 taken 161 times.
✓ Branch 1 taken 322 times.
483 for (int i = 0; i < 2; ++i)
274 {
275
1/2
✓ Branch 0 taken 322 times.
✗ Branch 1 not taken.
322 int flags = ::fcntl(pipe_fds_[i], F_GETFL, 0);
276
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 322 times.
322 if (flags == -1)
277 {
278 int errn = errno;
279 ::close(pipe_fds_[0]);
280 ::close(pipe_fds_[1]);
281 detail::throw_system_error(make_err(errn), "fcntl F_GETFL");
282 }
283
2/4
✓ Branch 0 taken 322 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 322 times.
322 if (::fcntl(pipe_fds_[i], F_SETFL, flags | O_NONBLOCK) == -1)
284 {
285 int errn = errno;
286 ::close(pipe_fds_[0]);
287 ::close(pipe_fds_[1]);
288 detail::throw_system_error(make_err(errn), "fcntl F_SETFL");
289 }
290
2/4
✓ Branch 0 taken 322 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 322 times.
322 if (::fcntl(pipe_fds_[i], F_SETFD, FD_CLOEXEC) == -1)
291 {
292 int errn = errno;
293 ::close(pipe_fds_[0]);
294 ::close(pipe_fds_[1]);
295 detail::throw_system_error(make_err(errn), "fcntl F_SETFD");
296 }
297 322 }
298
299
1/2
✓ Branch 0 taken 161 times.
✗ Branch 1 not taken.
161 timer_svc_ = &get_timer_service(ctx, *this);
300
2/4
✓ Branch 0 taken 161 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 161 times.
✗ Branch 3 not taken.
322 timer_svc_->set_on_earliest_changed(
301 1945 timer_service::callback(this, [](void* p) {
302 1784 static_cast<select_scheduler*>(p)->interrupt_reactor();
303 1784 }));
304
305 // Initialize resolver service
306
1/2
✓ Branch 0 taken 161 times.
✗ Branch 1 not taken.
161 get_resolver_service(ctx, *this);
307
308 // Initialize signal service
309
1/2
✓ Branch 0 taken 161 times.
✗ Branch 1 not taken.
161 get_signal_service(ctx, *this);
310
311 // Push task sentinel to interleave reactor runs with handler execution
312 161 completed_ops_.push(&task_op_);
313 322 }
314
315 483 inline select_scheduler::~select_scheduler()
316 322 {
317
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 161 times.
161 if (pipe_fds_[0] >= 0)
318
1/2
✓ Branch 0 taken 161 times.
✗ Branch 1 not taken.
161 ::close(pipe_fds_[0]);
319
1/2
✓ Branch 0 taken 161 times.
✗ Branch 1 not taken.
161 if (pipe_fds_[1] >= 0)
320
1/2
✓ Branch 0 taken 161 times.
✗ Branch 1 not taken.
161 ::close(pipe_fds_[1]);
321 483 }
322
323 inline void
324 161 select_scheduler::shutdown()
325 {
326 {
327 161 std::unique_lock lock(mutex_);
328 161 shutdown_ = true;
329
330
2/2
✓ Branch 0 taken 161 times.
✓ Branch 1 taken 161 times.
322 while (auto* h = completed_ops_.pop())
331 {
332
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 161 times.
161 if (h == &task_op_)
333 161 continue;
334 lock.unlock();
335 h->destroy();
336 lock.lock();
337 }
338 161 }
339
340 161 outstanding_work_.store(0, std::memory_order_release);
341
342
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 161 times.
161 if (pipe_fds_[1] >= 0)
343 161 interrupt_reactor();
344
345 161 wakeup_event_.notify_all();
346 161 }
347
348 inline void
349 2028 select_scheduler::post(std::coroutine_handle<> h) const
350 {
351 struct post_handler final : scheduler_op
352 {
353 std::coroutine_handle<> h_;
354
355 4056 explicit post_handler(std::coroutine_handle<> h) : h_(h) {}
356
357 4056 ~post_handler() override = default;
358
359 2028 void operator()() override
360 {
361 2028 auto h = h_;
362
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2028 times.
2028 delete this;
363 2028 h.resume();
364 2028 }
365
366 void destroy() override
367 {
368 delete this;
369 }
370 };
371
372 2028 auto ph = std::make_unique<post_handler>(h);
373 2028 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
374
375
1/2
✓ Branch 0 taken 2028 times.
✗ Branch 1 not taken.
2028 std::unique_lock lock(mutex_);
376 2028 completed_ops_.push(ph.release());
377
1/2
✓ Branch 0 taken 2028 times.
✗ Branch 1 not taken.
2028 wake_one_thread_and_unlock(lock);
378 2028 }
379
380 inline void
381 701978 select_scheduler::post(scheduler_op* h) const
382 {
383 701978 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
384
385 701978 std::unique_lock lock(mutex_);
386 701978 completed_ops_.push(h);
387
1/2
✓ Branch 0 taken 701978 times.
✗ Branch 1 not taken.
701978 wake_one_thread_and_unlock(lock);
388 701978 }
389
390 inline bool
391 289 select_scheduler::running_in_this_thread() const noexcept
392 {
393
2/2
✓ Branch 0 taken 85 times.
✓ Branch 1 taken 204 times.
289 for (auto* c = select::context_stack.get(); c != nullptr; c = c->next)
394
1/2
✓ Branch 0 taken 85 times.
✗ Branch 1 not taken.
85 if (c->key == this)
395 85 return true;
396 204 return false;
397 289 }
398
399 inline void
400 114 select_scheduler::stop()
401 {
402 114 bool expected = false;
403
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 114 times.
114 if (stopped_.compare_exchange_strong(
404 expected, true, std::memory_order_release,
405 std::memory_order_relaxed))
406 {
407 // Wake all threads so they notice stopped_ and exit
408 {
409 114 std::lock_guard lock(mutex_);
410 114 wakeup_event_.notify_all();
411 114 }
412 114 interrupt_reactor();
413 114 }
414 114 }
415
416 inline bool
417 7 select_scheduler::stopped() const noexcept
418 {
419 7 return stopped_.load(std::memory_order_acquire);
420 }
421
422 inline void
423 41 select_scheduler::restart()
424 {
425 41 stopped_.store(false, std::memory_order_release);
426 41 }
427
428 inline std::size_t
429 108 select_scheduler::run()
430 {
431
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 108 times.
108 if (stopped_.load(std::memory_order_acquire))
432 return 0;
433
434
1/2
✓ Branch 0 taken 108 times.
✗ Branch 1 not taken.
108 if (outstanding_work_.load(std::memory_order_acquire) == 0)
435 {
436 stop();
437 return 0;
438 }
439
440 108 select::thread_context_guard ctx(this);
441
442 108 std::size_t n = 0;
443
3/4
✓ Branch 0 taken 739740 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 108 times.
✓ Branch 3 taken 739632 times.
739740 while (do_one(-1))
444
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 739632 times.
739632 if (n != (std::numeric_limits<std::size_t>::max)())
445 739632 ++n;
446 108 return n;
447 108 }
448
449 inline std::size_t
450 select_scheduler::run_one()
451 {
452 if (stopped_.load(std::memory_order_acquire))
453 return 0;
454
455 if (outstanding_work_.load(std::memory_order_acquire) == 0)
456 {
457 stop();
458 return 0;
459 }
460
461 select::thread_context_guard ctx(this);
462 return do_one(-1);
463 }
464
465 inline std::size_t
466 29 select_scheduler::wait_one(long usec)
467 {
468
2/2
✓ Branch 0 taken 4 times.
✓ Branch 1 taken 25 times.
29 if (stopped_.load(std::memory_order_acquire))
469 4 return 0;
470
471
1/2
✓ Branch 0 taken 25 times.
✗ Branch 1 not taken.
25 if (outstanding_work_.load(std::memory_order_acquire) == 0)
472 {
473 stop();
474 return 0;
475 }
476
477 25 select::thread_context_guard ctx(this);
478
1/2
✓ Branch 0 taken 25 times.
✗ Branch 1 not taken.
25 return do_one(usec);
479 29 }
480
481 inline std::size_t
482 1 select_scheduler::poll()
483 {
484
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 time.
1 if (stopped_.load(std::memory_order_acquire))
485 return 0;
486
487
1/2
✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
1 if (outstanding_work_.load(std::memory_order_acquire) == 0)
488 {
489 stop();
490 return 0;
491 }
492
493 1 select::thread_context_guard ctx(this);
494
495 1 std::size_t n = 0;
496
3/4
✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 1 time.
✓ Branch 3 taken 1 time.
2 while (do_one(0))
497
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 time.
1 if (n != (std::numeric_limits<std::size_t>::max)())
498 1 ++n;
499 1 return n;
500 1 }
501
502 inline std::size_t
503 select_scheduler::poll_one()
504 {
505 if (stopped_.load(std::memory_order_acquire))
506 return 0;
507
508 if (outstanding_work_.load(std::memory_order_acquire) == 0)
509 {
510 stop();
511 return 0;
512 }
513
514 select::thread_context_guard ctx(this);
515 return do_one(0);
516 }
517
518 inline void
519 35693 select_scheduler::register_fd(int fd, select_op* op, int events) const
520 {
521 // Validate fd is within select() limits
522
1/2
✓ Branch 0 taken 35693 times.
✗ Branch 1 not taken.
35693 if (fd < 0 || fd >= FD_SETSIZE)
523 detail::throw_system_error(make_err(EINVAL), "select: fd out of range");
524
525 {
526 35693 std::lock_guard lock(mutex_);
527
528
1/2
✓ Branch 0 taken 35693 times.
✗ Branch 1 not taken.
35693 auto& state = registered_fds_[fd];
529
2/2
✓ Branch 0 taken 1701 times.
✓ Branch 1 taken 33992 times.
35693 if (events & event_read)
530 33992 state.read_op = op;
531
2/2
✓ Branch 0 taken 33992 times.
✓ Branch 1 taken 1701 times.
35693 if (events & event_write)
532 1701 state.write_op = op;
533
534
2/2
✓ Branch 0 taken 35573 times.
✓ Branch 1 taken 120 times.
35693 if (fd > max_fd_)
535 120 max_fd_ = fd;
536 35693 }
537
538 // Wake the reactor so a thread blocked in select() rebuilds its fd_sets
539 // with the newly registered fd.
540 35693 interrupt_reactor();
541 35693 }
542
543 inline void
544 3503 select_scheduler::deregister_fd(int fd, int events) const
545 {
546 3503 std::lock_guard lock(mutex_);
547
548
1/2
✓ Branch 0 taken 3503 times.
✗ Branch 1 not taken.
3503 auto it = registered_fds_.find(fd);
549
3/4
✓ Branch 0 taken 3503 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 41 times.
✓ Branch 3 taken 3462 times.
3503 if (it == registered_fds_.end())
550 3462 return;
551
552
1/2
✓ Branch 0 taken 41 times.
✗ Branch 1 not taken.
41 if (events & event_read)
553
1/2
✓ Branch 0 taken 41 times.
✗ Branch 1 not taken.
41 it->second.read_op = nullptr;
554
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 41 times.
41 if (events & event_write)
555 it->second.write_op = nullptr;
556
557 // Remove entry if both are null
558
4/8
✓ Branch 0 taken 41 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 41 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 41 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 41 times.
✗ Branch 7 not taken.
41 if (!it->second.read_op && !it->second.write_op)
559 {
560
1/2
✓ Branch 0 taken 41 times.
✗ Branch 1 not taken.
41 registered_fds_.erase(it);
561
562 // Recalculate max_fd_ if needed
563
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 39 times.
41 if (fd == max_fd_)
564 {
565 39 max_fd_ = pipe_fds_[0]; // At minimum, the pipe read end
566
2/8
✓ Branch 0 taken 39 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 39 times.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✗ Branch 6 not taken.
✗ Branch 7 not taken.
39 for (auto& [registered_fd, state] : registered_fds_)
567 {
568 if (registered_fd > max_fd_)
569 max_fd_ = registered_fd;
570 }
571 39 }
572 41 }
573 3503 }
574
575 inline void
576 37789 select_scheduler::work_started() noexcept
577 {
578 37789 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
579 37789 }
580
581 inline void
582 741795 select_scheduler::work_finished() noexcept
583 {
584
2/2
✓ Branch 0 taken 741682 times.
✓ Branch 1 taken 113 times.
741795 if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
585
1/2
✓ Branch 0 taken 113 times.
✗ Branch 1 not taken.
113 stop();
586 741795 }
587
588 inline void
589 39518 select_scheduler::interrupt_reactor() const
590 {
591 39518 char byte = 1;
592 39518 [[maybe_unused]] auto r = ::write(pipe_fds_[1], &byte, 1);
593 39518 }
594
595 inline void
596 704006 select_scheduler::wake_one_thread_and_unlock(
597 std::unique_lock<std::mutex>& lock) const
598 {
599
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 704006 times.
704006 if (idle_thread_count_ > 0)
600 {
601 // Idle worker exists - wake it via condvar
602 wakeup_event_.notify_one();
603 lock.unlock();
604 }
605
4/4
✓ Branch 0 taken 1781 times.
✓ Branch 1 taken 702225 times.
✓ Branch 2 taken 15 times.
✓ Branch 3 taken 1766 times.
704006 else if (reactor_running_ && !reactor_interrupted_)
606 {
607 // No idle workers but reactor is running - interrupt it
608 1766 reactor_interrupted_ = true;
609 1766 lock.unlock();
610 1766 interrupt_reactor();
611 1766 }
612 else
613 {
614 // No one to wake
615 702240 lock.unlock();
616 }
617 704006 }
618
619 inline long
620 66260 select_scheduler::calculate_timeout(long requested_timeout_us) const
621 {
622
1/2
✓ Branch 0 taken 66260 times.
✗ Branch 1 not taken.
66260 if (requested_timeout_us == 0)
623 return 0;
624
625 66260 auto nearest = timer_svc_->nearest_expiry();
626
2/2
✓ Branch 0 taken 123 times.
✓ Branch 1 taken 66137 times.
66260 if (nearest == timer_service::time_point::max())
627 123 return requested_timeout_us;
628
629 66137 auto now = std::chrono::steady_clock::now();
630
2/2
✓ Branch 0 taken 16 times.
✓ Branch 1 taken 66121 times.
66137 if (nearest <= now)
631 16 return 0;
632
633 66121 auto timer_timeout_us =
634 66121 std::chrono::duration_cast<std::chrono::microseconds>(nearest - now)
635 66121 .count();
636
637 // Clamp to [0, LONG_MAX] to prevent truncation on 32-bit long platforms
638 66121 constexpr auto long_max =
639 static_cast<long long>((std::numeric_limits<long>::max)());
640 66121 auto capped_timer_us =
641 132242 (std::min)((std::max)(static_cast<long long>(timer_timeout_us),
642 66121 static_cast<long long>(0)),
643 long_max);
644
645
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 66121 times.
66121 if (requested_timeout_us < 0)
646 66121 return static_cast<long>(capped_timer_us);
647
648 // requested_timeout_us is already long, so min() result fits in long
649 return static_cast<long>(
650 (std::min)(static_cast<long long>(requested_timeout_us),
651 capped_timer_us));
652 66260 }
653
654 inline void
655 198766 select_scheduler::run_reactor(std::unique_lock<std::mutex>& lock)
656 {
657 // Calculate timeout considering timers, use 0 if interrupted
658 198766 long effective_timeout_us =
659
2/2
✓ Branch 0 taken 132506 times.
✓ Branch 1 taken 66260 times.
198766 reactor_interrupted_ ? 0 : calculate_timeout(-1);
660
661 // Build fd_sets from registered_fds_
662 fd_set read_fds, write_fds, except_fds;
663 198766 FD_ZERO(&read_fds);
664 198766 FD_ZERO(&write_fds);
665 198766 FD_ZERO(&except_fds);
666
667 // Always include the interrupt pipe
668 198766 FD_SET(pipe_fds_[0], &read_fds);
669 198766 int nfds = pipe_fds_[0];
670
671 // Add registered fds
672
2/2
✓ Branch 0 taken 73319 times.
✓ Branch 1 taken 198766 times.
272085 for (auto& [fd, state] : registered_fds_)
673 {
674
2/2
✓ Branch 0 taken 3257 times.
✓ Branch 1 taken 70062 times.
73319 if (state.read_op)
675 70062 FD_SET(fd, &read_fds);
676
2/2
✓ Branch 0 taken 70062 times.
✓ Branch 1 taken 3257 times.
73319 if (state.write_op)
677 {
678 3257 FD_SET(fd, &write_fds);
679 // Also monitor for errors on connect operations
680 3257 FD_SET(fd, &except_fds);
681 3257 }
682
2/2
✓ Branch 0 taken 4157 times.
✓ Branch 1 taken 69162 times.
73319 if (fd > nfds)
683 69162 nfds = fd;
684 }
685
686 // Convert timeout to timeval
687 struct timeval tv;
688 198766 struct timeval* tv_ptr = nullptr;
689
2/2
✓ Branch 0 taken 123 times.
✓ Branch 1 taken 198643 times.
198766 if (effective_timeout_us >= 0)
690 {
691 198643 tv.tv_sec = effective_timeout_us / 1000000;
692 198643 tv.tv_usec = effective_timeout_us % 1000000;
693 198643 tv_ptr = &tv;
694 198643 }
695
696 198766 lock.unlock();
697
698 198766 int ready = ::select(nfds + 1, &read_fds, &write_fds, &except_fds, tv_ptr);
699 198766 int saved_errno = errno;
700
701 // Process timers outside the lock
702 198766 timer_svc_->process_expired();
703
704
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 198766 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
198766 if (ready < 0 && saved_errno != EINTR)
705 detail::throw_system_error(make_err(saved_errno), "select");
706
707 // Re-acquire lock before modifying completed_ops_
708 198766 lock.lock();
709
710 // Drain the interrupt pipe if readable
711
4/4
✓ Branch 0 taken 66748 times.
✓ Branch 1 taken 132018 times.
✓ Branch 2 taken 29071 times.
✓ Branch 3 taken 37677 times.
198766 if (ready > 0 && FD_ISSET(pipe_fds_[0], &read_fds))
712 {
713 char buf[256];
714
2/2
✓ Branch 0 taken 37677 times.
✓ Branch 1 taken 37677 times.
75354 while (::read(pipe_fds_[0], buf, sizeof(buf)) > 0)
715 {
716 }
717 37677 }
718
719 // Process I/O completions
720 198766 int completions_queued = 0;
721
2/2
✓ Branch 0 taken 132018 times.
✓ Branch 1 taken 66748 times.
198766 if (ready > 0)
722 {
723 // Iterate over registered fds (copy keys to avoid iterator invalidation)
724 66748 std::vector<int> fds_to_check;
725
1/2
✓ Branch 0 taken 66748 times.
✗ Branch 1 not taken.
66748 fds_to_check.reserve(registered_fds_.size());
726
5/8
✓ Branch 0 taken 136622 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 69874 times.
✓ Branch 3 taken 66748 times.
✓ Branch 4 taken 69874 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 69874 times.
✗ Branch 7 not taken.
136622 for (auto& [fd, state] : registered_fds_)
727
1/2
✓ Branch 0 taken 69874 times.
✗ Branch 1 not taken.
69874 fds_to_check.push_back(fd);
728
729
2/2
✓ Branch 0 taken 66748 times.
✓ Branch 1 taken 69874 times.
136622 for (int fd : fds_to_check)
730 {
731
1/2
✓ Branch 0 taken 69874 times.
✗ Branch 1 not taken.
69874 auto it = registered_fds_.find(fd);
732
2/4
✓ Branch 0 taken 69874 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 69874 times.
✗ Branch 3 not taken.
69874 if (it == registered_fds_.end())
733 continue;
734
735
1/2
✓ Branch 0 taken 69874 times.
✗ Branch 1 not taken.
69874 auto& state = it->second;
736
737 // Check for errors (especially for connect operations)
738
1/2
✓ Branch 0 taken 69874 times.
✗ Branch 1 not taken.
69874 bool has_error = FD_ISSET(fd, &except_fds);
739
740 // Process read readiness
741
6/8
✓ Branch 0 taken 66617 times.
✓ Branch 1 taken 3257 times.
✓ Branch 2 taken 66617 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 32666 times.
✓ Branch 5 taken 33951 times.
✗ Branch 6 not taken.
✓ Branch 7 taken 32666 times.
69874 if (state.read_op && (FD_ISSET(fd, &read_fds) || has_error))
742 {
743 33951 auto* op = state.read_op;
744 // Claim the op by exchanging to unregistered. Both registering and
745 // registered states mean the op is ours to complete.
746 33951 auto prev = op->registered.exchange(
747 select_registration_state::unregistered,
748 std::memory_order_acq_rel);
749
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 33951 times.
33951 if (prev != select_registration_state::unregistered)
750 {
751 33951 state.read_op = nullptr;
752
753
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 33951 times.
33951 if (has_error)
754 {
755 int errn = 0;
756 socklen_t len = sizeof(errn);
757 if (::getsockopt(
758 fd, SOL_SOCKET, SO_ERROR, &errn, &len) < 0)
759 errn = errno;
760 if (errn == 0)
761 errn = EIO;
762 op->complete(errn, 0);
763 }
764 else
765 {
766 33951 op->perform_io();
767 }
768
769 33951 completed_ops_.push(op);
770 33951 ++completions_queued;
771 33951 }
772 33951 }
773
774 // Process write readiness
775
6/8
✓ Branch 0 taken 3257 times.
✓ Branch 1 taken 66617 times.
✓ Branch 2 taken 3257 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 1556 times.
✓ Branch 5 taken 1701 times.
✗ Branch 6 not taken.
✓ Branch 7 taken 1556 times.
69874 if (state.write_op && (FD_ISSET(fd, &write_fds) || has_error))
776 {
777 1701 auto* op = state.write_op;
778 // Claim the op by exchanging to unregistered. Both registering and
779 // registered states mean the op is ours to complete.
780 1701 auto prev = op->registered.exchange(
781 select_registration_state::unregistered,
782 std::memory_order_acq_rel);
783
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1701 times.
1701 if (prev != select_registration_state::unregistered)
784 {
785 1701 state.write_op = nullptr;
786
787
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1701 times.
1701 if (has_error)
788 {
789 int errn = 0;
790 socklen_t len = sizeof(errn);
791 if (::getsockopt(
792 fd, SOL_SOCKET, SO_ERROR, &errn, &len) < 0)
793 errn = errno;
794 if (errn == 0)
795 errn = EIO;
796 op->complete(errn, 0);
797 }
798 else
799 {
800 1701 op->perform_io();
801 }
802
803 1701 completed_ops_.push(op);
804 1701 ++completions_queued;
805 1701 }
806 1701 }
807
808 // Clean up empty entries
809
4/4
✓ Branch 0 taken 37208 times.
✓ Branch 1 taken 32666 times.
✓ Branch 2 taken 35652 times.
✓ Branch 3 taken 1556 times.
69874 if (!state.read_op && !state.write_op)
810
1/2
✓ Branch 0 taken 35652 times.
✗ Branch 1 not taken.
35652 registered_fds_.erase(it);
811 }
812 66748 }
813
814
2/2
✓ Branch 0 taken 164662 times.
✓ Branch 1 taken 34104 times.
198766 if (completions_queued > 0)
815 {
816
2/2
✓ Branch 0 taken 32567 times.
✓ Branch 1 taken 1537 times.
34104 if (completions_queued == 1)
817 32567 wakeup_event_.notify_one();
818 else
819 1537 wakeup_event_.notify_all();
820 34104 }
821 198766 }
822
823 inline std::size_t
824 739767 select_scheduler::do_one(long timeout_us)
825 {
826 739767 std::unique_lock lock(mutex_);
827
828 739767 for (;;)
829 {
830
2/2
✓ Branch 0 taken 938424 times.
✓ Branch 1 taken 109 times.
938533 if (stopped_.load(std::memory_order_acquire))
831 109 return 0;
832
833 938424 scheduler_op* op = completed_ops_.pop();
834
835
2/2
✓ Branch 0 taken 739658 times.
✓ Branch 1 taken 198766 times.
938424 if (op == &task_op_)
836 {
837 198766 bool more_handlers = !completed_ops_.empty();
838
839
2/2
✓ Branch 0 taken 132506 times.
✓ Branch 1 taken 66260 times.
198766 if (!more_handlers)
840 {
841
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 66260 times.
66260 if (outstanding_work_.load(std::memory_order_acquire) == 0)
842 {
843 completed_ops_.push(&task_op_);
844 return 0;
845 }
846
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 66260 times.
66260 if (timeout_us == 0)
847 {
848 completed_ops_.push(&task_op_);
849 return 0;
850 }
851 66260 }
852
853
2/2
✓ Branch 0 taken 132506 times.
✓ Branch 1 taken 66260 times.
198766 reactor_interrupted_ = more_handlers || timeout_us == 0;
854 198766 reactor_running_ = true;
855
856
3/4
✓ Branch 0 taken 132506 times.
✓ Branch 1 taken 66260 times.
✓ Branch 2 taken 132506 times.
✗ Branch 3 not taken.
198766 if (more_handlers && idle_thread_count_ > 0)
857 wakeup_event_.notify_one();
858
859
1/2
✓ Branch 0 taken 198766 times.
✗ Branch 1 not taken.
198766 run_reactor(lock);
860
861 198766 reactor_running_ = false;
862 198766 completed_ops_.push(&task_op_);
863 198766 continue;
864 }
865
866
1/2
✓ Branch 0 taken 739658 times.
✗ Branch 1 not taken.
739658 if (op != nullptr)
867 {
868
1/2
✓ Branch 0 taken 739658 times.
✗ Branch 1 not taken.
739658 lock.unlock();
869 739658 select::work_guard g{this};
870
1/2
✓ Branch 0 taken 739658 times.
✗ Branch 1 not taken.
739658 (*op)();
871 739658 return 1;
872 739658 }
873
874 if (outstanding_work_.load(std::memory_order_acquire) == 0)
875 return 0;
876
877 if (timeout_us == 0)
878 return 0;
879
880 ++idle_thread_count_;
881 if (timeout_us < 0)
882 wakeup_event_.wait(lock);
883 else
884 wakeup_event_.wait_for(lock, std::chrono::microseconds(timeout_us));
885 --idle_thread_count_;
886 }
887 739767 }
888
889 } // namespace boost::corosio::detail
890
891 #endif // BOOST_COROSIO_HAS_SELECT
892
893 #endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
894