include/boost/corosio/native/detail/kqueue/kqueue_tcp_acceptor_service.hpp

57.4% Lines (112/195) 100.0% List of functions (20/20) 27.6% Branches (21/76)
f(x) Functions (20)
Function Calls Lines Branches Blocks
boost::corosio::detail::kqueue_tcp_acceptor_service::scheduler() const :75 2334x 100.0% 100.0% boost::corosio::detail::kqueue_accept_op::cancel() :92 6x 80.0% 50.0% 75.0% boost::corosio::detail::kqueue_accept_op::operator()() :101 5625x 100.0% 100.0% boost::corosio::detail::kqueue_tcp_acceptor::kqueue_tcp_acceptor(boost::corosio::detail::kqueue_tcp_acceptor_service&) :106 2346x 100.0% 100.0% boost::corosio::detail::kqueue_tcp_acceptor::accept(std::__1::coroutine_handle<void>, boost::capy::executor_ref, std::__1::stop_token, std::__1::error_code*, boost::corosio::io_object::implementation**) :113 5625x 26.0% 12.5% 18.0% boost::corosio::detail::kqueue_tcp_acceptor::cancel() :257 2x 100.0% 100.0% boost::corosio::detail::kqueue_tcp_acceptor::close_socket() :263 4688x 100.0% 100.0% boost::corosio::detail::kqueue_tcp_acceptor_service::kqueue_tcp_acceptor_service(boost::capy::execution_context&) :268 750x 100.0% 50.0% 100.0% boost::corosio::detail::kqueue_tcp_acceptor_service::~kqueue_tcp_acceptor_service() :277 1125x 100.0% 100.0% boost::corosio::detail::kqueue_tcp_acceptor_service::shutdown() :280 375x 80.0% 50.0% 75.0% boost::corosio::detail::kqueue_tcp_acceptor_service::construct() :289 1173x 100.0% 50.0% 42.0% boost::corosio::detail::kqueue_tcp_acceptor_service::destroy(boost::corosio::io_object::implementation*) :302 1173x 100.0% 50.0% 50.0% boost::corosio::detail::kqueue_tcp_acceptor_service::close(boost::corosio::io_object::handle&) :312 2344x 100.0% 100.0% boost::corosio::detail::kqueue_tcp_acceptor_service::open_acceptor_socket(boost::corosio::tcp_acceptor::implementation&, int, int, int) :318 1171x 68.8% 60.0% 66.0% boost::corosio::detail::kqueue_tcp_acceptor_service::bind_acceptor(boost::corosio::tcp_acceptor::implementation&, boost::corosio::endpoint) :374 1170x 100.0% 100.0% boost::corosio::detail::kqueue_tcp_acceptor_service::listen_acceptor(boost::corosio::tcp_acceptor::implementation&, int) :381 1167x 100.0% 100.0% boost::corosio::detail::kqueue_tcp_acceptor_service::post(boost::corosio::detail::scheduler_op*) :388 9x 100.0% 100.0% boost::corosio::detail::kqueue_tcp_acceptor_service::work_started() :394 5625x 100.0% 100.0% boost::corosio::detail::kqueue_tcp_acceptor_service::work_finished() :400 9x 100.0% 100.0% boost::corosio::detail::kqueue_tcp_acceptor_service::tcp_service() const :406 5616x 100.0% 50.0% 71.0%
Line Branch TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Michael Vandeberg
3 // Copyright (c) 2026 Steve Gerbino
4 //
5 // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 //
8 // Official repository: https://github.com/cppalliance/corosio
9 //
10
11 #ifndef BOOST_COROSIO_NATIVE_DETAIL_KQUEUE_KQUEUE_TCP_ACCEPTOR_SERVICE_HPP
12 #define BOOST_COROSIO_NATIVE_DETAIL_KQUEUE_KQUEUE_TCP_ACCEPTOR_SERVICE_HPP
13
14 #include <boost/corosio/detail/platform.hpp>
15
16 #if BOOST_COROSIO_HAS_KQUEUE
17
18 #include <boost/corosio/detail/config.hpp>
19 #include <boost/capy/ex/execution_context.hpp>
20 #include <boost/corosio/detail/tcp_acceptor_service.hpp>
21
22 #include <boost/corosio/native/detail/kqueue/kqueue_tcp_acceptor.hpp>
23 #include <boost/corosio/native/detail/kqueue/kqueue_tcp_service.hpp>
24 #include <boost/corosio/native/detail/kqueue/kqueue_scheduler.hpp>
25 #include <boost/corosio/native/detail/reactor/reactor_service_state.hpp>
26
27 #include <boost/corosio/native/detail/reactor/reactor_op_complete.hpp>
28
29 #include <memory>
30 #include <mutex>
31 #include <utility>
32
33 #include <errno.h>
34 #include <fcntl.h>
35 #include <netinet/in.h>
36 #include <sys/socket.h>
37 #include <unistd.h>
38
39 namespace boost::corosio::detail {
40
41 /// State for kqueue acceptor service.
42 using kqueue_tcp_acceptor_state =
43 reactor_service_state<kqueue_scheduler, kqueue_tcp_acceptor>;
44
45 /** kqueue acceptor service implementation.
46
47 Inherits from tcp_acceptor_service to enable runtime polymorphism.
48 Uses key_type = tcp_acceptor_service for service lookup.
49 */
50 class BOOST_COROSIO_DECL kqueue_tcp_acceptor_service final
51 : public tcp_acceptor_service
52 {
53 public:
54 explicit kqueue_tcp_acceptor_service(capy::execution_context& ctx);
55 ~kqueue_tcp_acceptor_service();
56
57 kqueue_tcp_acceptor_service(kqueue_tcp_acceptor_service const&) = delete;
58 kqueue_tcp_acceptor_service&
59 operator=(kqueue_tcp_acceptor_service const&) = delete;
60
61 void shutdown() override;
62 io_object::implementation* construct() override;
63 void destroy(io_object::implementation*) override;
64 void close(io_object::handle&) override;
65 std::error_code open_acceptor_socket(
66 tcp_acceptor::implementation& impl,
67 int family,
68 int type,
69 int protocol) override;
70 std::error_code
71 bind_acceptor(tcp_acceptor::implementation& impl, endpoint ep) override;
72 std::error_code
73 listen_acceptor(tcp_acceptor::implementation& impl, int backlog) override;
74
75 2334x kqueue_scheduler& scheduler() const noexcept
76 {
77 2334x return state_->sched_;
78 }
79 void post(scheduler_op* op);
80 void work_started() noexcept;
81 void work_finished() noexcept;
82
83 /** Get the TCP service for creating peer sockets during accept. */
84 kqueue_tcp_service* tcp_service() const noexcept;
85
86 private:
87 capy::execution_context& ctx_;
88 std::unique_ptr<kqueue_tcp_acceptor_state> state_;
89 };
90
91 inline void
92 6x kqueue_accept_op::cancel() noexcept
93 {
94
1/2
✓ Branch 0 taken 6 times.
✗ Branch 1 not taken.
6x if (acceptor_impl_)
95 6x acceptor_impl_->cancel_single_op(*this);
96 else
97 request_cancel();
98 6x }
99
100 inline void
101 5625x kqueue_accept_op::operator()()
102 {
103 5625x complete_accept_op<kqueue_tcp_socket>(*this);
104 5625x }
105
106 2346x inline kqueue_tcp_acceptor::kqueue_tcp_acceptor(
107 kqueue_tcp_acceptor_service& svc) noexcept
108 1173x : reactor_acceptor(svc)
109 2346x {
110 2346x }
111
112 inline std::coroutine_handle<>
113 5625x kqueue_tcp_acceptor::accept(
114 std::coroutine_handle<> h,
115 capy::executor_ref ex,
116 std::stop_token token,
117 std::error_code* ec,
118 io_object::implementation** impl_out)
119 {
120 5625x auto& op = acc_;
121 5625x op.reset();
122 5625x op.h = h;
123 5625x op.ex = ex;
124 5625x op.ec_out = ec;
125 5625x op.impl_out = impl_out;
126 5625x op.fd = fd_;
127 5625x op.start(token, this);
128
129 5625x sockaddr_storage peer_storage{};
130 5625x socklen_t addrlen = sizeof(peer_storage);
131
132 // FreeBSD: Can use accept4(fd_, addr, addrlen, SOCK_NONBLOCK | SOCK_CLOEXEC)
133 5625x int accepted =
134 5625x ::accept(fd_, reinterpret_cast<sockaddr*>(&peer_storage), &addrlen);
135
136
1/2
✓ Branch 0 taken 5625 times.
✗ Branch 1 not taken.
5625x if (accepted >= 0)
137 {
138 // Set non-blocking and close-on-exec on the accepted socket
139 int flags = ::fcntl(accepted, F_GETFL, 0);
140 if (flags == -1 || ::fcntl(accepted, F_SETFL, flags | O_NONBLOCK) == -1)
141 {
142 int errn = errno;
143 ::close(accepted);
144 op.complete(errn, 0);
145 op.impl_ptr = shared_from_this();
146 svc_.post(&op);
147 return std::noop_coroutine();
148 }
149 if (::fcntl(accepted, F_SETFD, FD_CLOEXEC) == -1)
150 {
151 int errn = errno;
152 ::close(accepted);
153 op.complete(errn, 0);
154 op.impl_ptr = shared_from_this();
155 svc_.post(&op);
156 return std::noop_coroutine();
157 }
158
159 // SO_NOSIGPIPE before budget check so both inline and
160 // queued paths have it applied (macOS lacks MSG_NOSIGNAL)
161 int one = 1;
162 if (::setsockopt(
163 accepted, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one)) == -1)
164 {
165 int errn = errno;
166 ::close(accepted);
167 op.complete(errn, 0);
168 op.impl_ptr = shared_from_this();
169 svc_.post(&op);
170 return std::noop_coroutine();
171 }
172
173 {
174 std::lock_guard lock(desc_state_.mutex);
175 desc_state_.read_ready = false;
176 }
177
178 if (svc_.scheduler().try_consume_inline_budget())
179 {
180 auto* socket_svc = svc_.tcp_service();
181 if (socket_svc)
182 {
183 auto& impl =
184 static_cast<kqueue_tcp_socket&>(*socket_svc->construct());
185 impl.set_socket(accepted);
186
187 impl.desc_state_.fd = accepted;
188 {
189 std::lock_guard lock(impl.desc_state_.mutex);
190 impl.desc_state_.read_op = nullptr;
191 impl.desc_state_.write_op = nullptr;
192 impl.desc_state_.connect_op = nullptr;
193 }
194 socket_svc->scheduler().register_descriptor(
195 accepted, &impl.desc_state_);
196
197 impl.set_endpoints(
198 local_endpoint_, from_sockaddr(peer_storage));
199
200 *ec = {};
201 if (impl_out)
202 *impl_out = &impl;
203 }
204 else
205 {
206 ::close(accepted);
207 *ec = make_err(ENOENT);
208 if (impl_out)
209 *impl_out = nullptr;
210 }
211 return dispatch_coro(ex, h);
212 }
213
214 op.accepted_fd = accepted;
215 op.peer_storage = peer_storage;
216 op.complete(0, 0);
217 op.impl_ptr = shared_from_this();
218 svc_.post(&op);
219 return std::noop_coroutine();
220 }
221
222
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 5625 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
5625x if (errno == EAGAIN || errno == EWOULDBLOCK)
223 {
224
1/2
✓ Branch 0 taken 5625 times.
✗ Branch 1 not taken.
5625x op.impl_ptr = shared_from_this();
225 5625x svc_.work_started();
226
227 5625x std::lock_guard lock(desc_state_.mutex);
228 5625x bool io_done = false;
229
1/2
✓ Branch 0 taken 5625 times.
✗ Branch 1 not taken.
5625x if (desc_state_.read_ready)
230 {
231 desc_state_.read_ready = false;
232 op.perform_io();
233 io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
234 if (!io_done)
235 op.errn = 0;
236 }
237
238
2/4
✓ Branch 0 taken 5625 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 5625 times.
5625x if (io_done || op.cancelled.load(std::memory_order_acquire))
239 {
240 svc_.post(&op);
241 svc_.work_finished();
242 }
243 else
244 {
245 5625x desc_state_.read_op = &op;
246 }
247 5625x return std::noop_coroutine();
248 5625x }
249
250 op.complete(errno, 0);
251 op.impl_ptr = shared_from_this();
252 svc_.post(&op);
253 return std::noop_coroutine();
254 5625x }
255
256 inline void
257 2x kqueue_tcp_acceptor::cancel() noexcept
258 {
259 2x do_cancel();
260 2x }
261
262 inline void
263 4688x kqueue_tcp_acceptor::close_socket() noexcept
264 {
265 4688x do_close_socket();
266 4688x }
267
268 750x inline kqueue_tcp_acceptor_service::kqueue_tcp_acceptor_service(
269 capy::execution_context& ctx)
270 375x : ctx_(ctx)
271 375x , state_(
272
1/2
✓ Branch 0 taken 375 times.
✗ Branch 1 not taken.
375x std::make_unique<kqueue_tcp_acceptor_state>(
273
1/2
✓ Branch 0 taken 375 times.
✗ Branch 1 not taken.
375x ctx.use_service<kqueue_scheduler>()))
274 750x {
275 750x }
276
277 1125x inline kqueue_tcp_acceptor_service::~kqueue_tcp_acceptor_service() = default;
278
279 inline void
280 375x kqueue_tcp_acceptor_service::shutdown()
281 {
282 375x std::lock_guard lock(state_->mutex_);
283
284
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 375 times.
375x while (auto* impl = state_->impl_list_.pop_front())
285 impl->close_socket();
286 375x }
287
288 inline io_object::implementation*
289 1173x kqueue_tcp_acceptor_service::construct()
290 {
291 1173x auto impl = std::make_shared<kqueue_tcp_acceptor>(*this);
292 1173x auto* raw = impl.get();
293
294
1/2
✓ Branch 0 taken 1173 times.
✗ Branch 1 not taken.
1173x std::lock_guard lock(state_->mutex_);
295
1/2
✓ Branch 0 taken 1173 times.
✗ Branch 1 not taken.
1173x state_->impl_ptrs_.emplace(raw, std::move(impl));
296 1173x state_->impl_list_.push_back(raw);
297
298 1173x return raw;
299 1173x }
300
301 inline void
302 1173x kqueue_tcp_acceptor_service::destroy(io_object::implementation* impl)
303 {
304 1173x auto* kq_impl = static_cast<kqueue_tcp_acceptor*>(impl);
305 1173x kq_impl->close_socket();
306 1173x std::lock_guard lock(state_->mutex_);
307 1173x state_->impl_list_.remove(kq_impl);
308
1/2
✓ Branch 0 taken 1173 times.
✗ Branch 1 not taken.
1173x state_->impl_ptrs_.erase(kq_impl);
309 1173x }
310
311 inline void
312 2344x kqueue_tcp_acceptor_service::close(io_object::handle& h)
313 {
314 2344x static_cast<kqueue_tcp_acceptor*>(h.get())->close_socket();
315 2344x }
316
317 inline std::error_code
318 1171x kqueue_tcp_acceptor_service::open_acceptor_socket(
319 tcp_acceptor::implementation& impl, int family, int type, int protocol)
320 {
321 1171x auto* kq_impl = static_cast<kqueue_tcp_acceptor*>(&impl);
322 1171x kq_impl->close_socket();
323
324 1171x int fd = ::socket(family, type, protocol);
325
1/2
✓ Branch 0 taken 1171 times.
✗ Branch 1 not taken.
1171x if (fd < 0)
326 return make_err(errno);
327
328 // Set non-blocking and close-on-exec
329 1171x int flags = ::fcntl(fd, F_GETFL, 0);
330
1/2
✓ Branch 0 taken 1171 times.
✗ Branch 1 not taken.
1171x if (flags == -1)
331 {
332 int errn = errno;
333 ::close(fd);
334 return make_err(errn);
335 }
336
1/2
✓ Branch 0 taken 1171 times.
✗ Branch 1 not taken.
1171x if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
337 {
338 int errn = errno;
339 ::close(fd);
340 return make_err(errn);
341 }
342
1/2
✓ Branch 0 taken 1171 times.
✗ Branch 1 not taken.
1171x if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
343 {
344 int errn = errno;
345 ::close(fd);
346 return make_err(errn);
347 }
348
349
2/2
✓ Branch 0 taken 1163 times.
✓ Branch 1 taken 8 times.
1171x if (family == AF_INET6)
350 {
351 8x int val = 0; // dual-stack default
352 8x ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof(val));
353 8x }
354
355 // SO_NOSIGPIPE on macOS (where MSG_NOSIGNAL doesn't exist)
356 #ifdef SO_NOSIGPIPE
357 1171x int nosig = 1;
358 1171x ::setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &nosig, sizeof(nosig));
359 #endif
360
361 1171x kq_impl->fd_ = fd;
362
363 // Set up descriptor state but do NOT register with kqueue yet
364 1171x kq_impl->desc_state_.fd = fd;
365 {
366 1171x std::lock_guard lock(kq_impl->desc_state_.mutex);
367 1171x kq_impl->desc_state_.read_op = nullptr;
368 1171x }
369
370 1171x return {};
371 1171x }
372
373 inline std::error_code
374 1170x kqueue_tcp_acceptor_service::bind_acceptor(
375 tcp_acceptor::implementation& impl, endpoint ep)
376 {
377 1170x return static_cast<kqueue_tcp_acceptor*>(&impl)->do_bind(ep);
378 }
379
380 inline std::error_code
381 1167x kqueue_tcp_acceptor_service::listen_acceptor(
382 tcp_acceptor::implementation& impl, int backlog)
383 {
384 1167x return static_cast<kqueue_tcp_acceptor*>(&impl)->do_listen(backlog);
385 }
386
387 inline void
388 9x kqueue_tcp_acceptor_service::post(scheduler_op* op)
389 {
390 9x state_->sched_.post(op);
391 9x }
392
393 inline void
394 5625x kqueue_tcp_acceptor_service::work_started() noexcept
395 {
396 5625x state_->sched_.work_started();
397 5625x }
398
399 inline void
400 9x kqueue_tcp_acceptor_service::work_finished() noexcept
401 {
402 9x state_->sched_.work_finished();
403 9x }
404
405 inline kqueue_tcp_service*
406 5616x kqueue_tcp_acceptor_service::tcp_service() const noexcept
407 {
408 5616x auto* svc = ctx_.find_service<detail::tcp_service>();
409
2/4
✓ Branch 0 taken 5616 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 5616 times.
✗ Branch 3 not taken.
5616x return svc ? dynamic_cast<kqueue_tcp_service*>(svc) : nullptr;
410 }
411
412 } // namespace boost::corosio::detail
413
414 #endif // BOOST_COROSIO_HAS_KQUEUE
415
416 #endif // BOOST_COROSIO_NATIVE_DETAIL_KQUEUE_KQUEUE_TCP_ACCEPTOR_SERVICE_HPP
417