src/corosio/src/tcp_server.cpp

100.0% Lines (71/71) 100.0% List of functions (16/16)
tcp_server.cpp
f(x) Functions (16)
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Vinnie Falco ([email protected])
3 // Copyright (c) 2026 Steve Gerbino
4 //
5 // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 //
8 // Official repository: https://github.com/cppalliance/corosio
9 //
10
11 #include <boost/corosio/tcp_server.hpp>
12 #include <boost/corosio/detail/except.hpp>
13 #include <condition_variable>
14 #include <mutex>
15 #include <utility>
16
17 namespace boost::corosio {
18
19 177x tcp_server::worker_base::worker_base() = default;
20 177x tcp_server::worker_base::~worker_base() = default;
21
22 struct tcp_server::impl
23 {
24 std::mutex join_mutex;
25 std::condition_variable join_cv;
26 capy::execution_context& ctx;
27 std::vector<tcp_acceptor> ports;
28 std::stop_source stop;
29
30 51x explicit impl(capy::execution_context& c) noexcept : ctx(c) {}
31 };
32
33 tcp_server::impl*
34 51x tcp_server::make_impl(capy::execution_context& ctx)
35 {
36 51x return new impl(ctx);
37 }
38
39 54x tcp_server::~tcp_server()
40 {
41 54x delete impl_;
42 54x }
43
44 3x tcp_server::tcp_server(tcp_server&& o) noexcept
45 3x : impl_(std::exchange(o.impl_, nullptr))
46 3x , ex_(o.ex_)
47 3x , waiters_(std::exchange(o.waiters_, nullptr))
48 3x , idle_head_(std::exchange(o.idle_head_, nullptr))
49 3x , active_head_(std::exchange(o.active_head_, nullptr))
50 3x , active_tail_(std::exchange(o.active_tail_, nullptr))
51 3x , active_accepts_(std::exchange(o.active_accepts_, 0))
52 3x , storage_(std::move(o.storage_))
53 3x , running_(std::exchange(o.running_, false))
54 {
55 3x }
56
57 tcp_server&
58 3x tcp_server::operator=(tcp_server&& o) noexcept
59 {
60 3x delete impl_;
61 3x impl_ = std::exchange(o.impl_, nullptr);
62 3x ex_ = o.ex_;
63 3x waiters_ = std::exchange(o.waiters_, nullptr);
64 3x idle_head_ = std::exchange(o.idle_head_, nullptr);
65 3x active_head_ = std::exchange(o.active_head_, nullptr);
66 3x active_tail_ = std::exchange(o.active_tail_, nullptr);
67 3x active_accepts_ = std::exchange(o.active_accepts_, 0);
68 3x storage_ = std::move(o.storage_);
69 3x running_ = std::exchange(o.running_, false);
70 3x return *this;
71 }
72
73 // Accept loop: wait for idle worker, accept connection, dispatch
74 capy::task<void>
75 36x tcp_server::do_accept(tcp_acceptor& acc)
76 {
77 // Analyzer can't trace value through coroutine await_transform
78 // NOLINTNEXTLINE(clang-analyzer-core.uninitialized.UndefReturn)
79 auto env = co_await capy::this_coro::environment;
80 while (!env->stop_token.stop_requested())
81 {
82 // Wait for an idle worker before blocking on accept
83 auto& w = co_await pop();
84 auto [ec] = co_await acc.accept(w.socket());
85 if (ec)
86 {
87 co_await push(w);
88 continue;
89 }
90 w.run(launcher{*this, w});
91 }
92 72x }
93
94 std::error_code
95 51x tcp_server::bind(endpoint ep)
96 {
97 try
98 {
99 51x impl_->ports.emplace_back(impl_->ctx, ep);
100 48x return {};
101 }
102 3x catch (std::system_error const& e)
103 {
104 3x return e.code();
105 3x }
106 }
107
108 endpoint
109 36x tcp_server::local_endpoint(std::size_t index) const noexcept
110 {
111 36x if (index >= impl_->ports.size())
112 3x return endpoint{};
113 33x return impl_->ports[index].local_endpoint();
114 }
115
116 void
117 42x tcp_server::start()
118 {
119 // Idempotent - only start if not already running
120 42x if (running_)
121 3x return;
122
123 // Previous session must be fully stopped before restart
124 39x if (active_accepts_ != 0)
125 3x detail::throw_logic_error(
126 "tcp_server::start: previous session not joined");
127
128 36x running_ = true;
129
130 36x impl_->stop = {}; // Fresh stop source
131 36x auto st = impl_->stop.get_token();
132
133 36x active_accepts_ = impl_->ports.size();
134
135 // Launch with completion handler that decrements counter
136 72x for (auto& t : impl_->ports)
137 72x capy::run_async(ex_, st, [this]() {
138 36x std::lock_guard lock(impl_->join_mutex);
139 36x if (--active_accepts_ == 0)
140 36x impl_->join_cv.notify_all();
141 72x })(do_accept(t));
142 36x }
143
144 void
145 42x tcp_server::stop()
146 {
147 // Idempotent - only stop if running
148 42x if (!running_)
149 6x return;
150 36x running_ = false;
151
152 // Stop accept loops
153 36x impl_->stop.request_stop();
154
155 // Launch cancellation coroutine on server executor
156 36x capy::run_async(ex_, std::stop_token{})(do_stop());
157 }
158
159 void
160 24x tcp_server::join()
161 {
162 24x std::unique_lock lock(impl_->join_mutex);
163 48x impl_->join_cv.wait(lock, [this] { return active_accepts_ == 0; });
164 24x }
165
166 capy::task<>
167 36x tcp_server::do_stop()
168 {
169 // Running on server executor - safe to iterate active list
170 // Just cancel, don't modify list - workers return themselves when done
171 for (auto* w = active_head_; w; w = w->next_)
172 w->stop_.request_stop();
173 co_return;
174 72x }
175
176 } // namespace boost::corosio
177