include/boost/corosio/native/detail/reactor/reactor_descriptor_state.hpp

84.3% Lines (86/102) 100.0% List of functions (6/6) 71.4% Branches (40/56)
reactor_descriptor_state.hpp
f(x) Functions (6)
Line Branch TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_DESCRIPTOR_STATE_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_DESCRIPTOR_STATE_HPP
12
13 #include <boost/corosio/native/detail/reactor/reactor_op_base.hpp>
14 #include <boost/corosio/native/detail/reactor/reactor_scheduler.hpp>
15
16 #include <boost/corosio/detail/conditionally_enabled_mutex.hpp>
17
18 #include <atomic>
19 #include <cstdint>
20 #include <memory>
21
22 #include <errno.h>
23 #include <sys/socket.h>
24
25 namespace boost::corosio::detail {
26
27 /// Shared reactor event constants.
28 /// These match epoll numeric values; kqueue maps its events to the same.
29 static constexpr std::uint32_t reactor_event_read = 0x001;
30 static constexpr std::uint32_t reactor_event_write = 0x004;
31 static constexpr std::uint32_t reactor_event_error = 0x008;
32
33 /** Per-descriptor state shared across reactor backends.
34
35 Tracks pending operations for a file descriptor. The fd is registered
36 once with the reactor and stays registered until closed. Uses deferred
37 I/O: the reactor sets ready_events atomically, then enqueues this state.
38 When popped by the scheduler, invoke_deferred_io() performs I/O under
39 the mutex and queues completed ops.
40
41 Non-template: uses reactor_op_base pointers so the scheduler and
42 descriptor_state code exist as a single copy in the binary regardless
43 of how many backends are compiled in.
44
45 @par Thread Safety
46 The mutex protects operation pointers and ready flags. ready_events_
47 and is_enqueued_ are atomic for lock-free reactor access.
48 */
49 22474x struct reactor_descriptor_state : scheduler_op
50 {
51 /// Protects operation pointers and ready/cancel flags.
52 /// Becomes a no-op in single-threaded mode.
53 22474x conditionally_enabled_mutex mutex{true};
54
55 /// Pending read operation (guarded by `mutex`).
56 22474x reactor_op_base* read_op = nullptr;
57
58 /// Pending write operation (guarded by `mutex`).
59 22474x reactor_op_base* write_op = nullptr;
60
61 /// Pending connect operation (guarded by `mutex`).
62 22474x reactor_op_base* connect_op = nullptr;
63
64 /// True if a read edge event arrived before an op was registered.
65 22474x bool read_ready = false;
66
67 /// True if a write edge event arrived before an op was registered.
68 22474x bool write_ready = false;
69
70 /// Deferred read cancellation (IOCP-style cancel semantics).
71 22474x bool read_cancel_pending = false;
72
73 /// Deferred write cancellation (IOCP-style cancel semantics).
74 22474x bool write_cancel_pending = false;
75
76 /// Deferred connect cancellation (IOCP-style cancel semantics).
77 22474x bool connect_cancel_pending = false;
78
79 /// Event mask set during registration (no mutex needed).
80 22474x std::uint32_t registered_events = 0;
81
82 /// File descriptor this state tracks.
83 22474x int fd = -1;
84
85 /// Accumulated ready events (set by reactor, read by scheduler).
86 22474x std::atomic<std::uint32_t> ready_events_{0};
87
88 /// True while this state is queued in the scheduler's completed_ops.
89 22474x std::atomic<bool> is_enqueued_{false};
90
91 /// Owning scheduler for posting completions.
92 22474x reactor_scheduler const* scheduler_ = nullptr;
93
94 /// Prevents impl destruction while queued in the scheduler.
95 std::shared_ptr<void> impl_ref_;
96
97 /// Add ready events atomically.
98 /// Release pairs with the consumer's acquire exchange on
99 /// ready_events_ so the consumer sees all flags. On x86 (TSO)
100 /// this compiles to the same LOCK OR as relaxed.
101 783941x void add_ready_events(std::uint32_t ev) noexcept
102 {
103 783941x ready_events_.fetch_or(ev, std::memory_order_release);
104 783941x }
105
106 /// Invoke deferred I/O and dispatch completions.
107 783711x void operator()() override
108 {
109 783711x invoke_deferred_io();
110 783711x }
111
112 /// Destroy without invoking.
113 /// Called during scheduler::shutdown() drain. Clear impl_ref_ to break
114 /// the self-referential cycle set by close_socket().
115 85x void destroy() override
116 {
117 85x impl_ref_.reset();
118 85x }
119
120 /** Perform deferred I/O and queue completions.
121
122 Performs I/O under the mutex and queues completed ops. EAGAIN
123 ops stay parked in their slot for re-delivery on the next
124 edge event.
125 */
126 void invoke_deferred_io();
127 };
128
129 inline void
130 783711x reactor_descriptor_state::invoke_deferred_io()
131 {
132 783711x std::shared_ptr<void> prevent_impl_destruction;
133 783711x op_queue local_ops;
134
135 {
136
1/2
✓ Branch 0 taken 783711 times.
✗ Branch 1 not taken.
783711x conditionally_enabled_mutex::scoped_lock lock(mutex);
137
138 // Must clear is_enqueued_ and move impl_ref_ under the same
139 // lock that processes I/O. close_socket() checks is_enqueued_
140 // under this mutex — without atomicity between the flag store
141 // and the ref move, close_socket() could see is_enqueued_==false,
142 // skip setting impl_ref_, and destroy the impl under us.
143 783711x prevent_impl_destruction = std::move(impl_ref_);
144 783711x is_enqueued_.store(false, std::memory_order_release);
145
146 783711x std::uint32_t ev = ready_events_.exchange(0, std::memory_order_acquire);
147
1/2
✓ Branch 0 taken 783711 times.
✗ Branch 1 not taken.
783711x if (ev == 0)
148 {
149 // Mutex unlocks here; compensate for work_cleanup's decrement
150 scheduler_->compensating_work_started();
151 return;
152 }
153
154 783711x int err = 0;
155
2/2
✓ Branch 0 taken 783704 times.
✓ Branch 1 taken 7 times.
783711x if (ev & reactor_event_error)
156 {
157 7x socklen_t len = sizeof(err);
158
2/4
✓ Branch 0 taken 7 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 7 times.
7x if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
159 err = errno;
160
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7 times.
7x if (err == 0)
161 err = EIO;
162 7x }
163
164
2/2
✓ Branch 0 taken 75591 times.
✓ Branch 1 taken 708120 times.
783711x if (ev & reactor_event_read)
165 {
166
2/2
✓ Branch 0 taken 119330 times.
✓ Branch 1 taken 588790 times.
708120x if (read_op)
167 {
168 119330x auto* rd = read_op;
169
2/2
✓ Branch 0 taken 5 times.
✓ Branch 1 taken 119325 times.
119330x if (err)
170 5x rd->complete(err, 0);
171 else
172 119325x rd->perform_io();
173
174
3/4
✓ Branch 0 taken 117678 times.
✓ Branch 1 taken 1652 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 117678 times.
119330x if (rd->errn == EAGAIN || rd->errn == EWOULDBLOCK)
175 {
176 1652x rd->errn = 0;
177 1652x }
178 else
179 {
180 117678x read_op = nullptr;
181 117678x local_ops.push(rd);
182 }
183 119330x }
184 else
185 {
186 588790x read_ready = true;
187 }
188 708120x }
189
2/2
✓ Branch 0 taken 75734 times.
✓ Branch 1 taken 707977 times.
783711x if (ev & reactor_event_write)
190 {
191
2/2
✓ Branch 0 taken 7048 times.
✓ Branch 1 taken 68686 times.
75734x bool had_write_op = (connect_op || write_op);
192
2/2
✓ Branch 0 taken 68686 times.
✓ Branch 1 taken 7048 times.
75734x if (connect_op)
193 {
194 7048x auto* cn = connect_op;
195
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 7046 times.
7048x if (err)
196 2x cn->complete(err, 0);
197 else
198 7046x cn->perform_io();
199 7048x connect_op = nullptr;
200 7048x local_ops.push(cn);
201 7048x }
202
2/2
✓ Branch 0 taken 75733 times.
✓ Branch 1 taken 1 time.
75734x if (write_op)
203 {
204 1x auto* wr = write_op;
205
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 time.
1x if (err)
206 wr->complete(err, 0);
207 else
208 1x wr->perform_io();
209
210
2/4
✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 1 time.
1x if (wr->errn == EAGAIN || wr->errn == EWOULDBLOCK)
211 {
212 wr->errn = 0;
213 }
214 else
215 {
216 1x write_op = nullptr;
217 1x local_ops.push(wr);
218 }
219 1x }
220
2/2
✓ Branch 0 taken 7049 times.
✓ Branch 1 taken 68685 times.
75734x if (!had_write_op)
221 68685x write_ready = true;
222 75734x }
223
2/2
✓ Branch 0 taken 783704 times.
✓ Branch 1 taken 7 times.
783711x if (err)
224 {
225
1/2
✓ Branch 0 taken 7 times.
✗ Branch 1 not taken.
7x if (read_op)
226 {
227 read_op->complete(err, 0);
228 local_ops.push(std::exchange(read_op, nullptr));
229 }
230
1/2
✓ Branch 0 taken 7 times.
✗ Branch 1 not taken.
7x if (write_op)
231 {
232 write_op->complete(err, 0);
233 local_ops.push(std::exchange(write_op, nullptr));
234 }
235
1/2
✓ Branch 0 taken 7 times.
✗ Branch 1 not taken.
7x if (connect_op)
236 {
237 connect_op->complete(err, 0);
238 local_ops.push(std::exchange(connect_op, nullptr));
239 }
240 7x }
241 783711x }
242
243 // Execute first handler inline — the scheduler's work_cleanup
244 // accounts for this as the "consumed" work item
245 783711x scheduler_op* first = local_ops.pop();
246
2/2
✓ Branch 0 taken 124727 times.
✓ Branch 1 taken 658984 times.
783711x if (first)
247 {
248
1/2
✓ Branch 0 taken 124727 times.
✗ Branch 1 not taken.
124727x scheduler_->post_deferred_completions(local_ops);
249
1/2
✓ Branch 0 taken 124727 times.
✗ Branch 1 not taken.
124727x (*first)();
250 124727x }
251 else
252 {
253 658984x scheduler_->compensating_work_started();
254 }
255 783711x }
256
257 } // namespace boost::corosio::detail
258
259 #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_DESCRIPTOR_STATE_HPP
260