include/boost/corosio/native/detail/kqueue/kqueue_tcp_acceptor_service.hpp

56.9% Lines (111/195) 100.0% List of functions (20/20) 26.4% Branches (19/72)
kqueue_tcp_acceptor_service.hpp
f(x) Functions (20)
Function Calls Lines Branches Blocks
boost::corosio::detail::kqueue_tcp_acceptor_service::scheduler() const :76 2016x 100.0% 100.0% boost::corosio::detail::kqueue_accept_op::cancel() :93 6x 80.0% 50.0% 75.0% boost::corosio::detail::kqueue_accept_op::operator()() :102 5316x 100.0% 100.0% boost::corosio::detail::kqueue_tcp_acceptor::kqueue_tcp_acceptor(boost::corosio::detail::kqueue_tcp_acceptor_service&) :107 2034x 100.0% 100.0% boost::corosio::detail::kqueue_tcp_acceptor::accept(std::__1::coroutine_handle<void>, boost::capy::executor_ref, std::__1::stop_token, std::__1::error_code*, boost::corosio::io_object::implementation**) :114 5316x 25.8% 12.5% 18.0% boost::corosio::detail::kqueue_tcp_acceptor::cancel() :259 2x 100.0% 100.0% boost::corosio::detail::kqueue_tcp_acceptor::close_socket() :265 4062x 100.0% 100.0% boost::corosio::detail::kqueue_tcp_acceptor_service::kqueue_tcp_acceptor_service(boost::capy::execution_context&, boost::corosio::detail::kqueue_tcp_service&) :270 874x 100.0% 50.0% 100.0% boost::corosio::detail::kqueue_tcp_acceptor_service::~kqueue_tcp_acceptor_service() :279 1311x 100.0% 100.0% boost::corosio::detail::kqueue_tcp_acceptor_service::shutdown() :282 437x 80.0% 50.0% 75.0% boost::corosio::detail::kqueue_tcp_acceptor_service::construct() :291 1017x 100.0% 50.0% 42.0% boost::corosio::detail::kqueue_tcp_acceptor_service::destroy(boost::corosio::io_object::implementation*) :304 1017x 100.0% 50.0% 50.0% boost::corosio::detail::kqueue_tcp_acceptor_service::close(boost::corosio::io_object::handle&) :314 2031x 100.0% 100.0% boost::corosio::detail::kqueue_tcp_acceptor_service::open_acceptor_socket(boost::corosio::tcp_acceptor::implementation&, int, int, int) :320 1014x 68.8% 60.0% 66.0% boost::corosio::detail::kqueue_tcp_acceptor_service::bind_acceptor(boost::corosio::tcp_acceptor::implementation&, boost::corosio::endpoint) :376 1013x 100.0% 100.0% boost::corosio::detail::kqueue_tcp_acceptor_service::listen_acceptor(boost::corosio::tcp_acceptor::implementation&, int) :383 1008x 100.0% 100.0% boost::corosio::detail::kqueue_tcp_acceptor_service::post(boost::corosio::detail::scheduler_op*) :390 9x 100.0% 100.0% boost::corosio::detail::kqueue_tcp_acceptor_service::work_started() :396 5316x 100.0% 100.0% boost::corosio::detail::kqueue_tcp_acceptor_service::work_finished() :402 9x 100.0% 100.0% boost::corosio::detail::kqueue_tcp_acceptor_service::tcp_service() const :408 5307x 100.0% 100.0%
Line Branch TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Michael Vandeberg
3 // Copyright (c) 2026 Steve Gerbino
4 //
5 // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 //
8 // Official repository: https://github.com/cppalliance/corosio
9 //
10
11 #ifndef BOOST_COROSIO_NATIVE_DETAIL_KQUEUE_KQUEUE_TCP_ACCEPTOR_SERVICE_HPP
12 #define BOOST_COROSIO_NATIVE_DETAIL_KQUEUE_KQUEUE_TCP_ACCEPTOR_SERVICE_HPP
13
14 #include <boost/corosio/detail/platform.hpp>
15
16 #if BOOST_COROSIO_HAS_KQUEUE
17
18 #include <boost/corosio/detail/config.hpp>
19 #include <boost/capy/ex/execution_context.hpp>
20 #include <boost/corosio/detail/tcp_acceptor_service.hpp>
21
22 #include <boost/corosio/native/detail/kqueue/kqueue_tcp_acceptor.hpp>
23 #include <boost/corosio/native/detail/kqueue/kqueue_tcp_service.hpp>
24 #include <boost/corosio/native/detail/kqueue/kqueue_scheduler.hpp>
25 #include <boost/corosio/native/detail/reactor/reactor_service_state.hpp>
26
27 #include <boost/corosio/native/detail/reactor/reactor_op_complete.hpp>
28
29 #include <memory>
30 #include <mutex>
31 #include <utility>
32
33 #include <errno.h>
34 #include <fcntl.h>
35 #include <netinet/in.h>
36 #include <sys/socket.h>
37 #include <unistd.h>
38
39 namespace boost::corosio::detail {
40
41 /// State for kqueue acceptor service.
42 using kqueue_tcp_acceptor_state =
43 reactor_service_state<kqueue_scheduler, kqueue_tcp_acceptor>;
44
45 /** kqueue acceptor service implementation.
46
47 Inherits from tcp_acceptor_service to enable runtime polymorphism.
48 Uses key_type = tcp_acceptor_service for service lookup.
49 */
50 class BOOST_COROSIO_DECL kqueue_tcp_acceptor_service final
51 : public tcp_acceptor_service
52 {
53 public:
54 explicit kqueue_tcp_acceptor_service(
55 capy::execution_context& ctx, kqueue_tcp_service& tcp_svc);
56 ~kqueue_tcp_acceptor_service();
57
58 kqueue_tcp_acceptor_service(kqueue_tcp_acceptor_service const&) = delete;
59 kqueue_tcp_acceptor_service&
60 operator=(kqueue_tcp_acceptor_service const&) = delete;
61
62 void shutdown() override;
63 io_object::implementation* construct() override;
64 void destroy(io_object::implementation*) override;
65 void close(io_object::handle&) override;
66 std::error_code open_acceptor_socket(
67 tcp_acceptor::implementation& impl,
68 int family,
69 int type,
70 int protocol) override;
71 std::error_code
72 bind_acceptor(tcp_acceptor::implementation& impl, endpoint ep) override;
73 std::error_code
74 listen_acceptor(tcp_acceptor::implementation& impl, int backlog) override;
75
76 2016x kqueue_scheduler& scheduler() const noexcept
77 {
78 2016x return state_->sched_;
79 }
80 void post(scheduler_op* op);
81 void work_started() noexcept;
82 void work_finished() noexcept;
83
84 /** Get the TCP service for creating peer sockets during accept. */
85 kqueue_tcp_service* tcp_service() const noexcept;
86
87 private:
88 kqueue_tcp_service* tcp_svc_;
89 std::unique_ptr<kqueue_tcp_acceptor_state> state_;
90 };
91
92 inline void
93 6x kqueue_accept_op::cancel() noexcept
94 {
95
1/2
✓ Branch 0 taken 6 times.
✗ Branch 1 not taken.
6x if (acceptor_impl_)
96 6x acceptor_impl_->cancel_single_op(*this);
97 else
98 request_cancel();
99 6x }
100
101 inline void
102 5316x kqueue_accept_op::operator()()
103 {
104 5316x complete_accept_op<kqueue_tcp_socket>(*this);
105 5316x }
106
107 2034x inline kqueue_tcp_acceptor::kqueue_tcp_acceptor(
108 kqueue_tcp_acceptor_service& svc) noexcept
109 1017x : reactor_acceptor(svc)
110 2034x {
111 2034x }
112
113 inline std::coroutine_handle<>
114 5316x kqueue_tcp_acceptor::accept(
115 std::coroutine_handle<> h,
116 capy::executor_ref ex,
117 std::stop_token token,
118 std::error_code* ec,
119 io_object::implementation** impl_out)
120 {
121 5316x auto& op = acc_;
122 5316x op.reset();
123 5316x op.h = h;
124 5316x op.ex = ex;
125 5316x op.ec_out = ec;
126 5316x op.impl_out = impl_out;
127 5316x op.fd = fd_;
128 5316x op.start(token, this);
129
130 5316x sockaddr_storage peer_storage{};
131 5316x socklen_t addrlen = sizeof(peer_storage);
132
133 // FreeBSD: Can use accept4(fd_, addr, addrlen, SOCK_NONBLOCK | SOCK_CLOEXEC)
134 5316x int accepted =
135 5316x ::accept(fd_, reinterpret_cast<sockaddr*>(&peer_storage), &addrlen);
136
137
1/2
✓ Branch 0 taken 5316 times.
✗ Branch 1 not taken.
5316x if (accepted >= 0)
138 {
139 // Set non-blocking and close-on-exec on the accepted socket
140 int flags = ::fcntl(accepted, F_GETFL, 0);
141 if (flags == -1 || ::fcntl(accepted, F_SETFL, flags | O_NONBLOCK) == -1)
142 {
143 int errn = errno;
144 ::close(accepted);
145 op.complete(errn, 0);
146 op.impl_ptr = shared_from_this();
147 svc_.post(&op);
148 return std::noop_coroutine();
149 }
150 if (::fcntl(accepted, F_SETFD, FD_CLOEXEC) == -1)
151 {
152 int errn = errno;
153 ::close(accepted);
154 op.complete(errn, 0);
155 op.impl_ptr = shared_from_this();
156 svc_.post(&op);
157 return std::noop_coroutine();
158 }
159
160 // SO_NOSIGPIPE before budget check so both inline and
161 // queued paths have it applied (macOS lacks MSG_NOSIGNAL)
162 int one = 1;
163 if (::setsockopt(
164 accepted, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one)) == -1)
165 {
166 int errn = errno;
167 ::close(accepted);
168 op.complete(errn, 0);
169 op.impl_ptr = shared_from_this();
170 svc_.post(&op);
171 return std::noop_coroutine();
172 }
173
174 {
175 std::lock_guard lock(desc_state_.mutex);
176 desc_state_.read_ready = false;
177 }
178
179 if (svc_.scheduler().try_consume_inline_budget())
180 {
181 auto* socket_svc = svc_.tcp_service();
182 if (socket_svc)
183 {
184 auto& impl =
185 static_cast<kqueue_tcp_socket&>(*socket_svc->construct());
186 impl.set_socket(accepted);
187
188 impl.desc_state_.fd = accepted;
189 {
190 std::lock_guard lock(impl.desc_state_.mutex);
191 impl.desc_state_.read_op = nullptr;
192 impl.desc_state_.write_op = nullptr;
193 impl.desc_state_.connect_op = nullptr;
194 }
195 socket_svc->scheduler().register_descriptor(
196 accepted, &impl.desc_state_);
197
198 impl.set_endpoints(
199 local_endpoint_, from_sockaddr(peer_storage));
200
201 *ec = {};
202 if (impl_out)
203 *impl_out = &impl;
204 }
205 else
206 {
207 ::close(accepted);
208 *ec = make_err(ENOENT);
209 if (impl_out)
210 *impl_out = nullptr;
211 }
212 op.cont_op.cont.h = h;
213 return dispatch_coro(ex, op.cont_op.cont);
214 }
215
216 op.accepted_fd = accepted;
217 op.peer_storage = peer_storage;
218 op.complete(0, 0);
219 op.impl_ptr = shared_from_this();
220 svc_.post(&op);
221 return std::noop_coroutine();
222 }
223
224
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 5316 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
5316x if (errno == EAGAIN || errno == EWOULDBLOCK)
225 {
226
1/2
✓ Branch 0 taken 5316 times.
✗ Branch 1 not taken.
5316x op.impl_ptr = shared_from_this();
227 5316x svc_.work_started();
228
229 5316x std::lock_guard lock(desc_state_.mutex);
230 5316x bool io_done = false;
231
1/2
✓ Branch 0 taken 5316 times.
✗ Branch 1 not taken.
5316x if (desc_state_.read_ready)
232 {
233 desc_state_.read_ready = false;
234 op.perform_io();
235 io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
236 if (!io_done)
237 op.errn = 0;
238 }
239
240
2/4
✓ Branch 0 taken 5316 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 5316 times.
5316x if (io_done || op.cancelled.load(std::memory_order_acquire))
241 {
242 svc_.post(&op);
243 svc_.work_finished();
244 }
245 else
246 {
247 5316x desc_state_.read_op = &op;
248 }
249 5316x return std::noop_coroutine();
250 5316x }
251
252 op.complete(errno, 0);
253 op.impl_ptr = shared_from_this();
254 svc_.post(&op);
255 return std::noop_coroutine();
256 5316x }
257
258 inline void
259 2x kqueue_tcp_acceptor::cancel() noexcept
260 {
261 2x do_cancel();
262 2x }
263
264 inline void
265 4062x kqueue_tcp_acceptor::close_socket() noexcept
266 {
267 4062x do_close_socket();
268 4062x }
269
270 874x inline kqueue_tcp_acceptor_service::kqueue_tcp_acceptor_service(
271 capy::execution_context& ctx, kqueue_tcp_service& tcp_svc)
272 437x : tcp_svc_(&tcp_svc)
273 437x , state_(
274
1/2
✓ Branch 0 taken 437 times.
✗ Branch 1 not taken.
437x std::make_unique<kqueue_tcp_acceptor_state>(
275
1/2
✓ Branch 0 taken 437 times.
✗ Branch 1 not taken.
437x ctx.use_service<kqueue_scheduler>()))
276 874x {
277 874x }
278
279 1311x inline kqueue_tcp_acceptor_service::~kqueue_tcp_acceptor_service() = default;
280
281 inline void
282 437x kqueue_tcp_acceptor_service::shutdown()
283 {
284 437x std::lock_guard lock(state_->mutex_);
285
286
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 437 times.
437x while (auto* impl = state_->impl_list_.pop_front())
287 impl->close_socket();
288 437x }
289
290 inline io_object::implementation*
291 1017x kqueue_tcp_acceptor_service::construct()
292 {
293 1017x auto impl = std::make_shared<kqueue_tcp_acceptor>(*this);
294 1017x auto* raw = impl.get();
295
296
1/2
✓ Branch 0 taken 1017 times.
✗ Branch 1 not taken.
1017x std::lock_guard lock(state_->mutex_);
297
1/2
✓ Branch 0 taken 1017 times.
✗ Branch 1 not taken.
1017x state_->impl_ptrs_.emplace(raw, std::move(impl));
298 1017x state_->impl_list_.push_back(raw);
299
300 1017x return raw;
301 1017x }
302
303 inline void
304 1017x kqueue_tcp_acceptor_service::destroy(io_object::implementation* impl)
305 {
306 1017x auto* kq_impl = static_cast<kqueue_tcp_acceptor*>(impl);
307 1017x kq_impl->close_socket();
308 1017x std::lock_guard lock(state_->mutex_);
309 1017x state_->impl_list_.remove(kq_impl);
310
1/2
✓ Branch 0 taken 1017 times.
✗ Branch 1 not taken.
1017x state_->impl_ptrs_.erase(kq_impl);
311 1017x }
312
313 inline void
314 2031x kqueue_tcp_acceptor_service::close(io_object::handle& h)
315 {
316 2031x static_cast<kqueue_tcp_acceptor*>(h.get())->close_socket();
317 2031x }
318
319 inline std::error_code
320 1014x kqueue_tcp_acceptor_service::open_acceptor_socket(
321 tcp_acceptor::implementation& impl, int family, int type, int protocol)
322 {
323 1014x auto* kq_impl = static_cast<kqueue_tcp_acceptor*>(&impl);
324 1014x kq_impl->close_socket();
325
326 1014x int fd = ::socket(family, type, protocol);
327
1/2
✓ Branch 0 taken 1014 times.
✗ Branch 1 not taken.
1014x if (fd < 0)
328 return make_err(errno);
329
330 // Set non-blocking and close-on-exec
331 1014x int flags = ::fcntl(fd, F_GETFL, 0);
332
1/2
✓ Branch 0 taken 1014 times.
✗ Branch 1 not taken.
1014x if (flags == -1)
333 {
334 int errn = errno;
335 ::close(fd);
336 return make_err(errn);
337 }
338
1/2
✓ Branch 0 taken 1014 times.
✗ Branch 1 not taken.
1014x if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
339 {
340 int errn = errno;
341 ::close(fd);
342 return make_err(errn);
343 }
344
1/2
✓ Branch 0 taken 1014 times.
✗ Branch 1 not taken.
1014x if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
345 {
346 int errn = errno;
347 ::close(fd);
348 return make_err(errn);
349 }
350
351
2/2
✓ Branch 0 taken 1006 times.
✓ Branch 1 taken 8 times.
1014x if (family == AF_INET6)
352 {
353 8x int val = 0; // dual-stack default
354 8x ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof(val));
355 8x }
356
357 // SO_NOSIGPIPE on macOS (where MSG_NOSIGNAL doesn't exist)
358 #ifdef SO_NOSIGPIPE
359 1014x int nosig = 1;
360 1014x ::setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &nosig, sizeof(nosig));
361 #endif
362
363 1014x kq_impl->fd_ = fd;
364
365 // Set up descriptor state but do NOT register with kqueue yet
366 1014x kq_impl->desc_state_.fd = fd;
367 {
368 1014x std::lock_guard lock(kq_impl->desc_state_.mutex);
369 1014x kq_impl->desc_state_.read_op = nullptr;
370 1014x }
371
372 1014x return {};
373 1014x }
374
375 inline std::error_code
376 1013x kqueue_tcp_acceptor_service::bind_acceptor(
377 tcp_acceptor::implementation& impl, endpoint ep)
378 {
379 1013x return static_cast<kqueue_tcp_acceptor*>(&impl)->do_bind(ep);
380 }
381
382 inline std::error_code
383 1008x kqueue_tcp_acceptor_service::listen_acceptor(
384 tcp_acceptor::implementation& impl, int backlog)
385 {
386 1008x return static_cast<kqueue_tcp_acceptor*>(&impl)->do_listen(backlog);
387 }
388
389 inline void
390 9x kqueue_tcp_acceptor_service::post(scheduler_op* op)
391 {
392 9x state_->sched_.post(op);
393 9x }
394
395 inline void
396 5316x kqueue_tcp_acceptor_service::work_started() noexcept
397 {
398 5316x state_->sched_.work_started();
399 5316x }
400
401 inline void
402 9x kqueue_tcp_acceptor_service::work_finished() noexcept
403 {
404 9x state_->sched_.work_finished();
405 9x }
406
407 inline kqueue_tcp_service*
408 5307x kqueue_tcp_acceptor_service::tcp_service() const noexcept
409 {
410 5307x return tcp_svc_;
411 }
412
413 } // namespace boost::corosio::detail
414
415 #endif // BOOST_COROSIO_HAS_KQUEUE
416
417 #endif // BOOST_COROSIO_NATIVE_DETAIL_KQUEUE_KQUEUE_TCP_ACCEPTOR_SERVICE_HPP
418