src/corosio/src/local_connect_pair.cpp

65.0% Lines (65/100) 100.0% List of functions (6/6) 61.0% Branches (36/59)
local_connect_pair.cpp
f(x) Functions (6)
Line Branch TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #include <boost/corosio/local_connect_pair.hpp>
11 #include <boost/corosio/detail/platform.hpp>
12 #include <boost/corosio/native/detail/make_err.hpp>
13
14 #include <system_error>
15
16 #if BOOST_COROSIO_POSIX
17 #include <fcntl.h>
18 #include <sys/socket.h>
19 #include <sys/un.h>
20 #include <unistd.h>
21 #elif BOOST_COROSIO_HAS_IOCP
22 #include <boost/corosio/native/detail/endpoint_convert.hpp>
23
24 #include <cstring>
25 #include <filesystem>
26 #include <random>
27 #include <string>
28 #include <thread>
29
30 #ifndef WIN32_LEAN_AND_MEAN
31 #define WIN32_LEAN_AND_MEAN
32 #endif
33 #include <WinSock2.h>
34
35 #ifndef AF_UNIX
36 #define AF_UNIX 1
37 #endif
38 #endif
39
40 namespace boost::corosio {
41
42 namespace {
43
44 #if BOOST_COROSIO_POSIX
45
46 std::error_code
47 make_pair_fds(int type, int& a_fd, int& b_fd) noexcept
48 {
49 int fds[2];
50 if (::socketpair(AF_UNIX, type, 0, fds) != 0)
51 return detail::make_err(errno);
52
53 // assign() is documented "adopt-only" and will not mutate the fd;
54 // set O_NONBLOCK before transferring ownership.
55 for (int i = 0; i < 2; ++i)
56 {
57 int flags = ::fcntl(fds[i], F_GETFL, 0);
58 if (flags < 0 || ::fcntl(fds[i], F_SETFL, flags | O_NONBLOCK) < 0)
59 {
60 auto ec = detail::make_err(errno);
61 ::close(fds[0]);
62 ::close(fds[1]);
63 return ec;
64 }
65 }
66
67 a_fd = fds[0];
68 b_fd = fds[1];
69 return {};
70 }
71
72 template<class Socket>
73 std::error_code
74 assign_pair(Socket& a, Socket& b, int a_fd, int b_fd) noexcept
75 {
76 try
77 {
78 a.assign(a_fd);
79 }
80 catch (std::system_error const& e)
81 {
82 ::close(a_fd);
83 ::close(b_fd);
84 return e.code();
85 }
86
87 try
88 {
89 b.assign(b_fd);
90 }
91 catch (std::system_error const& e)
92 {
93 a.close();
94 ::close(b_fd);
95 return e.code();
96 }
97
98 return {};
99 }
100
101 #elif BOOST_COROSIO_HAS_IOCP
102
103 // Build a unique sub-directory under temp and return the full socket
104 // path inside it. Empty string on failure.
105 std::string
106 1x pick_pair_path(std::filesystem::path& dir_out)
107 {
108 namespace fs = std::filesystem;
109
110
4/5
✓ Branch 2 → 3 taken 1 time.
✗ Branch 2 → 8 not taken.
✓ Branch 3 → 4 taken 1 time.
✓ Branch 4 → 5 taken 1 time.
✓ Branch 5 → 6 taken 1 time.
1x thread_local std::mt19937_64 gen{std::random_device{}()};
111
112
1/2
✓ Branch 36 → 9 taken 1 time.
✗ Branch 36 → 37 not taken.
1x for (int attempt = 0; attempt < 16; ++attempt)
113 {
114 auto candidate =
115
1/1
✓ Branch 13 → 14 taken 1 time.
2x fs::temp_directory_path() /
116
5/5
✓ Branch 9 → 10 taken 1 time.
✓ Branch 10 → 11 taken 1 time.
✓ Branch 11 → 12 taken 1 time.
✓ Branch 12 → 13 taken 1 time.
✓ Branch 14 → 15 taken 1 time.
3x ("co_pair_" + std::to_string(gen()));
117 1x std::error_code ec;
118
1/2
✓ Branch 21 → 22 taken 1 time.
✗ Branch 21 → 30 not taken.
1x if (fs::create_directory(candidate, ec))
119 {
120
1/1
✓ Branch 22 → 23 taken 1 time.
1x dir_out = candidate;
121
3/3
✓ Branch 23 → 24 taken 1 time.
✓ Branch 24 → 25 taken 1 time.
✓ Branch 25 → 26 taken 1 time.
1x return (candidate / "s").string();
122 }
123 1x }
124 return {};
125 }
126
127 void
128 1x remove_pair_path(std::filesystem::path const& dir, std::string const& path)
129 {
130 1x std::error_code ec;
131
1/1
✓ Branch 3 → 4 taken 1 time.
1x std::filesystem::remove(std::filesystem::path(path), ec);
132 1x std::filesystem::remove(dir, ec);
133 1x }
134
135 // Synchronously rendezvous two AF_UNIX SOCK_STREAM sockets. The
136 // listener and accept happen on the caller's thread; the connect
137 // runs on a short-lived worker to avoid a deadlock. The returned
138 // sockets are created with WSA_FLAG_OVERLAPPED so they can be
139 // registered with IOCP by assign_socket().
140 std::error_code
141 1x make_pair_sockets(SOCKET& a_sock, SOCKET& b_sock) noexcept
142 {
143 namespace fs = std::filesystem;
144
145 1x a_sock = INVALID_SOCKET;
146 1x b_sock = INVALID_SOCKET;
147
148 1x fs::path dir;
149 1x std::string path = pick_pair_path(dir);
150
1/2
✗ Branch 5 → 6 not taken.
✓ Branch 5 → 7 taken 1 time.
1x if (path.empty())
151 return detail::make_err(ERROR_PATH_NOT_FOUND);
152
153 1x SOCKET listen_sock = ::WSASocketW(
154 AF_UNIX, SOCK_STREAM, 0, nullptr, 0, WSA_FLAG_OVERLAPPED);
155
1/2
✗ Branch 8 → 9 not taken.
✓ Branch 8 → 13 taken 1 time.
1x if (listen_sock == INVALID_SOCKET)
156 {
157 auto ec = detail::make_err(::WSAGetLastError());
158 remove_pair_path(dir, path);
159 return ec;
160 }
161
162 1x detail::un_sa_t addr{};
163 1x addr.sun_family = AF_UNIX;
164 1x std::strncpy(
165 addr.sun_path, path.c_str(), sizeof(addr.sun_path) - 1);
166 int addr_len = static_cast<int>(
167 1x offsetof(detail::un_sa_t, sun_path) + path.size() + 1);
168
169 1x if (::bind(
170 listen_sock, reinterpret_cast<sockaddr*>(&addr), addr_len)
171
1/2
✗ Branch 16 → 17 not taken.
✓ Branch 16 → 22 taken 1 time.
1x == SOCKET_ERROR)
172 {
173 auto ec = detail::make_err(::WSAGetLastError());
174 ::closesocket(listen_sock);
175 remove_pair_path(dir, path);
176 return ec;
177 }
178
179
1/2
✗ Branch 23 → 24 not taken.
✓ Branch 23 → 29 taken 1 time.
1x if (::listen(listen_sock, 1) == SOCKET_ERROR)
180 {
181 auto ec = detail::make_err(::WSAGetLastError());
182 ::closesocket(listen_sock);
183 remove_pair_path(dir, path);
184 return ec;
185 }
186
187 1x SOCKET worker_sock = INVALID_SOCKET;
188 1x std::error_code worker_ec;
189
190 3x std::thread worker([&] {
191
1/1
✓ Branch 2 → 3 taken 1 time.
1x worker_sock = ::WSASocketW(
192 AF_UNIX, SOCK_STREAM, 0, nullptr, 0, WSA_FLAG_OVERLAPPED);
193
1/2
✗ Branch 3 → 4 not taken.
✓ Branch 3 → 7 taken 1 time.
1x if (worker_sock == INVALID_SOCKET)
194 {
195 worker_ec = detail::make_err(::WSAGetLastError());
196 return;
197 }
198
199 1x detail::un_sa_t caddr{};
200 1x caddr.sun_family = AF_UNIX;
201 1x std::strncpy(
202 caddr.sun_path, path.c_str(), sizeof(caddr.sun_path) - 1);
203 int caddr_len = static_cast<int>(
204 1x offsetof(detail::un_sa_t, sun_path) + path.size() + 1);
205
206
1/1
✓ Branch 9 → 10 taken 1 time.
1x if (::connect(
207 worker_sock, reinterpret_cast<sockaddr*>(&caddr), caddr_len)
208
1/2
✗ Branch 10 → 11 not taken.
✓ Branch 10 → 15 taken 1 time.
1x == SOCKET_ERROR)
209 {
210 worker_ec = detail::make_err(::WSAGetLastError());
211 ::closesocket(worker_sock);
212 worker_sock = INVALID_SOCKET;
213 }
214 1x });
215
216 1x SOCKET accept_sock = ::accept(listen_sock, nullptr, nullptr);
217 1x std::error_code accept_ec;
218
1/2
✗ Branch 33 → 34 not taken.
✓ Branch 33 → 36 taken 1 time.
1x if (accept_sock == INVALID_SOCKET)
219 accept_ec = detail::make_err(::WSAGetLastError());
220
221 1x worker.join();
222
223 1x ::closesocket(listen_sock);
224 1x remove_pair_path(dir, path);
225
226
1/2
✗ Branch 40 → 41 not taken.
✓ Branch 40 → 44 taken 1 time.
1x if (accept_ec)
227 {
228 if (worker_sock != INVALID_SOCKET)
229 ::closesocket(worker_sock);
230 return accept_ec;
231 }
232
1/2
✗ Branch 45 → 46 not taken.
✓ Branch 45 → 48 taken 1 time.
1x if (worker_ec)
233 {
234 ::closesocket(accept_sock);
235 return worker_ec;
236 }
237
238 1x a_sock = accept_sock;
239 1x b_sock = worker_sock;
240 1x return {};
241 1x }
242
243 std::error_code
244 1x assign_pair(
245 local_stream_socket& a,
246 local_stream_socket& b,
247 SOCKET a_sock,
248 SOCKET b_sock) noexcept
249 {
250 try
251 {
252
1/1
✓ Branch 2 → 3 taken 1 time.
1x a.assign(static_cast<native_handle_type>(a_sock));
253 }
254 catch (std::system_error const& e)
255 {
256 ::closesocket(a_sock);
257 ::closesocket(b_sock);
258 return e.code();
259 }
260
261 try
262 {
263
1/1
✓ Branch 3 → 4 taken 1 time.
1x b.assign(static_cast<native_handle_type>(b_sock));
264 }
265 catch (std::system_error const& e)
266 {
267 a.close();
268 ::closesocket(b_sock);
269 return e.code();
270 }
271
272 1x return {};
273 }
274
275 #endif
276
277 } // namespace
278
279 std::error_code
280 2x connect_pair(local_stream_socket& a, local_stream_socket& b) noexcept
281 {
282
5/6
✓ Branch 3 → 4 taken 1 time.
✓ Branch 3 → 6 taken 1 time.
✗ Branch 5 → 6 not taken.
✓ Branch 5 → 7 taken 1 time.
✓ Branch 8 → 9 taken 1 time.
✓ Branch 8 → 11 taken 1 time.
2x if (a.is_open() || b.is_open())
283 1x return detail::make_err(
284 #if BOOST_COROSIO_POSIX
285 EISCONN
286 #else
287 WSAEISCONN
288 #endif
289 1x );
290
291 #if BOOST_COROSIO_POSIX
292 int a_fd = -1, b_fd = -1;
293 if (auto ec = make_pair_fds(SOCK_STREAM, a_fd, b_fd))
294 return ec;
295 return assign_pair(a, b, a_fd, b_fd);
296 #elif BOOST_COROSIO_HAS_IOCP
297 1x SOCKET a_sock = INVALID_SOCKET, b_sock = INVALID_SOCKET;
298
1/2
✗ Branch 13 → 14 not taken.
✓ Branch 13 → 15 taken 1 time.
1x if (auto ec = make_pair_sockets(a_sock, b_sock))
299 return ec;
300 1x return assign_pair(a, b, a_sock, b_sock);
301 #else
302 return detail::make_err(ENOSYS);
303 #endif
304 }
305
306 #if BOOST_COROSIO_POSIX
307
308 std::error_code
309 connect_pair(local_datagram_socket& a, local_datagram_socket& b) noexcept
310 {
311 if (a.is_open() || b.is_open())
312 return detail::make_err(EISCONN);
313
314 int a_fd = -1, b_fd = -1;
315 if (auto ec = make_pair_fds(SOCK_DGRAM, a_fd, b_fd))
316 return ec;
317 return assign_pair(a, b, a_fd, b_fd);
318 }
319
320 #endif
321
322 } // namespace boost::corosio
323