src/corosio/src/tcp_server.cpp

67.1% Lines (55/82) 85.7% Functions (12/14) 50.0% Branches (76/152)
src/corosio/src/tcp_server.cpp
Line Branch TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Vinnie Falco ([email protected])
3 // Copyright (c) 2026 Steve Gerbino
4 //
5 // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 //
8 // Official repository: https://github.com/cppalliance/corosio
9 //
10
11 #include <boost/corosio/tcp_server.hpp>
12 #include <boost/corosio/detail/except.hpp>
13 #include <condition_variable>
14 #include <mutex>
15 #include <utility>
16
17 namespace boost::corosio {
18
19 struct tcp_server::impl
20 {
21 std::mutex join_mutex;
22 std::condition_variable join_cv;
23 capy::execution_context& ctx;
24 std::vector<tcp_acceptor> ports;
25 std::stop_source stop;
26
27
1/2
✓ Branch 0 taken 9 times.
✗ Branch 1 not taken.
18 explicit impl(capy::execution_context& c) noexcept : ctx(c) {}
28 };
29
30 tcp_server::impl*
31 9 tcp_server::make_impl(capy::execution_context& ctx)
32 {
33 9 return new impl(ctx);
34 }
35
36 9 tcp_server::~tcp_server()
37 {
38
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9 times.
9 delete impl_;
39 9 }
40
41 tcp_server::tcp_server(tcp_server&& o) noexcept
42 : impl_(std::exchange(o.impl_, nullptr))
43 , ex_(o.ex_)
44 , waiters_(std::exchange(o.waiters_, nullptr))
45 , idle_head_(std::exchange(o.idle_head_, nullptr))
46 , active_head_(std::exchange(o.active_head_, nullptr))
47 , active_tail_(std::exchange(o.active_tail_, nullptr))
48 , active_accepts_(std::exchange(o.active_accepts_, 0))
49 , storage_(std::move(o.storage_))
50 , running_(std::exchange(o.running_, false))
51 {
52 }
53
54 tcp_server&
55 tcp_server::operator=(tcp_server&& o) noexcept
56 {
57 delete impl_;
58 impl_ = std::exchange(o.impl_, nullptr);
59 ex_ = o.ex_;
60 waiters_ = std::exchange(o.waiters_, nullptr);
61 idle_head_ = std::exchange(o.idle_head_, nullptr);
62 active_head_ = std::exchange(o.active_head_, nullptr);
63 active_tail_ = std::exchange(o.active_tail_, nullptr);
64 active_accepts_ = std::exchange(o.active_accepts_, 0);
65 storage_ = std::move(o.storage_);
66 running_ = std::exchange(o.running_, false);
67 return *this;
68 }
69
70 // Accept loop: wait for idle worker, accept connection, dispatch
71 capy::task<void>
72
11/30
✓ Branch 0 taken 8 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 8 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 8 times.
✗ Branch 5 not taken.
✗ Branch 6 not taken.
✓ Branch 7 taken 8 times.
✓ Branch 8 taken 8 times.
✗ Branch 9 not taken.
✗ Branch 10 not taken.
✗ Branch 11 not taken.
✗ Branch 12 not taken.
✗ Branch 13 not taken.
✓ Branch 14 taken 8 times.
✗ Branch 15 not taken.
✗ Branch 16 not taken.
✗ Branch 17 not taken.
✓ Branch 18 taken 8 times.
✓ Branch 19 taken 8 times.
✗ Branch 20 not taken.
✗ Branch 21 not taken.
✓ Branch 22 taken 8 times.
✗ Branch 23 not taken.
✓ Branch 24 taken 30 times.
✓ Branch 25 taken 8 times.
✗ Branch 26 not taken.
✗ Branch 27 not taken.
✗ Branch 28 not taken.
✗ Branch 29 not taken.
100 tcp_server::do_accept(tcp_acceptor& acc)
73 8 {
74 // Analyzer can't trace value through coroutine await_transform
75 // NOLINTNEXTLINE(clang-analyzer-core.uninitialized.UndefReturn)
76
3/8
✓ Branch 0 taken 8 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 8 times.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✗ Branch 6 not taken.
✓ Branch 7 taken 8 times.
16 auto env = co_await capy::this_coro::environment;
77
2/2
✓ Branch 0 taken 8 times.
✓ Branch 1 taken 9 times.
17 while (!env->stop_token.stop_requested())
78 {
79 // Wait for an idle worker before blocking on accept
80
6/14
✓ Branch 0 taken 9 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 9 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 9 times.
✗ Branch 6 not taken.
✗ Branch 7 not taken.
✗ Branch 8 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 9 times.
✗ Branch 11 not taken.
✓ Branch 12 taken 18 times.
✓ Branch 13 taken 27 times.
9 auto& w = co_await pop();
81
12/16
✓ Branch 0 taken 9 times.
✓ Branch 1 taken 18 times.
✓ Branch 2 taken 27 times.
✓ Branch 3 taken 18 times.
✓ Branch 4 taken 27 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 9 times.
✓ Branch 7 taken 18 times.
✗ Branch 8 not taken.
✓ Branch 9 taken 9 times.
✗ Branch 10 not taken.
✗ Branch 11 not taken.
✓ Branch 12 taken 9 times.
✓ Branch 13 taken 9 times.
✓ Branch 14 taken 12 times.
✓ Branch 15 taken 21 times.
36 auto [ec] = co_await acc.accept(w.socket());
82
2/2
✓ Branch 0 taken 18 times.
✓ Branch 1 taken 3 times.
21 if (ec)
83 {
84
8/14
✓ Branch 0 taken 6 times.
✓ Branch 1 taken 12 times.
✓ Branch 2 taken 6 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 6 times.
✗ Branch 5 not taken.
✗ Branch 6 not taken.
✓ Branch 7 taken 6 times.
✗ Branch 8 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 6 times.
✓ Branch 11 taken 6 times.
✗ Branch 12 not taken.
✓ Branch 13 taken 6 times.
24 co_await push(w);
85 6 continue;
86 }
87
1/2
✓ Branch 0 taken 3 times.
✗ Branch 1 not taken.
3 w.run(launcher{*this, w});
88 39 }
89 116 }
90
91 std::error_code
92 9 tcp_server::bind(endpoint ep)
93 {
94 9 impl_->ports.emplace_back(impl_->ctx);
95 9 auto ec = impl_->ports.back().listen(ep);
96
2/2
✓ Branch 0 taken 8 times.
✓ Branch 1 taken 1 time.
9 if (ec)
97 1 impl_->ports.pop_back();
98 9 return ec;
99 }
100
101 void
102 10 tcp_server::start()
103 {
104 // Idempotent - only start if not already running
105
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 9 times.
10 if (running_)
106 1 return;
107
108 // Previous session must be fully stopped before restart
109
2/2
✓ Branch 0 taken 8 times.
✓ Branch 1 taken 1 time.
9 if (active_accepts_ != 0)
110 1 detail::throw_logic_error(
111 "tcp_server::start: previous session not joined");
112
113 8 running_ = true;
114
115 8 impl_->stop = {}; // Fresh stop source
116 8 auto st = impl_->stop.get_token();
117
118 8 active_accepts_ = impl_->ports.size();
119
120 // Launch with completion handler that decrements counter
121
2/2
✓ Branch 0 taken 8 times.
✓ Branch 1 taken 8 times.
16 for (auto& t : impl_->ports)
122
2/4
✓ Branch 0 taken 8 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 8 times.
24 capy::run_async(ex_, st, [this]() {
123 8 std::lock_guard lock(impl_->join_mutex);
124
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8 times.
8 if (--active_accepts_ == 0)
125 8 impl_->join_cv.notify_all();
126
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8 times.
16 })(do_accept(t));
127 9 }
128
129 void
130 10 tcp_server::stop()
131 {
132 // Idempotent - only stop if running
133
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 8 times.
10 if (!running_)
134 2 return;
135 8 running_ = false;
136
137 // Stop accept loops
138 8 impl_->stop.request_stop();
139
140 // Launch cancellation coroutine on server executor
141
3/6
✓ Branch 0 taken 8 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 8 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 8 times.
✗ Branch 5 not taken.
8 capy::run_async(ex_, std::stop_token{})(do_stop());
142 10 }
143
144 void
145 4 tcp_server::join()
146 {
147 4 std::unique_lock lock(impl_->join_mutex);
148
1/2
✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
8 impl_->join_cv.wait(lock, [this] { return active_accepts_ == 0; });
149 4 }
150
151 capy::task<>
152
9/28
✓ Branch 0 taken 8 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 8 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 8 times.
✗ Branch 5 not taken.
✗ Branch 6 not taken.
✓ Branch 7 taken 8 times.
✗ Branch 8 not taken.
✗ Branch 9 not taken.
✗ Branch 10 not taken.
✗ Branch 11 not taken.
✓ Branch 12 taken 8 times.
✗ Branch 13 not taken.
✗ Branch 14 not taken.
✗ Branch 15 not taken.
✓ Branch 16 taken 8 times.
✓ Branch 17 taken 8 times.
✗ Branch 18 not taken.
✗ Branch 19 not taken.
✓ Branch 20 taken 8 times.
✗ Branch 21 not taken.
✗ Branch 22 not taken.
✓ Branch 23 taken 8 times.
✗ Branch 24 not taken.
✗ Branch 25 not taken.
✗ Branch 26 not taken.
✗ Branch 27 not taken.
40 tcp_server::do_stop()
153 8 {
154 // Running on server executor - safe to iterate active list
155 // Just cancel, don't modify list - workers return themselves when done
156
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8 times.
8 for (auto* w = active_head_; w; w = w->next_)
157 w->stop_.request_stop();
158
1/2
✓ Branch 0 taken 8 times.
✗ Branch 1 not taken.
8 co_return;
159 }
160
161 } // namespace boost::corosio
162