include/boost/corosio/native/detail/reactor/reactor_descriptor_state.hpp

77.2% Lines (78/101) 100.0% List of functions (6/6) 64.3% Branches (36/56)
f(x) Functions (6)
Line Branch TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_DESCRIPTOR_STATE_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_DESCRIPTOR_STATE_HPP
12
13 #include <boost/corosio/native/detail/reactor/reactor_op_base.hpp>
14 #include <boost/corosio/native/detail/reactor/reactor_scheduler.hpp>
15
16 #include <atomic>
17 #include <cstdint>
18 #include <memory>
19 #include <mutex>
20
21 #include <errno.h>
22 #include <sys/socket.h>
23
24 namespace boost::corosio::detail {
25
26 /// Shared reactor event constants.
27 /// These match epoll numeric values; kqueue maps its events to the same.
28 static constexpr std::uint32_t reactor_event_read = 0x001;
29 static constexpr std::uint32_t reactor_event_write = 0x004;
30 static constexpr std::uint32_t reactor_event_error = 0x008;
31
32 /** Per-descriptor state shared across reactor backends.
33
34 Tracks pending operations for a file descriptor. The fd is registered
35 once with the reactor and stays registered until closed. Uses deferred
36 I/O: the reactor sets ready_events atomically, then enqueues this state.
37 When popped by the scheduler, invoke_deferred_io() performs I/O under
38 the mutex and queues completed ops.
39
40 Non-template: uses reactor_op_base pointers so the scheduler and
41 descriptor_state code exist as a single copy in the binary regardless
42 of how many backends are compiled in.
43
44 @par Thread Safety
45 The mutex protects operation pointers and ready flags. ready_events_
46 and is_enqueued_ are atomic for lock-free reactor access.
47 */
48 23681x struct reactor_descriptor_state : scheduler_op
49 {
50 /// Protects operation pointers and ready/cancel flags.
51 std::mutex mutex;
52
53 /// Pending read operation (guarded by `mutex`).
54 23681x reactor_op_base* read_op = nullptr;
55
56 /// Pending write operation (guarded by `mutex`).
57 23681x reactor_op_base* write_op = nullptr;
58
59 /// Pending connect operation (guarded by `mutex`).
60 23681x reactor_op_base* connect_op = nullptr;
61
62 /// True if a read edge event arrived before an op was registered.
63 23681x bool read_ready = false;
64
65 /// True if a write edge event arrived before an op was registered.
66 23681x bool write_ready = false;
67
68 /// Deferred read cancellation (IOCP-style cancel semantics).
69 23681x bool read_cancel_pending = false;
70
71 /// Deferred write cancellation (IOCP-style cancel semantics).
72 23681x bool write_cancel_pending = false;
73
74 /// Deferred connect cancellation (IOCP-style cancel semantics).
75 23681x bool connect_cancel_pending = false;
76
77 /// Event mask set during registration (no mutex needed).
78 23681x std::uint32_t registered_events = 0;
79
80 /// File descriptor this state tracks.
81 23681x int fd = -1;
82
83 /// Accumulated ready events (set by reactor, read by scheduler).
84 23681x std::atomic<std::uint32_t> ready_events_{0};
85
86 /// True while this state is queued in the scheduler's completed_ops.
87 23681x std::atomic<bool> is_enqueued_{false};
88
89 /// Owning scheduler for posting completions.
90 23681x reactor_scheduler_base const* scheduler_ = nullptr;
91
92 /// Prevents impl destruction while queued in the scheduler.
93 std::shared_ptr<void> impl_ref_;
94
95 /// Add ready events atomically.
96 /// Release pairs with the consumer's acquire exchange on
97 /// ready_events_ so the consumer sees all flags. On x86 (TSO)
98 /// this compiles to the same LOCK OR as relaxed.
99 857299x void add_ready_events(std::uint32_t ev) noexcept
100 {
101 857299x ready_events_.fetch_or(ev, std::memory_order_release);
102 857299x }
103
104 /// Invoke deferred I/O and dispatch completions.
105 856996x void operator()() override
106 {
107 856996x invoke_deferred_io();
108 856996x }
109
110 /// Destroy without invoking.
111 /// Called during scheduler::shutdown() drain. Clear impl_ref_ to break
112 /// the self-referential cycle set by close_socket().
113 95x void destroy() override
114 {
115 95x impl_ref_.reset();
116 95x }
117
118 /** Perform deferred I/O and queue completions.
119
120 Performs I/O under the mutex and queues completed ops. EAGAIN
121 ops stay parked in their slot for re-delivery on the next
122 edge event.
123 */
124 void invoke_deferred_io();
125 };
126
127 inline void
128 856996x reactor_descriptor_state::invoke_deferred_io()
129 {
130 856996x std::shared_ptr<void> prevent_impl_destruction;
131 856996x op_queue local_ops;
132
133 {
134
1/2
✓ Branch 0 taken 856996 times.
✗ Branch 1 not taken.
856996x std::lock_guard lock(mutex);
135
136 // Must clear is_enqueued_ and move impl_ref_ under the same
137 // lock that processes I/O. close_socket() checks is_enqueued_
138 // under this mutex — without atomicity between the flag store
139 // and the ref move, close_socket() could see is_enqueued_==false,
140 // skip setting impl_ref_, and destroy the impl under us.
141 856996x prevent_impl_destruction = std::move(impl_ref_);
142 856996x is_enqueued_.store(false, std::memory_order_release);
143
144 856996x std::uint32_t ev = ready_events_.exchange(0, std::memory_order_acquire);
145
1/2
✓ Branch 0 taken 856996 times.
✗ Branch 1 not taken.
856996x if (ev == 0)
146 {
147 // Mutex unlocks here; compensate for work_cleanup's decrement
148 scheduler_->compensating_work_started();
149 return;
150 }
151
152 856996x int err = 0;
153
2/2
✓ Branch 0 taken 856989 times.
✓ Branch 1 taken 7 times.
856996x if (ev & reactor_event_error)
154 {
155 7x socklen_t len = sizeof(err);
156
2/4
✓ Branch 0 taken 7 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 7 times.
7x if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
157 err = errno;
158
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7 times.
7x if (err == 0)
159 err = EIO;
160 7x }
161
162
2/2
✓ Branch 0 taken 81156 times.
✓ Branch 1 taken 775840 times.
856996x if (ev & reactor_event_read)
163 {
164
2/2
✓ Branch 0 taken 121011 times.
✓ Branch 1 taken 654829 times.
775840x if (read_op)
165 {
166 121011x auto* rd = read_op;
167
2/2
✓ Branch 0 taken 5 times.
✓ Branch 1 taken 121006 times.
121011x if (err)
168 5x rd->complete(err, 0);
169 else
170 121006x rd->perform_io();
171
172
3/4
✓ Branch 0 taken 120406 times.
✓ Branch 1 taken 605 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 120406 times.
121011x if (rd->errn == EAGAIN || rd->errn == EWOULDBLOCK)
173 {
174 605x rd->errn = 0;
175 605x }
176 else
177 {
178 120406x read_op = nullptr;
179 120406x local_ops.push(rd);
180 }
181 121011x }
182 else
183 {
184 654829x read_ready = true;
185 }
186 775840x }
187
2/2
✓ Branch 0 taken 81361 times.
✓ Branch 1 taken 775635 times.
856996x if (ev & reactor_event_write)
188 {
189
2/2
✓ Branch 0 taken 7407 times.
✓ Branch 1 taken 73954 times.
81361x bool had_write_op = (connect_op || write_op);
190
2/2
✓ Branch 0 taken 73954 times.
✓ Branch 1 taken 7407 times.
81361x if (connect_op)
191 {
192 7407x auto* cn = connect_op;
193
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 7405 times.
7407x if (err)
194 2x cn->complete(err, 0);
195 else
196 7405x cn->perform_io();
197 7407x connect_op = nullptr;
198 7407x local_ops.push(cn);
199 7407x }
200
1/2
✓ Branch 0 taken 81361 times.
✗ Branch 1 not taken.
81361x if (write_op)
201 {
202 auto* wr = write_op;
203 if (err)
204 wr->complete(err, 0);
205 else
206 wr->perform_io();
207
208 if (wr->errn == EAGAIN || wr->errn == EWOULDBLOCK)
209 {
210 wr->errn = 0;
211 }
212 else
213 {
214 write_op = nullptr;
215 local_ops.push(wr);
216 }
217 }
218
2/2
✓ Branch 0 taken 7407 times.
✓ Branch 1 taken 73954 times.
81361x if (!had_write_op)
219 73954x write_ready = true;
220 81361x }
221
2/2
✓ Branch 0 taken 856989 times.
✓ Branch 1 taken 7 times.
856996x if (err)
222 {
223
1/2
✓ Branch 0 taken 7 times.
✗ Branch 1 not taken.
7x if (read_op)
224 {
225 read_op->complete(err, 0);
226 local_ops.push(std::exchange(read_op, nullptr));
227 }
228
1/2
✓ Branch 0 taken 7 times.
✗ Branch 1 not taken.
7x if (write_op)
229 {
230 write_op->complete(err, 0);
231 local_ops.push(std::exchange(write_op, nullptr));
232 }
233
1/2
✓ Branch 0 taken 7 times.
✗ Branch 1 not taken.
7x if (connect_op)
234 {
235 connect_op->complete(err, 0);
236 local_ops.push(std::exchange(connect_op, nullptr));
237 }
238 7x }
239 856996x }
240
241 // Execute first handler inline — the scheduler's work_cleanup
242 // accounts for this as the "consumed" work item
243 856996x scheduler_op* first = local_ops.pop();
244
2/2
✓ Branch 0 taken 127813 times.
✓ Branch 1 taken 729183 times.
856996x if (first)
245 {
246
1/2
✓ Branch 0 taken 127813 times.
✗ Branch 1 not taken.
127813x scheduler_->post_deferred_completions(local_ops);
247
1/2
✓ Branch 0 taken 127813 times.
✗ Branch 1 not taken.
127813x (*first)();
248 127813x }
249 else
250 {
251 729183x scheduler_->compensating_work_started();
252 }
253 856996x }
254
255 } // namespace boost::corosio::detail
256
257 #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_DESCRIPTOR_STATE_HPP
258