include/boost/corosio/native/detail/reactor/reactor_op_complete.hpp

89.4% Lines (277/310) 100.0% List of functions (16/16)
reactor_op_complete.hpp
f(x) Functions (16)
Function Calls Lines Blocks
void boost::corosio::detail::complete_io_op<boost::corosio::detail::epoll_datagram_op>(boost::corosio::detail::epoll_datagram_op&) :39 8x 81.2% 64.0% void boost::corosio::detail::complete_io_op<boost::corosio::detail::epoll_op>(boost::corosio::detail::epoll_op&) :39 63822x 93.8% 79.0% void boost::corosio::detail::complete_io_op<boost::corosio::detail::epoll_recv_op>(boost::corosio::detail::epoll_recv_op&) :39 2x 87.5% 75.0% void boost::corosio::detail::complete_io_op<boost::corosio::detail::select_datagram_op>(boost::corosio::detail::select_datagram_op&) :39 8x 81.2% 64.0% void boost::corosio::detail::complete_io_op<boost::corosio::detail::select_op>(boost::corosio::detail::select_op&) :39 39644x 87.5% 75.0% void boost::corosio::detail::complete_io_op<boost::corosio::detail::select_recv_op>(boost::corosio::detail::select_recv_op&) :39 2x 87.5% 75.0% void boost::corosio::detail::complete_connect_op<boost::corosio::detail::epoll_connect_op>(boost::corosio::detail::epoll_connect_op&) :72 4904x 95.7% 85.0% void boost::corosio::detail::complete_connect_op<boost::corosio::detail::epoll_udp_connect_op>(boost::corosio::detail::epoll_udp_connect_op&) :72 5x 91.3% 79.0% void boost::corosio::detail::complete_connect_op<boost::corosio::detail::select_connect_op>(boost::corosio::detail::select_connect_op&) :72 3081x 95.7% 85.0% void boost::corosio::detail::complete_connect_op<boost::corosio::detail::select_udp_connect_op>(boost::corosio::detail::select_udp_connect_op&) :72 5x 91.3% 79.0% bool boost::corosio::detail::setup_accepted_socket<boost::corosio::detail::epoll_tcp_socket, boost::corosio::detail::epoll_tcp_acceptor>(boost::corosio::detail::epoll_tcp_acceptor*, int&, sockaddr_storage const&, boost::corosio::io_object::implementation**, std::error_code*) :122 4902x 89.5% 89.0% bool boost::corosio::detail::setup_accepted_socket<boost::corosio::detail::select_tcp_socket, boost::corosio::detail::select_tcp_acceptor>(boost::corosio::detail::select_tcp_acceptor*, int&, sockaddr_storage const&, boost::corosio::io_object::implementation**, std::error_code*) :122 3078x 89.5% 89.0% void boost::corosio::detail::complete_accept_op<boost::corosio::detail::epoll_tcp_socket, boost::corosio::detail::epoll_accept_op>(boost::corosio::detail::epoll_accept_op&) :168 4911x 84.0% 82.0% void boost::corosio::detail::complete_accept_op<boost::corosio::detail::select_tcp_socket, boost::corosio::detail::select_accept_op>(boost::corosio::detail::select_accept_op&) :168 3081x 84.0% 82.0% void boost::corosio::detail::complete_datagram_op<boost::corosio::detail::epoll_recv_from_op>(boost::corosio::detail::epoll_recv_from_op&, boost::corosio::endpoint*) :221 7x 94.1% 86.0% void boost::corosio::detail::complete_datagram_op<boost::corosio::detail::select_recv_from_op>(boost::corosio::detail::select_recv_from_op&, boost::corosio::endpoint*) :221 7x 94.1% 86.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_COMPLETE_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_COMPLETE_HPP
12
13 #include <boost/corosio/detail/dispatch_coro.hpp>
14 #include <boost/corosio/native/detail/endpoint_convert.hpp>
15 #include <boost/corosio/native/detail/make_err.hpp>
16 #include <boost/corosio/io/io_object.hpp>
17
18 #include <coroutine>
19 #include <mutex>
20 #include <utility>
21
22 #include <netinet/in.h>
23 #include <sys/socket.h>
24 #include <unistd.h>
25
26 namespace boost::corosio::detail {
27
28 /** Complete a base read/write operation.
29
30 Translates the recorded errno and cancellation state into
31 an error_code, stores the byte count, then resumes the
32 caller via symmetric transfer.
33
34 @tparam Op The concrete operation type.
35 @param op The operation to complete.
36 */
37 template<typename Op>
38 void
39 103486x complete_io_op(Op& op)
40 {
41 103486x op.stop_cb.reset();
42 103486x op.socket_impl_->desc_state_.scheduler_->reset_inline_budget();
43
44 103486x if (op.cancelled.load(std::memory_order_acquire))
45 1018x *op.ec_out = capy::error::canceled;
46 102468x else if (op.errn != 0)
47 4x *op.ec_out = make_err(op.errn);
48 102464x else if (op.is_read_operation() && op.bytes_transferred == 0)
49 *op.ec_out = capy::error::eof;
50 else
51 102464x *op.ec_out = {};
52
53 103486x *op.bytes_out = op.bytes_transferred;
54
55 103486x op.cont_op.cont.h = op.h;
56 103486x capy::executor_ref saved_ex(op.ex);
57 103486x auto prevent = std::move(op.impl_ptr);
58 103486x dispatch_coro(saved_ex, op.cont_op.cont).resume();
59 103486x }
60
61 /** Complete a connect operation with endpoint caching.
62
63 On success, queries the local endpoint via getsockname and
64 caches both endpoints in the socket impl. Then resumes the
65 caller via symmetric transfer.
66
67 @tparam Op The concrete connect operation type.
68 @param op The operation to complete.
69 */
70 template<typename Op>
71 void
72 7995x complete_connect_op(Op& op)
73 {
74 7995x op.stop_cb.reset();
75 7995x op.socket_impl_->desc_state_.scheduler_->reset_inline_budget();
76
77 7995x bool success =
78 7995x (op.errn == 0 && !op.cancelled.load(std::memory_order_acquire));
79
80 7995x if (success && op.socket_impl_)
81 {
82 7990x endpoint local_ep;
83 7990x sockaddr_storage local_storage{};
84 7990x socklen_t local_len = sizeof(local_storage);
85 7990x if (::getsockname(
86 op.fd, reinterpret_cast<sockaddr*>(&local_storage),
87 7990x &local_len) == 0)
88 7990x local_ep = from_sockaddr(local_storage);
89 7990x op.socket_impl_->set_endpoints(local_ep, op.target_endpoint);
90 }
91
92 7995x if (op.cancelled.load(std::memory_order_acquire))
93 *op.ec_out = capy::error::canceled;
94 7995x else if (op.errn != 0)
95 5x *op.ec_out = make_err(op.errn);
96 else
97 7990x *op.ec_out = {};
98
99 7995x op.cont_op.cont.h = op.h;
100 7995x capy::executor_ref saved_ex(op.ex);
101 7995x auto prevent = std::move(op.impl_ptr);
102 7995x dispatch_coro(saved_ex, op.cont_op.cont).resume();
103 7995x }
104
105 /** Construct and register a peer socket from an accepted fd.
106
107 Creates a new socket impl via the acceptor's associated
108 socket service, registers it with the scheduler, and caches
109 the local and remote endpoints.
110
111 @tparam SocketImpl The concrete socket implementation type.
112 @tparam AcceptorImpl The concrete acceptor implementation type.
113 @param acceptor_impl The acceptor that accepted the connection.
114 @param accepted_fd The accepted file descriptor (set to -1 on success).
115 @param peer_storage The peer address from accept().
116 @param impl_out Output pointer for the new socket impl.
117 @param ec_out Output pointer for any error.
118 @return True on success, false on failure.
119 */
120 template<typename SocketImpl, typename AcceptorImpl>
121 bool
122 7980x setup_accepted_socket(
123 AcceptorImpl* acceptor_impl,
124 int& accepted_fd,
125 sockaddr_storage const& peer_storage,
126 io_object::implementation** impl_out,
127 std::error_code* ec_out)
128 {
129 7980x auto* socket_svc = acceptor_impl->service().tcp_service();
130 7980x if (!socket_svc)
131 {
132 *ec_out = make_err(ENOENT);
133 return false;
134 }
135
136 7980x auto& impl = static_cast<SocketImpl&>(*socket_svc->construct());
137 7980x impl.set_socket(accepted_fd);
138
139 7980x impl.desc_state_.fd = accepted_fd;
140 {
141 7980x std::lock_guard lock(impl.desc_state_.mutex);
142 7980x impl.desc_state_.read_op = nullptr;
143 7980x impl.desc_state_.write_op = nullptr;
144 7980x impl.desc_state_.connect_op = nullptr;
145 7980x }
146 7980x socket_svc->scheduler().register_descriptor(accepted_fd, &impl.desc_state_);
147
148 7980x impl.set_endpoints(
149 acceptor_impl->local_endpoint(), from_sockaddr(peer_storage));
150
151 7980x if (impl_out)
152 7980x *impl_out = &impl;
153 7980x accepted_fd = -1;
154 7980x return true;
155 }
156
157 /** Complete an accept operation.
158
159 Sets up the peer socket on success, or closes the accepted
160 fd on failure. Then resumes the caller via symmetric transfer.
161
162 @tparam SocketImpl The concrete socket implementation type.
163 @tparam Op The concrete accept operation type.
164 @param op The operation to complete.
165 */
166 template<typename SocketImpl, typename Op>
167 void
168 7992x complete_accept_op(Op& op)
169 {
170 7992x op.stop_cb.reset();
171 7992x op.acceptor_impl_->desc_state_.scheduler_->reset_inline_budget();
172
173 7992x bool success =
174 7992x (op.errn == 0 && !op.cancelled.load(std::memory_order_acquire));
175
176 7992x if (op.cancelled.load(std::memory_order_acquire))
177 12x *op.ec_out = capy::error::canceled;
178 7980x else if (op.errn != 0)
179 *op.ec_out = make_err(op.errn);
180 else
181 7980x *op.ec_out = {};
182
183 7992x if (success && op.accepted_fd >= 0 && op.acceptor_impl_)
184 {
185 7980x if (!setup_accepted_socket<SocketImpl>(
186 7980x op.acceptor_impl_, op.accepted_fd, op.peer_storage, op.impl_out,
187 op.ec_out))
188 success = false;
189 }
190
191 7992x if (!success || !op.acceptor_impl_)
192 {
193 12x if (op.accepted_fd >= 0)
194 {
195 ::close(op.accepted_fd);
196 op.accepted_fd = -1;
197 }
198 12x if (op.impl_out)
199 12x *op.impl_out = nullptr;
200 }
201
202 7992x op.cont_op.cont.h = op.h;
203 7992x capy::executor_ref saved_ex(op.ex);
204 7992x auto prevent = std::move(op.impl_ptr);
205 7992x dispatch_coro(saved_ex, op.cont_op.cont).resume();
206 7992x }
207
208 /** Complete a datagram operation (send_to or recv_from).
209
210 For recv_from operations, writes the source endpoint from the
211 recorded sockaddr_storage into the caller's endpoint pointer.
212 Then resumes the caller via symmetric transfer.
213
214 @tparam Op The concrete datagram operation type.
215 @param op The operation to complete.
216 @param source_out Optional pointer to store source endpoint
217 (non-null for recv_from, null for send_to).
218 */
219 template<typename Op>
220 void
221 14x complete_datagram_op(Op& op, endpoint* source_out)
222 {
223 14x op.stop_cb.reset();
224 14x op.socket_impl_->desc_state_.scheduler_->reset_inline_budget();
225
226 14x if (op.cancelled.load(std::memory_order_acquire))
227 6x *op.ec_out = capy::error::canceled;
228 8x else if (op.errn != 0)
229 *op.ec_out = make_err(op.errn);
230 else
231 8x *op.ec_out = {};
232
233 14x *op.bytes_out = op.bytes_transferred;
234
235 20x if (source_out && !op.cancelled.load(std::memory_order_acquire) &&
236 6x op.errn == 0)
237 6x *source_out = from_sockaddr(op.source_storage);
238
239 14x op.cont_op.cont.h = op.h;
240 14x capy::executor_ref saved_ex(op.ex);
241 14x auto prevent = std::move(op.impl_ptr);
242 14x dispatch_coro(saved_ex, op.cont_op.cont).resume();
243 14x }
244
245 } // namespace boost::corosio::detail
246
247 #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_COMPLETE_HPP
248