include/boost/corosio/native/detail/reactor/reactor_op.hpp

68.5% Lines (263/384) 76.5% List of functions (52/68)
f(x) Functions (68)
Function Calls Lines Blocks
boost::corosio::detail::reactor_op<boost::corosio::detail::epoll_tcp_socket, boost::corosio::detail::epoll_tcp_acceptor>::canceller::operator()() const :56 806x 100.0% 100.0% boost::corosio::detail::reactor_op<boost::corosio::detail::epoll_udp_socket, boost::corosio::detail::epoll_tcp_acceptor>::canceller::operator()() const :56 1x 100.0% 100.0% boost::corosio::detail::reactor_op<boost::corosio::detail::select_tcp_socket, boost::corosio::detail::select_tcp_acceptor>::canceller::operator()() const :56 94x 100.0% 100.0% boost::corosio::detail::reactor_op<boost::corosio::detail::select_udp_socket, boost::corosio::detail::select_tcp_acceptor>::canceller::operator()() const :56 1x 100.0% 100.0% boost::corosio::detail::reactor_op<boost::corosio::detail::epoll_tcp_socket, boost::corosio::detail::epoll_tcp_acceptor>::reactor_op() :86 58464x 100.0% 100.0% boost::corosio::detail::reactor_op<boost::corosio::detail::epoll_udp_socket, boost::corosio::detail::epoll_tcp_acceptor>::reactor_op() :86 195x 100.0% 100.0% boost::corosio::detail::reactor_op<boost::corosio::detail::select_tcp_socket, boost::corosio::detail::select_tcp_acceptor>::reactor_op() :86 33664x 100.0% 100.0% boost::corosio::detail::reactor_op<boost::corosio::detail::select_udp_socket, boost::corosio::detail::select_tcp_acceptor>::reactor_op() :86 195x 100.0% 100.0% boost::corosio::detail::reactor_op<boost::corosio::detail::epoll_tcp_socket, boost::corosio::detail::epoll_tcp_acceptor>::reset() :89 298656x 100.0% 100.0% boost::corosio::detail::reactor_op<boost::corosio::detail::epoll_udp_socket, boost::corosio::detail::epoll_tcp_acceptor>::reset() :89 37x 100.0% 100.0% boost::corosio::detail::reactor_op<boost::corosio::detail::select_tcp_socket, boost::corosio::detail::select_tcp_acceptor>::reset() :89 184765x 100.0% 100.0% boost::corosio::detail::reactor_op<boost::corosio::detail::select_udp_socket, boost::corosio::detail::select_tcp_acceptor>::reset() :89 37x 100.0% 100.0% boost::corosio::detail::reactor_op<boost::corosio::detail::epoll_tcp_socket, boost::corosio::detail::epoll_tcp_acceptor>::is_read_operation() const :101 36573x 100.0% 100.0% boost::corosio::detail::reactor_op<boost::corosio::detail::epoll_udp_socket, boost::corosio::detail::epoll_tcp_acceptor>::is_read_operation() const :101 8x 100.0% 100.0% boost::corosio::detail::reactor_op<boost::corosio::detail::select_tcp_socket, boost::corosio::detail::select_tcp_acceptor>::is_read_operation() const :101 17704x 100.0% 100.0% boost::corosio::detail::reactor_op<boost::corosio::detail::select_udp_socket, boost::corosio::detail::select_tcp_acceptor>::is_read_operation() const :101 8x 100.0% 100.0% boost::corosio::detail::reactor_op<boost::corosio::detail::epoll_tcp_socket, boost::corosio::detail::epoll_tcp_acceptor>::destroy() :110 0 0.0% 0.0% boost::corosio::detail::reactor_op<boost::corosio::detail::epoll_udp_socket, boost::corosio::detail::epoll_tcp_acceptor>::destroy() :110 0 0.0% 0.0% boost::corosio::detail::reactor_op<boost::corosio::detail::select_tcp_socket, boost::corosio::detail::select_tcp_acceptor>::destroy() :110 0 0.0% 0.0% boost::corosio::detail::reactor_op<boost::corosio::detail::select_udp_socket, boost::corosio::detail::select_tcp_acceptor>::destroy() :110 0 0.0% 0.0% boost::corosio::detail::reactor_op<boost::corosio::detail::epoll_tcp_socket, boost::corosio::detail::epoll_tcp_acceptor>::start(std::stop_token const&, boost::corosio::detail::epoll_tcp_socket*) :117 67003x 100.0% 100.0% boost::corosio::detail::reactor_op<boost::corosio::detail::epoll_udp_socket, boost::corosio::detail::epoll_tcp_acceptor>::start(std::stop_token const&, boost::corosio::detail::epoll_udp_socket*) :117 22x 100.0% 100.0% boost::corosio::detail::reactor_op<boost::corosio::detail::select_tcp_socket, boost::corosio::detail::select_tcp_acceptor>::start(std::stop_token const&, boost::corosio::detail::select_tcp_socket*) :117 39319x 100.0% 100.0% boost::corosio::detail::reactor_op<boost::corosio::detail::select_udp_socket, boost::corosio::detail::select_tcp_acceptor>::start(std::stop_token const&, boost::corosio::detail::select_udp_socket*) :117 22x 100.0% 100.0% boost::corosio::detail::reactor_op<boost::corosio::detail::epoll_tcp_socket, boost::corosio::detail::epoll_tcp_acceptor>::start(std::stop_token const&, boost::corosio::detail::epoll_tcp_acceptor*) :129 6343x 100.0% 100.0% boost::corosio::detail::reactor_op<boost::corosio::detail::select_tcp_socket, boost::corosio::detail::select_tcp_acceptor>::start(std::stop_token const&, boost::corosio::detail::select_tcp_acceptor*) :129 3728x 87.5% 71.0% boost::corosio::detail::reactor_connect_op<boost::corosio::detail::epoll_datagram_op>::reset() :155 5x 100.0% 100.0% boost::corosio::detail::reactor_connect_op<boost::corosio::detail::epoll_op>::reset() :155 6336x 100.0% 100.0% boost::corosio::detail::reactor_connect_op<boost::corosio::detail::select_datagram_op>::reset() :155 5x 100.0% 100.0% boost::corosio::detail::reactor_connect_op<boost::corosio::detail::select_op>::reset() :155 3727x 100.0% 100.0% boost::corosio::detail::reactor_connect_op<boost::corosio::detail::epoll_datagram_op>::perform_io() :161 0 0.0% 0.0% boost::corosio::detail::reactor_connect_op<boost::corosio::detail::epoll_op>::perform_io() :161 6335x 85.7% 80.0% boost::corosio::detail::reactor_connect_op<boost::corosio::detail::select_datagram_op>::perform_io() :161 0 0.0% 0.0% boost::corosio::detail::reactor_connect_op<boost::corosio::detail::select_op>::perform_io() :161 3727x 85.7% 80.0% boost::corosio::detail::reactor_read_op<boost::corosio::detail::epoll_op>::is_read_operation() const :193 23218x 100.0% 100.0% boost::corosio::detail::reactor_read_op<boost::corosio::detail::select_op>::is_read_operation() const :193 17736x 100.0% 100.0% boost::corosio::detail::reactor_read_op<boost::corosio::detail::epoll_op>::reset() :198 107530x 100.0% 100.0% boost::corosio::detail::reactor_read_op<boost::corosio::detail::select_op>::reset() :198 88729x 100.0% 100.0% boost::corosio::detail::reactor_read_op<boost::corosio::detail::epoll_op>::perform_io() :205 1782x 100.0% 100.0% boost::corosio::detail::reactor_read_op<boost::corosio::detail::select_op>::perform_io() :205 188x 100.0% 100.0% boost::corosio::detail::reactor_write_op<boost::corosio::detail::epoll_op, boost::corosio::detail::epoll_write_policy>::reset() :244 178447x 100.0% 100.0% boost::corosio::detail::reactor_write_op<boost::corosio::detail::select_op, boost::corosio::detail::select_write_policy>::reset() :244 88581x 100.0% 100.0% boost::corosio::detail::reactor_write_op<boost::corosio::detail::epoll_op, boost::corosio::detail::epoll_write_policy>::perform_io() :250 0 0.0% 0.0% boost::corosio::detail::reactor_write_op<boost::corosio::detail::select_op, boost::corosio::detail::select_write_policy>::perform_io() :250 0 0.0% 0.0% boost::corosio::detail::reactor_accept_op<boost::corosio::detail::epoll_op, boost::corosio::detail::epoll_accept_policy>::reset() :283 6343x 100.0% 100.0% boost::corosio::detail::reactor_accept_op<boost::corosio::detail::select_op, boost::corosio::detail::select_accept_policy>::reset() :283 3728x 100.0% 100.0% boost::corosio::detail::reactor_accept_op<boost::corosio::detail::epoll_op, boost::corosio::detail::epoll_accept_policy>::perform_io() :292 6332x 85.7% 80.0% boost::corosio::detail::reactor_accept_op<boost::corosio::detail::select_op, boost::corosio::detail::select_accept_policy>::perform_io() :292 3723x 85.7% 80.0% boost::corosio::detail::reactor_send_op<boost::corosio::detail::epoll_datagram_op>::reset() :325 3x 100.0% 100.0% boost::corosio::detail::reactor_send_op<boost::corosio::detail::select_datagram_op>::reset() :325 3x 100.0% 100.0% boost::corosio::detail::reactor_send_op<boost::corosio::detail::epoll_datagram_op>::perform_io() :331 0 0.0% 0.0% boost::corosio::detail::reactor_send_op<boost::corosio::detail::select_datagram_op>::perform_io() :331 0 0.0% 0.0% boost::corosio::detail::reactor_recv_op<boost::corosio::detail::epoll_datagram_op>::is_read_operation() const :378 1x 100.0% 100.0% boost::corosio::detail::reactor_recv_op<boost::corosio::detail::select_datagram_op>::is_read_operation() const :378 1x 100.0% 100.0% boost::corosio::detail::reactor_recv_op<boost::corosio::detail::epoll_datagram_op>::reset() :383 2x 100.0% 100.0% boost::corosio::detail::reactor_recv_op<boost::corosio::detail::select_datagram_op>::reset() :383 2x 100.0% 100.0% boost::corosio::detail::reactor_recv_op<boost::corosio::detail::epoll_datagram_op>::perform_io() :389 0 0.0% 0.0% boost::corosio::detail::reactor_recv_op<boost::corosio::detail::select_datagram_op>::perform_io() :389 0 0.0% 0.0% boost::corosio::detail::reactor_send_to_op<boost::corosio::detail::epoll_datagram_op>::reset() :433 11x 100.0% 100.0% boost::corosio::detail::reactor_send_to_op<boost::corosio::detail::select_datagram_op>::reset() :433 11x 100.0% 100.0% boost::corosio::detail::reactor_send_to_op<boost::corosio::detail::epoll_datagram_op>::perform_io() :441 0 0.0% 0.0% boost::corosio::detail::reactor_send_to_op<boost::corosio::detail::select_datagram_op>::perform_io() :441 0 0.0% 0.0% boost::corosio::detail::reactor_recv_from_op<boost::corosio::detail::epoll_datagram_op>::is_read_operation() const :494 0 0.0% 0.0% boost::corosio::detail::reactor_recv_from_op<boost::corosio::detail::select_datagram_op>::is_read_operation() const :494 0 0.0% 0.0% boost::corosio::detail::reactor_recv_from_op<boost::corosio::detail::epoll_datagram_op>::reset() :499 16x 100.0% 100.0% boost::corosio::detail::reactor_recv_from_op<boost::corosio::detail::select_datagram_op>::reset() :499 16x 100.0% 100.0% boost::corosio::detail::reactor_recv_from_op<boost::corosio::detail::epoll_datagram_op>::perform_io() :507 1x 91.7% 75.0% boost::corosio::detail::reactor_recv_from_op<boost::corosio::detail::select_datagram_op>::perform_io() :507 1x 91.7% 75.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_HPP
12
13 #include <boost/corosio/native/detail/reactor/reactor_op_base.hpp>
14 #include <boost/corosio/io/io_object.hpp>
15 #include <boost/corosio/endpoint.hpp>
16 #include <boost/capy/ex/executor_ref.hpp>
17
18 #include <atomic>
19 #include <coroutine>
20 #include <cstddef>
21 #include <memory>
22 #include <optional>
23 #include <stop_token>
24 #include <system_error>
25
26 #include <errno.h>
27
28 #include <netinet/in.h>
29 #include <sys/socket.h>
30 #include <sys/uio.h>
31
32 namespace boost::corosio::detail {
33
34 /** Base operation for reactor-based backends.
35
36 Holds per-operation state that depends on the concrete backend
37 socket/acceptor types: coroutine handle, executor, output
38 pointers, file descriptor, stop_callback, and type-specific
39 impl pointers.
40
41 Fields shared across all backends (errn, bytes_transferred,
42 cancelled, impl_ptr, perform_io, complete) live in
43 reactor_op_base so the scheduler and descriptor_state can
44 access them without template instantiation.
45
46 @tparam Socket The backend socket impl type (forward-declared).
47 @tparam Acceptor The backend acceptor impl type (forward-declared).
48 */
49 template<class Socket, class Acceptor>
50 struct reactor_op : reactor_op_base
51 {
52 /// Stop-token callback that invokes cancel() on the target op.
53 struct canceller
54 {
55 reactor_op* op;
56 902x void operator()() const noexcept
57 {
58 902x op->cancel();
59 902x }
60 };
61
62 /// Caller's coroutine handle to resume on completion.
63 std::coroutine_handle<> h;
64
65 /// Executor for dispatching the completion.
66 capy::executor_ref ex;
67
68 /// Output pointer for the error code.
69 std::error_code* ec_out = nullptr;
70
71 /// Output pointer for bytes transferred.
72 std::size_t* bytes_out = nullptr;
73
74 /// File descriptor this operation targets.
75 int fd = -1;
76
77 /// Stop-token callback registration.
78 std::optional<std::stop_callback<canceller>> stop_cb;
79
80 /// Owning socket impl (for stop_token cancellation).
81 Socket* socket_impl_ = nullptr;
82
83 /// Owning acceptor impl (for stop_token cancellation).
84 Acceptor* acceptor_impl_ = nullptr;
85
86 92518x reactor_op() = default;
87
88 /// Reset operation state for reuse.
89 483495x void reset() noexcept
90 {
91 483495x fd = -1;
92 483495x errn = 0;
93 483495x bytes_transferred = 0;
94 483495x cancelled.store(false, std::memory_order_relaxed);
95 483495x impl_ptr.reset();
96 483495x socket_impl_ = nullptr;
97 483495x acceptor_impl_ = nullptr;
98 483495x }
99
100 /// Return true if this is a read-direction operation.
101 54293x virtual bool is_read_operation() const noexcept
102 {
103 54293x return false;
104 }
105
106 /// Cancel this operation via the owning impl.
107 virtual void cancel() noexcept = 0;
108
109 /// Destroy without invoking.
110 void destroy() override
111 {
112 stop_cb.reset();
113 reactor_op_base::destroy();
114 }
115
116 /// Arm the stop-token callback for a socket operation.
117 106366x void start(std::stop_token const& token, Socket* impl)
118 {
119 106366x cancelled.store(false, std::memory_order_release);
120 106366x stop_cb.reset();
121 106366x socket_impl_ = impl;
122 106366x acceptor_impl_ = nullptr;
123
124 106366x if (token.stop_possible())
125 1600x stop_cb.emplace(token, canceller{this});
126 106366x }
127
128 /// Arm the stop-token callback for an acceptor operation.
129 10071x void start(std::stop_token const& token, Acceptor* impl)
130 {
131 10071x cancelled.store(false, std::memory_order_release);
132 10071x stop_cb.reset();
133 10071x socket_impl_ = nullptr;
134 10071x acceptor_impl_ = impl;
135
136 10071x if (token.stop_possible())
137 9x stop_cb.emplace(token, canceller{this});
138 10071x }
139 };
140
141 /** Shared connect operation.
142
143 Checks SO_ERROR for connect completion status. The operator()()
144 and cancel() are provided by the concrete backend type.
145
146 @tparam Base The backend's base op type.
147 */
148 template<class Base>
149 struct reactor_connect_op : Base
150 {
151 /// Endpoint to connect to.
152 endpoint target_endpoint;
153
154 /// Reset operation state for reuse.
155 10073x void reset() noexcept
156 {
157 10073x Base::reset();
158 10073x target_endpoint = endpoint{};
159 10073x }
160
161 10062x void perform_io() noexcept override
162 {
163 10062x int err = 0;
164 10062x socklen_t len = sizeof(err);
165 10062x if (::getsockopt(this->fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
166 err = errno;
167 10062x this->complete(err, 0);
168 10062x }
169 };
170
171 /** Shared scatter-read operation.
172
173 Uses readv() with an EINTR retry loop.
174
175 @tparam Base The backend's base op type.
176 */
177 template<class Base>
178 struct reactor_read_op : Base
179 {
180 /// Maximum scatter-gather buffer count.
181 static constexpr std::size_t max_buffers = 16;
182
183 /// Scatter-gather I/O vectors.
184 iovec iovecs[max_buffers];
185
186 /// Number of active I/O vectors.
187 int iovec_count = 0;
188
189 /// True for zero-length reads (completed immediately).
190 bool empty_buffer_read = false;
191
192 /// Return true (this is a read-direction operation).
193 40954x bool is_read_operation() const noexcept override
194 {
195 40954x return !empty_buffer_read;
196 }
197
198 196259x void reset() noexcept
199 {
200 196259x Base::reset();
201 196259x iovec_count = 0;
202 196259x empty_buffer_read = false;
203 196259x }
204
205 1970x void perform_io() noexcept override
206 {
207 ssize_t n;
208 do
209 {
210 1970x n = ::readv(this->fd, iovecs, iovec_count);
211 }
212 1970x while (n < 0 && errno == EINTR);
213
214 1970x if (n >= 0)
215 1713x this->complete(0, static_cast<std::size_t>(n));
216 else
217 257x this->complete(errno, 0);
218 1970x }
219 };
220
221 /** Shared gather-write operation.
222
223 Delegates the actual syscall to WritePolicy::write(fd, iovecs, count),
224 which returns ssize_t (bytes written or -1 with errno set).
225
226 @tparam Base The backend's base op type.
227 @tparam WritePolicy Provides `static ssize_t write(int, iovec*, int)`.
228 */
229 template<class Base, class WritePolicy>
230 struct reactor_write_op : Base
231 {
232 /// The write syscall policy type.
233 using write_policy = WritePolicy;
234
235 /// Maximum scatter-gather buffer count.
236 static constexpr std::size_t max_buffers = 16;
237
238 /// Scatter-gather I/O vectors.
239 iovec iovecs[max_buffers];
240
241 /// Number of active I/O vectors.
242 int iovec_count = 0;
243
244 267028x void reset() noexcept
245 {
246 267028x Base::reset();
247 267028x iovec_count = 0;
248 267028x }
249
250 void perform_io() noexcept override
251 {
252 ssize_t n = WritePolicy::write(this->fd, iovecs, iovec_count);
253 if (n >= 0)
254 this->complete(0, static_cast<std::size_t>(n));
255 else
256 this->complete(errno, 0);
257 }
258 };
259
260 /** Shared accept operation.
261
262 Delegates the actual syscall to AcceptPolicy::do_accept(fd, peer_storage),
263 which returns the accepted fd or -1 with errno set.
264
265 @tparam Base The backend's base op type.
266 @tparam AcceptPolicy Provides `static int do_accept(int, sockaddr_storage&)`.
267 */
268 template<class Base, class AcceptPolicy>
269 struct reactor_accept_op : Base
270 {
271 /// File descriptor of the accepted connection.
272 int accepted_fd = -1;
273
274 /// Pointer to the peer socket implementation.
275 io_object::implementation* peer_impl = nullptr;
276
277 /// Output pointer for the accepted implementation.
278 io_object::implementation** impl_out = nullptr;
279
280 /// Peer address storage filled by accept.
281 sockaddr_storage peer_storage{};
282
283 10071x void reset() noexcept
284 {
285 10071x Base::reset();
286 10071x accepted_fd = -1;
287 10071x peer_impl = nullptr;
288 10071x impl_out = nullptr;
289 10071x peer_storage = {};
290 10071x }
291
292 10055x void perform_io() noexcept override
293 {
294 10055x int new_fd = AcceptPolicy::do_accept(this->fd, peer_storage);
295 10055x if (new_fd >= 0)
296 {
297 10055x accepted_fd = new_fd;
298 10055x this->complete(0, 0);
299 }
300 else
301 {
302 this->complete(errno, 0);
303 }
304 10055x }
305 };
306
307 /** Shared connected send operation for datagram sockets.
308
309 Uses sendmsg() with msg_name=nullptr (connected mode).
310
311 @tparam Base The backend's base op type.
312 */
313 template<class Base>
314 struct reactor_send_op : Base
315 {
316 /// Maximum scatter-gather buffer count.
317 static constexpr std::size_t max_buffers = 16;
318
319 /// Scatter-gather I/O vectors.
320 iovec iovecs[max_buffers];
321
322 /// Number of active I/O vectors.
323 int iovec_count = 0;
324
325 6x void reset() noexcept
326 {
327 6x Base::reset();
328 6x iovec_count = 0;
329 6x }
330
331 void perform_io() noexcept override
332 {
333 msghdr msg{};
334 msg.msg_iov = iovecs;
335 msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
336
337 #ifdef MSG_NOSIGNAL
338 constexpr int send_flags = MSG_NOSIGNAL;
339 #else
340 constexpr int send_flags = 0;
341 #endif
342
343 ssize_t n;
344 do
345 {
346 n = ::sendmsg(this->fd, &msg, send_flags);
347 }
348 while (n < 0 && errno == EINTR);
349
350 if (n >= 0)
351 this->complete(0, static_cast<std::size_t>(n));
352 else
353 this->complete(errno, 0);
354 }
355 };
356
357 /** Shared connected recv operation for datagram sockets.
358
359 Uses recvmsg() with msg_name=nullptr (connected mode).
360 Unlike reactor_read_op, does not map n==0 to EOF
361 (zero-length datagrams are valid).
362
363 @tparam Base The backend's base op type.
364 */
365 template<class Base>
366 struct reactor_recv_op : Base
367 {
368 /// Maximum scatter-gather buffer count.
369 static constexpr std::size_t max_buffers = 16;
370
371 /// Scatter-gather I/O vectors.
372 iovec iovecs[max_buffers];
373
374 /// Number of active I/O vectors.
375 int iovec_count = 0;
376
377 /// Return true (this is a read-direction operation).
378 2x bool is_read_operation() const noexcept override
379 {
380 2x return true;
381 }
382
383 4x void reset() noexcept
384 {
385 4x Base::reset();
386 4x iovec_count = 0;
387 4x }
388
389 void perform_io() noexcept override
390 {
391 msghdr msg{};
392 msg.msg_iov = iovecs;
393 msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
394
395 ssize_t n;
396 do
397 {
398 n = ::recvmsg(this->fd, &msg, 0);
399 }
400 while (n < 0 && errno == EINTR);
401
402 if (n >= 0)
403 this->complete(0, static_cast<std::size_t>(n));
404 else
405 this->complete(errno, 0);
406 }
407 };
408
409 /** Shared send_to operation for datagram sockets.
410
411 Uses sendmsg() with the destination endpoint in msg_name.
412
413 @tparam Base The backend's base op type.
414 */
415 template<class Base>
416 struct reactor_send_to_op : Base
417 {
418 /// Maximum scatter-gather buffer count.
419 static constexpr std::size_t max_buffers = 16;
420
421 /// Scatter-gather I/O vectors.
422 iovec iovecs[max_buffers];
423
424 /// Number of active I/O vectors.
425 int iovec_count = 0;
426
427 /// Destination address storage.
428 sockaddr_storage dest_storage{};
429
430 /// Destination address length.
431 socklen_t dest_len = 0;
432
433 22x void reset() noexcept
434 {
435 22x Base::reset();
436 22x iovec_count = 0;
437 22x dest_storage = {};
438 22x dest_len = 0;
439 22x }
440
441 void perform_io() noexcept override
442 {
443 msghdr msg{};
444 msg.msg_name = &dest_storage;
445 msg.msg_namelen = dest_len;
446 msg.msg_iov = iovecs;
447 msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
448
449 #ifdef MSG_NOSIGNAL
450 constexpr int send_flags = MSG_NOSIGNAL;
451 #else
452 constexpr int send_flags = 0;
453 #endif
454
455 ssize_t n;
456 do
457 {
458 n = ::sendmsg(this->fd, &msg, send_flags);
459 }
460 while (n < 0 && errno == EINTR);
461
462 if (n >= 0)
463 this->complete(0, static_cast<std::size_t>(n));
464 else
465 this->complete(errno, 0);
466 }
467 };
468
469 /** Shared recv_from operation for datagram sockets.
470
471 Uses recvmsg() with msg_name to capture the source endpoint.
472
473 @tparam Base The backend's base op type.
474 */
475 template<class Base>
476 struct reactor_recv_from_op : Base
477 {
478 /// Maximum scatter-gather buffer count.
479 static constexpr std::size_t max_buffers = 16;
480
481 /// Scatter-gather I/O vectors.
482 iovec iovecs[max_buffers];
483
484 /// Number of active I/O vectors.
485 int iovec_count = 0;
486
487 /// Source address storage filled by recvmsg.
488 sockaddr_storage source_storage{};
489
490 /// Output pointer for the source endpoint (set by do_recv_from).
491 endpoint* source_out = nullptr;
492
493 /// Return true (this is a read-direction operation).
494 bool is_read_operation() const noexcept override
495 {
496 return true;
497 }
498
499 32x void reset() noexcept
500 {
501 32x Base::reset();
502 32x iovec_count = 0;
503 32x source_storage = {};
504 32x source_out = nullptr;
505 32x }
506
507 2x void perform_io() noexcept override
508 {
509 2x msghdr msg{};
510 2x msg.msg_name = &source_storage;
511 2x msg.msg_namelen = sizeof(source_storage);
512 2x msg.msg_iov = iovecs;
513 2x msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
514
515 ssize_t n;
516 do
517 {
518 2x n = ::recvmsg(this->fd, &msg, 0);
519 }
520 2x while (n < 0 && errno == EINTR);
521
522 2x if (n >= 0)
523 2x this->complete(0, static_cast<std::size_t>(n));
524 else
525 this->complete(errno, 0);
526 2x }
527 };
528
529 } // namespace boost::corosio::detail
530
531 #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_HPP
532