include/boost/corosio/native/detail/kqueue/kqueue_socket_service.hpp

80.3% Lines (431/537) 97.4% Functions (38/39) 53.1% Branches (170/320)
include/boost/corosio/native/detail/kqueue/kqueue_socket_service.hpp
Line Branch TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Michael Vandeberg
3 // Copyright (c) 2026 Steve Gerbino
4 //
5 // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 //
8 // Official repository: https://github.com/cppalliance/corosio
9 //
10
11 #ifndef BOOST_COROSIO_NATIVE_DETAIL_KQUEUE_KQUEUE_SOCKET_SERVICE_HPP
12 #define BOOST_COROSIO_NATIVE_DETAIL_KQUEUE_KQUEUE_SOCKET_SERVICE_HPP
13
14 #include <boost/corosio/detail/platform.hpp>
15
16 #if BOOST_COROSIO_HAS_KQUEUE
17
18 #include <boost/corosio/detail/config.hpp>
19 #include <boost/capy/ex/execution_context.hpp>
20 #include <boost/corosio/detail/socket_service.hpp>
21
22 #include <boost/corosio/native/detail/kqueue/kqueue_socket.hpp>
23 #include <boost/corosio/native/detail/kqueue/kqueue_scheduler.hpp>
24
25 #include <boost/corosio/detail/endpoint_convert.hpp>
26 #include <boost/corosio/detail/dispatch_coro.hpp>
27 #include <boost/corosio/detail/make_err.hpp>
28 #include <boost/corosio/detail/except.hpp>
29 #include <boost/capy/buffers.hpp>
30
31 #include <coroutine>
32 #include <memory>
33 #include <mutex>
34 #include <unordered_map>
35 #include <utility>
36
37 #include <errno.h>
38 #include <fcntl.h>
39 #include <netinet/in.h>
40 #include <netinet/tcp.h>
41 #include <sys/socket.h>
42 #include <unistd.h>
43
44 /*
45 kqueue Socket Implementation
46 ============================
47
48 Each I/O operation follows the same pattern:
49 1. Try the syscall speculatively (readv/writev) before suspending
50 2. On success, return via symmetric transfer (the "pump" fast path)
51 3. On budget exhaustion, post to the scheduler queue for fairness
52 4. On EAGAIN, register_op() parks the op in the descriptor_state
53
54 The speculative path avoids scheduler queue, mutex, and reactor
55 round-trips entirely. An inline budget limits consecutive inline
56 completions to prevent starvation of other connections.
57
58 Cancellation
59 ------------
60 See op.hpp for the completion/cancellation race handling via the
61 descriptor_state mutex. cancel() must complete pending operations (post
62 them with cancelled flag) so coroutines waiting on them can resume.
63 close_socket() calls cancel() first to ensure this.
64
65 Impl Lifetime with shared_ptr
66 -----------------------------
67 Socket impls use enable_shared_from_this. The service owns impls via
68 shared_ptr maps (socket_ptrs_) keyed by raw pointer for O(1) lookup and
69 removal. When a user calls close(), we call cancel() which posts pending
70 ops to the scheduler.
71
72 CRITICAL: The posted ops must keep the impl alive until they complete.
73 Otherwise the scheduler would process a freed op (use-after-free). The
74 cancel() method captures shared_from_this() into op.impl_ptr before
75 posting. When the op completes, impl_ptr is cleared, allowing the impl
76 to be destroyed if no other references exist.
77
78 Service Ownership
79 -----------------
80 kqueue_socket_service owns all socket impls. destroy_impl() removes the
81 shared_ptr from the map, but the impl may survive if ops still hold
82 impl_ptr refs. shutdown() closes all sockets and clears the map; any
83 in-flight ops will complete and release their refs.
84 */
85
86 /*
87 kqueue socket implementation
88 ============================
89
90 Each kqueue_socket owns a descriptor_state that is persistently
91 registered with kqueue (EVFILT_READ + EVFILT_WRITE, both EV_CLEAR for
92 edge-triggered semantics). The descriptor_state tracks three operation
93 slots (read_op, write_op, connect_op) and two ready flags
94 (read_ready, write_ready) under a per-descriptor mutex.
95
96 Speculative I/O and the pump
97 ----------------------------
98 read_some() and write_some() attempt the syscall (readv/writev)
99 speculatively before suspending the caller. If data is available the
100 result is returned via symmetric transfer — no scheduler queue, no
101 mutex, no reactor round-trip. An inline budget limits consecutive
102 inline completions to prevent starvation of other connections.
103
104 When the speculative attempt returns EAGAIN, register_op() parks the
105 operation in its descriptor_state slot under the per-descriptor mutex.
106 If a cached ready flag fires before parking, register_op() retries
107 the I/O once under the mutex. This eliminates the cached_initiator
108 coroutine frame that previously trampolined into do_read_io() /
109 do_write_io() after the caller suspended.
110
111 Ready-flag protocol
112 -------------------
113 When a kqueue event fires and no operation is pending for that
114 direction, the reactor sets the corresponding ready flag instead of
115 dropping the event. When register_op() finds the ready flag set, it
116 performs I/O immediately rather than parking. This prevents lost
117 wakeups under edge-triggered notification.
118 */
119
120 namespace boost::corosio::detail {
121
122 /** State for kqueue socket service. */
123 class kqueue_socket_state
124 {
125 public:
126 897 explicit kqueue_socket_state(kqueue_scheduler& sched) noexcept
127 299 : sched_(sched)
128 299 {
129 598 }
130
131 kqueue_scheduler& sched_;
132 std::mutex mutex_;
133 intrusive_list<kqueue_socket> socket_list_;
134 std::unordered_map<kqueue_socket*, std::shared_ptr<kqueue_socket>>
135 socket_ptrs_;
136 };
137
138 /** kqueue socket service implementation.
139
140 Inherits from socket_service to enable runtime polymorphism.
141 Uses key_type = socket_service for service lookup.
142 */
143 class BOOST_COROSIO_DECL kqueue_socket_service final : public socket_service
144 {
145 public:
146 explicit kqueue_socket_service(capy::execution_context& ctx);
147 ~kqueue_socket_service();
148
149 kqueue_socket_service(kqueue_socket_service const&) = delete;
150 kqueue_socket_service& operator=(kqueue_socket_service const&) = delete;
151
152 void shutdown() override;
153
154 io_object::implementation* construct() override;
155 void destroy(io_object::implementation*) override;
156 void close(io_object::handle&) override;
157 std::error_code open_socket(tcp_socket::implementation& impl) override;
158
159 765861 kqueue_scheduler& scheduler() const noexcept
160 {
161 765861 return state_->sched_;
162 }
163 void post(kqueue_op* op);
164 void work_started() noexcept;
165 void work_finished() noexcept;
166
167 private:
168 std::unique_ptr<kqueue_socket_state> state_;
169 };
170
171 // -- Implementation ---------------------------------------------------------
172
173 inline void
174 660 kqueue_op::canceller::operator()() const noexcept
175 {
176 660 op->cancel();
177 660 }
178
179 inline void
180 kqueue_connect_op::cancel() noexcept
181 {
182 if (socket_impl_)
183 socket_impl_->cancel_single_op(*this);
184 else
185 request_cancel();
186 }
187
188 inline void
189 653 kqueue_read_op::cancel() noexcept
190 {
191
1/2
✓ Branch 0 taken 653 times.
✗ Branch 1 not taken.
653 if (socket_impl_)
192 653 socket_impl_->cancel_single_op(*this);
193 else
194 request_cancel();
195 653 }
196
197 inline void
198 1 kqueue_write_op::cancel() noexcept
199 {
200
1/2
✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
1 if (socket_impl_)
201 1 socket_impl_->cancel_single_op(*this);
202 else
203 request_cancel();
204 1 }
205
206 inline void
207 191840 kqueue_op::operator()()
208 {
209 191840 stop_cb.reset();
210
211 191840 socket_impl_->desc_state_.scheduler_->reset_inline_budget();
212
213
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 191840 times.
191840 if (ec_out)
214 {
215
2/2
✓ Branch 0 taken 801 times.
✓ Branch 1 taken 191039 times.
191840 if (cancelled.load(std::memory_order_acquire))
216 801 *ec_out = capy::error::canceled;
217
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 191036 times.
191039 else if (errn != 0)
218 3 *ec_out = make_err(errn);
219
4/4
✓ Branch 0 taken 107982 times.
✓ Branch 1 taken 83054 times.
✓ Branch 2 taken 107979 times.
✓ Branch 3 taken 3 times.
191036 else if (is_read_operation() && bytes_transferred == 0)
220 3 *ec_out = capy::error::eof;
221 else
222 191033 *ec_out = {};
223 191840 }
224
225
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 191840 times.
191840 if (bytes_out)
226 191840 *bytes_out = bytes_transferred;
227
228 // Move to stack before resuming coroutine. The coroutine might close
229 // the socket, releasing the last wrapper ref. If impl_ptr were the
230 // last ref and we destroyed it while still in operator(), we'd have
231 // use-after-free. Moving to local ensures destruction happens at
232 // function exit, after all member accesses are complete.
233 191840 capy::executor_ref saved_ex(std::move(ex));
234 191840 std::coroutine_handle<> saved_h(std::move(h));
235 191840 auto prevent_premature_destruction = std::move(impl_ptr);
236
2/4
✓ Branch 0 taken 191840 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 191840 times.
✗ Branch 3 not taken.
191840 dispatch_coro(saved_ex, saved_h).resume();
237 191840 }
238
239 inline void
240 5437 kqueue_connect_op::operator()()
241 {
242 5437 stop_cb.reset();
243
244 5437 socket_impl_->desc_state_.scheduler_->reset_inline_budget();
245
246
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 5436 times.
5437 bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
247
248 // Cache endpoints on successful connect
249
3/4
✓ Branch 0 taken 5436 times.
✓ Branch 1 taken 1 time.
✗ Branch 2 not taken.
✓ Branch 3 taken 5436 times.
5437 if (success && socket_impl_)
250 {
251 // Query local endpoint via getsockname (may fail, but remote is always known)
252 5436 endpoint local_ep;
253 5436 sockaddr_in local_addr{};
254 5436 socklen_t local_len = sizeof(local_addr);
255
2/4
✗ Branch 0 not taken.
✓ Branch 1 taken 5436 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 5436 times.
10872 if (::getsockname(
256 10872 fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
257 5436 local_ep = from_sockaddr_in(local_addr);
258 // Always cache remote endpoint; local may be default if getsockname failed
259 5436 static_cast<kqueue_socket*>(socket_impl_)
260 5436 ->set_endpoints(local_ep, target_endpoint);
261 5436 }
262
263
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 5437 times.
5437 if (ec_out)
264 {
265
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 5437 times.
5437 if (cancelled.load(std::memory_order_acquire))
266 *ec_out = capy::error::canceled;
267
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 5436 times.
5437 else if (errn != 0)
268 1 *ec_out = make_err(errn);
269 else
270 5436 *ec_out = {};
271 5437 }
272
273
1/2
✓ Branch 0 taken 5437 times.
✗ Branch 1 not taken.
5437 if (bytes_out)
274 *bytes_out = bytes_transferred;
275
276 // Move to stack before resuming. See kqueue_op::operator()() for rationale.
277 5437 capy::executor_ref saved_ex(std::move(ex));
278 5437 std::coroutine_handle<> saved_h(std::move(h));
279 5437 auto prevent_premature_destruction = std::move(impl_ptr);
280
2/4
✓ Branch 0 taken 5437 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 5437 times.
✗ Branch 3 not taken.
5437 dispatch_coro(saved_ex, saved_h).resume();
281 5437 }
282
283 65640 inline kqueue_socket::kqueue_socket(kqueue_socket_service& svc) noexcept
284 16410 : svc_(svc)
285 49230 {
286 16410 }
287
288 32820 inline kqueue_socket::~kqueue_socket() = default;
289
290 inline std::coroutine_handle<>
291 5437 kqueue_socket::connect(
292 std::coroutine_handle<> h,
293 capy::executor_ref ex,
294 endpoint ep,
295 std::stop_token token,
296 std::error_code* ec)
297 {
298 5437 auto& op = conn_;
299
300 5437 sockaddr_in addr = detail::to_sockaddr_in(ep);
301 5437 int result =
302 5437 ::connect(fd_, reinterpret_cast<sockaddr*>(&addr), sizeof(addr));
303
304 // Cache endpoints on sync success
305
1/2
✓ Branch 0 taken 5437 times.
✗ Branch 1 not taken.
5437 if (result == 0)
306 {
307 sockaddr_in local_addr{};
308 socklen_t local_len = sizeof(local_addr);
309 if (::getsockname(
310 fd_, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
311 local_endpoint_ = detail::from_sockaddr_in(local_addr);
312 remote_endpoint_ = ep;
313 }
314
315
2/4
✓ Branch 0 taken 5437 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 5437 times.
5437 if (result == 0 || errno != EINPROGRESS)
316 {
317 int err = (result < 0) ? errno : 0;
318
319 if (svc_.scheduler().try_consume_inline_budget())
320 {
321 *ec = err ? make_err(err) : std::error_code{};
322 return dispatch_coro(ex, h);
323 }
324
325 // Budget exhausted — post through queue
326 op.reset();
327 op.h = h;
328 op.ex = ex;
329 op.ec_out = ec;
330 op.fd = fd_;
331 op.target_endpoint = ep;
332 op.start(token, this);
333 op.impl_ptr = shared_from_this();
334 op.complete(err, 0);
335 svc_.post(&op);
336 return std::noop_coroutine();
337 }
338
339 // EINPROGRESS — async path
340 5437 op.reset();
341 5437 op.h = h;
342 5437 op.ex = ex;
343 5437 op.ec_out = ec;
344 5437 op.fd = fd_;
345 5437 op.target_endpoint = ep;
346
1/2
✓ Branch 0 taken 5437 times.
✗ Branch 1 not taken.
5437 op.start(token, this);
347 5437 op.impl_ptr = shared_from_this();
348
349 5437 register_op(
350 5437 op, desc_state_.connect_op, desc_state_.write_ready,
351 5437 desc_state_.connect_cancel_pending);
352 5437 return std::noop_coroutine();
353 5437 }
354
355 // Register an op with the reactor, handling cached edge events.
356 // Called under the EAGAIN path when speculative I/O failed.
357 inline void
358 54747 kqueue_socket::register_op(
359 kqueue_op& op,
360 kqueue_op*& desc_slot,
361 bool& ready_flag,
362 bool& cancel_flag) noexcept
363 {
364 54747 svc_.work_started();
365
366
1/2
✓ Branch 0 taken 54747 times.
✗ Branch 1 not taken.
54747 std::lock_guard lock(desc_state_.mutex);
367 54747 bool io_done = false;
368
2/2
✓ Branch 0 taken 54403 times.
✓ Branch 1 taken 344 times.
54747 if (ready_flag)
369 {
370 344 ready_flag = false;
371 344 op.perform_io();
372
2/2
✓ Branch 0 taken 335 times.
✓ Branch 1 taken 9 times.
344 io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
373
2/2
✓ Branch 0 taken 9 times.
✓ Branch 1 taken 335 times.
344 if (!io_done)
374 335 op.errn = 0;
375 344 }
376
377
2/2
✓ Branch 0 taken 54624 times.
✓ Branch 1 taken 123 times.
54747 if (cancel_flag)
378 {
379 123 cancel_flag = false;
380 123 op.cancelled.store(true, std::memory_order_relaxed);
381 123 }
382
383
4/4
✓ Branch 0 taken 54738 times.
✓ Branch 1 taken 9 times.
✓ Branch 2 taken 118 times.
✓ Branch 3 taken 54620 times.
54747 if (io_done || op.cancelled.load(std::memory_order_acquire))
384 {
385
1/2
✓ Branch 0 taken 127 times.
✗ Branch 1 not taken.
127 svc_.post(&op);
386 127 svc_.work_finished();
387 127 }
388 else
389 {
390 54620 desc_slot = &op;
391 }
392 54747 }
393
394 inline std::coroutine_handle<>
395 356722 kqueue_socket::read_some(
396 std::coroutine_handle<> h,
397 capy::executor_ref ex,
398 io_buffer_param param,
399 std::stop_token token,
400 std::error_code* ec,
401 std::size_t* bytes_out)
402 {
403 356722 auto& op = rd_;
404 356722 op.reset();
405
406 356722 capy::mutable_buffer bufs[kqueue_read_op::max_buffers];
407 356722 op.iovec_count =
408 356722 static_cast<int>(param.copy_to(bufs, kqueue_read_op::max_buffers));
409
410
4/6
✓ Branch 0 taken 356721 times.
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 356721 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 356721 times.
✗ Branch 5 not taken.
356722 if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
411 {
412 1 op.empty_buffer_read = true;
413 1 op.h = h;
414 1 op.ex = ex;
415 1 op.ec_out = ec;
416 1 op.bytes_out = bytes_out;
417
1/2
✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
1 op.start(token, this);
418
1/2
✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
1 op.impl_ptr = shared_from_this();
419 1 op.complete(0, 0);
420 1 svc_.post(&op);
421 1 return std::noop_coroutine();
422 }
423
424
2/2
✓ Branch 0 taken 356721 times.
✓ Branch 1 taken 356721 times.
713442 for (int i = 0; i < op.iovec_count; ++i)
425 {
426 356721 op.iovecs[i].iov_base = bufs[i].data();
427 356721 op.iovecs[i].iov_len = bufs[i].size();
428 356721 }
429
430 // Speculative read: try I/O before suspending. On success, return via
431 // symmetric transfer without touching the scheduler queue — this creates
432 // a tight pump loop for back-to-back reads on a hot socket.
433 // Budget limits consecutive inline completions to prevent starvation
434 // of other connections competing for scheduler time.
435 ssize_t n;
436 356721 do
437 {
438 356721 n = ::readv(fd_, op.iovecs, op.iovec_count);
439 713442 }
440
2/2
✓ Branch 0 taken 307411 times.
✓ Branch 1 taken 49310 times.
356721 while (n < 0 && errno == EINTR);
441
442
3/6
✓ Branch 0 taken 49310 times.
✓ Branch 1 taken 307411 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 49310 times.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
356721 if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
443 {
444
1/2
✓ Branch 0 taken 307411 times.
✗ Branch 1 not taken.
307411 int err = (n < 0) ? errno : 0;
445
2/2
✓ Branch 0 taken 307403 times.
✓ Branch 1 taken 8 times.
307411 auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
446
447
2/2
✓ Branch 0 taken 247942 times.
✓ Branch 1 taken 59469 times.
307411 if (svc_.scheduler().try_consume_inline_budget())
448 {
449
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 247942 times.
247942 if (err)
450 *ec = make_err(err);
451
2/2
✓ Branch 0 taken 247934 times.
✓ Branch 1 taken 8 times.
247942 else if (n == 0)
452 8 *ec = capy::error::eof;
453 else
454 247934 *ec = {};
455 247942 *bytes_out = bytes;
456 247942 return dispatch_coro(ex, h);
457 }
458
459 // Budget exhausted — fall through to queue
460 59469 op.h = h;
461 59469 op.ex = ex;
462 59469 op.ec_out = ec;
463 59469 op.bytes_out = bytes_out;
464
1/2
✓ Branch 0 taken 59469 times.
✗ Branch 1 not taken.
59469 op.start(token, this);
465
1/2
✓ Branch 0 taken 59469 times.
✗ Branch 1 not taken.
59469 op.impl_ptr = shared_from_this();
466 59469 op.complete(err, bytes);
467 59469 svc_.post(&op);
468 59469 return std::noop_coroutine();
469 }
470
471 // EAGAIN — register with reactor
472 49310 op.h = h;
473 49310 op.ex = ex;
474 49310 op.ec_out = ec;
475 49310 op.bytes_out = bytes_out;
476 49310 op.fd = fd_;
477
1/2
✓ Branch 0 taken 49310 times.
✗ Branch 1 not taken.
49310 op.start(token, this);
478
1/2
✓ Branch 0 taken 49310 times.
✗ Branch 1 not taken.
49310 op.impl_ptr = shared_from_this();
479
480 49310 register_op(
481 49310 op, desc_state_.read_op, desc_state_.read_ready,
482 49310 desc_state_.read_cancel_pending);
483 49310 return std::noop_coroutine();
484 356722 }
485
486 inline std::coroutine_handle<>
487 447565 kqueue_socket::write_some(
488 std::coroutine_handle<> h,
489 capy::executor_ref ex,
490 io_buffer_param param,
491 std::stop_token token,
492 std::error_code* ec,
493 std::size_t* bytes_out)
494 {
495 447565 auto& op = wr_;
496 447565 op.reset();
497
498 447565 capy::mutable_buffer bufs[kqueue_write_op::max_buffers];
499 447565 op.iovec_count =
500 447565 static_cast<int>(param.copy_to(bufs, kqueue_write_op::max_buffers));
501
502
4/6
✓ Branch 0 taken 447564 times.
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 447564 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 447564 times.
✗ Branch 5 not taken.
447565 if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
503 {
504 1 op.h = h;
505 1 op.ex = ex;
506 1 op.ec_out = ec;
507 1 op.bytes_out = bytes_out;
508
1/2
✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
1 op.start(token, this);
509 1 op.impl_ptr = shared_from_this();
510 1 op.complete(0, 0);
511 1 svc_.post(&op);
512 1 return std::noop_coroutine();
513 }
514
515
2/2
✓ Branch 0 taken 447564 times.
✓ Branch 1 taken 447564 times.
895128 for (int i = 0; i < op.iovec_count; ++i)
516 {
517 447564 op.iovecs[i].iov_base = bufs[i].data();
518 447564 op.iovecs[i].iov_len = bufs[i].size();
519 447564 }
520
521 // Speculative write: try I/O before suspending. On success, return via
522 // symmetric transfer without touching the scheduler queue — this creates
523 // a tight pump loop for back-to-back writes on a hot socket.
524 // Budget limits consecutive inline completions to prevent starvation.
525 ssize_t n;
526 447564 do
527 {
528 447564 n = ::writev(fd_, op.iovecs, op.iovec_count);
529 895128 }
530
2/2
✓ Branch 0 taken 447563 times.
✓ Branch 1 taken 1 time.
447564 while (n < 0 && errno == EINTR);
531
532
4/6
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 447563 times.
✓ Branch 2 taken 1 time.
✗ Branch 3 not taken.
✓ Branch 4 taken 1 time.
✗ Branch 5 not taken.
447564 if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
533 {
534
2/2
✓ Branch 0 taken 447563 times.
✓ Branch 1 taken 1 time.
447564 int err = (n < 0) ? errno : 0;
535
2/2
✓ Branch 0 taken 447563 times.
✓ Branch 1 taken 1 time.
447564 auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
536
537
2/2
✓ Branch 0 taken 364505 times.
✓ Branch 1 taken 83059 times.
447564 if (svc_.scheduler().try_consume_inline_budget())
538 {
539
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 364504 times.
364505 *ec = err ? make_err(err) : std::error_code{};
540 364505 *bytes_out = bytes;
541 364505 return dispatch_coro(ex, h);
542 }
543
544 // Budget exhausted — fall through to queue
545 83059 op.h = h;
546 83059 op.ex = ex;
547 83059 op.ec_out = ec;
548 83059 op.bytes_out = bytes_out;
549
1/2
✓ Branch 0 taken 83059 times.
✗ Branch 1 not taken.
83059 op.start(token, this);
550 83059 op.impl_ptr = shared_from_this();
551 83059 op.complete(err, bytes);
552 83059 svc_.post(&op);
553 83059 return std::noop_coroutine();
554 }
555
556 // EAGAIN — register with reactor
557 op.h = h;
558 op.ex = ex;
559 op.ec_out = ec;
560 op.bytes_out = bytes_out;
561 op.fd = fd_;
562 op.start(token, this);
563 op.impl_ptr = shared_from_this();
564
565 register_op(
566 op, desc_state_.write_op, desc_state_.write_ready,
567 desc_state_.write_cancel_pending);
568 return std::noop_coroutine();
569 447565 }
570
571 inline std::error_code
572 3 kqueue_socket::shutdown(tcp_socket::shutdown_type what) noexcept
573 {
574 int how;
575
3/4
✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
✓ Branch 2 taken 1 time.
✓ Branch 3 taken 1 time.
3 switch (what)
576 {
577 case tcp_socket::shutdown_receive:
578 1 how = SHUT_RD;
579 1 break;
580 case tcp_socket::shutdown_send:
581 1 how = SHUT_WR;
582 1 break;
583 case tcp_socket::shutdown_both:
584 1 how = SHUT_RDWR;
585 1 break;
586 default:
587 return make_err(EINVAL);
588 }
589
2/4
✓ Branch 0 taken 3 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 3 times.
3 if (::shutdown(fd_, how) != 0)
590 return make_err(errno);
591 3 return {};
592 3 }
593
594 inline std::error_code
595 5 kqueue_socket::set_no_delay(bool value) noexcept
596 {
597 5 int flag = value ? 1 : 0;
598
2/4
✓ Branch 0 taken 5 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 5 times.
5 if (::setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag)) != 0)
599 return make_err(errno);
600 5 return {};
601 5 }
602
603 inline bool
604 5 kqueue_socket::no_delay(std::error_code& ec) const noexcept
605 {
606 5 int flag = 0;
607 5 socklen_t len = sizeof(flag);
608
2/4
✓ Branch 0 taken 5 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 5 times.
5 if (::getsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, &len) != 0)
609 {
610 ec = make_err(errno);
611 return false;
612 }
613 5 ec = {};
614 5 return flag != 0;
615 5 }
616
617 inline std::error_code
618 4 kqueue_socket::set_keep_alive(bool value) noexcept
619 {
620 4 int flag = value ? 1 : 0;
621
2/4
✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 4 times.
4 if (::setsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, sizeof(flag)) != 0)
622 return make_err(errno);
623 4 return {};
624 4 }
625
626 inline bool
627 4 kqueue_socket::keep_alive(std::error_code& ec) const noexcept
628 {
629 4 int flag = 0;
630 4 socklen_t len = sizeof(flag);
631
2/4
✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 4 times.
4 if (::getsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, &len) != 0)
632 {
633 ec = make_err(errno);
634 return false;
635 }
636 4 ec = {};
637 4 return flag != 0;
638 4 }
639
640 inline std::error_code
641 1 kqueue_socket::set_receive_buffer_size(int size) noexcept
642 {
643
2/4
✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 1 time.
1 if (::setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)) != 0)
644 return make_err(errno);
645 1 return {};
646 1 }
647
648 inline int
649 3 kqueue_socket::receive_buffer_size(std::error_code& ec) const noexcept
650 {
651 3 int size = 0;
652 3 socklen_t len = sizeof(size);
653
2/4
✓ Branch 0 taken 3 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 3 times.
3 if (::getsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, &len) != 0)
654 {
655 ec = make_err(errno);
656 return 0;
657 }
658 3 ec = {};
659 3 return size;
660 3 }
661
662 inline std::error_code
663 1 kqueue_socket::set_send_buffer_size(int size) noexcept
664 {
665
2/4
✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 1 time.
1 if (::setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, sizeof(size)) != 0)
666 return make_err(errno);
667 1 return {};
668 1 }
669
670 inline int
671 3 kqueue_socket::send_buffer_size(std::error_code& ec) const noexcept
672 {
673 3 int size = 0;
674 3 socklen_t len = sizeof(size);
675
2/4
✓ Branch 0 taken 3 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 3 times.
3 if (::getsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, &len) != 0)
676 {
677 ec = make_err(errno);
678 return 0;
679 }
680 3 ec = {};
681 3 return size;
682 3 }
683
684 inline std::error_code
685 1718 kqueue_socket::set_linger(bool enabled, int timeout) noexcept
686 {
687
2/2
✓ Branch 0 taken 1717 times.
✓ Branch 1 taken 1 time.
1718 if (timeout < 0)
688 1 return make_err(EINVAL);
689 struct ::linger lg;
690 1717 lg.l_onoff = enabled ? 1 : 0;
691 1717 lg.l_linger = timeout;
692
2/4
✓ Branch 0 taken 1717 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 1717 times.
1717 if (::setsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, sizeof(lg)) != 0)
693 return make_err(errno);
694 1717 user_set_linger_ = true;
695 1717 return {};
696 1718 }
697
698 inline tcp_socket::linger_options
699 3 kqueue_socket::linger(std::error_code& ec) const noexcept
700 {
701 3 struct ::linger lg{};
702 3 socklen_t len = sizeof(lg);
703
2/4
✓ Branch 0 taken 3 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 3 times.
3 if (::getsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, &len) != 0)
704 {
705 ec = make_err(errno);
706 return {};
707 }
708 3 ec = {};
709 3 return {.enabled = lg.l_onoff != 0, .timeout = lg.l_linger};
710 3 }
711
712 inline void
713 213 kqueue_socket::cancel() noexcept
714 {
715 213 auto self = weak_from_this().lock();
716
1/2
✓ Branch 0 taken 213 times.
✗ Branch 1 not taken.
213 if (!self)
717 return;
718
719 213 conn_.request_cancel();
720 213 rd_.request_cancel();
721 213 wr_.request_cancel();
722
723 213 kqueue_op* conn_claimed = nullptr;
724 213 kqueue_op* rd_claimed = nullptr;
725 213 kqueue_op* wr_claimed = nullptr;
726 {
727
1/2
✓ Branch 0 taken 213 times.
✗ Branch 1 not taken.
213 std::lock_guard lock(desc_state_.mutex);
728
1/2
✓ Branch 0 taken 213 times.
✗ Branch 1 not taken.
213 if (desc_state_.connect_op == &conn_)
729 conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
730 else
731 213 desc_state_.connect_cancel_pending = true;
732
2/2
✓ Branch 0 taken 202 times.
✓ Branch 1 taken 11 times.
213 if (desc_state_.read_op == &rd_)
733 11 rd_claimed = std::exchange(desc_state_.read_op, nullptr);
734 else
735 202 desc_state_.read_cancel_pending = true;
736
1/2
✓ Branch 0 taken 213 times.
✗ Branch 1 not taken.
213 if (desc_state_.write_op == &wr_)
737 wr_claimed = std::exchange(desc_state_.write_op, nullptr);
738 else
739 213 desc_state_.write_cancel_pending = true;
740 213 }
741
742
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 213 times.
213 if (conn_claimed)
743 {
744 conn_.impl_ptr = self;
745 svc_.post(&conn_);
746 svc_.work_finished();
747 }
748
2/2
✓ Branch 0 taken 11 times.
✓ Branch 1 taken 202 times.
213 if (rd_claimed)
749 {
750 11 rd_.impl_ptr = self;
751
1/2
✓ Branch 0 taken 11 times.
✗ Branch 1 not taken.
11 svc_.post(&rd_);
752 11 svc_.work_finished();
753 11 }
754
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 213 times.
213 if (wr_claimed)
755 {
756 wr_.impl_ptr = self;
757 svc_.post(&wr_);
758 svc_.work_finished();
759 }
760 213 }
761
762 inline void
763 654 kqueue_socket::cancel_single_op(kqueue_op& op) noexcept
764 {
765 654 auto self = weak_from_this().lock();
766
1/2
✓ Branch 0 taken 654 times.
✗ Branch 1 not taken.
654 if (!self)
767 return;
768
769 654 op.request_cancel();
770
771 654 kqueue_op** desc_op_ptr = nullptr;
772
1/2
✓ Branch 0 taken 654 times.
✗ Branch 1 not taken.
654 if (&op == &conn_)
773 desc_op_ptr = &desc_state_.connect_op;
774
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 653 times.
654 else if (&op == &rd_)
775 653 desc_op_ptr = &desc_state_.read_op;
776
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 time.
1 else if (&op == &wr_)
777 1 desc_op_ptr = &desc_state_.write_op;
778
779
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 654 times.
654 if (desc_op_ptr)
780 {
781 654 kqueue_op* claimed = nullptr;
782 {
783
1/2
✓ Branch 0 taken 654 times.
✗ Branch 1 not taken.
654 std::lock_guard lock(desc_state_.mutex);
784
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 653 times.
654 if (*desc_op_ptr == &op)
785 653 claimed = std::exchange(*desc_op_ptr, nullptr);
786
1/2
✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
1 else if (&op == &conn_)
787 desc_state_.connect_cancel_pending = true;
788
1/2
✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
1 else if (&op == &rd_)
789 desc_state_.read_cancel_pending = true;
790
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 time.
1 else if (&op == &wr_)
791 1 desc_state_.write_cancel_pending = true;
792 654 }
793
2/2
✓ Branch 0 taken 653 times.
✓ Branch 1 taken 1 time.
654 if (claimed)
794 {
795 653 op.impl_ptr = self;
796
1/2
✓ Branch 0 taken 653 times.
✗ Branch 1 not taken.
653 svc_.post(&op);
797 653 svc_.work_finished();
798 653 }
799 654 }
800 654 }
801
802 inline void
803 49156 kqueue_socket::close_socket() noexcept
804 {
805 49156 auto self = weak_from_this().lock();
806
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 49156 times.
49156 if (self)
807 {
808 49156 conn_.request_cancel();
809 49156 rd_.request_cancel();
810 49156 wr_.request_cancel();
811
812 49156 kqueue_op* conn_claimed = nullptr;
813 49156 kqueue_op* rd_claimed = nullptr;
814 49156 kqueue_op* wr_claimed = nullptr;
815 {
816
1/2
✓ Branch 0 taken 49156 times.
✗ Branch 1 not taken.
49156 std::lock_guard lock(desc_state_.mutex);
817 49156 conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
818 49156 rd_claimed = std::exchange(desc_state_.read_op, nullptr);
819 49156 wr_claimed = std::exchange(desc_state_.write_op, nullptr);
820 49156 desc_state_.read_ready = false;
821 49156 desc_state_.write_ready = false;
822 49156 desc_state_.read_cancel_pending = false;
823 49156 desc_state_.write_cancel_pending = false;
824 49156 desc_state_.connect_cancel_pending = false;
825 49156 }
826
827
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 49156 times.
49156 if (conn_claimed)
828 {
829 conn_.impl_ptr = self;
830 svc_.post(&conn_);
831 svc_.work_finished();
832 }
833
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 49153 times.
49156 if (rd_claimed)
834 {
835 3 rd_.impl_ptr = self;
836
1/2
✓ Branch 0 taken 3 times.
✗ Branch 1 not taken.
3 svc_.post(&rd_);
837 3 svc_.work_finished();
838 3 }
839
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 49156 times.
49156 if (wr_claimed)
840 {
841 wr_.impl_ptr = self;
842 svc_.post(&wr_);
843 svc_.work_finished();
844 }
845
846
2/2
✓ Branch 0 taken 47620 times.
✓ Branch 1 taken 1536 times.
49156 if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
847 1536 desc_state_.impl_ref_ = self;
848 49156 }
849
850
2/2
✓ Branch 0 taken 10886 times.
✓ Branch 1 taken 38270 times.
49156 if (fd_ >= 0)
851 {
852
1/2
✓ Branch 0 taken 10886 times.
✗ Branch 1 not taken.
10886 ::close(fd_);
853 10886 fd_ = -1;
854 10886 }
855
856 49156 desc_state_.fd = -1;
857 49156 desc_state_.registered_events = 0;
858 49156 user_set_linger_ = false;
859
860 49156 local_endpoint_ = endpoint{};
861 49156 remote_endpoint_ = endpoint{};
862 49156 }
863
864 598 inline kqueue_socket_service::kqueue_socket_service(
865 capy::execution_context& ctx)
866 299 : state_(
867
1/2
✓ Branch 0 taken 299 times.
✗ Branch 1 not taken.
299 std::make_unique<kqueue_socket_state>(
868
1/2
✓ Branch 0 taken 299 times.
✗ Branch 1 not taken.
299 ctx.use_service<kqueue_scheduler>()))
869 598 {
870 598 }
871
872 897 inline kqueue_socket_service::~kqueue_socket_service() {}
873
874 inline void
875 299 kqueue_socket_service::shutdown()
876 {
877 299 std::lock_guard lock(state_->mutex_);
878
879
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 299 times.
299 while (auto* impl = state_->socket_list_.pop_front())
880 {
881 if (impl->user_set_linger_ && impl->fd_ >= 0)
882 {
883 struct ::linger lg;
884 lg.l_onoff = 0;
885 lg.l_linger = 0;
886 ::setsockopt(impl->fd_, SOL_SOCKET, SO_LINGER, &lg, sizeof(lg));
887 }
888 impl->close_socket();
889 }
890
891 // Don't clear socket_ptrs_ here. The scheduler shuts down after us and
892 // drains completed_ops_, calling destroy() on each queued op. If we
893 // released our shared_ptrs now, a kqueue_op::destroy() could free the
894 // last ref to an impl whose embedded descriptor_state is still linked
895 // in the queue — use-after-free on the next pop(). Letting ~state_
896 // release the ptrs (during service destruction, after scheduler
897 // shutdown) keeps every impl alive until all ops have been drained.
898 299 }
899
900 inline io_object::implementation*
901 16410 kqueue_socket_service::construct()
902 {
903 16410 auto impl = std::make_shared<kqueue_socket>(*this);
904 16410 auto* raw = impl.get();
905
906 {
907
1/2
✓ Branch 0 taken 16410 times.
✗ Branch 1 not taken.
16410 std::lock_guard lock(state_->mutex_);
908 16410 state_->socket_list_.push_back(raw);
909
1/2
✓ Branch 0 taken 16410 times.
✗ Branch 1 not taken.
16410 state_->socket_ptrs_.emplace(raw, std::move(impl));
910 16410 }
911
912 16410 return raw;
913 16410 }
914
915 inline void
916 16410 kqueue_socket_service::destroy(io_object::implementation* impl)
917 {
918 16410 auto* kq_impl = static_cast<kqueue_socket*>(impl);
919
920 // Match asio: if the user set SO_LINGER, clear it before close so
921 // the destructor doesn't block and close() sends FIN instead of RST.
922 // RST doesn't reliably trigger EV_EOF on macOS kqueue.
923
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 16410 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
16410 if (kq_impl->user_set_linger_ && kq_impl->fd_ >= 0)
924 {
925 struct ::linger lg;
926 lg.l_onoff = 0;
927 lg.l_linger = 0;
928 ::setsockopt(kq_impl->fd_, SOL_SOCKET, SO_LINGER, &lg, sizeof(lg));
929 }
930
931 16410 kq_impl->close_socket();
932 16410 std::lock_guard lock(state_->mutex_);
933 16410 state_->socket_list_.remove(kq_impl);
934
1/2
✓ Branch 0 taken 16410 times.
✗ Branch 1 not taken.
16410 state_->socket_ptrs_.erase(kq_impl);
935 16410 }
936
937 inline std::error_code
938 5450 kqueue_socket_service::open_socket(tcp_socket::implementation& impl)
939 {
940 5450 auto* kq_impl = static_cast<kqueue_socket*>(&impl);
941 5450 kq_impl->close_socket();
942
943 // FreeBSD: Can use socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0)
944 5450 int fd = ::socket(AF_INET, SOCK_STREAM, 0);
945
1/2
✓ Branch 0 taken 5450 times.
✗ Branch 1 not taken.
5450 if (fd < 0)
946 return make_err(errno);
947
948 // Set non-blocking
949 5450 int flags = ::fcntl(fd, F_GETFL, 0);
950
1/2
✓ Branch 0 taken 5450 times.
✗ Branch 1 not taken.
5450 if (flags == -1)
951 {
952 int errn = errno;
953 ::close(fd);
954 return make_err(errn);
955 }
956
1/2
✓ Branch 0 taken 5450 times.
✗ Branch 1 not taken.
5450 if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
957 {
958 int errn = errno;
959 ::close(fd);
960 return make_err(errn);
961 }
962
963 // Set close-on-exec
964
1/2
✓ Branch 0 taken 5450 times.
✗ Branch 1 not taken.
5450 if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
965 {
966 int errn = errno;
967 ::close(fd);
968 return make_err(errn);
969 }
970
971 // Suppress SIGPIPE on this socket; writev() has no MSG_NOSIGNAL
972 // equivalent, so SO_NOSIGPIPE is required on macOS/FreeBSD.
973 5450 int one = 1;
974
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 5450 times.
5450 if (::setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one)) != 0)
975 {
976 int errn = errno;
977 ::close(fd);
978 return make_err(errn);
979 }
980
981 5450 kq_impl->fd_ = fd;
982
983 // Register fd with kqueue (edge-triggered mode via EV_CLEAR)
984 5450 kq_impl->desc_state_.fd = fd;
985 {
986 5450 std::lock_guard lock(kq_impl->desc_state_.mutex);
987 5450 kq_impl->desc_state_.read_op = nullptr;
988 5450 kq_impl->desc_state_.write_op = nullptr;
989 5450 kq_impl->desc_state_.connect_op = nullptr;
990 5450 }
991 5450 scheduler().register_descriptor(fd, &kq_impl->desc_state_);
992
993 5450 return {};
994 5450 }
995
996 inline void
997 27296 kqueue_socket_service::close(io_object::handle& h)
998 {
999 27296 static_cast<kqueue_socket*>(h.get())->close_socket();
1000 27296 }
1001
1002 inline void
1003 143324 kqueue_socket_service::post(kqueue_op* op)
1004 {
1005 143324 state_->sched_.post(op);
1006 143324 }
1007
1008 inline void
1009 54747 kqueue_socket_service::work_started() noexcept
1010 {
1011 54747 state_->sched_.work_started();
1012 54747 }
1013
1014 inline void
1015 794 kqueue_socket_service::work_finished() noexcept
1016 {
1017 794 state_->sched_.work_finished();
1018 794 }
1019
1020 } // namespace boost::corosio::detail
1021
1022 #endif // BOOST_COROSIO_HAS_KQUEUE
1023
1024 #endif // BOOST_COROSIO_NATIVE_DETAIL_KQUEUE_KQUEUE_SOCKET_SERVICE_HPP
1025