include/boost/corosio/native/detail/select/select_scheduler.hpp

87.6% Lines (156/178) 100.0% List of functions (12/12) 51.7% Branches (93/180)
select_scheduler.hpp
f(x) Functions (12)
Function Calls Lines Branches Blocks
boost::corosio::detail::select_scheduler::select_scheduler(boost::capy::execution_context&, int) :142 1190x 50.0% 25.0% 100.0% boost::corosio::detail::select_scheduler::select_scheduler(boost::capy::execution_context&, int)::'lambda'(void*)::__invoke(void*) :177 2000x 100.0% 100.0% boost::corosio::detail::select_scheduler::select_scheduler(boost::capy::execution_context&, int)::'lambda'(void*)::operator void (*)(void*)() const :177 595x 100.0% 100.0% boost::corosio::detail::select_scheduler::select_scheduler(boost::capy::execution_context&, int)::'lambda'(void*)::operator()(void*) const :177 2000x 100.0% 50.0% 100.0% boost::corosio::detail::select_scheduler::~select_scheduler() :189 1785x 100.0% 50.0% 100.0% boost::corosio::detail::select_scheduler::shutdown() :198 595x 100.0% 50.0% 100.0% boost::corosio::detail::select_scheduler::register_descriptor(int, boost::corosio::detail::reactor_descriptor_state*) const :207 4114x 95.0% 66.7% 60.0% boost::corosio::detail::select_scheduler::deregister_descriptor(int) const :237 4114x 92.3% 65.0% 78.0% boost::corosio::detail::select_scheduler::notify_reactor() const :259 2372x 100.0% 100.0% boost::corosio::detail::select_scheduler::interrupt_reactor() const :265 9518x 100.0% 100.0% boost::corosio::detail::select_scheduler::calculate_timeout(long) const :272 1075299x 85.7% 75.0% 80.0% boost::corosio::detail::select_scheduler::run_task(boost::corosio::detail::conditionally_enabled_mutex::scoped_lock&, boost::corosio::detail::reactor_scheduler_context*, long) :305 1098173x 94.3% 63.2% 82.0%
Line Branch TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_SELECT
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/capy/ex/execution_context.hpp>
19
20 #include <boost/corosio/native/detail/reactor/reactor_scheduler.hpp>
21
22 #include <boost/corosio/native/detail/select/select_traits.hpp>
23 #include <boost/corosio/detail/timer_service.hpp>
24 #include <boost/corosio/native/detail/make_err.hpp>
25 #include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
26 #include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
27 #include <boost/corosio/native/detail/posix/posix_stream_file_service.hpp>
28 #include <boost/corosio/native/detail/posix/posix_random_access_file_service.hpp>
29
30 #include <boost/corosio/detail/except.hpp>
31
32 #include <sys/select.h>
33 #include <unistd.h>
34 #include <errno.h>
35 #include <fcntl.h>
36
37 #include <atomic>
38 #include <chrono>
39 #include <cstdint>
40 #include <limits>
41 #include <mutex>
42 #include <unordered_map>
43
44 namespace boost::corosio::detail {
45
46 struct select_op;
47
48 /** POSIX scheduler using select() for I/O multiplexing.
49
50 This scheduler implements the scheduler interface using the POSIX select()
51 call for I/O event notification. It inherits the shared reactor threading
52 model from reactor_scheduler: signal state machine, inline completion
53 budget, work counting, and the do_one event loop.
54
55 The design mirrors epoll_scheduler for behavioral consistency:
56 - Same single-reactor thread coordination model
57 - Same deferred I/O pattern (reactor marks ready; workers do I/O)
58 - Same timer integration pattern
59
60 Known Limitations:
61 - FD_SETSIZE (~1024) limits maximum concurrent connections
62 - O(n) scanning: rebuilds fd_sets each iteration
63 - Level-triggered only (no edge-triggered mode)
64
65 @par Thread Safety
66 All public member functions are thread-safe.
67 */
68 class BOOST_COROSIO_DECL select_scheduler final : public reactor_scheduler
69 {
70 public:
71 /** Construct the scheduler.
72
73 Creates a self-pipe for reactor interruption.
74
75 @param ctx Reference to the owning execution_context.
76 @param concurrency_hint Hint for expected thread count (unused).
77 */
78 select_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
79
80 /// Destroy the scheduler.
81 ~select_scheduler() override;
82
83 select_scheduler(select_scheduler const&) = delete;
84 select_scheduler& operator=(select_scheduler const&) = delete;
85
86 /// Shut down the scheduler, draining pending operations.
87 void shutdown() override;
88
89 /** Return the maximum file descriptor value supported.
90
91 Returns FD_SETSIZE - 1, the maximum fd value that can be
92 monitored by select(). Operations with fd >= FD_SETSIZE
93 will fail with EINVAL.
94
95 @return The maximum supported file descriptor value.
96 */
97 static constexpr int max_fd() noexcept
98 {
99 return FD_SETSIZE - 1;
100 }
101
102 /** Register a descriptor for persistent monitoring.
103
104 The fd is added to the registered_descs_ map and will be
105 included in subsequent select() calls. The reactor is
106 interrupted so a blocked select() rebuilds its fd_sets.
107
108 @param fd The file descriptor to register.
109 @param desc Pointer to descriptor state for this fd.
110 */
111 void register_descriptor(int fd, reactor_descriptor_state* desc) const;
112
113 /** Deregister a persistently registered descriptor.
114
115 @param fd The file descriptor to deregister.
116 */
117 void deregister_descriptor(int fd) const;
118
119 /** Interrupt the reactor so it rebuilds its fd_sets.
120
121 Called when a write or connect op is registered after
122 the reactor's snapshot was taken. Without this, select()
123 may block not watching for writability on the fd.
124 */
125 void notify_reactor() const;
126
127 private:
128 void
129 run_task(lock_type& lock, context_type* ctx,
130 long timeout_us) override;
131 void interrupt_reactor() const override;
132 long calculate_timeout(long requested_timeout_us) const;
133
134 // Self-pipe for interrupting select()
135 int pipe_fds_[2]; // [0]=read, [1]=write
136
137 // Per-fd tracking for fd_set building
138 mutable std::unordered_map<int, reactor_descriptor_state*> registered_descs_;
139 mutable int max_fd_ = -1;
140 };
141
142 1785x inline select_scheduler::select_scheduler(capy::execution_context& ctx, int)
143 595x : pipe_fds_{-1, -1}
144 595x , max_fd_(-1)
145 1190x {
146
2/4
✓ Branch 0 taken 595 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 595 times.
595x if (::pipe(pipe_fds_) < 0)
147 detail::throw_system_error(make_err(errno), "pipe");
148
149
2/2
✓ Branch 0 taken 595 times.
✓ Branch 1 taken 1190 times.
1785x for (int i = 0; i < 2; ++i)
150 {
151
1/2
✓ Branch 0 taken 1190 times.
✗ Branch 1 not taken.
1190x int flags = ::fcntl(pipe_fds_[i], F_GETFL, 0);
152
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1190 times.
1190x if (flags == -1)
153 {
154 int errn = errno;
155 ::close(pipe_fds_[0]);
156 ::close(pipe_fds_[1]);
157 detail::throw_system_error(make_err(errn), "fcntl F_GETFL");
158 }
159
2/4
✓ Branch 0 taken 1190 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 1190 times.
1190x if (::fcntl(pipe_fds_[i], F_SETFL, flags | O_NONBLOCK) == -1)
160 {
161 int errn = errno;
162 ::close(pipe_fds_[0]);
163 ::close(pipe_fds_[1]);
164 detail::throw_system_error(make_err(errn), "fcntl F_SETFL");
165 }
166
2/4
✓ Branch 0 taken 1190 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 1190 times.
1190x if (::fcntl(pipe_fds_[i], F_SETFD, FD_CLOEXEC) == -1)
167 {
168 int errn = errno;
169 ::close(pipe_fds_[0]);
170 ::close(pipe_fds_[1]);
171 detail::throw_system_error(make_err(errn), "fcntl F_SETFD");
172 }
173 1190x }
174
175
1/2
✓ Branch 0 taken 595 times.
✗ Branch 1 not taken.
595x timer_svc_ = &get_timer_service(ctx, *this);
176
2/4
✓ Branch 0 taken 595 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 595 times.
✗ Branch 3 not taken.
1190x timer_svc_->set_on_earliest_changed(
177 2595x timer_service::callback(this, [](void* p) {
178 2000x static_cast<select_scheduler*>(p)->interrupt_reactor();
179 2000x }));
180
181
1/2
✓ Branch 0 taken 595 times.
✗ Branch 1 not taken.
595x get_resolver_service(ctx, *this);
182
1/2
✓ Branch 0 taken 595 times.
✗ Branch 1 not taken.
595x get_signal_service(ctx, *this);
183
1/2
✓ Branch 0 taken 595 times.
✗ Branch 1 not taken.
595x get_stream_file_service(ctx, *this);
184
1/2
✓ Branch 0 taken 595 times.
✗ Branch 1 not taken.
595x get_random_access_file_service(ctx, *this);
185
186 595x completed_ops_.push(&task_op_);
187 1190x }
188
189 1785x inline select_scheduler::~select_scheduler()
190 1190x {
191
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 595 times.
595x if (pipe_fds_[0] >= 0)
192
1/2
✓ Branch 0 taken 595 times.
✗ Branch 1 not taken.
595x ::close(pipe_fds_[0]);
193
1/2
✓ Branch 0 taken 595 times.
✗ Branch 1 not taken.
595x if (pipe_fds_[1] >= 0)
194
1/2
✓ Branch 0 taken 595 times.
✗ Branch 1 not taken.
595x ::close(pipe_fds_[1]);
195 1785x }
196
197 inline void
198 595x select_scheduler::shutdown()
199 {
200 595x shutdown_drain();
201
202
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 595 times.
595x if (pipe_fds_[1] >= 0)
203 595x interrupt_reactor();
204 595x }
205
206 inline void
207 4114x select_scheduler::register_descriptor(
208 int fd, reactor_descriptor_state* desc) const
209 {
210
1/2
✓ Branch 0 taken 4114 times.
✗ Branch 1 not taken.
4114x if (fd < 0 || fd >= FD_SETSIZE)
211 detail::throw_system_error(make_err(EINVAL), "select: fd out of range");
212
213 4114x desc->registered_events = reactor_event_read | reactor_event_write;
214 4114x desc->fd = fd;
215 4114x desc->scheduler_ = this;
216 4114x desc->mutex.set_enabled(!single_threaded_);
217 4114x desc->ready_events_.store(0, std::memory_order_relaxed);
218
219 {
220 4114x conditionally_enabled_mutex::scoped_lock lock(desc->mutex);
221 4114x desc->impl_ref_.reset();
222 4114x desc->read_ready = false;
223 4114x desc->write_ready = false;
224 4114x }
225
226 {
227 4114x mutex_type::scoped_lock lock(mutex_);
228
1/2
✓ Branch 0 taken 4114 times.
✗ Branch 1 not taken.
4114x registered_descs_[fd] = desc;
229
2/2
✓ Branch 0 taken 5 times.
✓ Branch 1 taken 4109 times.
4114x if (fd > max_fd_)
230 4109x max_fd_ = fd;
231 4114x }
232
233 4114x interrupt_reactor();
234 4114x }
235
236 inline void
237 4114x select_scheduler::deregister_descriptor(int fd) const
238 {
239 4114x mutex_type::scoped_lock lock(mutex_);
240
241
1/2
✓ Branch 0 taken 4114 times.
✗ Branch 1 not taken.
4114x auto it = registered_descs_.find(fd);
242
2/4
✓ Branch 0 taken 4114 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 4114 times.
✗ Branch 3 not taken.
4114x if (it == registered_descs_.end())
243 return;
244
245
1/2
✓ Branch 0 taken 4114 times.
✗ Branch 1 not taken.
4114x registered_descs_.erase(it);
246
247
2/2
✓ Branch 0 taken 116 times.
✓ Branch 1 taken 3998 times.
4114x if (fd == max_fd_)
248 {
249 3998x max_fd_ = pipe_fds_[0];
250
5/8
✓ Branch 0 taken 7741 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 3743 times.
✓ Branch 3 taken 3998 times.
✓ Branch 4 taken 3743 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 3743 times.
✗ Branch 7 not taken.
7741x for (auto& [registered_fd, state] : registered_descs_)
251 {
252
2/2
✓ Branch 0 taken 41 times.
✓ Branch 1 taken 3702 times.
3743x if (registered_fd > max_fd_)
253 3702x max_fd_ = registered_fd;
254 }
255 3998x }
256 4114x }
257
258 inline void
259 2372x select_scheduler::notify_reactor() const
260 {
261 2372x interrupt_reactor();
262 2372x }
263
264 inline void
265 9518x select_scheduler::interrupt_reactor() const
266 {
267 9518x char byte = 1;
268 9518x [[maybe_unused]] auto r = ::write(pipe_fds_[1], &byte, 1);
269 9518x }
270
271 inline long
272 1075299x select_scheduler::calculate_timeout(long requested_timeout_us) const
273 {
274
1/2
✓ Branch 0 taken 1075299 times.
✗ Branch 1 not taken.
1075299x if (requested_timeout_us == 0)
275 return 0;
276
277 1075299x auto nearest = timer_svc_->nearest_expiry();
278
2/2
✓ Branch 0 taken 1491 times.
✓ Branch 1 taken 1073808 times.
1075299x if (nearest == timer_service::time_point::max())
279 1491x return requested_timeout_us;
280
281 1073808x auto now = std::chrono::steady_clock::now();
282
2/2
✓ Branch 0 taken 68 times.
✓ Branch 1 taken 1073740 times.
1073808x if (nearest <= now)
283 68x return 0;
284
285 1073740x auto timer_timeout_us =
286 1073740x std::chrono::duration_cast<std::chrono::microseconds>(nearest - now)
287 1073740x .count();
288
289 1073740x constexpr auto long_max =
290 static_cast<long long>((std::numeric_limits<long>::max)());
291 1073740x auto capped_timer_us =
292 2147480x (std::min)((std::max)(static_cast<long long>(timer_timeout_us),
293 1073740x static_cast<long long>(0)),
294 long_max);
295
296
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1073740 times.
1073740x if (requested_timeout_us < 0)
297 1073740x return static_cast<long>(capped_timer_us);
298
299 return static_cast<long>(
300 (std::min)(static_cast<long long>(requested_timeout_us),
301 capped_timer_us));
302 1075299x }
303
304 inline void
305 1098173x select_scheduler::run_task(
306 lock_type& lock, context_type* ctx, long timeout_us)
307 {
308 1098173x long effective_timeout_us =
309
2/2
✓ Branch 0 taken 22874 times.
✓ Branch 1 taken 1075299 times.
1098173x task_interrupted_ ? 0 : calculate_timeout(timeout_us);
310
311 // Snapshot registered descriptors while holding lock.
312 // Record which fds need write monitoring to avoid a hot loop:
313 // select is level-triggered so writable sockets (nearly always
314 // writable) would cause select() to return immediately every
315 // iteration if unconditionally added to write_fds.
316 struct fd_entry
317 {
318 int fd;
319 reactor_descriptor_state* desc;
320 bool needs_write;
321 };
322 fd_entry snapshot[FD_SETSIZE];
323 1098173x int snapshot_count = 0;
324
325
2/2
✓ Branch 0 taken 1669345 times.
✓ Branch 1 taken 1098173 times.
2767518x for (auto& [fd, desc] : registered_descs_)
326 {
327
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1669345 times.
1669345x if (snapshot_count < FD_SETSIZE)
328 {
329 1669345x conditionally_enabled_mutex::scoped_lock desc_lock(desc->mutex);
330 1669345x snapshot[snapshot_count].fd = fd;
331 1669345x snapshot[snapshot_count].desc = desc;
332 1669345x snapshot[snapshot_count].needs_write =
333
2/2
✓ Branch 0 taken 1165 times.
✓ Branch 1 taken 1668180 times.
1669345x (desc->write_op || desc->connect_op);
334 1669345x ++snapshot_count;
335 1669345x }
336 }
337
338
2/2
✓ Branch 0 taken 22874 times.
✓ Branch 1 taken 1075299 times.
1098173x if (lock.owns_lock())
339 1075299x lock.unlock();
340
341 1098173x task_cleanup on_exit{this, &lock, ctx};
342
343 fd_set read_fds, write_fds, except_fds;
344 1098173x FD_ZERO(&read_fds);
345 1098173x FD_ZERO(&write_fds);
346 1098173x FD_ZERO(&except_fds);
347
348
1/2
✓ Branch 0 taken 1098173 times.
✗ Branch 1 not taken.
1098173x FD_SET(pipe_fds_[0], &read_fds);
349 1098173x int nfds = pipe_fds_[0];
350
351
2/2
✓ Branch 0 taken 1098173 times.
✓ Branch 1 taken 1669345 times.
2767518x for (int i = 0; i < snapshot_count; ++i)
352 {
353 1669345x int fd = snapshot[i].fd;
354
1/2
✓ Branch 0 taken 1669345 times.
✗ Branch 1 not taken.
1669345x FD_SET(fd, &read_fds);
355
2/2
✓ Branch 0 taken 4704 times.
✓ Branch 1 taken 1664641 times.
1669345x if (snapshot[i].needs_write)
356
1/2
✓ Branch 0 taken 4704 times.
✗ Branch 1 not taken.
4704x FD_SET(fd, &write_fds);
357
1/2
✓ Branch 0 taken 1669345 times.
✗ Branch 1 not taken.
1669345x FD_SET(fd, &except_fds);
358
2/2
✓ Branch 0 taken 571606 times.
✓ Branch 1 taken 1097739 times.
1669345x if (fd > nfds)
359 1097739x nfds = fd;
360 1669345x }
361
362 struct timeval tv;
363 1098173x struct timeval* tv_ptr = nullptr;
364
2/2
✓ Branch 0 taken 1487 times.
✓ Branch 1 taken 1096686 times.
1098173x if (effective_timeout_us >= 0)
365 {
366 1096686x tv.tv_sec = effective_timeout_us / 1000000;
367 1096686x tv.tv_usec = effective_timeout_us % 1000000;
368 1096686x tv_ptr = &tv;
369 1096686x }
370
371
1/2
✓ Branch 0 taken 1098173 times.
✗ Branch 1 not taken.
1098173x int ready = ::select(nfds + 1, &read_fds, &write_fds, &except_fds, tv_ptr);
372
373 // EINTR: signal interrupted select(), just retry.
374 // EBADF: an fd was closed between snapshot and select(); retry
375 // with a fresh snapshot from registered_descs_.
376
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1098173 times.
1098173x if (ready < 0)
377 {
378 if (errno == EINTR || errno == EBADF)
379 return;
380 detail::throw_system_error(make_err(errno), "select");
381 }
382
383 // Process timers outside the lock
384
1/2
✓ Branch 0 taken 1098173 times.
✗ Branch 1 not taken.
1098173x timer_svc_->process_expired();
385
386 1098173x op_queue local_ops;
387
388
2/2
✓ Branch 0 taken 1093336 times.
✓ Branch 1 taken 4837 times.
1098173x if (ready > 0)
389 {
390
3/4
✓ Branch 0 taken 1093336 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 4910 times.
✓ Branch 3 taken 1088426 times.
1093336x if (FD_ISSET(pipe_fds_[0], &read_fds))
391 {
392 char buf[256];
393
3/4
✓ Branch 0 taken 9820 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 4910 times.
✓ Branch 3 taken 4910 times.
9820x while (::read(pipe_fds_[0], buf, sizeof(buf)) > 0)
394 {
395 }
396 4910x }
397
398
2/2
✓ Branch 0 taken 1663755 times.
✓ Branch 1 taken 1093336 times.
2757091x for (int i = 0; i < snapshot_count; ++i)
399 {
400 1663755x int fd = snapshot[i].fd;
401 1663755x reactor_descriptor_state* desc = snapshot[i].desc;
402
403 1663755x std::uint32_t flags = 0;
404
3/4
✓ Branch 0 taken 1663755 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 518797 times.
✓ Branch 3 taken 1144958 times.
1663755x if (FD_ISSET(fd, &read_fds))
405 1144958x flags |= reactor_event_read;
406
3/4
✓ Branch 0 taken 1663755 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 1661384 times.
✓ Branch 3 taken 2371 times.
1663755x if (FD_ISSET(fd, &write_fds))
407 2371x flags |= reactor_event_write;
408
2/4
✓ Branch 0 taken 1663755 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 1663755 times.
✗ Branch 3 not taken.
1663755x if (FD_ISSET(fd, &except_fds))
409 flags |= reactor_event_error;
410
411
2/2
✓ Branch 0 taken 516436 times.
✓ Branch 1 taken 1147319 times.
1663755x if (flags == 0)
412 516436x continue;
413
414 1147319x desc->add_ready_events(flags);
415
416 1147319x bool expected = false;
417
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1147319 times.
1147319x if (desc->is_enqueued_.compare_exchange_strong(
418 expected, true, std::memory_order_release,
419 std::memory_order_relaxed))
420 {
421 1147319x local_ops.push(desc);
422 1147319x }
423 1147319x }
424 1093336x }
425
426
1/2
✓ Branch 0 taken 1098173 times.
✗ Branch 1 not taken.
1098173x lock.lock();
427
428
2/2
✓ Branch 0 taken 8925 times.
✓ Branch 1 taken 1089248 times.
1098173x if (!local_ops.empty())
429 1089248x completed_ops_.splice(local_ops);
430 1098173x }
431
432 } // namespace boost::corosio::detail
433
434 #endif // BOOST_COROSIO_HAS_SELECT
435
436 #endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
437