include/boost/corosio/native/detail/io_uring/io_uring_acceptor_ops.hpp

84.6% Lines (44/52) 71.4% List of functions (5/7)
io_uring_acceptor_ops.hpp
f(x) Functions (7)
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_IO_URING_IO_URING_ACCEPTOR_OPS_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_IO_URING_IO_URING_ACCEPTOR_OPS_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_IO_URING
16
17 #include <liburing.h>
18
19 #include <boost/capy/error.hpp>
20 #include <boost/corosio/detail/dispatch_coro.hpp>
21 #include <boost/corosio/io/io_object.hpp>
22 #include <boost/corosio/native/detail/io_uring/io_uring_buffer.hpp>
23 #include <boost/corosio/native/detail/io_uring/io_uring_op.hpp>
24 #include <boost/corosio/native/detail/make_err.hpp>
25
26 #include <netinet/in.h>
27 #include <sys/socket.h>
28
29 namespace boost::corosio::detail {
30
31 /** Multishot accept op — one submitted per acceptor lifetime.
32
33 The kernel produces a CQE for each accepted connection. Each CQE
34 carries the new fd in `res` (>= 0) or a negative errno on failure.
35 The `IORING_CQE_F_MORE` flag is set on every CQE except the last,
36 indicating whether the multishot armament is still active.
37
38 `do_cqe` does NOT push self into `local` — the owning acceptor's
39 `on_cqe` handler decides whether to dispatch immediately (waiter
40 present) or park the fd (no waiter). The multishot op persists
41 across CQEs; only `acceptor_impl` owns its lifetime.
42 */
43 struct uring_multi_accept_op : io_uring_op
44 {
45 /// Filled by the kernel for each accept. Address of this struct
46 /// is registered with the SQE; kernel writes peer address here.
47 sockaddr_storage peer_storage{};
48 socklen_t peer_len = sizeof(peer_storage);
49 int listen_fd = -1;
50
51 /// Owning acceptor; raw because the op IS owned by the acceptor.
52 void* acceptor_impl = nullptr;
53
54 /** Callback into the acceptor for each accept CQE.
55
56 @param acceptor The owning acceptor_impl pointer.
57 @param new_fd Accepted fd on success, -1 on error.
58 @param err errno value on failure, 0 on success.
59 @param more True unless this is the terminating CQE
60 (e.g. kernel dropped multishot on -ENOMEM).
61 */
62 void (*on_cqe)(void* acceptor, int new_fd, int err,
63 bool more) noexcept = nullptr;
64
65 85x uring_multi_accept_op() noexcept
66 85x : io_uring_op(&do_handler, &do_cqe, &do_prep)
67 85x {}
68
69 85x static void do_prep(io_uring_op* base, ::io_uring_sqe* sqe) noexcept
70 {
71 85x auto* self = static_cast<uring_multi_accept_op*>(base);
72 85x ::io_uring_prep_multishot_accept(
73 sqe, self->listen_fd,
74 85x reinterpret_cast<sockaddr*>(&self->peer_storage),
75 &self->peer_len,
76 SOCK_NONBLOCK | SOCK_CLOEXEC);
77 85x }
78
79 3958x static void do_cqe(io_uring_op* base, int res, unsigned flags,
80 op_queue& /*local*/) noexcept
81 {
82 3958x auto* self = static_cast<uring_multi_accept_op*>(base);
83 3958x bool more = (flags & IORING_CQE_F_MORE) != 0;
84 3958x int err = (res < 0) ? -res : 0;
85 3958x int new_fd = (res >= 0) ? res : -1;
86 3958x if (self->on_cqe)
87 3958x self->on_cqe(self->acceptor_impl, new_fd, err, more);
88 // Intentionally NOT pushed into local: the acceptor decides
89 // whether to surface the fd via a waiter or park it.
90 3958x }
91
92 /// Never invoked: the multishot op is owned by the acceptor and
93 /// never queued for handler dispatch. Provided so the vtable is
94 /// complete.
95 static void do_handler(
96 void* /*owner*/, scheduler_op* /*base*/,
97 std::uint32_t /*bytes*/, std::uint32_t /*error*/) noexcept
98 {
99 // No-op. The acceptor's per-accept callback handles everything.
100 }
101 };
102
103 /** Synthesized accept op — manufactured by the acceptor for parked fds.
104
105 When `async_accept` arrives and a ready fd is already parked, the
106 acceptor builds one of these, fills `accepted_fd` and peer storage
107 from the parked node, and posts it to the scheduler. This op never
108 interacts with the ring directly — it goes straight to handler
109 dispatch via `(*op)()`.
110
111 `do_cqe` is unused (this op never receives a kernel CQE).
112 */
113 struct uring_accept_op : io_uring_op
114 {
115 int accepted_fd = -1;
116 int err = 0;
117 sockaddr_storage peer_storage{};
118 socklen_t peer_len = 0;
119
120 /// Set by the acceptor's `async_accept` entry point; filled by
121 /// `do_handler` with the new socket impl.
122 io_object::implementation** impl_out = nullptr;
123
124 /// Optional output for the peer endpoint.
125 endpoint* peer_endpoint_out = nullptr;
126
127 /// The peer service used to wrap the accepted fd.
128 void* peer_service = nullptr;
129
130 /// Acceptor-supplied wrapper: adopts `fd` into the right impl type.
131 io_object::implementation*
132 (*adopt_fn)(void* peer_service, int fd,
133 sockaddr_storage const& peer,
134 socklen_t peer_len) noexcept = nullptr;
135
136 3956x uring_accept_op() noexcept
137 3956x : io_uring_op(&do_handler, &do_cqe)
138 3956x {}
139
140 static void do_cqe(io_uring_op*, int, unsigned,
141 op_queue&) noexcept
142 {
143 // Unreachable: this op never receives a CQE.
144 }
145
146 3956x static void do_handler(
147 void* owner, scheduler_op* base,
148 std::uint32_t /*bytes*/, std::uint32_t /*error*/) noexcept
149 {
150 3956x auto* self = static_cast<uring_accept_op*>(base);
151 3956x self->stop_cb.reset();
152
153 3956x if (owner == nullptr)
154 {
155 delete self;
156 4x return;
157 }
158
159 bool was_cancelled =
160 3956x self->cancelled.load(std::memory_order_acquire);
161
162 3956x if (was_cancelled || self->err)
163 {
164 4x if (self->ec_out)
165 4x *self->ec_out = was_cancelled
166 4x ? std::error_code(capy::error::canceled)
167 : make_err(self->err);
168 4x self->cont_op.cont.h = self->h;
169 4x auto next = dispatch_coro(self->ex, self->cont_op.cont);
170 4x delete self;
171 4x next.resume();
172 4x return;
173 }
174
175 3952x if (self->adopt_fn && self->impl_out)
176 3952x *self->impl_out = self->adopt_fn(
177 self->peer_service, self->accepted_fd,
178 3952x self->peer_storage, self->peer_len);
179
180 3952x if (self->peer_endpoint_out)
181 *self->peer_endpoint_out =
182 sockaddr_to_endpoint(self->peer_storage);
183
184 3952x if (self->ec_out)
185 3952x *self->ec_out = {};
186
187 3952x self->cont_op.cont.h = self->h;
188 3952x auto next = dispatch_coro(self->ex, self->cont_op.cont);
189 3952x delete self;
190 3952x next.resume();
191 }
192 };
193
194 } // namespace boost::corosio::detail
195
196 #endif // BOOST_COROSIO_HAS_IO_URING
197
198 #endif // BOOST_COROSIO_NATIVE_DETAIL_IO_URING_IO_URING_ACCEPTOR_OPS_HPP
199