include/boost/corosio/native/detail/iocp/win_local_dgram_service.hpp

2.6% Lines (12/465) 4.9% List of functions (3/61) 2.3% Branches (4/174)
win_local_dgram_service.hpp
f(x) Functions (61)
Function Calls Lines Branches Blocks
boost::corosio::detail::local_dgram_to_native_msg_flags(int) :38 0 0.0% 0.0% 0.0% boost::corosio::detail::local_dgram_send_to_op::local_dgram_send_to_op(boost::corosio::detail::win_local_dgram_socket_internal&) :110 0 0.0% 0.0% boost::corosio::detail::local_dgram_recv_from_op::local_dgram_recv_from_op(boost::corosio::detail::win_local_dgram_socket_internal&) :118 0 0.0% 0.0% boost::corosio::detail::local_dgram_connect_op::local_dgram_connect_op(boost::corosio::detail::win_local_dgram_socket_internal&) :126 0 0.0% 0.0% boost::corosio::detail::local_dgram_send_op::local_dgram_send_op(boost::corosio::detail::win_local_dgram_socket_internal&) :134 0 0.0% 0.0% boost::corosio::detail::local_dgram_recv_op::local_dgram_recv_op(boost::corosio::detail::win_local_dgram_socket_internal&) :142 0 0.0% 0.0% boost::corosio::detail::local_dgram_send_to_op::do_cancel_impl(boost::corosio::detail::overlapped_op*) :155 0 0.0% 0.0% 0.0% boost::corosio::detail::local_dgram_recv_from_op::do_cancel_impl(boost::corosio::detail::overlapped_op*) :167 0 0.0% 0.0% 0.0% boost::corosio::detail::local_dgram_connect_op::do_cancel_impl(boost::corosio::detail::overlapped_op*) :179 0 0.0% 0.0% boost::corosio::detail::local_dgram_send_op::do_cancel_impl(boost::corosio::detail::overlapped_op*) :186 0 0.0% 0.0% 0.0% boost::corosio::detail::local_dgram_recv_op::do_cancel_impl(boost::corosio::detail::overlapped_op*) :198 0 0.0% 0.0% 0.0% boost::corosio::detail::local_dgram_send_to_op::do_complete(void*, boost::corosio::detail::scheduler_op*, unsigned int, unsigned int) :214 0 0.0% 0.0% 0.0% boost::corosio::detail::local_dgram_recv_from_op::do_complete(void*, boost::corosio::detail::scheduler_op*, unsigned int, unsigned int) :232 0 0.0% 0.0% 0.0% boost::corosio::detail::local_dgram_connect_op::do_complete(void*, boost::corosio::detail::scheduler_op*, unsigned int, unsigned int) :259 0 0.0% 0.0% 0.0% boost::corosio::detail::local_dgram_send_op::do_complete(void*, boost::corosio::detail::scheduler_op*, unsigned int, unsigned int) :292 0 0.0% 0.0% 0.0% boost::corosio::detail::local_dgram_recv_op::do_complete(void*, boost::corosio::detail::scheduler_op*, unsigned int, unsigned int) :310 0 0.0% 0.0% 0.0% boost::corosio::detail::win_local_dgram_socket_internal::win_local_dgram_socket_internal(boost::corosio::detail::win_local_dgram_service&) :331 0 0.0% 0.0% boost::corosio::detail::win_local_dgram_socket_internal::~win_local_dgram_socket_internal() :342 0 0.0% 0.0% boost::corosio::detail::win_local_dgram_socket_internal::native_handle() const :348 0 0.0% 0.0% boost::corosio::detail::win_local_dgram_socket_internal::local_endpoint() const :354 0 0.0% 0.0% boost::corosio::detail::win_local_dgram_socket_internal::remote_endpoint() const :360 0 0.0% 0.0% boost::corosio::detail::win_local_dgram_socket_internal::is_open() const :366 0 0.0% 0.0% boost::corosio::detail::win_local_dgram_socket_internal::send_to(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::buffer_param, boost::corosio::local_endpoint, int, std::stop_token, std::error_code*, unsigned long long*) :372 0 0.0% 0.0% 0.0% boost::corosio::detail::win_local_dgram_socket_internal::recv_from(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::buffer_param, boost::corosio::local_endpoint*, int, std::stop_token, std::error_code*, unsigned long long*) :431 0 0.0% 0.0% 0.0% boost::corosio::detail::win_local_dgram_socket_internal::connect(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::local_endpoint, std::stop_token, std::error_code*) :500 0 0.0% 0.0% 0.0% boost::corosio::detail::win_local_dgram_socket_internal::send(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::buffer_param, int, std::stop_token, std::error_code*, unsigned long long*) :534 0 0.0% 0.0% 0.0% boost::corosio::detail::win_local_dgram_socket_internal::recv(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::buffer_param, int, std::stop_token, std::error_code*, unsigned long long*) :588 0 0.0% 0.0% 0.0% boost::corosio::detail::win_local_dgram_socket_internal::cancel() :650 0 0.0% 0.0% 0.0% boost::corosio::detail::win_local_dgram_socket_internal::close_socket() :665 0 0.0% 0.0% 0.0% boost::corosio::detail::win_local_dgram_socket::win_local_dgram_socket(std::shared_ptr<boost::corosio::detail::win_local_dgram_socket_internal>) :682 0 0.0% 0.0% boost::corosio::detail::win_local_dgram_socket::close_internal() :689 0 0.0% 0.0% 0.0% boost::corosio::detail::win_local_dgram_socket::send_to(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::buffer_param, boost::corosio::local_endpoint, int, std::stop_token, std::error_code*, unsigned long long*) :699 0 0.0% 0.0% 0.0% boost::corosio::detail::win_local_dgram_socket::recv_from(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::buffer_param, boost::corosio::local_endpoint*, int, std::stop_token, std::error_code*, unsigned long long*) :708 0 0.0% 0.0% 0.0% boost::corosio::detail::win_local_dgram_socket::connect(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::local_endpoint, std::stop_token, std::error_code*) :717 0 0.0% 0.0% 0.0% boost::corosio::detail::win_local_dgram_socket::send(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::buffer_param, int, std::stop_token, std::error_code*, unsigned long long*) :725 0 0.0% 0.0% 0.0% boost::corosio::detail::win_local_dgram_socket::recv(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::buffer_param, int, std::stop_token, std::error_code*, unsigned long long*) :734 0 0.0% 0.0% 0.0% boost::corosio::detail::win_local_dgram_socket::bind(boost::corosio::local_endpoint) :743 0 0.0% 0.0% 0.0% boost::corosio::detail::win_local_dgram_socket::shutdown(boost::corosio::shutdown_type) :762 0 0.0% 0.0% 0.0% boost::corosio::detail::win_local_dgram_socket::native_handle() const :786 0 0.0% 0.0% boost::corosio::detail::win_local_dgram_socket::release_socket() :792 0 0.0% 0.0% 0.0% boost::corosio::detail::win_local_dgram_socket::set_option(int, int, void const*, unsigned long long) :806 0 0.0% 0.0% 0.0% boost::corosio::detail::win_local_dgram_socket::get_option(int, int, void*, unsigned long long*) const :817 0 0.0% 0.0% 0.0% boost::corosio::detail::win_local_dgram_socket::local_endpoint() const :830 0 0.0% 0.0% boost::corosio::detail::win_local_dgram_socket::remote_endpoint() const :836 0 0.0% 0.0% boost::corosio::detail::win_local_dgram_socket::cancel() :842 0 0.0% 0.0% boost::corosio::detail::win_local_dgram_socket::get_internal() const :848 0 0.0% 0.0% boost::corosio::detail::win_local_dgram_service::win_local_dgram_service(boost::capy::execution_context&) :857 442x 100.0% 100.0% 75.0% boost::corosio::detail::win_local_dgram_service::~win_local_dgram_service() :864 884x 100.0% 100.0% boost::corosio::detail::win_local_dgram_service::shutdown() :872 442x 66.7% 50.0% 71.4% boost::corosio::detail::win_local_dgram_service::construct() :884 0 0.0% 0.0% 0.0% boost::corosio::detail::win_local_dgram_service::destroy(boost::corosio::io_object::implementation*) :904 0 0.0% 0.0% 0.0% boost::corosio::detail::win_local_dgram_service::close(boost::corosio::io_object::handle&) :915 0 0.0% 0.0% boost::corosio::detail::win_local_dgram_service::destroy_impl(boost::corosio::detail::win_local_dgram_socket&) :922 0 0.0% 0.0% 0.0% boost::corosio::detail::win_local_dgram_service::unregister_impl(boost::corosio::detail::win_local_dgram_socket_internal&) :932 0 0.0% 0.0% boost::corosio::detail::win_local_dgram_service::open_socket(boost::corosio::local_datagram_socket::implementation&, int, int, int) :940 0 0.0% 0.0% boost::corosio::detail::win_local_dgram_service::assign_socket(boost::corosio::local_datagram_socket::implementation&, unsigned long long) :949 0 0.0% 0.0% boost::corosio::detail::win_local_dgram_service::bind_socket(boost::corosio::local_datagram_socket::implementation&, boost::corosio::local_endpoint) :956 0 0.0% 0.0% 0.0% boost::corosio::detail::win_local_dgram_service::open_socket_internal(boost::corosio::detail::win_local_dgram_socket_internal&, int, int, int) :980 0 0.0% 0.0% 0.0% boost::corosio::detail::win_local_dgram_service::on_pending(boost::corosio::detail::overlapped_op*) :1013 0 0.0% 0.0% boost::corosio::detail::win_local_dgram_service::on_completion(boost::corosio::detail::overlapped_op*, unsigned long, unsigned long) :1019 0 0.0% 0.0% boost::corosio::detail::win_local_dgram_service::work_started() :1026 0 0.0% 0.0%
Line Branch TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Michael Vandeberg
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_IOCP_WIN_LOCAL_DGRAM_SERVICE_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_IOCP_WIN_LOCAL_DGRAM_SERVICE_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_IOCP
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/corosio/detail/local_datagram_service.hpp>
19
20 #include <boost/corosio/native/detail/iocp/win_local_dgram_socket.hpp>
21 #include <boost/corosio/native/detail/iocp/win_scheduler.hpp>
22 #include <boost/corosio/native/detail/iocp/win_completion_key.hpp>
23 #include <boost/corosio/native/detail/iocp/win_mutex.hpp>
24 #include <boost/corosio/native/detail/iocp/win_wsa_init.hpp>
25
26 #include <boost/corosio/native/detail/endpoint_convert.hpp>
27 #include <boost/corosio/native/detail/make_err.hpp>
28 #include <boost/corosio/detail/dispatch_coro.hpp>
29
30 #include <cstring>
31
32 #include <Ws2tcpip.h>
33
34 namespace boost::corosio::detail {
35
36 /* Map portable message_flags values to native MSG_* constants. */
37 inline DWORD
38 local_dgram_to_native_msg_flags(int flags) noexcept
39 {
40 DWORD native = 0;
41 if (flags & 1) native |= MSG_PEEK;
42 if (flags & 2) native |= MSG_OOB;
43 if (flags & 4) native |= MSG_DONTROUTE;
44 return native;
45 }
46
47 /* IOCP local datagram service.
48
49 Inherits from local_datagram_service to enable runtime polymorphism
50 via use_service<local_datagram_service>().
51 */
52 class BOOST_COROSIO_DECL win_local_dgram_service final
53 : private win_wsa_init
54 , public local_datagram_service
55 {
56 public:
57 io_object::implementation* construct() override;
58
59 void destroy(io_object::implementation* p) override;
60
61 void close(io_object::handle& h) override;
62
63 explicit win_local_dgram_service(capy::execution_context& ctx);
64
65 ~win_local_dgram_service();
66
67 win_local_dgram_service(win_local_dgram_service const&) = delete;
68 win_local_dgram_service& operator=(win_local_dgram_service const&) = delete;
69
70 void shutdown() override;
71
72 std::error_code open_socket(
73 local_datagram_socket::implementation& impl,
74 int family, int type, int protocol) override;
75
76 std::error_code assign_socket(
77 local_datagram_socket::implementation& impl,
78 native_handle_type fd) override;
79
80 std::error_code bind_socket(
81 local_datagram_socket::implementation& impl,
82 corosio::local_endpoint ep) override;
83
84 void destroy_impl(win_local_dgram_socket& impl);
85
86 void unregister_impl(win_local_dgram_socket_internal& impl);
87
88 std::error_code open_socket_internal(
89 win_local_dgram_socket_internal& impl,
90 int family, int type, int protocol);
91
92 void post(overlapped_op* op);
93 void on_pending(overlapped_op* op) noexcept;
94 void on_completion(overlapped_op* op, DWORD error, DWORD bytes) noexcept;
95 void work_started() noexcept;
96 void work_finished() noexcept;
97
98 private:
99 win_scheduler& sched_;
100 win_mutex mutex_;
101 intrusive_list<win_local_dgram_socket_internal> socket_list_;
102 intrusive_list<win_local_dgram_socket> wrapper_list_;
103 void* iocp_;
104 };
105
106 // ============================================================
107 // Operation constructors
108 // ============================================================
109
110 inline local_dgram_send_to_op::local_dgram_send_to_op(
111 win_local_dgram_socket_internal& internal_) noexcept
112 : overlapped_op(&do_complete)
113 , internal(internal_)
114 {
115 cancel_func_ = &do_cancel_impl;
116 }
117
118 inline local_dgram_recv_from_op::local_dgram_recv_from_op(
119 win_local_dgram_socket_internal& internal_) noexcept
120 : overlapped_op(&do_complete)
121 , internal(internal_)
122 {
123 cancel_func_ = &do_cancel_impl;
124 }
125
126 inline local_dgram_connect_op::local_dgram_connect_op(
127 win_local_dgram_socket_internal& internal_) noexcept
128 : overlapped_op(&do_complete)
129 , internal(internal_)
130 {
131 cancel_func_ = &do_cancel_impl;
132 }
133
134 inline local_dgram_send_op::local_dgram_send_op(
135 win_local_dgram_socket_internal& internal_) noexcept
136 : overlapped_op(&do_complete)
137 , internal(internal_)
138 {
139 cancel_func_ = &do_cancel_impl;
140 }
141
142 inline local_dgram_recv_op::local_dgram_recv_op(
143 win_local_dgram_socket_internal& internal_) noexcept
144 : overlapped_op(&do_complete)
145 , internal(internal_)
146 {
147 cancel_func_ = &do_cancel_impl;
148 }
149
150 // ============================================================
151 // Cancellation functions
152 // ============================================================
153
154 inline void
155 local_dgram_send_to_op::do_cancel_impl(overlapped_op* base) noexcept
156 {
157 auto* op = static_cast<local_dgram_send_to_op*>(base);
158 op->cancelled.store(true, std::memory_order_release);
159 if (op->internal.is_open())
160 {
161 ::CancelIoEx(
162 reinterpret_cast<HANDLE>(op->internal.native_handle()), op);
163 }
164 }
165
166 inline void
167 local_dgram_recv_from_op::do_cancel_impl(overlapped_op* base) noexcept
168 {
169 auto* op = static_cast<local_dgram_recv_from_op*>(base);
170 op->cancelled.store(true, std::memory_order_release);
171 if (op->internal.is_open())
172 {
173 ::CancelIoEx(
174 reinterpret_cast<HANDLE>(op->internal.native_handle()), op);
175 }
176 }
177
178 inline void
179 local_dgram_connect_op::do_cancel_impl(overlapped_op* base) noexcept
180 {
181 auto* op = static_cast<local_dgram_connect_op*>(base);
182 op->cancelled.store(true, std::memory_order_release);
183 }
184
185 inline void
186 local_dgram_send_op::do_cancel_impl(overlapped_op* base) noexcept
187 {
188 auto* op = static_cast<local_dgram_send_op*>(base);
189 op->cancelled.store(true, std::memory_order_release);
190 if (op->internal.is_open())
191 {
192 ::CancelIoEx(
193 reinterpret_cast<HANDLE>(op->internal.native_handle()), op);
194 }
195 }
196
197 inline void
198 local_dgram_recv_op::do_cancel_impl(overlapped_op* base) noexcept
199 {
200 auto* op = static_cast<local_dgram_recv_op*>(base);
201 op->cancelled.store(true, std::memory_order_release);
202 if (op->internal.is_open())
203 {
204 ::CancelIoEx(
205 reinterpret_cast<HANDLE>(op->internal.native_handle()), op);
206 }
207 }
208
209 // ============================================================
210 // Completion handlers
211 // ============================================================
212
213 inline void
214 local_dgram_send_to_op::do_complete(
215 void* owner,
216 scheduler_op* base,
217 std::uint32_t /*bytes*/,
218 std::uint32_t /*error*/)
219 {
220 auto* op = static_cast<local_dgram_send_to_op*>(base);
221 if (!owner)
222 {
223 op->cleanup_only();
224 op->internal_ptr.reset();
225 return;
226 }
227 auto prevent_premature_destruction = std::move(op->internal_ptr);
228 op->invoke_handler();
229 }
230
231 inline void
232 local_dgram_recv_from_op::do_complete(
233 void* owner,
234 scheduler_op* base,
235 std::uint32_t /*bytes*/,
236 std::uint32_t /*error*/)
237 {
238 auto* op = static_cast<local_dgram_recv_from_op*>(base);
239 if (!owner)
240 {
241 op->cleanup_only();
242 op->internal_ptr.reset();
243 return;
244 }
245
246 bool success =
247 (op->dwError == 0 && !op->cancelled.load(std::memory_order_acquire));
248 if (success && op->source_out)
249 {
250 *op->source_out = from_sockaddr_local(
251 op->source_storage, static_cast<socklen_t>(op->source_len));
252 }
253
254 auto prevent_premature_destruction = std::move(op->internal_ptr);
255 op->invoke_handler();
256 }
257
258 inline void
259 local_dgram_connect_op::do_complete(
260 void* owner,
261 scheduler_op* base,
262 std::uint32_t /*bytes*/,
263 std::uint32_t /*error*/)
264 {
265 auto* op = static_cast<local_dgram_connect_op*>(base);
266 if (!owner)
267 {
268 op->cleanup_only();
269 op->internal_ptr.reset();
270 return;
271 }
272
273 bool success =
274 (op->dwError == 0 && !op->cancelled.load(std::memory_order_acquire));
275 if (success)
276 {
277 sockaddr_storage local_storage{};
278 int local_len = sizeof(local_storage);
279 if (::getsockname(
280 op->internal.socket_,
281 reinterpret_cast<sockaddr*>(&local_storage), &local_len) == 0)
282 op->internal.local_endpoint_ = from_sockaddr_local(
283 local_storage, static_cast<socklen_t>(local_len));
284 op->internal.remote_endpoint_ = op->target_endpoint;
285 }
286
287 auto prevent_premature_destruction = std::move(op->internal_ptr);
288 op->invoke_handler();
289 }
290
291 inline void
292 local_dgram_send_op::do_complete(
293 void* owner,
294 scheduler_op* base,
295 std::uint32_t /*bytes*/,
296 std::uint32_t /*error*/)
297 {
298 auto* op = static_cast<local_dgram_send_op*>(base);
299 if (!owner)
300 {
301 op->cleanup_only();
302 op->internal_ptr.reset();
303 return;
304 }
305 auto prevent_premature_destruction = std::move(op->internal_ptr);
306 op->invoke_handler();
307 }
308
309 inline void
310 local_dgram_recv_op::do_complete(
311 void* owner,
312 scheduler_op* base,
313 std::uint32_t /*bytes*/,
314 std::uint32_t /*error*/)
315 {
316 auto* op = static_cast<local_dgram_recv_op*>(base);
317 if (!owner)
318 {
319 op->cleanup_only();
320 op->internal_ptr.reset();
321 return;
322 }
323 auto prevent_premature_destruction = std::move(op->internal_ptr);
324 op->invoke_handler();
325 }
326
327 // ============================================================
328 // win_local_dgram_socket_internal
329 // ============================================================
330
331 inline win_local_dgram_socket_internal::win_local_dgram_socket_internal(
332 win_local_dgram_service& svc) noexcept
333 : svc_(svc)
334 , wr_(*this)
335 , rd_(*this)
336 , conn_(*this)
337 , send_wr_(*this)
338 , recv_rd_(*this)
339 {
340 }
341
342 inline win_local_dgram_socket_internal::~win_local_dgram_socket_internal()
343 {
344 svc_.unregister_impl(*this);
345 }
346
347 inline SOCKET
348 win_local_dgram_socket_internal::native_handle() const noexcept
349 {
350 return socket_;
351 }
352
353 inline corosio::local_endpoint
354 win_local_dgram_socket_internal::local_endpoint() const noexcept
355 {
356 return local_endpoint_;
357 }
358
359 inline corosio::local_endpoint
360 win_local_dgram_socket_internal::remote_endpoint() const noexcept
361 {
362 return remote_endpoint_;
363 }
364
365 inline bool
366 win_local_dgram_socket_internal::is_open() const noexcept
367 {
368 return socket_ != INVALID_SOCKET;
369 }
370
371 inline std::coroutine_handle<>
372 win_local_dgram_socket_internal::send_to(
373 std::coroutine_handle<> h,
374 capy::executor_ref d,
375 buffer_param param,
376 corosio::local_endpoint dest,
377 int flags,
378 std::stop_token token,
379 std::error_code* ec,
380 std::size_t* bytes_out)
381 {
382 wr_.internal_ptr = shared_from_this();
383
384 auto& op = wr_;
385 op.reset();
386 op.h = h;
387 op.ex = d;
388 op.ec_out = ec;
389 op.bytes_out = bytes_out;
390 op.start(token);
391
392 svc_.work_started();
393
394 capy::mutable_buffer bufs[local_dgram_send_to_op::max_buffers];
395 op.wsabuf_count =
396 static_cast<DWORD>(param.copy_to(bufs, local_dgram_send_to_op::max_buffers));
397
398 for (DWORD i = 0; i < op.wsabuf_count; ++i)
399 {
400 op.wsabufs[i].buf = static_cast<char*>(bufs[i].data());
401 op.wsabufs[i].len = static_cast<ULONG>(bufs[i].size());
402 }
403
404 op.dest_len = static_cast<int>(to_sockaddr(dest, op.dest_storage));
405
406 int result = ::WSASendTo(
407 socket_, op.wsabufs, op.wsabuf_count, nullptr,
408 local_dgram_to_native_msg_flags(flags),
409 reinterpret_cast<sockaddr*>(&op.dest_storage), op.dest_len, &op,
410 nullptr);
411
412 if (result == SOCKET_ERROR)
413 {
414 DWORD err = ::WSAGetLastError();
415 if (err != WSA_IO_PENDING)
416 {
417 svc_.on_completion(&op, err, 0);
418 return std::noop_coroutine();
419 }
420 }
421
422 svc_.on_pending(&op);
423
424 if (op.cancelled.load(std::memory_order_acquire))
425 ::CancelIoEx(reinterpret_cast<HANDLE>(socket_), &op);
426
427 return std::noop_coroutine();
428 }
429
430 inline std::coroutine_handle<>
431 win_local_dgram_socket_internal::recv_from(
432 std::coroutine_handle<> h,
433 capy::executor_ref d,
434 buffer_param param,
435 corosio::local_endpoint* source,
436 int flags,
437 std::stop_token token,
438 std::error_code* ec,
439 std::size_t* bytes_out)
440 {
441 rd_.internal_ptr = shared_from_this();
442
443 auto& op = rd_;
444 op.reset();
445 op.h = h;
446 op.ex = d;
447 op.ec_out = ec;
448 op.bytes_out = bytes_out;
449 op.source_out = source;
450 op.start(token);
451
452 svc_.work_started();
453
454 capy::mutable_buffer bufs[local_dgram_recv_from_op::max_buffers];
455 op.wsabuf_count =
456 static_cast<DWORD>(param.copy_to(bufs, local_dgram_recv_from_op::max_buffers));
457
458 if (op.wsabuf_count == 0 || (op.wsabuf_count == 1 && bufs[0].size() == 0))
459 {
460 op.empty_buffer = true;
461 svc_.on_completion(&op, 0, 0);
462 return std::noop_coroutine();
463 }
464
465 for (DWORD i = 0; i < op.wsabuf_count; ++i)
466 {
467 op.wsabufs[i].buf = static_cast<char*>(bufs[i].data());
468 op.wsabufs[i].len = static_cast<ULONG>(bufs[i].size());
469 }
470
471 op.flags = local_dgram_to_native_msg_flags(flags);
472 std::memset(&op.source_storage, 0, sizeof(op.source_storage));
473 op.source_len = sizeof(op.source_storage);
474
475 int result = ::WSARecvFrom(
476 socket_, op.wsabufs, op.wsabuf_count, nullptr, &op.flags,
477 reinterpret_cast<sockaddr*>(&op.source_storage), &op.source_len, &op,
478 nullptr);
479
480 if (result == SOCKET_ERROR)
481 {
482 DWORD err = ::WSAGetLastError();
483 if (err != WSA_IO_PENDING)
484 {
485 svc_.on_completion(&op, err, 0);
486 return std::noop_coroutine();
487 }
488 }
489
490 svc_.on_pending(&op);
491
492 if (op.cancelled.load(std::memory_order_acquire))
493 ::CancelIoEx(reinterpret_cast<HANDLE>(socket_), &op);
494
495 return std::noop_coroutine();
496 }
497
498 // Datagram connect is synchronous on Windows
499 inline std::coroutine_handle<>
500 win_local_dgram_socket_internal::connect(
501 std::coroutine_handle<> h,
502 capy::executor_ref d,
503 corosio::local_endpoint ep,
504 std::stop_token token,
505 std::error_code* ec)
506 {
507 conn_.internal_ptr = shared_from_this();
508
509 auto& op = conn_;
510 op.reset();
511 op.h = h;
512 op.ex = d;
513 op.ec_out = ec;
514 op.target_endpoint = ep;
515 op.start(token);
516
517 svc_.work_started();
518
519 sockaddr_storage storage{};
520 socklen_t addrlen = detail::to_sockaddr(ep, storage);
521 int result = ::WSAConnect(
522 socket_, reinterpret_cast<sockaddr*>(&storage),
523 static_cast<int>(addrlen), nullptr, nullptr, nullptr, nullptr);
524
525 if (result == SOCKET_ERROR)
526 svc_.on_completion(&op, ::WSAGetLastError(), 0);
527 else
528 svc_.on_completion(&op, 0, 0);
529
530 return std::noop_coroutine();
531 }
532
533 inline std::coroutine_handle<>
534 win_local_dgram_socket_internal::send(
535 std::coroutine_handle<> h,
536 capy::executor_ref d,
537 buffer_param param,
538 int flags,
539 std::stop_token token,
540 std::error_code* ec,
541 std::size_t* bytes_out)
542 {
543 send_wr_.internal_ptr = shared_from_this();
544
545 auto& op = send_wr_;
546 op.reset();
547 op.h = h;
548 op.ex = d;
549 op.ec_out = ec;
550 op.bytes_out = bytes_out;
551 op.start(token);
552
553 svc_.work_started();
554
555 capy::mutable_buffer bufs[local_dgram_send_op::max_buffers];
556 op.wsabuf_count =
557 static_cast<DWORD>(param.copy_to(bufs, local_dgram_send_op::max_buffers));
558
559 for (DWORD i = 0; i < op.wsabuf_count; ++i)
560 {
561 op.wsabufs[i].buf = static_cast<char*>(bufs[i].data());
562 op.wsabufs[i].len = static_cast<ULONG>(bufs[i].size());
563 }
564
565 int result = ::WSASend(
566 socket_, op.wsabufs, op.wsabuf_count, nullptr,
567 local_dgram_to_native_msg_flags(flags), &op, nullptr);
568
569 if (result == SOCKET_ERROR)
570 {
571 DWORD err = ::WSAGetLastError();
572 if (err != WSA_IO_PENDING)
573 {
574 svc_.on_completion(&op, err, 0);
575 return std::noop_coroutine();
576 }
577 }
578
579 svc_.on_pending(&op);
580
581 if (op.cancelled.load(std::memory_order_acquire))
582 ::CancelIoEx(reinterpret_cast<HANDLE>(socket_), &op);
583
584 return std::noop_coroutine();
585 }
586
587 inline std::coroutine_handle<>
588 win_local_dgram_socket_internal::recv(
589 std::coroutine_handle<> h,
590 capy::executor_ref d,
591 buffer_param param,
592 int flags,
593 std::stop_token token,
594 std::error_code* ec,
595 std::size_t* bytes_out)
596 {
597 recv_rd_.internal_ptr = shared_from_this();
598
599 auto& op = recv_rd_;
600 op.reset();
601 op.h = h;
602 op.ex = d;
603 op.ec_out = ec;
604 op.bytes_out = bytes_out;
605 op.start(token);
606
607 svc_.work_started();
608
609 capy::mutable_buffer bufs[local_dgram_recv_op::max_buffers];
610 op.wsabuf_count =
611 static_cast<DWORD>(param.copy_to(bufs, local_dgram_recv_op::max_buffers));
612
613 if (op.wsabuf_count == 0 || (op.wsabuf_count == 1 && bufs[0].size() == 0))
614 {
615 op.empty_buffer = true;
616 svc_.on_completion(&op, 0, 0);
617 return std::noop_coroutine();
618 }
619
620 for (DWORD i = 0; i < op.wsabuf_count; ++i)
621 {
622 op.wsabufs[i].buf = static_cast<char*>(bufs[i].data());
623 op.wsabufs[i].len = static_cast<ULONG>(bufs[i].size());
624 }
625
626 op.flags = local_dgram_to_native_msg_flags(flags);
627
628 int result = ::WSARecv(
629 socket_, op.wsabufs, op.wsabuf_count, nullptr, &op.flags, &op, nullptr);
630
631 if (result == SOCKET_ERROR)
632 {
633 DWORD err = ::WSAGetLastError();
634 if (err != WSA_IO_PENDING)
635 {
636 svc_.on_completion(&op, err, 0);
637 return std::noop_coroutine();
638 }
639 }
640
641 svc_.on_pending(&op);
642
643 if (op.cancelled.load(std::memory_order_acquire))
644 ::CancelIoEx(reinterpret_cast<HANDLE>(socket_), &op);
645
646 return std::noop_coroutine();
647 }
648
649 inline void
650 win_local_dgram_socket_internal::cancel() noexcept
651 {
652 if (socket_ != INVALID_SOCKET)
653 {
654 ::CancelIoEx(reinterpret_cast<HANDLE>(socket_), nullptr);
655 }
656
657 wr_.request_cancel();
658 rd_.request_cancel();
659 conn_.request_cancel();
660 send_wr_.request_cancel();
661 recv_rd_.request_cancel();
662 }
663
664 inline void
665 win_local_dgram_socket_internal::close_socket() noexcept
666 {
667 if (socket_ != INVALID_SOCKET)
668 {
669 ::CancelIoEx(reinterpret_cast<HANDLE>(socket_), nullptr);
670 ::closesocket(socket_);
671 socket_ = INVALID_SOCKET;
672 }
673
674 local_endpoint_ = corosio::local_endpoint{};
675 remote_endpoint_ = corosio::local_endpoint{};
676 }
677
678 // ============================================================
679 // win_local_dgram_socket (wrapper)
680 // ============================================================
681
682 inline win_local_dgram_socket::win_local_dgram_socket(
683 std::shared_ptr<win_local_dgram_socket_internal> internal) noexcept
684 : internal_(std::move(internal))
685 {
686 }
687
688 inline void
689 win_local_dgram_socket::close_internal() noexcept
690 {
691 if (internal_)
692 {
693 internal_->close_socket();
694 internal_.reset();
695 }
696 }
697
698 inline std::coroutine_handle<>
699 win_local_dgram_socket::send_to(
700 std::coroutine_handle<> h, capy::executor_ref d,
701 buffer_param buf, corosio::local_endpoint dest, int flags,
702 std::stop_token token, std::error_code* ec, std::size_t* bytes)
703 {
704 return internal_->send_to(h, d, buf, dest, flags, token, ec, bytes);
705 }
706
707 inline std::coroutine_handle<>
708 win_local_dgram_socket::recv_from(
709 std::coroutine_handle<> h, capy::executor_ref d,
710 buffer_param buf, corosio::local_endpoint* source, int flags,
711 std::stop_token token, std::error_code* ec, std::size_t* bytes)
712 {
713 return internal_->recv_from(h, d, buf, source, flags, token, ec, bytes);
714 }
715
716 inline std::coroutine_handle<>
717 win_local_dgram_socket::connect(
718 std::coroutine_handle<> h, capy::executor_ref d,
719 corosio::local_endpoint ep, std::stop_token token, std::error_code* ec)
720 {
721 return internal_->connect(h, d, ep, token, ec);
722 }
723
724 inline std::coroutine_handle<>
725 win_local_dgram_socket::send(
726 std::coroutine_handle<> h, capy::executor_ref d,
727 buffer_param buf, int flags,
728 std::stop_token token, std::error_code* ec, std::size_t* bytes)
729 {
730 return internal_->send(h, d, buf, flags, token, ec, bytes);
731 }
732
733 inline std::coroutine_handle<>
734 win_local_dgram_socket::recv(
735 std::coroutine_handle<> h, capy::executor_ref d,
736 buffer_param buf, int flags,
737 std::stop_token token, std::error_code* ec, std::size_t* bytes)
738 {
739 return internal_->recv(h, d, buf, flags, token, ec, bytes);
740 }
741
742 inline std::error_code
743 win_local_dgram_socket::bind(corosio::local_endpoint ep) noexcept
744 {
745 if (ep.is_abstract())
746 return std::make_error_code(std::errc::operation_not_supported);
747
748 SOCKET sock = internal_->socket_;
749
750 sockaddr_storage storage{};
751 socklen_t addrlen = detail::to_sockaddr(ep, storage);
752 if (::bind(
753 sock, reinterpret_cast<sockaddr*>(&storage),
754 static_cast<int>(addrlen)) == SOCKET_ERROR)
755 return make_err(::WSAGetLastError());
756
757 internal_->local_endpoint_ = ep;
758 return {};
759 }
760
761 inline std::error_code
762 win_local_dgram_socket::shutdown(
763 local_datagram_socket::shutdown_type what) noexcept
764 {
765 int how;
766 switch (what)
767 {
768 case local_datagram_socket::shutdown_receive:
769 how = SD_RECEIVE;
770 break;
771 case local_datagram_socket::shutdown_send:
772 how = SD_SEND;
773 break;
774 case local_datagram_socket::shutdown_both:
775 how = SD_BOTH;
776 break;
777 default:
778 return make_err(WSAEINVAL);
779 }
780 if (::shutdown(internal_->native_handle(), how) != 0)
781 return make_err(WSAGetLastError());
782 return {};
783 }
784
785 inline native_handle_type
786 win_local_dgram_socket::native_handle() const noexcept
787 {
788 return static_cast<native_handle_type>(internal_->native_handle());
789 }
790
791 inline native_handle_type
792 win_local_dgram_socket::release_socket() noexcept
793 {
794 SOCKET s = internal_->socket_;
795 if (s != INVALID_SOCKET)
796 {
797 internal_->cancel();
798 internal_->socket_ = INVALID_SOCKET;
799 internal_->local_endpoint_ = corosio::local_endpoint{};
800 internal_->remote_endpoint_ = corosio::local_endpoint{};
801 }
802 return static_cast<native_handle_type>(s);
803 }
804
805 inline std::error_code
806 win_local_dgram_socket::set_option(
807 int level, int optname, void const* data, std::size_t size) noexcept
808 {
809 if (::setsockopt(
810 internal_->native_handle(), level, optname,
811 reinterpret_cast<char const*>(data), static_cast<int>(size)) != 0)
812 return make_err(WSAGetLastError());
813 return {};
814 }
815
816 inline std::error_code
817 win_local_dgram_socket::get_option(
818 int level, int optname, void* data, std::size_t* size) const noexcept
819 {
820 int len = static_cast<int>(*size);
821 if (::getsockopt(
822 internal_->native_handle(), level, optname,
823 reinterpret_cast<char*>(data), &len) != 0)
824 return make_err(WSAGetLastError());
825 *size = static_cast<std::size_t>(len);
826 return {};
827 }
828
829 inline corosio::local_endpoint
830 win_local_dgram_socket::local_endpoint() const noexcept
831 {
832 return internal_->local_endpoint();
833 }
834
835 inline corosio::local_endpoint
836 win_local_dgram_socket::remote_endpoint() const noexcept
837 {
838 return internal_->remote_endpoint();
839 }
840
841 inline void
842 win_local_dgram_socket::cancel() noexcept
843 {
844 internal_->cancel();
845 }
846
847 inline win_local_dgram_socket_internal*
848 win_local_dgram_socket::get_internal() const noexcept
849 {
850 return internal_.get();
851 }
852
853 // ============================================================
854 // win_local_dgram_service
855 // ============================================================
856
857 442x inline win_local_dgram_service::win_local_dgram_service(
858 442x capy::execution_context& ctx)
859 884x : sched_(ctx.use_service<win_scheduler>())
860
2/2
✓ Branch 4 → 5 taken 442 times.
✓ Branch 5 → 6 taken 442 times.
442x , iocp_(sched_.native_handle())
861 {
862 442x }
863
864 884x inline win_local_dgram_service::~win_local_dgram_service()
865 {
866
1/2
✗ Branch 6 → 3 not taken.
✓ Branch 6 → 7 taken 442 times.
442x for (auto* w = wrapper_list_.pop_front(); w != nullptr;
867 w = wrapper_list_.pop_front())
868 delete w;
869 884x }
870
871 inline void
872 442x win_local_dgram_service::shutdown()
873 {
874 442x std::lock_guard<win_mutex> lock(mutex_);
875
876
1/2
✗ Branch 6 → 4 not taken.
✓ Branch 6 → 7 taken 442 times.
442x for (auto* impl = socket_list_.pop_front(); impl != nullptr;
877 impl = socket_list_.pop_front())
878 {
879 impl->close_socket();
880 }
881 442x }
882
883 inline io_object::implementation*
884 win_local_dgram_service::construct()
885 {
886 auto internal = std::make_shared<win_local_dgram_socket_internal>(*this);
887
888 {
889 std::lock_guard<win_mutex> lock(mutex_);
890 socket_list_.push_back(internal.get());
891 }
892
893 auto* wrapper = new win_local_dgram_socket(std::move(internal));
894
895 {
896 std::lock_guard<win_mutex> lock(mutex_);
897 wrapper_list_.push_back(wrapper);
898 }
899
900 return wrapper;
901 }
902
903 inline void
904 win_local_dgram_service::destroy(io_object::implementation* p)
905 {
906 if (p)
907 {
908 auto& wrapper = static_cast<win_local_dgram_socket&>(*p);
909 wrapper.close_internal();
910 destroy_impl(wrapper);
911 }
912 }
913
914 inline void
915 win_local_dgram_service::close(io_object::handle& h)
916 {
917 auto& wrapper = static_cast<win_local_dgram_socket&>(*h.get());
918 wrapper.get_internal()->close_socket();
919 }
920
921 inline void
922 win_local_dgram_service::destroy_impl(win_local_dgram_socket& impl)
923 {
924 {
925 std::lock_guard<win_mutex> lock(mutex_);
926 wrapper_list_.remove(&impl);
927 }
928 delete &impl;
929 }
930
931 inline void
932 win_local_dgram_service::unregister_impl(
933 win_local_dgram_socket_internal& impl)
934 {
935 std::lock_guard<win_mutex> lock(mutex_);
936 socket_list_.remove(&impl);
937 }
938
939 inline std::error_code
940 win_local_dgram_service::open_socket(
941 local_datagram_socket::implementation& impl,
942 int family, int type, int protocol)
943 {
944 auto& wrapper = static_cast<win_local_dgram_socket&>(impl);
945 return open_socket_internal(*wrapper.get_internal(), family, type, protocol);
946 }
947
948 inline std::error_code
949 win_local_dgram_service::assign_socket(
950 local_datagram_socket::implementation& /*impl*/, native_handle_type /*fd*/)
951 {
952 return std::make_error_code(std::errc::operation_not_supported);
953 }
954
955 inline std::error_code
956 win_local_dgram_service::bind_socket(
957 local_datagram_socket::implementation& impl,
958 corosio::local_endpoint ep)
959 {
960 // Reject abstract sockets on Windows
961 if (ep.is_abstract())
962 return std::make_error_code(std::errc::operation_not_supported);
963
964 auto& wrapper = static_cast<win_local_dgram_socket&>(impl);
965 auto* internal = wrapper.get_internal();
966 SOCKET sock = internal->socket_;
967
968 sockaddr_storage storage{};
969 socklen_t addrlen = detail::to_sockaddr(ep, storage);
970 if (::bind(
971 sock, reinterpret_cast<sockaddr*>(&storage),
972 static_cast<int>(addrlen)) == SOCKET_ERROR)
973 return make_err(::WSAGetLastError());
974
975 internal->local_endpoint_ = ep;
976 return {};
977 }
978
979 inline std::error_code
980 win_local_dgram_service::open_socket_internal(
981 win_local_dgram_socket_internal& impl,
982 int family, int type, int protocol)
983 {
984 impl.close_socket();
985
986 SOCKET sock =
987 ::WSASocketW(family, type, protocol, nullptr, 0, WSA_FLAG_OVERLAPPED);
988
989 if (sock == INVALID_SOCKET)
990 return make_err(::WSAGetLastError());
991
992 HANDLE result = ::CreateIoCompletionPort(
993 reinterpret_cast<HANDLE>(sock), static_cast<HANDLE>(iocp_), key_io, 0);
994
995 if (result == nullptr)
996 {
997 DWORD dwError = ::GetLastError();
998 ::closesocket(sock);
999 return make_err(dwError);
1000 }
1001
1002 impl.socket_ = sock;
1003 return {};
1004 }
1005
1006 inline void
1007 win_local_dgram_service::post(overlapped_op* op)
1008 {
1009 sched_.post(op);
1010 }
1011
1012 inline void
1013 win_local_dgram_service::on_pending(overlapped_op* op) noexcept
1014 {
1015 sched_.on_pending(op);
1016 }
1017
1018 inline void
1019 win_local_dgram_service::on_completion(
1020 overlapped_op* op, DWORD error, DWORD bytes) noexcept
1021 {
1022 sched_.on_completion(op, error, bytes);
1023 }
1024
1025 inline void
1026 win_local_dgram_service::work_started() noexcept
1027 {
1028 sched_.work_started();
1029 }
1030
1031 inline void
1032 win_local_dgram_service::work_finished() noexcept
1033 {
1034 sched_.work_finished();
1035 }
1036
1037 } // namespace boost::corosio::detail
1038
1039 #endif // BOOST_COROSIO_HAS_IOCP
1040
1041 #endif // BOOST_COROSIO_NATIVE_DETAIL_IOCP_WIN_LOCAL_DGRAM_SERVICE_HPP
1042