include/boost/corosio/native/detail/reactor/reactor_descriptor_state.hpp

80.8% Lines (105/130) 100.0% List of functions (6/6) 71.4% Branches (50/70)
reactor_descriptor_state.hpp
f(x) Functions (6)
Line Branch TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_DESCRIPTOR_STATE_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_DESCRIPTOR_STATE_HPP
12
13 #include <boost/corosio/native/detail/reactor/reactor_op_base.hpp>
14 #include <boost/corosio/native/detail/reactor/reactor_scheduler.hpp>
15
16 #include <boost/corosio/detail/conditionally_enabled_mutex.hpp>
17
18 #include <atomic>
19 #include <cstdint>
20 #include <memory>
21
22 #include <errno.h>
23 #include <sys/socket.h>
24
25 namespace boost::corosio::detail {
26
27 /// Shared reactor event constants.
28 /// These match epoll numeric values; kqueue maps its events to the same.
29 static constexpr std::uint32_t reactor_event_read = 0x001;
30 static constexpr std::uint32_t reactor_event_write = 0x004;
31 static constexpr std::uint32_t reactor_event_error = 0x008;
32
33 /** Per-descriptor state shared across reactor backends.
34
35 Tracks pending operations for a file descriptor. The fd is registered
36 once with the reactor and stays registered until closed. Uses deferred
37 I/O: the reactor sets ready_events atomically, then enqueues this state.
38 When popped by the scheduler, invoke_deferred_io() performs I/O under
39 the mutex and queues completed ops.
40
41 Non-template: uses reactor_op_base pointers so the scheduler and
42 descriptor_state code exist as a single copy in the binary regardless
43 of how many backends are compiled in.
44
45 @par Thread Safety
46 The mutex protects operation pointers and ready flags. ready_events_
47 and is_enqueued_ are atomic for lock-free reactor access.
48 */
49 25737x struct reactor_descriptor_state : scheduler_op
50 {
51 /// Protects operation pointers and ready/cancel flags.
52 /// Becomes a no-op in single-threaded mode.
53 25737x conditionally_enabled_mutex mutex{true};
54
55 /// Pending read operation (guarded by `mutex`).
56 25737x reactor_op_base* read_op = nullptr;
57
58 /// Pending write operation (guarded by `mutex`).
59 25737x reactor_op_base* write_op = nullptr;
60
61 /// Pending connect operation (guarded by `mutex`).
62 25737x reactor_op_base* connect_op = nullptr;
63
64 /// Pending wait-for-read operation (guarded by `mutex`).
65 25737x reactor_op_base* wait_read_op = nullptr;
66
67 /// Pending wait-for-write operation (guarded by `mutex`).
68 25737x reactor_op_base* wait_write_op = nullptr;
69
70 /// Pending wait-for-error operation (guarded by `mutex`).
71 25737x reactor_op_base* wait_error_op = nullptr;
72
73 /// True if a read edge event arrived before an op was registered.
74 25737x bool read_ready = false;
75
76 /// True if a write edge event arrived before an op was registered.
77 25737x bool write_ready = false;
78
79 /// Deferred read cancellation (IOCP-style cancel semantics).
80 25737x bool read_cancel_pending = false;
81
82 /// Deferred write cancellation (IOCP-style cancel semantics).
83 25737x bool write_cancel_pending = false;
84
85 /// Deferred connect cancellation (IOCP-style cancel semantics).
86 25737x bool connect_cancel_pending = false;
87
88 /// Deferred wait-read cancellation (IOCP-style cancel semantics).
89 25737x bool wait_read_cancel_pending = false;
90
91 /// Deferred wait-write cancellation (IOCP-style cancel semantics).
92 25737x bool wait_write_cancel_pending = false;
93
94 /// Deferred wait-error cancellation (IOCP-style cancel semantics).
95 25737x bool wait_error_cancel_pending = false;
96
97 /// Event mask set during registration (no mutex needed).
98 25737x std::uint32_t registered_events = 0;
99
100 /// File descriptor this state tracks.
101 25737x int fd = -1;
102
103 /// Accumulated ready events (set by reactor, read by scheduler).
104 25737x std::atomic<std::uint32_t> ready_events_{0};
105
106 /// True while this state is queued in the scheduler's completed_ops.
107 25737x std::atomic<bool> is_enqueued_{false};
108
109 /// Owning scheduler for posting completions.
110 25737x reactor_scheduler const* scheduler_ = nullptr;
111
112 /// Prevents impl destruction while queued in the scheduler.
113 std::shared_ptr<void> impl_ref_;
114
115 /// Add ready events atomically.
116 /// Release pairs with the consumer's acquire exchange on
117 /// ready_events_ so the consumer sees all flags. On x86 (TSO)
118 /// this compiles to the same LOCK OR as relaxed.
119 1518112x void add_ready_events(std::uint32_t ev) noexcept
120 {
121 1518112x ready_events_.fetch_or(ev, std::memory_order_release);
122 1518112x }
123
124 /// Invoke deferred I/O and dispatch completions.
125 1517806x void operator()() override
126 {
127 1517806x invoke_deferred_io();
128 1517806x }
129
130 /// Destroy without invoking.
131 /// Called during scheduler::shutdown() drain. Clear impl_ref_ to break
132 /// the self-referential cycle set by close_socket().
133 160x void destroy() override
134 {
135 160x impl_ref_.reset();
136 160x }
137
138 /** Perform deferred I/O and queue completions.
139
140 Performs I/O under the mutex and queues completed ops. EAGAIN
141 ops stay parked in their slot for re-delivery on the next
142 edge event.
143 */
144 void invoke_deferred_io();
145 };
146
147 inline void
148 1517806x reactor_descriptor_state::invoke_deferred_io()
149 {
150 1517806x std::shared_ptr<void> prevent_impl_destruction;
151 1517806x op_queue local_ops;
152
153 {
154
1/2
✓ Branch 0 taken 1517806 times.
✗ Branch 1 not taken.
1517806x conditionally_enabled_mutex::scoped_lock lock(mutex);
155
156 // Must clear is_enqueued_ and move impl_ref_ under the same
157 // lock that processes I/O. close_socket() checks is_enqueued_
158 // under this mutex — without atomicity between the flag store
159 // and the ref move, close_socket() could see is_enqueued_==false,
160 // skip setting impl_ref_, and destroy the impl under us.
161 1517806x prevent_impl_destruction = std::move(impl_ref_);
162 1517806x is_enqueued_.store(false, std::memory_order_release);
163
164 1517806x std::uint32_t ev = ready_events_.exchange(0, std::memory_order_acquire);
165
1/2
✓ Branch 0 taken 1517806 times.
✗ Branch 1 not taken.
1517806x if (ev == 0)
166 {
167 // Mutex unlocks here; compensate for work_cleanup's decrement
168 scheduler_->compensating_work_started();
169 return;
170 }
171
172 1517806x int err = 0;
173
2/2
✓ Branch 0 taken 1517796 times.
✓ Branch 1 taken 10 times.
1517806x if (ev & reactor_event_error)
174 {
175 10x socklen_t len = sizeof(err);
176
2/4
✓ Branch 0 taken 10 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 10 times.
10x if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
177 err = errno;
178
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 9 times.
10x if (err == 0)
179 1x err = EIO;
180 10x }
181
182
2/2
✓ Branch 0 taken 78504 times.
✓ Branch 1 taken 1439302 times.
1517806x if (ev & reactor_event_read)
183 {
184
2/2
✓ Branch 0 taken 129721 times.
✓ Branch 1 taken 1309581 times.
1439302x if (read_op)
185 {
186 129721x auto* rd = read_op;
187
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 129721 times.
129721x if (err)
188 rd->complete(err, 0);
189 else
190 129721x rd->perform_io();
191
192
3/4
✓ Branch 0 taken 127352 times.
✓ Branch 1 taken 2369 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 127352 times.
129721x if (rd->errn == EAGAIN || rd->errn == EWOULDBLOCK)
193 {
194 2369x rd->errn = 0;
195 2369x }
196 else
197 {
198 127352x read_op = nullptr;
199 127352x local_ops.push(rd);
200 }
201 129721x }
202 else
203 {
204 1309581x read_ready = true;
205 }
206
207 // Complete any parked wait-for-read regardless of read_op presence.
208
2/2
✓ Branch 0 taken 1439286 times.
✓ Branch 1 taken 16 times.
1439302x if (wait_read_op)
209 {
210 16x wait_read_op->complete(err, 0);
211 16x local_ops.push(std::exchange(wait_read_op, nullptr));
212 16x }
213 1439302x }
214
2/2
✓ Branch 0 taken 78655 times.
✓ Branch 1 taken 1439151 times.
1517806x if (ev & reactor_event_write)
215 {
216
2/2
✓ Branch 0 taken 7873 times.
✓ Branch 1 taken 70782 times.
78655x bool had_write_op = (connect_op || write_op);
217
2/2
✓ Branch 0 taken 70782 times.
✓ Branch 1 taken 7873 times.
78655x if (connect_op)
218 {
219 7873x auto* cn = connect_op;
220
2/2
✓ Branch 0 taken 9 times.
✓ Branch 1 taken 7864 times.
7873x if (err)
221 9x cn->complete(err, 0);
222 else
223 7864x cn->perform_io();
224 7873x connect_op = nullptr;
225 7873x local_ops.push(cn);
226 7873x }
227
2/2
✓ Branch 0 taken 77653 times.
✓ Branch 1 taken 1002 times.
78655x if (write_op)
228 {
229 1002x auto* wr = write_op;
230
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1002 times.
1002x if (err)
231 wr->complete(err, 0);
232 else
233 1002x wr->perform_io();
234
235
2/4
✓ Branch 0 taken 1002 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 1002 times.
1002x if (wr->errn == EAGAIN || wr->errn == EWOULDBLOCK)
236 {
237 wr->errn = 0;
238 }
239 else
240 {
241 1002x write_op = nullptr;
242 1002x local_ops.push(wr);
243 }
244 1002x }
245
2/2
✓ Branch 0 taken 8875 times.
✓ Branch 1 taken 69780 times.
78655x if (!had_write_op)
246 69780x write_ready = true;
247
248 // Complete any parked wait-for-write regardless of write_op presence.
249
1/2
✓ Branch 0 taken 78655 times.
✗ Branch 1 not taken.
78655x if (wait_write_op)
250 {
251 wait_write_op->complete(err, 0);
252 local_ops.push(std::exchange(wait_write_op, nullptr));
253 }
254 78655x }
255 // Complete a parked wait-for-error on any error condition.
256
3/4
✓ Branch 0 taken 1517796 times.
✓ Branch 1 taken 10 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 1517796 times.
1517806x if ((ev & reactor_event_error) || err)
257 {
258
2/2
✓ Branch 0 taken 9 times.
✓ Branch 1 taken 1 time.
10x if (wait_error_op)
259 {
260 1x wait_error_op->complete(err, 0);
261 1x local_ops.push(std::exchange(wait_error_op, nullptr));
262 1x }
263 10x }
264
2/2
✓ Branch 0 taken 1517796 times.
✓ Branch 1 taken 10 times.
1517806x if (err)
265 {
266
1/2
✓ Branch 0 taken 10 times.
✗ Branch 1 not taken.
10x if (read_op)
267 {
268 read_op->complete(err, 0);
269 local_ops.push(std::exchange(read_op, nullptr));
270 }
271
1/2
✓ Branch 0 taken 10 times.
✗ Branch 1 not taken.
10x if (write_op)
272 {
273 write_op->complete(err, 0);
274 local_ops.push(std::exchange(write_op, nullptr));
275 }
276
1/2
✓ Branch 0 taken 10 times.
✗ Branch 1 not taken.
10x if (connect_op)
277 {
278 connect_op->complete(err, 0);
279 local_ops.push(std::exchange(connect_op, nullptr));
280 }
281
1/2
✓ Branch 0 taken 10 times.
✗ Branch 1 not taken.
10x if (wait_read_op)
282 {
283 wait_read_op->complete(err, 0);
284 local_ops.push(std::exchange(wait_read_op, nullptr));
285 }
286
1/2
✓ Branch 0 taken 10 times.
✗ Branch 1 not taken.
10x if (wait_write_op)
287 {
288 wait_write_op->complete(err, 0);
289 local_ops.push(std::exchange(wait_write_op, nullptr));
290 }
291 10x }
292 1517806x }
293
294 // Execute first handler inline — the scheduler's work_cleanup
295 // accounts for this as the "consumed" work item
296 1517806x scheduler_op* first = local_ops.pop();
297
2/2
✓ Branch 0 taken 136243 times.
✓ Branch 1 taken 1381563 times.
1517806x if (first)
298 {
299
1/2
✓ Branch 0 taken 136243 times.
✗ Branch 1 not taken.
136243x scheduler_->post_deferred_completions(local_ops);
300
1/2
✓ Branch 0 taken 136243 times.
✗ Branch 1 not taken.
136243x (*first)();
301 136243x }
302 else
303 {
304 1381563x scheduler_->compensating_work_started();
305 }
306 1517806x }
307
308 } // namespace boost::corosio::detail
309
310 #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_DESCRIPTOR_STATE_HPP
311