include/boost/corosio/native/detail/select/select_scheduler.hpp

87.4% Lines (152/174) 100.0% List of functions (12/12) 51.1% Branches (90/176)
f(x) Functions (12)
Function Calls Lines Branches Blocks
boost::corosio::detail::select_scheduler::select_scheduler(boost::capy::execution_context&, int) :140 462x 50.0% 25.0% 100.0% boost::corosio::detail::select_scheduler::select_scheduler(boost::capy::execution_context&, int)::'lambda'(void*)::__invoke(void*) :175 1947x 100.0% 100.0% boost::corosio::detail::select_scheduler::select_scheduler(boost::capy::execution_context&, int)::'lambda'(void*)::operator void (*)(void*)() const :175 231x 100.0% 100.0% boost::corosio::detail::select_scheduler::select_scheduler(boost::capy::execution_context&, int)::'lambda'(void*)::operator()(void*) const :175 1947x 100.0% 50.0% 100.0% boost::corosio::detail::select_scheduler::~select_scheduler() :185 693x 100.0% 50.0% 100.0% boost::corosio::detail::select_scheduler::shutdown() :194 231x 100.0% 50.0% 100.0% boost::corosio::detail::select_scheduler::register_descriptor(int, boost::corosio::detail::select_descriptor_state*) const :203 3699x 94.7% 66.7% 60.0% boost::corosio::detail::select_scheduler::deregister_descriptor(int) const :232 3699x 92.3% 65.0% 78.0% boost::corosio::detail::select_scheduler::notify_reactor() const :254 85295x 100.0% 100.0% boost::corosio::detail::select_scheduler::interrupt_reactor() const :260 91334x 100.0% 100.0% boost::corosio::detail::select_scheduler::calculate_timeout(long) const :267 621410x 85.7% 75.0% 80.0% boost::corosio::detail::select_scheduler::run_task(std::__1::unique_lock<std::__1::mutex>&, boost::corosio::detail::reactor_scheduler_context*) :300 646965x 94.2% 61.8% 82.0%
Line Branch TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_SELECT
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/capy/ex/execution_context.hpp>
19
20 #include <boost/corosio/native/detail/reactor/reactor_scheduler.hpp>
21
22 #include <boost/corosio/native/detail/select/select_op.hpp>
23 #include <boost/corosio/detail/timer_service.hpp>
24 #include <boost/corosio/native/detail/make_err.hpp>
25 #include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
26 #include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
27
28 #include <boost/corosio/detail/except.hpp>
29
30 #include <sys/select.h>
31 #include <unistd.h>
32 #include <errno.h>
33 #include <fcntl.h>
34
35 #include <atomic>
36 #include <chrono>
37 #include <cstdint>
38 #include <limits>
39 #include <mutex>
40 #include <unordered_map>
41
42 namespace boost::corosio::detail {
43
44 struct select_op;
45 struct select_descriptor_state;
46
47 /** POSIX scheduler using select() for I/O multiplexing.
48
49 This scheduler implements the scheduler interface using the POSIX select()
50 call for I/O event notification. It inherits the shared reactor threading
51 model from reactor_scheduler_base: signal state machine, inline completion
52 budget, work counting, and the do_one event loop.
53
54 The design mirrors epoll_scheduler for behavioral consistency:
55 - Same single-reactor thread coordination model
56 - Same deferred I/O pattern (reactor marks ready; workers do I/O)
57 - Same timer integration pattern
58
59 Known Limitations:
60 - FD_SETSIZE (~1024) limits maximum concurrent connections
61 - O(n) scanning: rebuilds fd_sets each iteration
62 - Level-triggered only (no edge-triggered mode)
63
64 @par Thread Safety
65 All public member functions are thread-safe.
66 */
67 class BOOST_COROSIO_DECL select_scheduler final : public reactor_scheduler_base
68 {
69 public:
70 /** Construct the scheduler.
71
72 Creates a self-pipe for reactor interruption.
73
74 @param ctx Reference to the owning execution_context.
75 @param concurrency_hint Hint for expected thread count (unused).
76 */
77 select_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
78
79 /// Destroy the scheduler.
80 ~select_scheduler() override;
81
82 select_scheduler(select_scheduler const&) = delete;
83 select_scheduler& operator=(select_scheduler const&) = delete;
84
85 /// Shut down the scheduler, draining pending operations.
86 void shutdown() override;
87
88 /** Return the maximum file descriptor value supported.
89
90 Returns FD_SETSIZE - 1, the maximum fd value that can be
91 monitored by select(). Operations with fd >= FD_SETSIZE
92 will fail with EINVAL.
93
94 @return The maximum supported file descriptor value.
95 */
96 static constexpr int max_fd() noexcept
97 {
98 return FD_SETSIZE - 1;
99 }
100
101 /** Register a descriptor for persistent monitoring.
102
103 The fd is added to the registered_descs_ map and will be
104 included in subsequent select() calls. The reactor is
105 interrupted so a blocked select() rebuilds its fd_sets.
106
107 @param fd The file descriptor to register.
108 @param desc Pointer to descriptor state for this fd.
109 */
110 void register_descriptor(int fd, select_descriptor_state* desc) const;
111
112 /** Deregister a persistently registered descriptor.
113
114 @param fd The file descriptor to deregister.
115 */
116 void deregister_descriptor(int fd) const;
117
118 /** Interrupt the reactor so it rebuilds its fd_sets.
119
120 Called when a write or connect op is registered after
121 the reactor's snapshot was taken. Without this, select()
122 may block not watching for writability on the fd.
123 */
124 void notify_reactor() const;
125
126 private:
127 void
128 run_task(std::unique_lock<std::mutex>& lock, context_type* ctx) override;
129 void interrupt_reactor() const override;
130 long calculate_timeout(long requested_timeout_us) const;
131
132 // Self-pipe for interrupting select()
133 int pipe_fds_[2]; // [0]=read, [1]=write
134
135 // Per-fd tracking for fd_set building
136 mutable std::unordered_map<int, select_descriptor_state*> registered_descs_;
137 mutable int max_fd_ = -1;
138 };
139
140 693x inline select_scheduler::select_scheduler(capy::execution_context& ctx, int)
141 231x : pipe_fds_{-1, -1}
142 231x , max_fd_(-1)
143 462x {
144
2/4
✓ Branch 0 taken 231 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 231 times.
231x if (::pipe(pipe_fds_) < 0)
145 detail::throw_system_error(make_err(errno), "pipe");
146
147
2/2
✓ Branch 0 taken 231 times.
✓ Branch 1 taken 462 times.
693x for (int i = 0; i < 2; ++i)
148 {
149
1/2
✓ Branch 0 taken 462 times.
✗ Branch 1 not taken.
462x int flags = ::fcntl(pipe_fds_[i], F_GETFL, 0);
150
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 462 times.
462x if (flags == -1)
151 {
152 int errn = errno;
153 ::close(pipe_fds_[0]);
154 ::close(pipe_fds_[1]);
155 detail::throw_system_error(make_err(errn), "fcntl F_GETFL");
156 }
157
2/4
✓ Branch 0 taken 462 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 462 times.
462x if (::fcntl(pipe_fds_[i], F_SETFL, flags | O_NONBLOCK) == -1)
158 {
159 int errn = errno;
160 ::close(pipe_fds_[0]);
161 ::close(pipe_fds_[1]);
162 detail::throw_system_error(make_err(errn), "fcntl F_SETFL");
163 }
164
2/4
✓ Branch 0 taken 462 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 462 times.
462x if (::fcntl(pipe_fds_[i], F_SETFD, FD_CLOEXEC) == -1)
165 {
166 int errn = errno;
167 ::close(pipe_fds_[0]);
168 ::close(pipe_fds_[1]);
169 detail::throw_system_error(make_err(errn), "fcntl F_SETFD");
170 }
171 462x }
172
173
1/2
✓ Branch 0 taken 231 times.
✗ Branch 1 not taken.
231x timer_svc_ = &get_timer_service(ctx, *this);
174
2/4
✓ Branch 0 taken 231 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 231 times.
✗ Branch 3 not taken.
462x timer_svc_->set_on_earliest_changed(
175 2178x timer_service::callback(this, [](void* p) {
176 1947x static_cast<select_scheduler*>(p)->interrupt_reactor();
177 1947x }));
178
179
1/2
✓ Branch 0 taken 231 times.
✗ Branch 1 not taken.
231x get_resolver_service(ctx, *this);
180
1/2
✓ Branch 0 taken 231 times.
✗ Branch 1 not taken.
231x get_signal_service(ctx, *this);
181
182 231x completed_ops_.push(&task_op_);
183 462x }
184
185 693x inline select_scheduler::~select_scheduler()
186 462x {
187
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 231 times.
231x if (pipe_fds_[0] >= 0)
188
1/2
✓ Branch 0 taken 231 times.
✗ Branch 1 not taken.
231x ::close(pipe_fds_[0]);
189
1/2
✓ Branch 0 taken 231 times.
✗ Branch 1 not taken.
231x if (pipe_fds_[1] >= 0)
190
1/2
✓ Branch 0 taken 231 times.
✗ Branch 1 not taken.
231x ::close(pipe_fds_[1]);
191 693x }
192
193 inline void
194 231x select_scheduler::shutdown()
195 {
196 231x shutdown_drain();
197
198
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 231 times.
231x if (pipe_fds_[1] >= 0)
199 231x interrupt_reactor();
200 231x }
201
202 inline void
203 3699x select_scheduler::register_descriptor(
204 int fd, select_descriptor_state* desc) const
205 {
206
1/2
✓ Branch 0 taken 3699 times.
✗ Branch 1 not taken.
3699x if (fd < 0 || fd >= FD_SETSIZE)
207 detail::throw_system_error(make_err(EINVAL), "select: fd out of range");
208
209 3699x desc->registered_events = reactor_event_read | reactor_event_write;
210 3699x desc->fd = fd;
211 3699x desc->scheduler_ = this;
212 3699x desc->ready_events_.store(0, std::memory_order_relaxed);
213
214 {
215 3699x std::lock_guard lock(desc->mutex);
216 3699x desc->impl_ref_.reset();
217 3699x desc->read_ready = false;
218 3699x desc->write_ready = false;
219 3699x }
220
221 {
222 3699x std::lock_guard lock(mutex_);
223
1/2
✓ Branch 0 taken 3699 times.
✗ Branch 1 not taken.
3699x registered_descs_[fd] = desc;
224
2/2
✓ Branch 0 taken 4 times.
✓ Branch 1 taken 3695 times.
3699x if (fd > max_fd_)
225 3695x max_fd_ = fd;
226 3699x }
227
228 3699x interrupt_reactor();
229 3699x }
230
231 inline void
232 3699x select_scheduler::deregister_descriptor(int fd) const
233 {
234 3699x std::lock_guard lock(mutex_);
235
236
1/2
✓ Branch 0 taken 3699 times.
✗ Branch 1 not taken.
3699x auto it = registered_descs_.find(fd);
237
2/4
✓ Branch 0 taken 3699 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 3699 times.
✗ Branch 3 not taken.
3699x if (it == registered_descs_.end())
238 return;
239
240
1/2
✓ Branch 0 taken 3699 times.
✗ Branch 1 not taken.
3699x registered_descs_.erase(it);
241
242
2/2
✓ Branch 0 taken 61 times.
✓ Branch 1 taken 3638 times.
3699x if (fd == max_fd_)
243 {
244 3638x max_fd_ = pipe_fds_[0];
245
5/8
✓ Branch 0 taken 7178 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 3540 times.
✓ Branch 3 taken 3638 times.
✓ Branch 4 taken 3540 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 3540 times.
✗ Branch 7 not taken.
7178x for (auto& [registered_fd, state] : registered_descs_)
246 {
247
2/2
✓ Branch 0 taken 9 times.
✓ Branch 1 taken 3531 times.
3540x if (registered_fd > max_fd_)
248 3531x max_fd_ = registered_fd;
249 }
250 3638x }
251 3699x }
252
253 inline void
254 85295x select_scheduler::notify_reactor() const
255 {
256 85295x interrupt_reactor();
257 85295x }
258
259 inline void
260 91334x select_scheduler::interrupt_reactor() const
261 {
262 91334x char byte = 1;
263 91334x [[maybe_unused]] auto r = ::write(pipe_fds_[1], &byte, 1);
264 91334x }
265
266 inline long
267 621410x select_scheduler::calculate_timeout(long requested_timeout_us) const
268 {
269
1/2
✓ Branch 0 taken 621410 times.
✗ Branch 1 not taken.
621410x if (requested_timeout_us == 0)
270 return 0;
271
272 621410x auto nearest = timer_svc_->nearest_expiry();
273
2/2
✓ Branch 0 taken 129 times.
✓ Branch 1 taken 621281 times.
621410x if (nearest == timer_service::time_point::max())
274 129x return requested_timeout_us;
275
276 621281x auto now = std::chrono::steady_clock::now();
277
2/2
✓ Branch 0 taken 37 times.
✓ Branch 1 taken 621244 times.
621281x if (nearest <= now)
278 37x return 0;
279
280 621244x auto timer_timeout_us =
281 621244x std::chrono::duration_cast<std::chrono::microseconds>(nearest - now)
282 621244x .count();
283
284 621244x constexpr auto long_max =
285 static_cast<long long>((std::numeric_limits<long>::max)());
286 621244x auto capped_timer_us =
287 1242488x (std::min)((std::max)(static_cast<long long>(timer_timeout_us),
288 621244x static_cast<long long>(0)),
289 long_max);
290
291
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 621244 times.
621244x if (requested_timeout_us < 0)
292 621244x return static_cast<long>(capped_timer_us);
293
294 return static_cast<long>(
295 (std::min)(static_cast<long long>(requested_timeout_us),
296 capped_timer_us));
297 621410x }
298
299 inline void
300 646965x select_scheduler::run_task(
301 std::unique_lock<std::mutex>& lock, context_type* ctx)
302 {
303
2/2
✓ Branch 0 taken 25555 times.
✓ Branch 1 taken 621410 times.
646965x long effective_timeout_us = task_interrupted_ ? 0 : calculate_timeout(-1);
304
305 // Snapshot registered descriptors while holding lock.
306 // Record which fds need write monitoring to avoid a hot loop:
307 // select is level-triggered so writable sockets (nearly always
308 // writable) would cause select() to return immediately every
309 // iteration if unconditionally added to write_fds.
310 struct fd_entry
311 {
312 int fd;
313 select_descriptor_state* desc;
314 bool needs_write;
315 };
316 fd_entry snapshot[FD_SETSIZE];
317 646965x int snapshot_count = 0;
318
319
2/2
✓ Branch 0 taken 1316939 times.
✓ Branch 1 taken 646965 times.
1963904x for (auto& [fd, desc] : registered_descs_)
320 {
321
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1316939 times.
1316939x if (snapshot_count < FD_SETSIZE)
322 {
323 1316939x std::lock_guard desc_lock(desc->mutex);
324 1316939x snapshot[snapshot_count].fd = fd;
325 1316939x snapshot[snapshot_count].desc = desc;
326 1316939x snapshot[snapshot_count].needs_write =
327
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1316939 times.
1316939x (desc->write_op || desc->connect_op);
328 1316939x ++snapshot_count;
329 1316939x }
330 }
331
332
2/2
✓ Branch 0 taken 25555 times.
✓ Branch 1 taken 621410 times.
646965x if (lock.owns_lock())
333 621410x lock.unlock();
334
335 646965x task_cleanup on_exit{this, &lock, ctx};
336
337 fd_set read_fds, write_fds, except_fds;
338 646965x FD_ZERO(&read_fds);
339 646965x FD_ZERO(&write_fds);
340 646965x FD_ZERO(&except_fds);
341
342
1/2
✓ Branch 0 taken 646965 times.
✗ Branch 1 not taken.
646965x FD_SET(pipe_fds_[0], &read_fds);
343 646965x int nfds = pipe_fds_[0];
344
345
2/2
✓ Branch 0 taken 646965 times.
✓ Branch 1 taken 1316939 times.
1963904x for (int i = 0; i < snapshot_count; ++i)
346 {
347 1316939x int fd = snapshot[i].fd;
348
1/2
✓ Branch 0 taken 1316939 times.
✗ Branch 1 not taken.
1316939x FD_SET(fd, &read_fds);
349
2/2
✓ Branch 0 taken 3537 times.
✓ Branch 1 taken 1313402 times.
1316939x if (snapshot[i].needs_write)
350
1/2
✓ Branch 0 taken 3537 times.
✗ Branch 1 not taken.
3537x FD_SET(fd, &write_fds);
351
1/2
✓ Branch 0 taken 1316939 times.
✗ Branch 1 not taken.
1316939x FD_SET(fd, &except_fds);
352
2/2
✓ Branch 0 taken 670266 times.
✓ Branch 1 taken 646673 times.
1316939x if (fd > nfds)
353 646673x nfds = fd;
354 1316939x }
355
356 struct timeval tv;
357 646965x struct timeval* tv_ptr = nullptr;
358
2/2
✓ Branch 0 taken 129 times.
✓ Branch 1 taken 646836 times.
646965x if (effective_timeout_us >= 0)
359 {
360 646836x tv.tv_sec = effective_timeout_us / 1000000;
361 646836x tv.tv_usec = effective_timeout_us % 1000000;
362 646836x tv_ptr = &tv;
363 646836x }
364
365
1/2
✓ Branch 0 taken 646965 times.
✗ Branch 1 not taken.
646965x int ready = ::select(nfds + 1, &read_fds, &write_fds, &except_fds, tv_ptr);
366
367 // EINTR: signal interrupted select(), just retry.
368 // EBADF: an fd was closed between snapshot and select(); retry
369 // with a fresh snapshot from registered_descs_.
370
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 646965 times.
646965x if (ready < 0)
371 {
372 if (errno == EINTR || errno == EBADF)
373 return;
374 detail::throw_system_error(make_err(errno), "select");
375 }
376
377 // Process timers outside the lock
378
1/2
✓ Branch 0 taken 646965 times.
✗ Branch 1 not taken.
646965x timer_svc_->process_expired();
379
380 646965x op_queue local_ops;
381
382
2/2
✓ Branch 0 taken 641937 times.
✓ Branch 1 taken 5028 times.
646965x if (ready > 0)
383 {
384
3/4
✓ Branch 0 taken 641937 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 26131 times.
✓ Branch 3 taken 615806 times.
641937x if (FD_ISSET(pipe_fds_[0], &read_fds))
385 {
386 char buf[256];
387
3/4
✓ Branch 0 taken 52262 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 26131 times.
✓ Branch 3 taken 26131 times.
52262x while (::read(pipe_fds_[0], buf, sizeof(buf)) > 0)
388 {
389 }
390 26131x }
391
392
2/2
✓ Branch 0 taken 1310842 times.
✓ Branch 1 taken 641937 times.
1952779x for (int i = 0; i < snapshot_count; ++i)
393 {
394 1310842x int fd = snapshot[i].fd;
395 1310842x select_descriptor_state* desc = snapshot[i].desc;
396
397 1310842x std::uint32_t flags = 0;
398
3/4
✓ Branch 0 taken 1310842 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 612195 times.
✓ Branch 3 taken 698647 times.
1310842x if (FD_ISSET(fd, &read_fds))
399 698647x flags |= reactor_event_read;
400
3/4
✓ Branch 0 taken 1310842 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 1309053 times.
✓ Branch 3 taken 1789 times.
1310842x if (FD_ISSET(fd, &write_fds))
401 1789x flags |= reactor_event_write;
402
2/4
✓ Branch 0 taken 1310842 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 1310842 times.
✗ Branch 3 not taken.
1310842x if (FD_ISSET(fd, &except_fds))
403 flags |= reactor_event_error;
404
405
2/2
✓ Branch 0 taken 610409 times.
✓ Branch 1 taken 700433 times.
1310842x if (flags == 0)
406 610409x continue;
407
408 700433x desc->add_ready_events(flags);
409
410 700433x bool expected = false;
411
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 700433 times.
700433x if (desc->is_enqueued_.compare_exchange_strong(
412 expected, true, std::memory_order_release,
413 std::memory_order_relaxed))
414 {
415 700433x local_ops.push(desc);
416 700433x }
417 700433x }
418 641937x }
419
420
1/2
✓ Branch 0 taken 646965 times.
✗ Branch 1 not taken.
646965x lock.lock();
421
422
2/2
✓ Branch 0 taken 9099 times.
✓ Branch 1 taken 637866 times.
646965x if (!local_ops.empty())
423 637866x completed_ops_.splice(local_ops);
424 646965x }
425
426 } // namespace boost::corosio::detail
427
428 #endif // BOOST_COROSIO_HAS_SELECT
429
430 #endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
431