include/boost/corosio/native/detail/iocp/win_local_stream_service.hpp

3.7% Lines (15/402) 5.2% List of functions (3/58) 4.3% Branches (6/140)
win_local_stream_service.hpp
f(x) Functions (58)
Function Calls Lines Branches Blocks
boost::corosio::detail::local_stream_connect_op::local_stream_connect_op(boost::corosio::detail::win_local_stream_socket_internal&) :124 0 0.0% 0.0% boost::corosio::detail::local_stream_read_op::local_stream_read_op(boost::corosio::detail::win_local_stream_socket_internal&) :132 0 0.0% 0.0% boost::corosio::detail::local_stream_write_op::local_stream_write_op(boost::corosio::detail::win_local_stream_socket_internal&) :140 0 0.0% 0.0% boost::corosio::detail::local_stream_connect_op::do_cancel_impl(boost::corosio::detail::overlapped_op*) :153 0 0.0% 0.0% 0.0% boost::corosio::detail::local_stream_read_op::do_cancel_impl(boost::corosio::detail::overlapped_op*) :165 0 0.0% 0.0% 0.0% boost::corosio::detail::local_stream_write_op::do_cancel_impl(boost::corosio::detail::overlapped_op*) :177 0 0.0% 0.0% 0.0% boost::corosio::detail::local_stream_connect_op::do_complete(void*, boost::corosio::detail::scheduler_op*, unsigned int, unsigned int) :193 0 0.0% 0.0% 0.0% boost::corosio::detail::local_stream_read_op::do_complete(void*, boost::corosio::detail::scheduler_op*, unsigned int, unsigned int) :237 0 0.0% 0.0% 0.0% boost::corosio::detail::local_stream_write_op::do_complete(void*, boost::corosio::detail::scheduler_op*, unsigned int, unsigned int) :261 0 0.0% 0.0% 0.0% boost::corosio::detail::win_local_stream_socket_internal::win_local_stream_socket_internal(boost::corosio::detail::win_local_stream_service&) :284 0 0.0% 0.0% boost::corosio::detail::win_local_stream_socket_internal::~win_local_stream_socket_internal() :293 0 0.0% 0.0% boost::corosio::detail::win_local_stream_socket_internal::native_handle() const :299 0 0.0% 0.0% boost::corosio::detail::win_local_stream_socket_internal::local_endpoint() const :305 0 0.0% 0.0% boost::corosio::detail::win_local_stream_socket_internal::remote_endpoint() const :311 0 0.0% 0.0% boost::corosio::detail::win_local_stream_socket_internal::is_open() const :317 0 0.0% 0.0% boost::corosio::detail::win_local_stream_socket_internal::set_socket(unsigned long long) :323 0 0.0% 0.0% boost::corosio::detail::win_local_stream_socket_internal::set_endpoints(boost::corosio::local_endpoint, boost::corosio::local_endpoint) :329 0 0.0% 0.0% boost::corosio::detail::win_local_stream_socket_internal::connect(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::local_endpoint, std::stop_token, std::error_code*) :338 0 0.0% 0.0% 0.0% boost::corosio::detail::win_local_stream_socket_internal::read_some(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::buffer_param, std::stop_token, std::error_code*, unsigned long long*) :409 0 0.0% 0.0% 0.0% boost::corosio::detail::win_local_stream_socket_internal::write_some(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::buffer_param, std::stop_token, std::error_code*, unsigned long long*) :471 0 0.0% 0.0% 0.0% boost::corosio::detail::win_local_stream_socket_internal::cancel() :529 0 0.0% 0.0% 0.0% boost::corosio::detail::win_local_stream_socket_internal::close_socket() :542 0 0.0% 0.0% 0.0% boost::corosio::detail::win_local_stream_socket::win_local_stream_socket(std::shared_ptr<boost::corosio::detail::win_local_stream_socket_internal>) :559 0 0.0% 0.0% boost::corosio::detail::win_local_stream_socket::close_internal() :566 0 0.0% 0.0% 0.0% boost::corosio::detail::win_local_stream_socket::connect(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::local_endpoint, std::stop_token, std::error_code*) :576 0 0.0% 0.0% 0.0% boost::corosio::detail::win_local_stream_socket::read_some(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::buffer_param, std::stop_token, std::error_code*, unsigned long long*) :587 0 0.0% 0.0% 0.0% boost::corosio::detail::win_local_stream_socket::write_some(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::buffer_param, std::stop_token, std::error_code*, unsigned long long*) :599 0 0.0% 0.0% 0.0% boost::corosio::detail::win_local_stream_socket::shutdown(boost::corosio::shutdown_type) :611 0 0.0% 0.0% 0.0% boost::corosio::detail::win_local_stream_socket::native_handle() const :635 0 0.0% 0.0% boost::corosio::detail::win_local_stream_socket::release_socket() :641 0 0.0% 0.0% 0.0% boost::corosio::detail::win_local_stream_socket::set_option(int, int, void const*, unsigned long long) :655 0 0.0% 0.0% 0.0% boost::corosio::detail::win_local_stream_socket::get_option(int, int, void*, unsigned long long*) const :666 0 0.0% 0.0% 0.0% boost::corosio::detail::win_local_stream_socket::local_endpoint() const :679 0 0.0% 0.0% boost::corosio::detail::win_local_stream_socket::remote_endpoint() const :685 0 0.0% 0.0% boost::corosio::detail::win_local_stream_socket::cancel() :691 0 0.0% 0.0% boost::corosio::detail::win_local_stream_socket::get_internal() const :697 0 0.0% 0.0% boost::corosio::detail::win_local_stream_service::win_local_stream_service(boost::capy::execution_context&, boost::corosio::detail::win_tcp_service&) :706 442x 100.0% 100.0% 78.6% boost::corosio::detail::win_local_stream_service::~win_local_stream_service() :714 884x 100.0% 100.0% boost::corosio::detail::win_local_stream_service::shutdown() :726 442x 55.6% 50.0% 63.6% boost::corosio::detail::win_local_stream_service::construct() :744 0 0.0% 0.0% 0.0% boost::corosio::detail::win_local_stream_service::destroy(boost::corosio::io_object::implementation*) :764 0 0.0% 0.0% 0.0% boost::corosio::detail::win_local_stream_service::close(boost::corosio::io_object::handle&) :775 0 0.0% 0.0% boost::corosio::detail::win_local_stream_service::destroy_impl(boost::corosio::detail::win_local_stream_socket&) :782 0 0.0% 0.0% 0.0% boost::corosio::detail::win_local_stream_service::unregister_impl(boost::corosio::detail::win_local_stream_socket_internal&) :792 0 0.0% 0.0% boost::corosio::detail::win_local_stream_service::open_socket(boost::corosio::local_stream_socket::implementation&, int, int, int) :800 0 0.0% 0.0% boost::corosio::detail::win_local_stream_service::assign_socket(boost::corosio::local_stream_socket::implementation&, unsigned long long) :809 0 0.0% 0.0% boost::corosio::detail::win_local_stream_service::open_socket_internal(boost::corosio::detail::win_local_stream_socket_internal&, int, int, int) :817 0 0.0% 0.0% 0.0% boost::corosio::detail::win_local_stream_service::native_handle() const :846 0 0.0% 0.0% boost::corosio::detail::win_local_stream_service::connect_ex() const :852 0 0.0% 0.0% boost::corosio::detail::win_local_stream_service::accept_ex() const :858 0 0.0% 0.0% boost::corosio::detail::win_local_stream_service::on_pending(boost::corosio::detail::overlapped_op*) :870 0 0.0% 0.0% boost::corosio::detail::win_local_stream_service::on_completion(boost::corosio::detail::overlapped_op*, unsigned long, unsigned long) :876 0 0.0% 0.0% boost::corosio::detail::win_local_stream_service::work_started() :883 0 0.0% 0.0% boost::corosio::detail::win_local_stream_service::destroy_acceptor_impl(boost::corosio::detail::win_local_stream_acceptor&) :895 0 0.0% 0.0% 0.0% boost::corosio::detail::win_local_stream_service::unregister_acceptor_impl(boost::corosio::detail::win_local_stream_acceptor_internal&) :906 0 0.0% 0.0% boost::corosio::detail::win_local_stream_service::open_acceptor_socket(boost::corosio::detail::win_local_stream_acceptor_internal&, int, int, int) :914 0 0.0% 0.0% 0.0% boost::corosio::detail::win_local_stream_service::bind_acceptor(boost::corosio::detail::win_local_stream_acceptor_internal&, boost::corosio::local_endpoint) :941 0 0.0% 0.0% 0.0% boost::corosio::detail::win_local_stream_service::listen_acceptor(boost::corosio::detail::win_local_stream_acceptor_internal&, int) :963 0 0.0% 0.0% 0.0%
Line Branch TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Michael Vandeberg
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_IOCP_WIN_LOCAL_STREAM_SERVICE_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_IOCP_WIN_LOCAL_STREAM_SERVICE_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_IOCP
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/corosio/detail/local_stream_service.hpp>
19
20 #include <boost/corosio/native/detail/iocp/win_local_stream_socket.hpp>
21 #include <boost/corosio/native/detail/iocp/win_tcp_service.hpp>
22 #include <boost/corosio/native/detail/iocp/win_scheduler.hpp>
23 #include <boost/corosio/native/detail/iocp/win_completion_key.hpp>
24 #include <boost/corosio/native/detail/iocp/win_mutex.hpp>
25 #include <boost/corosio/native/detail/iocp/win_wsa_init.hpp>
26
27 #include <boost/corosio/native/detail/endpoint_convert.hpp>
28 #include <boost/corosio/native/detail/make_err.hpp>
29 #include <boost/corosio/detail/dispatch_coro.hpp>
30
31 #include <Ws2tcpip.h>
32
33 namespace boost::corosio::detail {
34
35 class win_local_stream_acceptor;
36 class win_local_stream_acceptor_internal;
37 class win_local_stream_acceptor_service;
38
39 /* IOCP local stream socket service.
40
41 Inherits from local_stream_service to enable runtime polymorphism
42 via use_service<local_stream_service>(). Reuses the ConnectEx /
43 AcceptEx function pointers already loaded by win_tcp_service.
44 */
45 class BOOST_COROSIO_DECL win_local_stream_service final
46 : private win_wsa_init
47 , public local_stream_service
48 {
49 public:
50 io_object::implementation* construct() override;
51
52 void destroy(io_object::implementation* p) override;
53
54 void close(io_object::handle& h) override;
55
56 explicit win_local_stream_service(
57 capy::execution_context& ctx, win_tcp_service& tcp_svc);
58
59 ~win_local_stream_service();
60
61 win_local_stream_service(win_local_stream_service const&) = delete;
62 win_local_stream_service& operator=(win_local_stream_service const&) = delete;
63
64 void shutdown() override;
65
66 std::error_code open_socket(
67 local_stream_socket::implementation& impl,
68 int family, int type, int protocol) override;
69
70 std::error_code assign_socket(
71 local_stream_socket::implementation& impl,
72 native_handle_type fd) override;
73
74 void destroy_impl(win_local_stream_socket& impl);
75
76 void unregister_impl(win_local_stream_socket_internal& impl);
77
78 std::error_code open_socket_internal(
79 win_local_stream_socket_internal& impl,
80 int family, int type, int protocol);
81
82 void destroy_acceptor_impl(win_local_stream_acceptor& impl);
83
84 void unregister_acceptor_impl(win_local_stream_acceptor_internal& impl);
85
86 std::error_code open_acceptor_socket(
87 win_local_stream_acceptor_internal& impl,
88 int family, int type, int protocol);
89
90 std::error_code bind_acceptor(
91 win_local_stream_acceptor_internal& impl,
92 corosio::local_endpoint ep);
93
94 std::error_code listen_acceptor(
95 win_local_stream_acceptor_internal& impl, int backlog);
96
97 void* native_handle() const noexcept;
98 LPFN_CONNECTEX connect_ex() const noexcept;
99 LPFN_ACCEPTEX accept_ex() const noexcept;
100
101 void post(overlapped_op* op);
102 void on_pending(overlapped_op* op) noexcept;
103 void on_completion(overlapped_op* op, DWORD error, DWORD bytes) noexcept;
104 void work_started() noexcept;
105 void work_finished() noexcept;
106
107 private:
108 friend class win_local_stream_acceptor_service;
109
110 win_tcp_service& tcp_svc_;
111 win_scheduler& sched_;
112 win_mutex mutex_;
113 intrusive_list<win_local_stream_socket_internal> socket_list_;
114 intrusive_list<win_local_stream_acceptor_internal> acceptor_list_;
115 intrusive_list<win_local_stream_socket> socket_wrapper_list_;
116 intrusive_list<win_local_stream_acceptor> acceptor_wrapper_list_;
117 void* iocp_;
118 };
119
120 // ============================================================
121 // Operation constructors
122 // ============================================================
123
124 inline local_stream_connect_op::local_stream_connect_op(
125 win_local_stream_socket_internal& internal_) noexcept
126 : overlapped_op(&do_complete)
127 , internal(internal_)
128 {
129 cancel_func_ = &do_cancel_impl;
130 }
131
132 inline local_stream_read_op::local_stream_read_op(
133 win_local_stream_socket_internal& internal_) noexcept
134 : overlapped_op(&do_complete)
135 , internal(internal_)
136 {
137 cancel_func_ = &do_cancel_impl;
138 }
139
140 inline local_stream_write_op::local_stream_write_op(
141 win_local_stream_socket_internal& internal_) noexcept
142 : overlapped_op(&do_complete)
143 , internal(internal_)
144 {
145 cancel_func_ = &do_cancel_impl;
146 }
147
148 // ============================================================
149 // Cancellation functions
150 // ============================================================
151
152 inline void
153 local_stream_connect_op::do_cancel_impl(overlapped_op* base) noexcept
154 {
155 auto* op = static_cast<local_stream_connect_op*>(base);
156 op->cancelled.store(true, std::memory_order_release);
157 if (op->internal.is_open())
158 {
159 ::CancelIoEx(
160 reinterpret_cast<HANDLE>(op->internal.native_handle()), op);
161 }
162 }
163
164 inline void
165 local_stream_read_op::do_cancel_impl(overlapped_op* base) noexcept
166 {
167 auto* op = static_cast<local_stream_read_op*>(base);
168 op->cancelled.store(true, std::memory_order_release);
169 if (op->internal.is_open())
170 {
171 ::CancelIoEx(
172 reinterpret_cast<HANDLE>(op->internal.native_handle()), op);
173 }
174 }
175
176 inline void
177 local_stream_write_op::do_cancel_impl(overlapped_op* base) noexcept
178 {
179 auto* op = static_cast<local_stream_write_op*>(base);
180 op->cancelled.store(true, std::memory_order_release);
181 if (op->internal.is_open())
182 {
183 ::CancelIoEx(
184 reinterpret_cast<HANDLE>(op->internal.native_handle()), op);
185 }
186 }
187
188 // ============================================================
189 // connect_op completion handler
190 // ============================================================
191
192 inline void
193 local_stream_connect_op::do_complete(
194 void* owner,
195 scheduler_op* base,
196 std::uint32_t /*bytes*/,
197 std::uint32_t /*error*/)
198 {
199 auto* op = static_cast<local_stream_connect_op*>(base);
200
201 if (!owner)
202 {
203 op->cleanup_only();
204 op->internal_ptr.reset();
205 return;
206 }
207
208 bool success =
209 (op->dwError == 0 && !op->cancelled.load(std::memory_order_acquire));
210 if (success && op->internal.is_open())
211 {
212 // Required after ConnectEx
213 ::setsockopt(
214 op->internal.native_handle(), SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT,
215 nullptr, 0);
216
217 corosio::local_endpoint local_ep;
218 sockaddr_storage local_storage{};
219 int local_len = sizeof(local_storage);
220 if (::getsockname(
221 op->internal.native_handle(),
222 reinterpret_cast<sockaddr*>(&local_storage), &local_len) == 0)
223 local_ep = from_sockaddr_local(
224 local_storage, static_cast<socklen_t>(local_len));
225 op->internal.set_endpoints(local_ep, op->target_endpoint);
226 }
227
228 auto prevent_premature_destruction = std::move(op->internal_ptr);
229 op->invoke_handler();
230 }
231
232 // ============================================================
233 // read_op completion handler
234 // ============================================================
235
236 inline void
237 local_stream_read_op::do_complete(
238 void* owner,
239 scheduler_op* base,
240 std::uint32_t /*bytes*/,
241 std::uint32_t /*error*/)
242 {
243 auto* op = static_cast<local_stream_read_op*>(base);
244
245 if (!owner)
246 {
247 op->cleanup_only();
248 op->internal_ptr.reset();
249 return;
250 }
251
252 auto prevent_premature_destruction = std::move(op->internal_ptr);
253 op->invoke_handler();
254 }
255
256 // ============================================================
257 // write_op completion handler
258 // ============================================================
259
260 inline void
261 local_stream_write_op::do_complete(
262 void* owner,
263 scheduler_op* base,
264 std::uint32_t /*bytes*/,
265 std::uint32_t /*error*/)
266 {
267 auto* op = static_cast<local_stream_write_op*>(base);
268
269 if (!owner)
270 {
271 op->cleanup_only();
272 op->internal_ptr.reset();
273 return;
274 }
275
276 auto prevent_premature_destruction = std::move(op->internal_ptr);
277 op->invoke_handler();
278 }
279
280 // ============================================================
281 // win_local_stream_socket_internal
282 // ============================================================
283
284 inline win_local_stream_socket_internal::win_local_stream_socket_internal(
285 win_local_stream_service& svc) noexcept
286 : svc_(svc)
287 , conn_(*this)
288 , rd_(*this)
289 , wr_(*this)
290 {
291 }
292
293 inline win_local_stream_socket_internal::~win_local_stream_socket_internal()
294 {
295 svc_.unregister_impl(*this);
296 }
297
298 inline SOCKET
299 win_local_stream_socket_internal::native_handle() const noexcept
300 {
301 return socket_;
302 }
303
304 inline corosio::local_endpoint
305 win_local_stream_socket_internal::local_endpoint() const noexcept
306 {
307 return local_endpoint_;
308 }
309
310 inline corosio::local_endpoint
311 win_local_stream_socket_internal::remote_endpoint() const noexcept
312 {
313 return remote_endpoint_;
314 }
315
316 inline bool
317 win_local_stream_socket_internal::is_open() const noexcept
318 {
319 return socket_ != INVALID_SOCKET;
320 }
321
322 inline void
323 win_local_stream_socket_internal::set_socket(SOCKET s) noexcept
324 {
325 socket_ = s;
326 }
327
328 inline void
329 win_local_stream_socket_internal::set_endpoints(
330 corosio::local_endpoint local,
331 corosio::local_endpoint remote) noexcept
332 {
333 local_endpoint_ = local;
334 remote_endpoint_ = remote;
335 }
336
337 inline std::coroutine_handle<>
338 win_local_stream_socket_internal::connect(
339 std::coroutine_handle<> h,
340 capy::executor_ref d,
341 corosio::local_endpoint ep,
342 std::stop_token token,
343 std::error_code* ec)
344 {
345 conn_.internal_ptr = shared_from_this();
346
347 auto& op = conn_;
348 op.reset();
349 op.h = h;
350 op.ex = d;
351 op.ec_out = ec;
352 op.target_endpoint = ep;
353 op.start(token);
354
355 svc_.work_started();
356
357 // ConnectEx requires the socket to be bound. For AF_UNIX,
358 // bind to a family-only sockaddr_un (empty path).
359 if (local_endpoint_.empty())
360 {
361 un_sa_t bind_sa{};
362 bind_sa.sun_family = AF_UNIX;
363 socklen_t bind_len =
364 static_cast<socklen_t>(offsetof(un_sa_t, sun_path));
365
366 if (::bind(
367 socket_, reinterpret_cast<sockaddr*>(&bind_sa),
368 bind_len) == SOCKET_ERROR)
369 {
370 svc_.on_completion(&op, ::WSAGetLastError(), 0);
371 return std::noop_coroutine();
372 }
373 }
374
375 auto connect_ex = svc_.connect_ex();
376 if (!connect_ex)
377 {
378 svc_.on_completion(&op, WSAEOPNOTSUPP, 0);
379 return std::noop_coroutine();
380 }
381
382 sockaddr_storage storage{};
383 socklen_t addrlen = detail::to_sockaddr(ep, storage);
384
385 BOOL result = connect_ex(
386 socket_, reinterpret_cast<sockaddr*>(&storage),
387 static_cast<int>(addrlen), nullptr, 0, nullptr, &op);
388
389 if (!result)
390 {
391 DWORD err = ::WSAGetLastError();
392 if (err != ERROR_IO_PENDING)
393 {
394 svc_.on_completion(&op, err, 0);
395 return std::noop_coroutine();
396 }
397 }
398
399 svc_.on_pending(&op);
400
401 // Re-check cancellation after I/O is pending
402 if (op.cancelled.load(std::memory_order_acquire))
403 ::CancelIoEx(reinterpret_cast<HANDLE>(socket_), &op);
404
405 return std::noop_coroutine();
406 }
407
408 inline std::coroutine_handle<>
409 win_local_stream_socket_internal::read_some(
410 std::coroutine_handle<> h,
411 capy::executor_ref d,
412 buffer_param param,
413 std::stop_token token,
414 std::error_code* ec,
415 std::size_t* bytes_out)
416 {
417 rd_.internal_ptr = shared_from_this();
418
419 auto& op = rd_;
420 op.reset();
421 op.is_read_ = true;
422 op.h = h;
423 op.ex = d;
424 op.ec_out = ec;
425 op.bytes_out = bytes_out;
426 op.start(token);
427
428 svc_.work_started();
429
430 capy::mutable_buffer bufs[local_stream_read_op::max_buffers];
431 op.wsabuf_count =
432 static_cast<DWORD>(param.copy_to(bufs, local_stream_read_op::max_buffers));
433
434 if (op.wsabuf_count == 0)
435 {
436 op.empty_buffer = true;
437 svc_.on_completion(&op, 0, 0);
438 return std::noop_coroutine();
439 }
440
441 for (DWORD i = 0; i < op.wsabuf_count; ++i)
442 {
443 op.wsabufs[i].buf = static_cast<char*>(bufs[i].data());
444 op.wsabufs[i].len = static_cast<ULONG>(bufs[i].size());
445 }
446
447 op.flags = 0;
448
449 int result = ::WSARecv(
450 socket_, op.wsabufs, op.wsabuf_count, nullptr, &op.flags, &op, nullptr);
451
452 if (result == SOCKET_ERROR)
453 {
454 DWORD err = ::WSAGetLastError();
455 if (err != WSA_IO_PENDING)
456 {
457 svc_.on_completion(&op, err, 0);
458 return std::noop_coroutine();
459 }
460 }
461
462 svc_.on_pending(&op);
463
464 if (op.cancelled.load(std::memory_order_acquire))
465 ::CancelIoEx(reinterpret_cast<HANDLE>(socket_), &op);
466
467 return std::noop_coroutine();
468 }
469
470 inline std::coroutine_handle<>
471 win_local_stream_socket_internal::write_some(
472 std::coroutine_handle<> h,
473 capy::executor_ref d,
474 buffer_param param,
475 std::stop_token token,
476 std::error_code* ec,
477 std::size_t* bytes_out)
478 {
479 wr_.internal_ptr = shared_from_this();
480
481 auto& op = wr_;
482 op.reset();
483 op.h = h;
484 op.ex = d;
485 op.ec_out = ec;
486 op.bytes_out = bytes_out;
487 op.start(token);
488
489 svc_.work_started();
490
491 capy::mutable_buffer bufs[local_stream_write_op::max_buffers];
492 op.wsabuf_count =
493 static_cast<DWORD>(param.copy_to(bufs, local_stream_write_op::max_buffers));
494
495 if (op.wsabuf_count == 0)
496 {
497 svc_.on_completion(&op, 0, 0);
498 return std::noop_coroutine();
499 }
500
501 for (DWORD i = 0; i < op.wsabuf_count; ++i)
502 {
503 op.wsabufs[i].buf = static_cast<char*>(bufs[i].data());
504 op.wsabufs[i].len = static_cast<ULONG>(bufs[i].size());
505 }
506
507 int result = ::WSASend(
508 socket_, op.wsabufs, op.wsabuf_count, nullptr, 0, &op, nullptr);
509
510 if (result == SOCKET_ERROR)
511 {
512 DWORD err = ::WSAGetLastError();
513 if (err != WSA_IO_PENDING)
514 {
515 svc_.on_completion(&op, err, 0);
516 return std::noop_coroutine();
517 }
518 }
519
520 svc_.on_pending(&op);
521
522 if (op.cancelled.load(std::memory_order_acquire))
523 ::CancelIoEx(reinterpret_cast<HANDLE>(socket_), &op);
524
525 return std::noop_coroutine();
526 }
527
528 inline void
529 win_local_stream_socket_internal::cancel() noexcept
530 {
531 if (socket_ != INVALID_SOCKET)
532 {
533 ::CancelIoEx(reinterpret_cast<HANDLE>(socket_), nullptr);
534 }
535
536 conn_.request_cancel();
537 rd_.request_cancel();
538 wr_.request_cancel();
539 }
540
541 inline void
542 win_local_stream_socket_internal::close_socket() noexcept
543 {
544 if (socket_ != INVALID_SOCKET)
545 {
546 ::CancelIoEx(reinterpret_cast<HANDLE>(socket_), nullptr);
547 ::closesocket(socket_);
548 socket_ = INVALID_SOCKET;
549 }
550
551 local_endpoint_ = corosio::local_endpoint{};
552 remote_endpoint_ = corosio::local_endpoint{};
553 }
554
555 // ============================================================
556 // win_local_stream_socket (wrapper)
557 // ============================================================
558
559 inline win_local_stream_socket::win_local_stream_socket(
560 std::shared_ptr<win_local_stream_socket_internal> internal) noexcept
561 : internal_(std::move(internal))
562 {
563 }
564
565 inline void
566 win_local_stream_socket::close_internal() noexcept
567 {
568 if (internal_)
569 {
570 internal_->close_socket();
571 internal_.reset();
572 }
573 }
574
575 inline std::coroutine_handle<>
576 win_local_stream_socket::connect(
577 std::coroutine_handle<> h,
578 capy::executor_ref d,
579 corosio::local_endpoint ep,
580 std::stop_token token,
581 std::error_code* ec)
582 {
583 return internal_->connect(h, d, ep, token, ec);
584 }
585
586 inline std::coroutine_handle<>
587 win_local_stream_socket::read_some(
588 std::coroutine_handle<> h,
589 capy::executor_ref d,
590 buffer_param buf,
591 std::stop_token token,
592 std::error_code* ec,
593 std::size_t* bytes)
594 {
595 return internal_->read_some(h, d, buf, token, ec, bytes);
596 }
597
598 inline std::coroutine_handle<>
599 win_local_stream_socket::write_some(
600 std::coroutine_handle<> h,
601 capy::executor_ref d,
602 buffer_param buf,
603 std::stop_token token,
604 std::error_code* ec,
605 std::size_t* bytes)
606 {
607 return internal_->write_some(h, d, buf, token, ec, bytes);
608 }
609
610 inline std::error_code
611 win_local_stream_socket::shutdown(
612 local_stream_socket::shutdown_type what) noexcept
613 {
614 int how;
615 switch (what)
616 {
617 case local_stream_socket::shutdown_receive:
618 how = SD_RECEIVE;
619 break;
620 case local_stream_socket::shutdown_send:
621 how = SD_SEND;
622 break;
623 case local_stream_socket::shutdown_both:
624 how = SD_BOTH;
625 break;
626 default:
627 return make_err(WSAEINVAL);
628 }
629 if (::shutdown(internal_->native_handle(), how) != 0)
630 return make_err(WSAGetLastError());
631 return {};
632 }
633
634 inline native_handle_type
635 win_local_stream_socket::native_handle() const noexcept
636 {
637 return static_cast<native_handle_type>(internal_->native_handle());
638 }
639
640 inline native_handle_type
641 win_local_stream_socket::release_socket() noexcept
642 {
643 SOCKET s = internal_->socket_;
644 if (s != INVALID_SOCKET)
645 {
646 internal_->cancel();
647 internal_->socket_ = INVALID_SOCKET;
648 internal_->local_endpoint_ = corosio::local_endpoint{};
649 internal_->remote_endpoint_ = corosio::local_endpoint{};
650 }
651 return static_cast<native_handle_type>(s);
652 }
653
654 inline std::error_code
655 win_local_stream_socket::set_option(
656 int level, int optname, void const* data, std::size_t size) noexcept
657 {
658 if (::setsockopt(
659 internal_->native_handle(), level, optname,
660 reinterpret_cast<char const*>(data), static_cast<int>(size)) != 0)
661 return make_err(WSAGetLastError());
662 return {};
663 }
664
665 inline std::error_code
666 win_local_stream_socket::get_option(
667 int level, int optname, void* data, std::size_t* size) const noexcept
668 {
669 int len = static_cast<int>(*size);
670 if (::getsockopt(
671 internal_->native_handle(), level, optname,
672 reinterpret_cast<char*>(data), &len) != 0)
673 return make_err(WSAGetLastError());
674 *size = static_cast<std::size_t>(len);
675 return {};
676 }
677
678 inline corosio::local_endpoint
679 win_local_stream_socket::local_endpoint() const noexcept
680 {
681 return internal_->local_endpoint();
682 }
683
684 inline corosio::local_endpoint
685 win_local_stream_socket::remote_endpoint() const noexcept
686 {
687 return internal_->remote_endpoint();
688 }
689
690 inline void
691 win_local_stream_socket::cancel() noexcept
692 {
693 internal_->cancel();
694 }
695
696 inline win_local_stream_socket_internal*
697 win_local_stream_socket::get_internal() const noexcept
698 {
699 return internal_.get();
700 }
701
702 // ============================================================
703 // win_local_stream_service
704 // ============================================================
705
706 442x inline win_local_stream_service::win_local_stream_service(
707 442x capy::execution_context& ctx, win_tcp_service& tcp_svc)
708 442x : tcp_svc_(tcp_svc)
709 884x , sched_(ctx.use_service<win_scheduler>())
710
2/2
✓ Branch 4 → 5 taken 442 times.
✓ Branch 5 → 6 taken 442 times.
442x , iocp_(sched_.native_handle())
711 {
712 442x }
713
714 884x inline win_local_stream_service::~win_local_stream_service()
715 {
716
1/2
✗ Branch 6 → 3 not taken.
✓ Branch 6 → 7 taken 442 times.
442x for (auto* w = socket_wrapper_list_.pop_front(); w != nullptr;
717 w = socket_wrapper_list_.pop_front())
718 delete w;
719
720
1/2
✗ Branch 11 → 8 not taken.
✓ Branch 11 → 12 taken 442 times.
442x for (auto* w = acceptor_wrapper_list_.pop_front(); w != nullptr;
721 w = acceptor_wrapper_list_.pop_front())
722 delete w;
723 884x }
724
725 inline void
726 442x win_local_stream_service::shutdown()
727 {
728 442x std::lock_guard<win_mutex> lock(mutex_);
729
730
1/2
✗ Branch 6 → 4 not taken.
✓ Branch 6 → 7 taken 442 times.
442x for (auto* impl = socket_list_.pop_front(); impl != nullptr;
731 impl = socket_list_.pop_front())
732 {
733 impl->close_socket();
734 }
735
736
1/2
✗ Branch 10 → 8 not taken.
✓ Branch 10 → 11 taken 442 times.
442x for (auto* impl = acceptor_list_.pop_front(); impl != nullptr;
737 impl = acceptor_list_.pop_front())
738 {
739 impl->close_socket();
740 }
741 442x }
742
743 inline io_object::implementation*
744 win_local_stream_service::construct()
745 {
746 auto internal = std::make_shared<win_local_stream_socket_internal>(*this);
747
748 {
749 std::lock_guard<win_mutex> lock(mutex_);
750 socket_list_.push_back(internal.get());
751 }
752
753 auto* wrapper = new win_local_stream_socket(std::move(internal));
754
755 {
756 std::lock_guard<win_mutex> lock(mutex_);
757 socket_wrapper_list_.push_back(wrapper);
758 }
759
760 return wrapper;
761 }
762
763 inline void
764 win_local_stream_service::destroy(io_object::implementation* p)
765 {
766 if (p)
767 {
768 auto& wrapper = static_cast<win_local_stream_socket&>(*p);
769 wrapper.close_internal();
770 destroy_impl(wrapper);
771 }
772 }
773
774 inline void
775 win_local_stream_service::close(io_object::handle& h)
776 {
777 auto& wrapper = static_cast<win_local_stream_socket&>(*h.get());
778 wrapper.get_internal()->close_socket();
779 }
780
781 inline void
782 win_local_stream_service::destroy_impl(win_local_stream_socket& impl)
783 {
784 {
785 std::lock_guard<win_mutex> lock(mutex_);
786 socket_wrapper_list_.remove(&impl);
787 }
788 delete &impl;
789 }
790
791 inline void
792 win_local_stream_service::unregister_impl(
793 win_local_stream_socket_internal& impl)
794 {
795 std::lock_guard<win_mutex> lock(mutex_);
796 socket_list_.remove(&impl);
797 }
798
799 inline std::error_code
800 win_local_stream_service::open_socket(
801 local_stream_socket::implementation& impl,
802 int family, int type, int protocol)
803 {
804 auto& wrapper = static_cast<win_local_stream_socket&>(impl);
805 return open_socket_internal(*wrapper.get_internal(), family, type, protocol);
806 }
807
808 inline std::error_code
809 win_local_stream_service::assign_socket(
810 local_stream_socket::implementation& /*impl*/, native_handle_type /*fd*/)
811 {
812 // socketpair / assign is POSIX-only
813 return std::make_error_code(std::errc::operation_not_supported);
814 }
815
816 inline std::error_code
817 win_local_stream_service::open_socket_internal(
818 win_local_stream_socket_internal& impl,
819 int family, int type, int protocol)
820 {
821 impl.close_socket();
822
823 SOCKET sock =
824 ::WSASocketW(family, type, protocol, nullptr, 0, WSA_FLAG_OVERLAPPED);
825
826 if (sock == INVALID_SOCKET)
827 return make_err(::WSAGetLastError());
828
829 // No IPV6_V6ONLY for AF_UNIX
830
831 HANDLE result = ::CreateIoCompletionPort(
832 reinterpret_cast<HANDLE>(sock), static_cast<HANDLE>(iocp_), key_io, 0);
833
834 if (result == nullptr)
835 {
836 DWORD dwError = ::GetLastError();
837 ::closesocket(sock);
838 return make_err(dwError);
839 }
840
841 impl.socket_ = sock;
842 return {};
843 }
844
845 inline void*
846 win_local_stream_service::native_handle() const noexcept
847 {
848 return iocp_;
849 }
850
851 inline LPFN_CONNECTEX
852 win_local_stream_service::connect_ex() const noexcept
853 {
854 return tcp_svc_.connect_ex();
855 }
856
857 inline LPFN_ACCEPTEX
858 win_local_stream_service::accept_ex() const noexcept
859 {
860 return tcp_svc_.accept_ex();
861 }
862
863 inline void
864 win_local_stream_service::post(overlapped_op* op)
865 {
866 sched_.post(op);
867 }
868
869 inline void
870 win_local_stream_service::on_pending(overlapped_op* op) noexcept
871 {
872 sched_.on_pending(op);
873 }
874
875 inline void
876 win_local_stream_service::on_completion(
877 overlapped_op* op, DWORD error, DWORD bytes) noexcept
878 {
879 sched_.on_completion(op, error, bytes);
880 }
881
882 inline void
883 win_local_stream_service::work_started() noexcept
884 {
885 sched_.work_started();
886 }
887
888 inline void
889 win_local_stream_service::work_finished() noexcept
890 {
891 sched_.work_finished();
892 }
893
894 inline void
895 win_local_stream_service::destroy_acceptor_impl(
896 win_local_stream_acceptor& impl)
897 {
898 {
899 std::lock_guard<win_mutex> lock(mutex_);
900 acceptor_wrapper_list_.remove(&impl);
901 }
902 delete &impl;
903 }
904
905 inline void
906 win_local_stream_service::unregister_acceptor_impl(
907 win_local_stream_acceptor_internal& impl)
908 {
909 std::lock_guard<win_mutex> lock(mutex_);
910 acceptor_list_.remove(&impl);
911 }
912
913 inline std::error_code
914 win_local_stream_service::open_acceptor_socket(
915 win_local_stream_acceptor_internal& impl,
916 int family, int type, int protocol)
917 {
918 impl.close_socket();
919
920 SOCKET sock =
921 ::WSASocketW(family, type, protocol, nullptr, 0, WSA_FLAG_OVERLAPPED);
922
923 if (sock == INVALID_SOCKET)
924 return make_err(::WSAGetLastError());
925
926 HANDLE result = ::CreateIoCompletionPort(
927 reinterpret_cast<HANDLE>(sock), static_cast<HANDLE>(iocp_), key_io, 0);
928
929 if (result == nullptr)
930 {
931 DWORD dwError = ::GetLastError();
932 ::closesocket(sock);
933 return make_err(dwError);
934 }
935
936 impl.socket_ = sock;
937 return {};
938 }
939
940 inline std::error_code
941 win_local_stream_service::bind_acceptor(
942 win_local_stream_acceptor_internal& impl,
943 corosio::local_endpoint ep)
944 {
945 // Reject abstract sockets on Windows
946 if (ep.is_abstract())
947 return std::make_error_code(std::errc::operation_not_supported);
948
949 SOCKET sock = impl.socket_;
950
951 sockaddr_storage storage{};
952 socklen_t addrlen = detail::to_sockaddr(ep, storage);
953 if (::bind(
954 sock, reinterpret_cast<sockaddr*>(&storage),
955 static_cast<int>(addrlen)) == SOCKET_ERROR)
956 return make_err(::WSAGetLastError());
957
958 impl.set_local_endpoint(ep);
959 return {};
960 }
961
962 inline std::error_code
963 win_local_stream_service::listen_acceptor(
964 win_local_stream_acceptor_internal& impl, int backlog)
965 {
966 SOCKET sock = impl.socket_;
967
968 if (::listen(sock, backlog) == SOCKET_ERROR)
969 return make_err(::WSAGetLastError());
970
971 return {};
972 }
973
974 } // namespace boost::corosio::detail
975
976 #endif // BOOST_COROSIO_HAS_IOCP
977
978 #endif // BOOST_COROSIO_NATIVE_DETAIL_IOCP_WIN_LOCAL_STREAM_SERVICE_HPP
979