include/boost/corosio/native/detail/epoll/epoll_socket_service.hpp

83.1% Lines (388/467) 97.4% Functions (37/38)
include/boost/corosio/native/detail/epoll/epoll_socket_service.hpp
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_EPOLL
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/capy/ex/execution_context.hpp>
19 #include <boost/corosio/detail/socket_service.hpp>
20
21 #include <boost/corosio/native/detail/epoll/epoll_socket.hpp>
22 #include <boost/corosio/native/detail/epoll/epoll_scheduler.hpp>
23
24 #include <boost/corosio/detail/endpoint_convert.hpp>
25 #include <boost/corosio/detail/make_err.hpp>
26 #include <boost/corosio/detail/dispatch_coro.hpp>
27 #include <boost/corosio/detail/except.hpp>
28 #include <boost/capy/buffers.hpp>
29
30 #include <coroutine>
31 #include <mutex>
32 #include <unordered_map>
33 #include <utility>
34
35 #include <errno.h>
36 #include <netinet/in.h>
37 #include <netinet/tcp.h>
38 #include <sys/epoll.h>
39 #include <sys/socket.h>
40 #include <unistd.h>
41
42 /*
43 epoll Socket Implementation
44 ===========================
45
46 Each I/O operation follows the same pattern:
47 1. Try the syscall immediately (non-blocking socket)
48 2. If it succeeds or fails with a real error, post to completion queue
49 3. If EAGAIN/EWOULDBLOCK, register with epoll and wait
50
51 This "try first" approach avoids unnecessary epoll round-trips for
52 operations that can complete immediately (common for small reads/writes
53 on fast local connections).
54
55 One-Shot Registration
56 ---------------------
57 We use one-shot epoll registration: each operation registers, waits for
58 one event, then unregisters. This simplifies the state machine since we
59 don't need to track whether an fd is currently registered or handle
60 re-arming. The tradeoff is slightly more epoll_ctl calls, but the
61 simplicity is worth it.
62
63 Cancellation
64 ------------
65 See op.hpp for the completion/cancellation race handling via the
66 `registered` atomic. cancel() must complete pending operations (post
67 them with cancelled flag) so coroutines waiting on them can resume.
68 close_socket() calls cancel() first to ensure this.
69
70 Impl Lifetime with shared_ptr
71 -----------------------------
72 Socket impls use enable_shared_from_this. The service owns impls via
73 shared_ptr maps (socket_ptrs_) keyed by raw pointer for O(1) lookup and
74 removal. When a user calls close(), we call cancel() which posts pending
75 ops to the scheduler.
76
77 CRITICAL: The posted ops must keep the impl alive until they complete.
78 Otherwise the scheduler would process a freed op (use-after-free). The
79 cancel() method captures shared_from_this() into op.impl_ptr before
80 posting. When the op completes, impl_ptr is cleared, allowing the impl
81 to be destroyed if no other references exist.
82
83 Service Ownership
84 -----------------
85 epoll_socket_service owns all socket impls. destroy_impl() removes the
86 shared_ptr from the map, but the impl may survive if ops still hold
87 impl_ptr refs. shutdown() closes all sockets and clears the map; any
88 in-flight ops will complete and release their refs.
89 */
90
91 namespace boost::corosio::detail {
92
93 /** State for epoll socket service. */
94 class epoll_socket_state
95 {
96 public:
97 270 explicit epoll_socket_state(epoll_scheduler& sched) noexcept : sched_(sched)
98 {
99 270 }
100
101 epoll_scheduler& sched_;
102 std::mutex mutex_;
103 intrusive_list<epoll_socket> socket_list_;
104 std::unordered_map<epoll_socket*, std::shared_ptr<epoll_socket>>
105 socket_ptrs_;
106 };
107
108 /** epoll socket service implementation.
109
110 Inherits from socket_service to enable runtime polymorphism.
111 Uses key_type = socket_service for service lookup.
112 */
113 class BOOST_COROSIO_DECL epoll_socket_service final : public socket_service
114 {
115 public:
116 explicit epoll_socket_service(capy::execution_context& ctx);
117 ~epoll_socket_service() override;
118
119 epoll_socket_service(epoll_socket_service const&) = delete;
120 epoll_socket_service& operator=(epoll_socket_service const&) = delete;
121
122 void shutdown() override;
123
124 io_object::implementation* construct() override;
125 void destroy(io_object::implementation*) override;
126 void close(io_object::handle&) override;
127 std::error_code open_socket(tcp_socket::implementation& impl) override;
128
129 333234 epoll_scheduler& scheduler() const noexcept
130 {
131 333234 return state_->sched_;
132 }
133 void post(epoll_op* op);
134 void work_started() noexcept;
135 void work_finished() noexcept;
136
137 private:
138 std::unique_ptr<epoll_socket_state> state_;
139 };
140
141 //--------------------------------------------------------------------------
142 //
143 // Implementation
144 //
145 //--------------------------------------------------------------------------
146
147 // Register an op with the reactor, handling cached edge events.
148 // Called under the EAGAIN/EINPROGRESS path when speculative I/O failed.
149 inline void
150 5115 epoll_socket::register_op(
151 epoll_op& op,
152 epoll_op*& desc_slot,
153 bool& ready_flag,
154 bool& cancel_flag) noexcept
155 {
156 5115 svc_.work_started();
157
158 5115 std::lock_guard lock(desc_state_.mutex);
159 5115 bool io_done = false;
160 5115 if (ready_flag)
161 {
162 167 ready_flag = false;
163 167 op.perform_io();
164 167 io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
165 167 if (!io_done)
166 167 op.errn = 0;
167 }
168
169 5115 if (cancel_flag)
170 {
171 96 cancel_flag = false;
172 96 op.cancelled.store(true, std::memory_order_relaxed);
173 }
174
175 5115 if (io_done || op.cancelled.load(std::memory_order_acquire))
176 {
177 96 svc_.post(&op);
178 96 svc_.work_finished();
179 }
180 else
181 {
182 5019 desc_slot = &op;
183 }
184 5115 }
185
186 inline void
187 108 epoll_op::canceller::operator()() const noexcept
188 {
189 108 op->cancel();
190 108 }
191
192 inline void
193 epoll_connect_op::cancel() noexcept
194 {
195 if (socket_impl_)
196 socket_impl_->cancel_single_op(*this);
197 else
198 request_cancel();
199 }
200
201 inline void
202 101 epoll_read_op::cancel() noexcept
203 {
204 101 if (socket_impl_)
205 101 socket_impl_->cancel_single_op(*this);
206 else
207 request_cancel();
208 101 }
209
210 inline void
211 1 epoll_write_op::cancel() noexcept
212 {
213 1 if (socket_impl_)
214 1 socket_impl_->cancel_single_op(*this);
215 else
216 request_cancel();
217 1 }
218
219 inline void
220 51939 epoll_op::operator()()
221 {
222 51939 stop_cb.reset();
223
224 51939 socket_impl_->svc_.scheduler().reset_inline_budget();
225
226 51939 if (cancelled.load(std::memory_order_acquire))
227 218 *ec_out = capy::error::canceled;
228 51721 else if (errn != 0)
229 4 *ec_out = make_err(errn);
230 51717 else if (is_read_operation() && bytes_transferred == 0)
231 *ec_out = capy::error::eof;
232 else
233 51717 *ec_out = {};
234
235 51939 *bytes_out = bytes_transferred;
236
237 // Move to stack before resuming coroutine. The coroutine might close
238 // the socket, releasing the last wrapper ref. If impl_ptr were the
239 // last ref and we destroyed it while still in operator(), we'd have
240 // use-after-free. Moving to local ensures destruction happens at
241 // function exit, after all member accesses are complete.
242 51939 capy::executor_ref saved_ex(ex);
243 51939 std::coroutine_handle<> saved_h(h);
244 51939 auto prevent_premature_destruction = std::move(impl_ptr);
245 51939 dispatch_coro(saved_ex, saved_h).resume();
246 51939 }
247
248 inline void
249 4725 epoll_connect_op::operator()()
250 {
251 4725 stop_cb.reset();
252
253 4725 socket_impl_->svc_.scheduler().reset_inline_budget();
254
255 4725 bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
256
257 // Cache endpoints on successful connect
258 4725 if (success && socket_impl_)
259 {
260 // Query local endpoint via getsockname (may fail, but remote is always known)
261 4723 endpoint local_ep;
262 4723 sockaddr_in local_addr{};
263 4723 socklen_t local_len = sizeof(local_addr);
264 4723 if (::getsockname(
265 4723 fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
266 4723 local_ep = from_sockaddr_in(local_addr);
267 // Always cache remote endpoint; local may be default if getsockname failed
268 4723 static_cast<epoll_socket*>(socket_impl_)
269 4723 ->set_endpoints(local_ep, target_endpoint);
270 }
271
272 4725 if (cancelled.load(std::memory_order_acquire))
273 *ec_out = capy::error::canceled;
274 4725 else if (errn != 0)
275 2 *ec_out = make_err(errn);
276 else
277 4723 *ec_out = {};
278
279 // Move to stack before resuming. See epoll_op::operator()() for rationale.
280 4725 capy::executor_ref saved_ex(ex);
281 4725 std::coroutine_handle<> saved_h(h);
282 4725 auto prevent_premature_destruction = std::move(impl_ptr);
283 4725 dispatch_coro(saved_ex, saved_h).resume();
284 4725 }
285
286 14268 inline epoll_socket::epoll_socket(epoll_socket_service& svc) noexcept
287 14268 : svc_(svc)
288 {
289 14268 }
290
291 14268 inline epoll_socket::~epoll_socket() = default;
292
293 inline std::coroutine_handle<>
294 4725 epoll_socket::connect(
295 std::coroutine_handle<> h,
296 capy::executor_ref ex,
297 endpoint ep,
298 std::stop_token token,
299 std::error_code* ec)
300 {
301 4725 auto& op = conn_;
302
303 4725 sockaddr_in addr = detail::to_sockaddr_in(ep);
304 int result =
305 4725 ::connect(fd_, reinterpret_cast<sockaddr*>(&addr), sizeof(addr));
306
307 4725 if (result == 0)
308 {
309 sockaddr_in local_addr{};
310 socklen_t local_len = sizeof(local_addr);
311 if (::getsockname(
312 fd_, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
313 local_endpoint_ = detail::from_sockaddr_in(local_addr);
314 remote_endpoint_ = ep;
315 }
316
317 4725 if (result == 0 || errno != EINPROGRESS)
318 {
319 int err = (result < 0) ? errno : 0;
320 if (svc_.scheduler().try_consume_inline_budget())
321 {
322 *ec = err ? make_err(err) : std::error_code{};
323 return dispatch_coro(ex, h);
324 }
325 op.reset();
326 op.h = h;
327 op.ex = ex;
328 op.ec_out = ec;
329 op.fd = fd_;
330 op.target_endpoint = ep;
331 op.start(token, this);
332 op.impl_ptr = shared_from_this();
333 op.complete(err, 0);
334 svc_.post(&op);
335 return std::noop_coroutine();
336 }
337
338 // EINPROGRESS — register with reactor
339 4725 op.reset();
340 4725 op.h = h;
341 4725 op.ex = ex;
342 4725 op.ec_out = ec;
343 4725 op.fd = fd_;
344 4725 op.target_endpoint = ep;
345 4725 op.start(token, this);
346 4725 op.impl_ptr = shared_from_this();
347
348 4725 register_op(
349 4725 op, desc_state_.connect_op, desc_state_.write_ready,
350 4725 desc_state_.connect_cancel_pending);
351 4725 return std::noop_coroutine();
352 }
353
354 inline std::coroutine_handle<>
355 129109 epoll_socket::read_some(
356 std::coroutine_handle<> h,
357 capy::executor_ref ex,
358 io_buffer_param param,
359 std::stop_token token,
360 std::error_code* ec,
361 std::size_t* bytes_out)
362 {
363 129109 auto& op = rd_;
364 129109 op.reset();
365
366 129109 capy::mutable_buffer bufs[epoll_read_op::max_buffers];
367 129109 op.iovec_count =
368 129109 static_cast<int>(param.copy_to(bufs, epoll_read_op::max_buffers));
369
370 129109 if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
371 {
372 1 op.empty_buffer_read = true;
373 1 op.h = h;
374 1 op.ex = ex;
375 1 op.ec_out = ec;
376 1 op.bytes_out = bytes_out;
377 1 op.start(token, this);
378 1 op.impl_ptr = shared_from_this();
379 1 op.complete(0, 0);
380 1 svc_.post(&op);
381 1 return std::noop_coroutine();
382 }
383
384 258216 for (int i = 0; i < op.iovec_count; ++i)
385 {
386 129108 op.iovecs[i].iov_base = bufs[i].data();
387 129108 op.iovecs[i].iov_len = bufs[i].size();
388 }
389
390 // Speculative read
391 ssize_t n;
392 do
393 {
394 129108 n = ::readv(fd_, op.iovecs, op.iovec_count);
395 }
396 129108 while (n < 0 && errno == EINTR);
397
398 129108 if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
399 {
400 128718 int err = (n < 0) ? errno : 0;
401 128718 auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
402
403 128718 if (svc_.scheduler().try_consume_inline_budget())
404 {
405 102967 if (err)
406 1 *ec = make_err(err);
407 102966 else if (n == 0)
408 11 *ec = capy::error::eof;
409 else
410 102955 *ec = {};
411 102967 *bytes_out = bytes;
412 102967 return dispatch_coro(ex, h);
413 }
414 25751 op.h = h;
415 25751 op.ex = ex;
416 25751 op.ec_out = ec;
417 25751 op.bytes_out = bytes_out;
418 25751 op.start(token, this);
419 25751 op.impl_ptr = shared_from_this();
420 25751 op.complete(err, bytes);
421 25751 svc_.post(&op);
422 25751 return std::noop_coroutine();
423 }
424
425 // EAGAIN — register with reactor
426 390 op.h = h;
427 390 op.ex = ex;
428 390 op.ec_out = ec;
429 390 op.bytes_out = bytes_out;
430 390 op.fd = fd_;
431 390 op.start(token, this);
432 390 op.impl_ptr = shared_from_this();
433
434 390 register_op(
435 390 op, desc_state_.read_op, desc_state_.read_ready,
436 390 desc_state_.read_cancel_pending);
437 390 return std::noop_coroutine();
438 }
439
440 inline std::coroutine_handle<>
441 128935 epoll_socket::write_some(
442 std::coroutine_handle<> h,
443 capy::executor_ref ex,
444 io_buffer_param param,
445 std::stop_token token,
446 std::error_code* ec,
447 std::size_t* bytes_out)
448 {
449 128935 auto& op = wr_;
450 128935 op.reset();
451
452 128935 capy::mutable_buffer bufs[epoll_write_op::max_buffers];
453 128935 op.iovec_count =
454 128935 static_cast<int>(param.copy_to(bufs, epoll_write_op::max_buffers));
455
456 128935 if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
457 {
458 1 op.h = h;
459 1 op.ex = ex;
460 1 op.ec_out = ec;
461 1 op.bytes_out = bytes_out;
462 1 op.start(token, this);
463 1 op.impl_ptr = shared_from_this();
464 1 op.complete(0, 0);
465 1 svc_.post(&op);
466 1 return std::noop_coroutine();
467 }
468
469 257868 for (int i = 0; i < op.iovec_count; ++i)
470 {
471 128934 op.iovecs[i].iov_base = bufs[i].data();
472 128934 op.iovecs[i].iov_len = bufs[i].size();
473 }
474
475 // Speculative write
476 128934 msghdr msg{};
477 128934 msg.msg_iov = op.iovecs;
478 128934 msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
479
480 ssize_t n;
481 do
482 {
483 128934 n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
484 }
485 128934 while (n < 0 && errno == EINTR);
486
487 128934 if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
488 {
489 128934 int err = (n < 0) ? errno : 0;
490 128934 auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
491
492 128934 if (svc_.scheduler().try_consume_inline_budget())
493 {
494 103138 *ec = err ? make_err(err) : std::error_code{};
495 103138 *bytes_out = bytes;
496 103138 return dispatch_coro(ex, h);
497 }
498 25796 op.h = h;
499 25796 op.ex = ex;
500 25796 op.ec_out = ec;
501 25796 op.bytes_out = bytes_out;
502 25796 op.start(token, this);
503 25796 op.impl_ptr = shared_from_this();
504 25796 op.complete(err, bytes);
505 25796 svc_.post(&op);
506 25796 return std::noop_coroutine();
507 }
508
509 // EAGAIN — register with reactor
510 op.h = h;
511 op.ex = ex;
512 op.ec_out = ec;
513 op.bytes_out = bytes_out;
514 op.fd = fd_;
515 op.start(token, this);
516 op.impl_ptr = shared_from_this();
517
518 register_op(
519 op, desc_state_.write_op, desc_state_.write_ready,
520 desc_state_.write_cancel_pending);
521 return std::noop_coroutine();
522 }
523
524 inline std::error_code
525 3 epoll_socket::shutdown(tcp_socket::shutdown_type what) noexcept
526 {
527 int how;
528 3 switch (what)
529 {
530 1 case tcp_socket::shutdown_receive:
531 1 how = SHUT_RD;
532 1 break;
533 1 case tcp_socket::shutdown_send:
534 1 how = SHUT_WR;
535 1 break;
536 1 case tcp_socket::shutdown_both:
537 1 how = SHUT_RDWR;
538 1 break;
539 default:
540 return make_err(EINVAL);
541 }
542 3 if (::shutdown(fd_, how) != 0)
543 return make_err(errno);
544 3 return {};
545 }
546
547 inline std::error_code
548 5 epoll_socket::set_no_delay(bool value) noexcept
549 {
550 5 int flag = value ? 1 : 0;
551 5 if (::setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag)) != 0)
552 return make_err(errno);
553 5 return {};
554 }
555
556 inline bool
557 5 epoll_socket::no_delay(std::error_code& ec) const noexcept
558 {
559 5 int flag = 0;
560 5 socklen_t len = sizeof(flag);
561 5 if (::getsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, &len) != 0)
562 {
563 ec = make_err(errno);
564 return false;
565 }
566 5 ec = {};
567 5 return flag != 0;
568 }
569
570 inline std::error_code
571 4 epoll_socket::set_keep_alive(bool value) noexcept
572 {
573 4 int flag = value ? 1 : 0;
574 4 if (::setsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, sizeof(flag)) != 0)
575 return make_err(errno);
576 4 return {};
577 }
578
579 inline bool
580 4 epoll_socket::keep_alive(std::error_code& ec) const noexcept
581 {
582 4 int flag = 0;
583 4 socklen_t len = sizeof(flag);
584 4 if (::getsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, &len) != 0)
585 {
586 ec = make_err(errno);
587 return false;
588 }
589 4 ec = {};
590 4 return flag != 0;
591 }
592
593 inline std::error_code
594 1 epoll_socket::set_receive_buffer_size(int size) noexcept
595 {
596 1 if (::setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)) != 0)
597 return make_err(errno);
598 1 return {};
599 }
600
601 inline int
602 3 epoll_socket::receive_buffer_size(std::error_code& ec) const noexcept
603 {
604 3 int size = 0;
605 3 socklen_t len = sizeof(size);
606 3 if (::getsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, &len) != 0)
607 {
608 ec = make_err(errno);
609 return 0;
610 }
611 3 ec = {};
612 3 return size;
613 }
614
615 inline std::error_code
616 1 epoll_socket::set_send_buffer_size(int size) noexcept
617 {
618 1 if (::setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, sizeof(size)) != 0)
619 return make_err(errno);
620 1 return {};
621 }
622
623 inline int
624 3 epoll_socket::send_buffer_size(std::error_code& ec) const noexcept
625 {
626 3 int size = 0;
627 3 socklen_t len = sizeof(size);
628 3 if (::getsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, &len) != 0)
629 {
630 ec = make_err(errno);
631 return 0;
632 }
633 3 ec = {};
634 3 return size;
635 }
636
637 inline std::error_code
638 60 epoll_socket::set_linger(bool enabled, int timeout) noexcept
639 {
640 60 if (timeout < 0)
641 1 return make_err(EINVAL);
642 struct ::linger lg;
643 59 lg.l_onoff = enabled ? 1 : 0;
644 59 lg.l_linger = timeout;
645 59 if (::setsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, sizeof(lg)) != 0)
646 return make_err(errno);
647 59 return {};
648 }
649
650 inline tcp_socket::linger_options
651 3 epoll_socket::linger(std::error_code& ec) const noexcept
652 {
653 3 struct ::linger lg{};
654 3 socklen_t len = sizeof(lg);
655 3 if (::getsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, &len) != 0)
656 {
657 ec = make_err(errno);
658 return {};
659 }
660 3 ec = {};
661 3 return {.enabled = lg.l_onoff != 0, .timeout = lg.l_linger};
662 }
663
664 inline void
665 215 epoll_socket::cancel() noexcept
666 {
667 215 auto self = weak_from_this().lock();
668 215 if (!self)
669 return;
670
671 215 conn_.request_cancel();
672 215 rd_.request_cancel();
673 215 wr_.request_cancel();
674
675 215 epoll_op* conn_claimed = nullptr;
676 215 epoll_op* rd_claimed = nullptr;
677 215 epoll_op* wr_claimed = nullptr;
678 {
679 215 std::lock_guard lock(desc_state_.mutex);
680 215 if (desc_state_.connect_op == &conn_)
681 conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
682 else
683 215 desc_state_.connect_cancel_pending = true;
684 215 if (desc_state_.read_op == &rd_)
685 11 rd_claimed = std::exchange(desc_state_.read_op, nullptr);
686 else
687 204 desc_state_.read_cancel_pending = true;
688 215 if (desc_state_.write_op == &wr_)
689 wr_claimed = std::exchange(desc_state_.write_op, nullptr);
690 else
691 215 desc_state_.write_cancel_pending = true;
692 215 }
693
694 215 if (conn_claimed)
695 {
696 conn_.impl_ptr = self;
697 svc_.post(&conn_);
698 svc_.work_finished();
699 }
700 215 if (rd_claimed)
701 {
702 11 rd_.impl_ptr = self;
703 11 svc_.post(&rd_);
704 11 svc_.work_finished();
705 }
706 215 if (wr_claimed)
707 {
708 wr_.impl_ptr = self;
709 svc_.post(&wr_);
710 svc_.work_finished();
711 }
712 215 }
713
714 inline void
715 102 epoll_socket::cancel_single_op(epoll_op& op) noexcept
716 {
717 102 auto self = weak_from_this().lock();
718 102 if (!self)
719 return;
720
721 102 op.request_cancel();
722
723 102 epoll_op** desc_op_ptr = nullptr;
724 102 if (&op == &conn_)
725 desc_op_ptr = &desc_state_.connect_op;
726 102 else if (&op == &rd_)
727 101 desc_op_ptr = &desc_state_.read_op;
728 1 else if (&op == &wr_)
729 1 desc_op_ptr = &desc_state_.write_op;
730
731 102 if (desc_op_ptr)
732 {
733 102 epoll_op* claimed = nullptr;
734 {
735 102 std::lock_guard lock(desc_state_.mutex);
736 102 if (*desc_op_ptr == &op)
737 100 claimed = std::exchange(*desc_op_ptr, nullptr);
738 2 else if (&op == &conn_)
739 desc_state_.connect_cancel_pending = true;
740 2 else if (&op == &rd_)
741 1 desc_state_.read_cancel_pending = true;
742 1 else if (&op == &wr_)
743 1 desc_state_.write_cancel_pending = true;
744 102 }
745 102 if (claimed)
746 {
747 100 op.impl_ptr = self;
748 100 svc_.post(&op);
749 100 svc_.work_finished();
750 }
751 }
752 102 }
753
754 inline void
755 42731 epoll_socket::close_socket() noexcept
756 {
757 42731 auto self = weak_from_this().lock();
758 42731 if (self)
759 {
760 42731 conn_.request_cancel();
761 42731 rd_.request_cancel();
762 42731 wr_.request_cancel();
763
764 42731 epoll_op* conn_claimed = nullptr;
765 42731 epoll_op* rd_claimed = nullptr;
766 42731 epoll_op* wr_claimed = nullptr;
767 {
768 42731 std::lock_guard lock(desc_state_.mutex);
769 42731 conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
770 42731 rd_claimed = std::exchange(desc_state_.read_op, nullptr);
771 42731 wr_claimed = std::exchange(desc_state_.write_op, nullptr);
772 42731 desc_state_.read_ready = false;
773 42731 desc_state_.write_ready = false;
774 42731 desc_state_.read_cancel_pending = false;
775 42731 desc_state_.write_cancel_pending = false;
776 42731 desc_state_.connect_cancel_pending = false;
777 42731 }
778
779 42731 if (conn_claimed)
780 {
781 conn_.impl_ptr = self;
782 svc_.post(&conn_);
783 svc_.work_finished();
784 }
785 42731 if (rd_claimed)
786 {
787 1 rd_.impl_ptr = self;
788 1 svc_.post(&rd_);
789 1 svc_.work_finished();
790 }
791 42731 if (wr_claimed)
792 {
793 wr_.impl_ptr = self;
794 svc_.post(&wr_);
795 svc_.work_finished();
796 }
797
798 42731 if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
799 157 desc_state_.impl_ref_ = self;
800 }
801
802 42731 if (fd_ >= 0)
803 {
804 9459 if (desc_state_.registered_events != 0)
805 9459 svc_.scheduler().deregister_descriptor(fd_);
806 9459 ::close(fd_);
807 9459 fd_ = -1;
808 }
809
810 42731 desc_state_.fd = -1;
811 42731 desc_state_.registered_events = 0;
812
813 42731 local_endpoint_ = endpoint{};
814 42731 remote_endpoint_ = endpoint{};
815 42731 }
816
817 270 inline epoll_socket_service::epoll_socket_service(capy::execution_context& ctx)
818 270 : state_(
819 std::make_unique<epoll_socket_state>(
820 270 ctx.use_service<epoll_scheduler>()))
821 {
822 270 }
823
824 540 inline epoll_socket_service::~epoll_socket_service() {}
825
826 inline void
827 270 epoll_socket_service::shutdown()
828 {
829 270 std::lock_guard lock(state_->mutex_);
830
831 270 while (auto* impl = state_->socket_list_.pop_front())
832 impl->close_socket();
833
834 // Don't clear socket_ptrs_ here. The scheduler shuts down after us and
835 // drains completed_ops_, calling destroy() on each queued op. If we
836 // released our shared_ptrs now, an epoll_op::destroy() could free the
837 // last ref to an impl whose embedded descriptor_state is still linked
838 // in the queue — use-after-free on the next pop(). Letting ~state_
839 // release the ptrs (during service destruction, after scheduler
840 // shutdown) keeps every impl alive until all ops have been drained.
841 270 }
842
843 inline io_object::implementation*
844 14268 epoll_socket_service::construct()
845 {
846 14268 auto impl = std::make_shared<epoll_socket>(*this);
847 14268 auto* raw = impl.get();
848
849 {
850 14268 std::lock_guard lock(state_->mutex_);
851 14268 state_->socket_list_.push_back(raw);
852 14268 state_->socket_ptrs_.emplace(raw, std::move(impl));
853 14268 }
854
855 14268 return raw;
856 14268 }
857
858 inline void
859 14268 epoll_socket_service::destroy(io_object::implementation* impl)
860 {
861 14268 auto* epoll_impl = static_cast<epoll_socket*>(impl);
862 14268 epoll_impl->close_socket();
863 14268 std::lock_guard lock(state_->mutex_);
864 14268 state_->socket_list_.remove(epoll_impl);
865 14268 state_->socket_ptrs_.erase(epoll_impl);
866 14268 }
867
868 inline std::error_code
869 4736 epoll_socket_service::open_socket(tcp_socket::implementation& impl)
870 {
871 4736 auto* epoll_impl = static_cast<epoll_socket*>(&impl);
872 4736 epoll_impl->close_socket();
873
874 4736 int fd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
875 4736 if (fd < 0)
876 return make_err(errno);
877
878 4736 epoll_impl->fd_ = fd;
879
880 // Register fd with epoll (edge-triggered mode)
881 4736 epoll_impl->desc_state_.fd = fd;
882 {
883 4736 std::lock_guard lock(epoll_impl->desc_state_.mutex);
884 4736 epoll_impl->desc_state_.read_op = nullptr;
885 4736 epoll_impl->desc_state_.write_op = nullptr;
886 4736 epoll_impl->desc_state_.connect_op = nullptr;
887 4736 }
888 4736 scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
889
890 4736 return {};
891 }
892
893 inline void
894 23727 epoll_socket_service::close(io_object::handle& h)
895 {
896 23727 static_cast<epoll_socket*>(h.get())->close_socket();
897 23727 }
898
899 inline void
900 51757 epoll_socket_service::post(epoll_op* op)
901 {
902 51757 state_->sched_.post(op);
903 51757 }
904
905 inline void
906 5115 epoll_socket_service::work_started() noexcept
907 {
908 5115 state_->sched_.work_started();
909 5115 }
910
911 inline void
912 208 epoll_socket_service::work_finished() noexcept
913 {
914 208 state_->sched_.work_finished();
915 208 }
916
917 } // namespace boost::corosio::detail
918
919 #endif // BOOST_COROSIO_HAS_EPOLL
920
921 #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
922