include/boost/corosio/native/detail/select/select_scheduler.hpp

88.8% Lines (158/178) 100.0% List of functions (12/12) 52.2% Branches (94/180)
select_scheduler.hpp
f(x) Functions (12)
Function Calls Lines Branches Blocks
boost::corosio::detail::select_scheduler::select_scheduler(boost::capy::execution_context&, int) :143 484x 50.0% 25.0% 100.0% boost::corosio::detail::select_scheduler::select_scheduler(boost::capy::execution_context&, int)::'lambda'(void*)::__invoke(void*) :178 1889x 100.0% 100.0% boost::corosio::detail::select_scheduler::select_scheduler(boost::capy::execution_context&, int)::'lambda'(void*)::operator void (*)(void*)() const :178 242x 100.0% 100.0% boost::corosio::detail::select_scheduler::select_scheduler(boost::capy::execution_context&, int)::'lambda'(void*)::operator()(void*) const :178 1889x 100.0% 50.0% 100.0% boost::corosio::detail::select_scheduler::~select_scheduler() :190 726x 100.0% 50.0% 100.0% boost::corosio::detail::select_scheduler::shutdown() :199 242x 100.0% 50.0% 100.0% boost::corosio::detail::select_scheduler::register_descriptor(int, boost::corosio::detail::select_descriptor_state*) const :208 3609x 95.0% 66.7% 60.0% boost::corosio::detail::select_scheduler::deregister_descriptor(int) const :238 3609x 92.3% 65.0% 78.0% boost::corosio::detail::select_scheduler::notify_reactor() const :260 81911x 100.0% 100.0% boost::corosio::detail::select_scheduler::interrupt_reactor() const :266 87814x 100.0% 100.0% boost::corosio::detail::select_scheduler::calculate_timeout(long) const :273 561307x 95.2% 87.5% 90.0% boost::corosio::detail::select_scheduler::run_task(boost::corosio::detail::conditionally_enabled_mutex::scoped_lock&, boost::corosio::detail::reactor_scheduler_context*, long) :306 584361x 94.3% 63.2% 82.0%
Line Branch TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_SELECT
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/capy/ex/execution_context.hpp>
19
20 #include <boost/corosio/native/detail/reactor/reactor_scheduler.hpp>
21
22 #include <boost/corosio/native/detail/select/select_op.hpp>
23 #include <boost/corosio/detail/timer_service.hpp>
24 #include <boost/corosio/native/detail/make_err.hpp>
25 #include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
26 #include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
27 #include <boost/corosio/native/detail/posix/posix_stream_file_service.hpp>
28 #include <boost/corosio/native/detail/posix/posix_random_access_file_service.hpp>
29
30 #include <boost/corosio/detail/except.hpp>
31
32 #include <sys/select.h>
33 #include <unistd.h>
34 #include <errno.h>
35 #include <fcntl.h>
36
37 #include <atomic>
38 #include <chrono>
39 #include <cstdint>
40 #include <limits>
41 #include <mutex>
42 #include <unordered_map>
43
44 namespace boost::corosio::detail {
45
46 struct select_op;
47 struct select_descriptor_state;
48
49 /** POSIX scheduler using select() for I/O multiplexing.
50
51 This scheduler implements the scheduler interface using the POSIX select()
52 call for I/O event notification. It inherits the shared reactor threading
53 model from reactor_scheduler: signal state machine, inline completion
54 budget, work counting, and the do_one event loop.
55
56 The design mirrors epoll_scheduler for behavioral consistency:
57 - Same single-reactor thread coordination model
58 - Same deferred I/O pattern (reactor marks ready; workers do I/O)
59 - Same timer integration pattern
60
61 Known Limitations:
62 - FD_SETSIZE (~1024) limits maximum concurrent connections
63 - O(n) scanning: rebuilds fd_sets each iteration
64 - Level-triggered only (no edge-triggered mode)
65
66 @par Thread Safety
67 All public member functions are thread-safe.
68 */
69 class BOOST_COROSIO_DECL select_scheduler final : public reactor_scheduler
70 {
71 public:
72 /** Construct the scheduler.
73
74 Creates a self-pipe for reactor interruption.
75
76 @param ctx Reference to the owning execution_context.
77 @param concurrency_hint Hint for expected thread count (unused).
78 */
79 select_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
80
81 /// Destroy the scheduler.
82 ~select_scheduler() override;
83
84 select_scheduler(select_scheduler const&) = delete;
85 select_scheduler& operator=(select_scheduler const&) = delete;
86
87 /// Shut down the scheduler, draining pending operations.
88 void shutdown() override;
89
90 /** Return the maximum file descriptor value supported.
91
92 Returns FD_SETSIZE - 1, the maximum fd value that can be
93 monitored by select(). Operations with fd >= FD_SETSIZE
94 will fail with EINVAL.
95
96 @return The maximum supported file descriptor value.
97 */
98 static constexpr int max_fd() noexcept
99 {
100 return FD_SETSIZE - 1;
101 }
102
103 /** Register a descriptor for persistent monitoring.
104
105 The fd is added to the registered_descs_ map and will be
106 included in subsequent select() calls. The reactor is
107 interrupted so a blocked select() rebuilds its fd_sets.
108
109 @param fd The file descriptor to register.
110 @param desc Pointer to descriptor state for this fd.
111 */
112 void register_descriptor(int fd, select_descriptor_state* desc) const;
113
114 /** Deregister a persistently registered descriptor.
115
116 @param fd The file descriptor to deregister.
117 */
118 void deregister_descriptor(int fd) const;
119
120 /** Interrupt the reactor so it rebuilds its fd_sets.
121
122 Called when a write or connect op is registered after
123 the reactor's snapshot was taken. Without this, select()
124 may block not watching for writability on the fd.
125 */
126 void notify_reactor() const;
127
128 private:
129 void
130 run_task(lock_type& lock, context_type* ctx,
131 long timeout_us) override;
132 void interrupt_reactor() const override;
133 long calculate_timeout(long requested_timeout_us) const;
134
135 // Self-pipe for interrupting select()
136 int pipe_fds_[2]; // [0]=read, [1]=write
137
138 // Per-fd tracking for fd_set building
139 mutable std::unordered_map<int, select_descriptor_state*> registered_descs_;
140 mutable int max_fd_ = -1;
141 };
142
143 726x inline select_scheduler::select_scheduler(capy::execution_context& ctx, int)
144 242x : pipe_fds_{-1, -1}
145 242x , max_fd_(-1)
146 484x {
147
2/4
✓ Branch 0 taken 242 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 242 times.
242x if (::pipe(pipe_fds_) < 0)
148 detail::throw_system_error(make_err(errno), "pipe");
149
150
2/2
✓ Branch 0 taken 242 times.
✓ Branch 1 taken 484 times.
726x for (int i = 0; i < 2; ++i)
151 {
152
1/2
✓ Branch 0 taken 484 times.
✗ Branch 1 not taken.
484x int flags = ::fcntl(pipe_fds_[i], F_GETFL, 0);
153
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 484 times.
484x if (flags == -1)
154 {
155 int errn = errno;
156 ::close(pipe_fds_[0]);
157 ::close(pipe_fds_[1]);
158 detail::throw_system_error(make_err(errn), "fcntl F_GETFL");
159 }
160
2/4
✓ Branch 0 taken 484 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 484 times.
484x if (::fcntl(pipe_fds_[i], F_SETFL, flags | O_NONBLOCK) == -1)
161 {
162 int errn = errno;
163 ::close(pipe_fds_[0]);
164 ::close(pipe_fds_[1]);
165 detail::throw_system_error(make_err(errn), "fcntl F_SETFL");
166 }
167
2/4
✓ Branch 0 taken 484 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 484 times.
484x if (::fcntl(pipe_fds_[i], F_SETFD, FD_CLOEXEC) == -1)
168 {
169 int errn = errno;
170 ::close(pipe_fds_[0]);
171 ::close(pipe_fds_[1]);
172 detail::throw_system_error(make_err(errn), "fcntl F_SETFD");
173 }
174 484x }
175
176
1/2
✓ Branch 0 taken 242 times.
✗ Branch 1 not taken.
242x timer_svc_ = &get_timer_service(ctx, *this);
177
2/4
✓ Branch 0 taken 242 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 242 times.
✗ Branch 3 not taken.
484x timer_svc_->set_on_earliest_changed(
178 2131x timer_service::callback(this, [](void* p) {
179 1889x static_cast<select_scheduler*>(p)->interrupt_reactor();
180 1889x }));
181
182
1/2
✓ Branch 0 taken 242 times.
✗ Branch 1 not taken.
242x get_resolver_service(ctx, *this);
183
1/2
✓ Branch 0 taken 242 times.
✗ Branch 1 not taken.
242x get_signal_service(ctx, *this);
184
1/2
✓ Branch 0 taken 242 times.
✗ Branch 1 not taken.
242x get_stream_file_service(ctx, *this);
185
1/2
✓ Branch 0 taken 242 times.
✗ Branch 1 not taken.
242x get_random_access_file_service(ctx, *this);
186
187 242x completed_ops_.push(&task_op_);
188 484x }
189
190 726x inline select_scheduler::~select_scheduler()
191 484x {
192
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 242 times.
242x if (pipe_fds_[0] >= 0)
193
1/2
✓ Branch 0 taken 242 times.
✗ Branch 1 not taken.
242x ::close(pipe_fds_[0]);
194
1/2
✓ Branch 0 taken 242 times.
✗ Branch 1 not taken.
242x if (pipe_fds_[1] >= 0)
195
1/2
✓ Branch 0 taken 242 times.
✗ Branch 1 not taken.
242x ::close(pipe_fds_[1]);
196 726x }
197
198 inline void
199 242x select_scheduler::shutdown()
200 {
201 242x shutdown_drain();
202
203
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 242 times.
242x if (pipe_fds_[1] >= 0)
204 242x interrupt_reactor();
205 242x }
206
207 inline void
208 3609x select_scheduler::register_descriptor(
209 int fd, select_descriptor_state* desc) const
210 {
211
1/2
✓ Branch 0 taken 3609 times.
✗ Branch 1 not taken.
3609x if (fd < 0 || fd >= FD_SETSIZE)
212 detail::throw_system_error(make_err(EINVAL), "select: fd out of range");
213
214 3609x desc->registered_events = reactor_event_read | reactor_event_write;
215 3609x desc->fd = fd;
216 3609x desc->scheduler_ = this;
217 3609x desc->mutex.set_enabled(!single_threaded_);
218 3609x desc->ready_events_.store(0, std::memory_order_relaxed);
219
220 {
221 3609x conditionally_enabled_mutex::scoped_lock lock(desc->mutex);
222 3609x desc->impl_ref_.reset();
223 3609x desc->read_ready = false;
224 3609x desc->write_ready = false;
225 3609x }
226
227 {
228 3609x mutex_type::scoped_lock lock(mutex_);
229
1/2
✓ Branch 0 taken 3609 times.
✗ Branch 1 not taken.
3609x registered_descs_[fd] = desc;
230
2/2
✓ Branch 0 taken 4 times.
✓ Branch 1 taken 3605 times.
3609x if (fd > max_fd_)
231 3605x max_fd_ = fd;
232 3609x }
233
234 3609x interrupt_reactor();
235 3609x }
236
237 inline void
238 3609x select_scheduler::deregister_descriptor(int fd) const
239 {
240 3609x mutex_type::scoped_lock lock(mutex_);
241
242
1/2
✓ Branch 0 taken 3609 times.
✗ Branch 1 not taken.
3609x auto it = registered_descs_.find(fd);
243
2/4
✓ Branch 0 taken 3609 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 3609 times.
✗ Branch 3 not taken.
3609x if (it == registered_descs_.end())
244 return;
245
246
1/2
✓ Branch 0 taken 3609 times.
✗ Branch 1 not taken.
3609x registered_descs_.erase(it);
247
248
2/2
✓ Branch 0 taken 63 times.
✓ Branch 1 taken 3546 times.
3609x if (fd == max_fd_)
249 {
250 3546x max_fd_ = pipe_fds_[0];
251
5/8
✓ Branch 0 taken 6987 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 3441 times.
✓ Branch 3 taken 3546 times.
✓ Branch 4 taken 3441 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 3441 times.
✗ Branch 7 not taken.
6987x for (auto& [registered_fd, state] : registered_descs_)
252 {
253
2/2
✓ Branch 0 taken 9 times.
✓ Branch 1 taken 3432 times.
3441x if (registered_fd > max_fd_)
254 3432x max_fd_ = registered_fd;
255 }
256 3546x }
257 3609x }
258
259 inline void
260 81911x select_scheduler::notify_reactor() const
261 {
262 81911x interrupt_reactor();
263 81911x }
264
265 inline void
266 87814x select_scheduler::interrupt_reactor() const
267 {
268 87814x char byte = 1;
269 87814x [[maybe_unused]] auto r = ::write(pipe_fds_[1], &byte, 1);
270 87814x }
271
272 inline long
273 561307x select_scheduler::calculate_timeout(long requested_timeout_us) const
274 {
275
1/2
✓ Branch 0 taken 561307 times.
✗ Branch 1 not taken.
561307x if (requested_timeout_us == 0)
276 return 0;
277
278 561307x auto nearest = timer_svc_->nearest_expiry();
279
2/2
✓ Branch 0 taken 828 times.
✓ Branch 1 taken 560479 times.
561307x if (nearest == timer_service::time_point::max())
280 828x return requested_timeout_us;
281
282 560479x auto now = std::chrono::steady_clock::now();
283
2/2
✓ Branch 0 taken 53 times.
✓ Branch 1 taken 560426 times.
560479x if (nearest <= now)
284 53x return 0;
285
286 560426x auto timer_timeout_us =
287 560426x std::chrono::duration_cast<std::chrono::microseconds>(nearest - now)
288 560426x .count();
289
290 560426x constexpr auto long_max =
291 static_cast<long long>((std::numeric_limits<long>::max)());
292 560426x auto capped_timer_us =
293 1120852x (std::min)((std::max)(static_cast<long long>(timer_timeout_us),
294 560426x static_cast<long long>(0)),
295 long_max);
296
297
2/2
✓ Branch 0 taken 6 times.
✓ Branch 1 taken 560420 times.
560426x if (requested_timeout_us < 0)
298 560420x return static_cast<long>(capped_timer_us);
299
300 6x return static_cast<long>(
301 6x (std::min)(static_cast<long long>(requested_timeout_us),
302 capped_timer_us));
303 561307x }
304
305 inline void
306 584361x select_scheduler::run_task(
307 lock_type& lock, context_type* ctx, long timeout_us)
308 {
309 584361x long effective_timeout_us =
310
2/2
✓ Branch 0 taken 23054 times.
✓ Branch 1 taken 561307 times.
584361x task_interrupted_ ? 0 : calculate_timeout(timeout_us);
311
312 // Snapshot registered descriptors while holding lock.
313 // Record which fds need write monitoring to avoid a hot loop:
314 // select is level-triggered so writable sockets (nearly always
315 // writable) would cause select() to return immediately every
316 // iteration if unconditionally added to write_fds.
317 struct fd_entry
318 {
319 int fd;
320 select_descriptor_state* desc;
321 bool needs_write;
322 };
323 fd_entry snapshot[FD_SETSIZE];
324 584361x int snapshot_count = 0;
325
326
2/2
✓ Branch 0 taken 1194531 times.
✓ Branch 1 taken 584361 times.
1778892x for (auto& [fd, desc] : registered_descs_)
327 {
328
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1194531 times.
1194531x if (snapshot_count < FD_SETSIZE)
329 {
330 1194531x conditionally_enabled_mutex::scoped_lock desc_lock(desc->mutex);
331 1194531x snapshot[snapshot_count].fd = fd;
332 1194531x snapshot[snapshot_count].desc = desc;
333 1194531x snapshot[snapshot_count].needs_write =
334
2/2
✓ Branch 0 taken 682 times.
✓ Branch 1 taken 1193849 times.
1194531x (desc->write_op || desc->connect_op);
335 1194531x ++snapshot_count;
336 1194531x }
337 }
338
339
2/2
✓ Branch 0 taken 23054 times.
✓ Branch 1 taken 561307 times.
584361x if (lock.owns_lock())
340 561307x lock.unlock();
341
342 584361x task_cleanup on_exit{this, &lock, ctx};
343
344 fd_set read_fds, write_fds, except_fds;
345 584361x FD_ZERO(&read_fds);
346 584361x FD_ZERO(&write_fds);
347 584361x FD_ZERO(&except_fds);
348
349
1/2
✓ Branch 0 taken 584361 times.
✗ Branch 1 not taken.
584361x FD_SET(pipe_fds_[0], &read_fds);
350 584361x int nfds = pipe_fds_[0];
351
352
2/2
✓ Branch 0 taken 584361 times.
✓ Branch 1 taken 1194531 times.
1778892x for (int i = 0; i < snapshot_count; ++i)
353 {
354 1194531x int fd = snapshot[i].fd;
355
1/2
✓ Branch 0 taken 1194531 times.
✗ Branch 1 not taken.
1194531x FD_SET(fd, &read_fds);
356
2/2
✓ Branch 0 taken 4104 times.
✓ Branch 1 taken 1190427 times.
1194531x if (snapshot[i].needs_write)
357
1/2
✓ Branch 0 taken 4104 times.
✗ Branch 1 not taken.
4104x FD_SET(fd, &write_fds);
358
1/2
✓ Branch 0 taken 1194531 times.
✗ Branch 1 not taken.
1194531x FD_SET(fd, &except_fds);
359
2/2
✓ Branch 0 taken 610464 times.
✓ Branch 1 taken 584067 times.
1194531x if (fd > nfds)
360 584067x nfds = fd;
361 1194531x }
362
363 struct timeval tv;
364 584361x struct timeval* tv_ptr = nullptr;
365
2/2
✓ Branch 0 taken 828 times.
✓ Branch 1 taken 583533 times.
584361x if (effective_timeout_us >= 0)
366 {
367 583533x tv.tv_sec = effective_timeout_us / 1000000;
368 583533x tv.tv_usec = effective_timeout_us % 1000000;
369 583533x tv_ptr = &tv;
370 583533x }
371
372
1/2
✓ Branch 0 taken 584361 times.
✗ Branch 1 not taken.
584361x int ready = ::select(nfds + 1, &read_fds, &write_fds, &except_fds, tv_ptr);
373
374 // EINTR: signal interrupted select(), just retry.
375 // EBADF: an fd was closed between snapshot and select(); retry
376 // with a fresh snapshot from registered_descs_.
377
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 584361 times.
584361x if (ready < 0)
378 {
379 if (errno == EINTR || errno == EBADF)
380 return;
381 detail::throw_system_error(make_err(errno), "select");
382 }
383
384 // Process timers outside the lock
385
1/2
✓ Branch 0 taken 584361 times.
✗ Branch 1 not taken.
584361x timer_svc_->process_expired();
386
387 584361x op_queue local_ops;
388
389
2/2
✓ Branch 0 taken 580350 times.
✓ Branch 1 taken 4011 times.
584361x if (ready > 0)
390 {
391
3/4
✓ Branch 0 taken 580350 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 24532 times.
✓ Branch 3 taken 555818 times.
580350x if (FD_ISSET(pipe_fds_[0], &read_fds))
392 {
393 char buf[256];
394
3/4
✓ Branch 0 taken 49064 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 24532 times.
✓ Branch 3 taken 24532 times.
49064x while (::read(pipe_fds_[0], buf, sizeof(buf)) > 0)
395 {
396 }
397 24532x }
398
399
2/2
✓ Branch 0 taken 1190363 times.
✓ Branch 1 taken 580350 times.
1770713x for (int i = 0; i < snapshot_count; ++i)
400 {
401 1190363x int fd = snapshot[i].fd;
402 1190363x select_descriptor_state* desc = snapshot[i].desc;
403
404 1190363x std::uint32_t flags = 0;
405
3/4
✓ Branch 0 taken 1190363 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 556379 times.
✓ Branch 3 taken 633984 times.
1190363x if (FD_ISSET(fd, &read_fds))
406 633984x flags |= reactor_event_read;
407
3/4
✓ Branch 0 taken 1190363 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 1188623 times.
✓ Branch 3 taken 1740 times.
1190363x if (FD_ISSET(fd, &write_fds))
408 1740x flags |= reactor_event_write;
409
2/4
✓ Branch 0 taken 1190363 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 1190363 times.
✗ Branch 3 not taken.
1190363x if (FD_ISSET(fd, &except_fds))
410 flags |= reactor_event_error;
411
412
2/2
✓ Branch 0 taken 554642 times.
✓ Branch 1 taken 635721 times.
1190363x if (flags == 0)
413 554642x continue;
414
415 635721x desc->add_ready_events(flags);
416
417 635721x bool expected = false;
418
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 635721 times.
635721x if (desc->is_enqueued_.compare_exchange_strong(
419 expected, true, std::memory_order_release,
420 std::memory_order_relaxed))
421 {
422 635721x local_ops.push(desc);
423 635721x }
424 635721x }
425 580350x }
426
427
1/2
✓ Branch 0 taken 584361 times.
✗ Branch 1 not taken.
584361x lock.lock();
428
429
2/2
✓ Branch 0 taken 7987 times.
✓ Branch 1 taken 576374 times.
584361x if (!local_ops.empty())
430 576374x completed_ops_.splice(local_ops);
431 584361x }
432
433 } // namespace boost::corosio::detail
434
435 #endif // BOOST_COROSIO_HAS_SELECT
436
437 #endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
438