include/boost/corosio/native/detail/epoll/epoll_scheduler.hpp

83.9% Lines (115/137) 100.0% List of functions (9/9)
f(x) Functions (9)
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_EPOLL
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/capy/ex/execution_context.hpp>
19
20 #include <boost/corosio/native/detail/reactor/reactor_scheduler.hpp>
21
22 #include <boost/corosio/native/detail/epoll/epoll_op.hpp>
23 #include <boost/corosio/detail/timer_service.hpp>
24 #include <boost/corosio/native/detail/make_err.hpp>
25 #include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
26 #include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
27
28 #include <boost/corosio/detail/except.hpp>
29
30 #include <atomic>
31 #include <chrono>
32 #include <cstdint>
33 #include <mutex>
34
35 #include <errno.h>
36 #include <sys/epoll.h>
37 #include <sys/eventfd.h>
38 #include <sys/timerfd.h>
39 #include <unistd.h>
40
41 namespace boost::corosio::detail {
42
43 struct epoll_op;
44 struct descriptor_state;
45
46 /** Linux scheduler using epoll for I/O multiplexing.
47
48 This scheduler implements the scheduler interface using Linux epoll
49 for efficient I/O event notification. It uses a single reactor model
50 where one thread runs epoll_wait while other threads
51 wait on a condition variable for handler work. This design provides:
52
53 - Handler parallelism: N posted handlers can execute on N threads
54 - No thundering herd: condition_variable wakes exactly one thread
55 - IOCP parity: Behavior matches Windows I/O completion port semantics
56
57 When threads call run(), they first try to execute queued handlers.
58 If the queue is empty and no reactor is running, one thread becomes
59 the reactor and runs epoll_wait. Other threads wait on a condition
60 variable until handlers are available.
61
62 @par Thread Safety
63 All public member functions are thread-safe.
64 */
65 class BOOST_COROSIO_DECL epoll_scheduler final : public reactor_scheduler_base
66 {
67 public:
68 /** Construct the scheduler.
69
70 Creates an epoll instance, eventfd for reactor interruption,
71 and timerfd for kernel-managed timer expiry.
72
73 @param ctx Reference to the owning execution_context.
74 @param concurrency_hint Hint for expected thread count (unused).
75 */
76 epoll_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
77
78 /// Destroy the scheduler.
79 ~epoll_scheduler() override;
80
81 epoll_scheduler(epoll_scheduler const&) = delete;
82 epoll_scheduler& operator=(epoll_scheduler const&) = delete;
83
84 /// Shut down the scheduler, draining pending operations.
85 void shutdown() override;
86
87 /** Return the epoll file descriptor.
88
89 Used by socket services to register file descriptors
90 for I/O event notification.
91
92 @return The epoll file descriptor.
93 */
94 int epoll_fd() const noexcept
95 {
96 return epoll_fd_;
97 }
98
99 /** Register a descriptor for persistent monitoring.
100
101 The fd is registered once and stays registered until explicitly
102 deregistered. Events are dispatched via descriptor_state which
103 tracks pending read/write/connect operations.
104
105 @param fd The file descriptor to register.
106 @param desc Pointer to descriptor data (stored in epoll_event.data.ptr).
107 */
108 void register_descriptor(int fd, descriptor_state* desc) const;
109
110 /** Deregister a persistently registered descriptor.
111
112 @param fd The file descriptor to deregister.
113 */
114 void deregister_descriptor(int fd) const;
115
116 private:
117 void
118 run_task(std::unique_lock<std::mutex>& lock, context_type* ctx) override;
119 void interrupt_reactor() const override;
120 void update_timerfd() const;
121
122 int epoll_fd_;
123 int event_fd_;
124 int timer_fd_;
125
126 // Edge-triggered eventfd state
127 mutable std::atomic<bool> eventfd_armed_{false};
128
129 // Set when the earliest timer changes; flushed before epoll_wait
130 mutable std::atomic<bool> timerfd_stale_{false};
131 };
132
133 339x inline epoll_scheduler::epoll_scheduler(capy::execution_context& ctx, int)
134 339x : epoll_fd_(-1)
135 339x , event_fd_(-1)
136 339x , timer_fd_(-1)
137 {
138 339x epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
139 339x if (epoll_fd_ < 0)
140 detail::throw_system_error(make_err(errno), "epoll_create1");
141
142 339x event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
143 339x if (event_fd_ < 0)
144 {
145 int errn = errno;
146 ::close(epoll_fd_);
147 detail::throw_system_error(make_err(errn), "eventfd");
148 }
149
150 339x timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
151 339x if (timer_fd_ < 0)
152 {
153 int errn = errno;
154 ::close(event_fd_);
155 ::close(epoll_fd_);
156 detail::throw_system_error(make_err(errn), "timerfd_create");
157 }
158
159 339x epoll_event ev{};
160 339x ev.events = EPOLLIN | EPOLLET;
161 339x ev.data.ptr = nullptr;
162 339x if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
163 {
164 int errn = errno;
165 ::close(timer_fd_);
166 ::close(event_fd_);
167 ::close(epoll_fd_);
168 detail::throw_system_error(make_err(errn), "epoll_ctl");
169 }
170
171 339x epoll_event timer_ev{};
172 339x timer_ev.events = EPOLLIN | EPOLLERR;
173 339x timer_ev.data.ptr = &timer_fd_;
174 339x if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
175 {
176 int errn = errno;
177 ::close(timer_fd_);
178 ::close(event_fd_);
179 ::close(epoll_fd_);
180 detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
181 }
182
183 339x timer_svc_ = &get_timer_service(ctx, *this);
184 339x timer_svc_->set_on_earliest_changed(
185 6559x timer_service::callback(this, [](void* p) {
186 6220x auto* self = static_cast<epoll_scheduler*>(p);
187 6220x self->timerfd_stale_.store(true, std::memory_order_release);
188 6220x self->interrupt_reactor();
189 6220x }));
190
191 339x get_resolver_service(ctx, *this);
192 339x get_signal_service(ctx, *this);
193
194 339x completed_ops_.push(&task_op_);
195 339x }
196
197 678x inline epoll_scheduler::~epoll_scheduler()
198 {
199 339x if (timer_fd_ >= 0)
200 339x ::close(timer_fd_);
201 339x if (event_fd_ >= 0)
202 339x ::close(event_fd_);
203 339x if (epoll_fd_ >= 0)
204 339x ::close(epoll_fd_);
205 678x }
206
207 inline void
208 339x epoll_scheduler::shutdown()
209 {
210 339x shutdown_drain();
211
212 339x if (event_fd_ >= 0)
213 339x interrupt_reactor();
214 339x }
215
216 inline void
217 13863x epoll_scheduler::register_descriptor(int fd, descriptor_state* desc) const
218 {
219 13863x epoll_event ev{};
220 13863x ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
221 13863x ev.data.ptr = desc;
222
223 13863x if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
224 detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
225
226 13863x desc->registered_events = ev.events;
227 13863x desc->fd = fd;
228 13863x desc->scheduler_ = this;
229 13863x desc->ready_events_.store(0, std::memory_order_relaxed);
230
231 13863x std::lock_guard lock(desc->mutex);
232 13863x desc->impl_ref_.reset();
233 13863x desc->read_ready = false;
234 13863x desc->write_ready = false;
235 13863x }
236
237 inline void
238 13863x epoll_scheduler::deregister_descriptor(int fd) const
239 {
240 13863x ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
241 13863x }
242
243 inline void
244 9269x epoll_scheduler::interrupt_reactor() const
245 {
246 9269x bool expected = false;
247 9269x if (eventfd_armed_.compare_exchange_strong(
248 expected, true, std::memory_order_release,
249 std::memory_order_relaxed))
250 {
251 9015x std::uint64_t val = 1;
252 9015x [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
253 }
254 9269x }
255
256 inline void
257 11683x epoll_scheduler::update_timerfd() const
258 {
259 11683x auto nearest = timer_svc_->nearest_expiry();
260
261 11683x itimerspec ts{};
262 11683x int flags = 0;
263
264 11683x if (nearest == timer_service::time_point::max())
265 {
266 // No timers — disarm by setting to 0 (relative)
267 }
268 else
269 {
270 11628x auto now = std::chrono::steady_clock::now();
271 11628x if (nearest <= now)
272 {
273 // Use 1ns instead of 0 — zero disarms the timerfd
274 49x ts.it_value.tv_nsec = 1;
275 }
276 else
277 {
278 11579x auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
279 11579x nearest - now)
280 11579x .count();
281 11579x ts.it_value.tv_sec = nsec / 1000000000;
282 11579x ts.it_value.tv_nsec = nsec % 1000000000;
283 11579x if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
284 ts.it_value.tv_nsec = 1;
285 }
286 }
287
288 11683x if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
289 detail::throw_system_error(make_err(errno), "timerfd_settime");
290 11683x }
291
292 inline void
293 53830x epoll_scheduler::run_task(std::unique_lock<std::mutex>& lock, context_type* ctx)
294 {
295 53830x int timeout_ms = task_interrupted_ ? 0 : -1;
296
297 53830x if (lock.owns_lock())
298 17875x lock.unlock();
299
300 53830x task_cleanup on_exit{this, &lock, ctx};
301
302 // Flush deferred timerfd programming before blocking
303 53830x if (timerfd_stale_.exchange(false, std::memory_order_acquire))
304 6198x update_timerfd();
305
306 epoll_event events[128];
307 53830x int nfds = ::epoll_wait(epoll_fd_, events, 128, timeout_ms);
308
309 53830x if (nfds < 0 && errno != EINTR)
310 detail::throw_system_error(make_err(errno), "epoll_wait");
311
312 53830x bool check_timers = false;
313 53830x op_queue local_ops;
314
315 114002x for (int i = 0; i < nfds; ++i)
316 {
317 60172x if (events[i].data.ptr == nullptr)
318 {
319 std::uint64_t val;
320 // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
321 8676x [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
322 8676x eventfd_armed_.store(false, std::memory_order_relaxed);
323 8676x continue;
324 8676x }
325
326 51496x if (events[i].data.ptr == &timer_fd_)
327 {
328 std::uint64_t expirations;
329 // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
330 [[maybe_unused]] auto r =
331 5485x ::read(timer_fd_, &expirations, sizeof(expirations));
332 5485x check_timers = true;
333 5485x continue;
334 5485x }
335
336 46011x auto* desc = static_cast<descriptor_state*>(events[i].data.ptr);
337 46011x desc->add_ready_events(events[i].events);
338
339 46011x bool expected = false;
340 46011x if (desc->is_enqueued_.compare_exchange_strong(
341 expected, true, std::memory_order_release,
342 std::memory_order_relaxed))
343 {
344 46011x local_ops.push(desc);
345 }
346 }
347
348 53830x if (check_timers)
349 {
350 5485x timer_svc_->process_expired();
351 5485x update_timerfd();
352 }
353
354 53830x lock.lock();
355
356 53830x if (!local_ops.empty())
357 28994x completed_ops_.splice(local_ops);
358 53830x }
359
360 } // namespace boost::corosio::detail
361
362 #endif // BOOST_COROSIO_HAS_EPOLL
363
364 #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
365