include/boost/corosio/detail/thread_pool.hpp

89.6% Lines (43/48) 100.0% List of functions (7/7) 86.7% Branches (26/30)
thread_pool.hpp
f(x) Functions (7)
Line Branch TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_DETAIL_THREAD_POOL_HPP
11 #define BOOST_COROSIO_DETAIL_THREAD_POOL_HPP
12
13 #include <boost/corosio/detail/config.hpp>
14 #include <boost/corosio/detail/intrusive.hpp>
15 #include <boost/capy/ex/execution_context.hpp>
16 #include <boost/capy/test/thread_name.hpp>
17
18 #include <condition_variable>
19 #include <cstdio>
20 #include <mutex>
21 #include <stdexcept>
22 #include <thread>
23 #include <vector>
24
25 namespace boost::corosio::detail {
26
27 /** Base class for thread pool work items.
28
29 Derive from this to create work that can be posted to a
30 @ref thread_pool. Uses static function pointer dispatch,
31 consistent with the IOCP `op` pattern.
32
33 @par Example
34 @code
35 struct my_work : pool_work_item
36 {
37 int* result;
38 static void execute( pool_work_item* w ) noexcept
39 {
40 auto* self = static_cast<my_work*>( w );
41 *self->result = 42;
42 }
43 };
44
45 my_work w;
46 w.func_ = &my_work::execute;
47 w.result = &r;
48 pool.post( &w );
49 @endcode
50 */
51 struct pool_work_item : intrusive_queue<pool_work_item>::node
52 {
53 /// Static dispatch function signature.
54 using func_type = void (*)(pool_work_item*) noexcept;
55
56 /// Completion handler invoked by the worker thread.
57 func_type func_ = nullptr;
58 };
59
60 /** Shared thread pool for dispatching blocking operations.
61
62 Provides a fixed pool of reusable worker threads for operations
63 that cannot be integrated with async I/O (e.g. blocking DNS
64 calls). Registered as an `execution_context::service` so it
65 is a singleton per io_context.
66
67 Threads are created eagerly in the constructor. The default
68 thread count is 1.
69
70 @par Thread Safety
71 All public member functions are thread-safe.
72
73 @par Shutdown
74 Sets a shutdown flag, notifies all threads, and joins them.
75 In-flight blocking calls complete naturally before the thread
76 exits.
77 */
78 class thread_pool final : public capy::execution_context::service
79 {
80 std::mutex mutex_;
81 std::condition_variable cv_;
82 intrusive_queue<pool_work_item> work_queue_;
83 std::vector<std::thread> threads_;
84 bool shutdown_ = false;
85
86 void worker_loop(unsigned index);
87
88 public:
89 using key_type = thread_pool;
90
91 /** Construct the thread pool service.
92
93 Eagerly creates all worker threads.
94
95 @par Exception Safety
96 Strong guarantee. If thread creation fails, all
97 already-created threads are shut down and joined
98 before the exception propagates.
99
100 @param ctx Reference to the owning execution_context.
101 @param num_threads Number of worker threads. Must be
102 at least 1.
103
104 @throws std::logic_error If `num_threads` is 0.
105 */
106 444x explicit thread_pool(capy::execution_context& ctx, unsigned num_threads = 1)
107 444x {
108 (void)ctx;
109
2/2
✓ Branch 7 → 8 taken 1 time.
✓ Branch 7 → 11 taken 443 times.
444x if (!num_threads)
110
1/1
✓ Branch 9 → 10 taken 1 time.
1x throw std::logic_error("thread_pool requires at least 1 thread");
111
1/1
✓ Branch 11 → 12 taken 443 times.
443x threads_.reserve(num_threads);
112 try
113 {
114
2/2
✓ Branch 15 → 13 taken 446 times.
✓ Branch 15 → 16 taken 443 times.
889x for (unsigned i = 0; i < num_threads; ++i)
115
1/1
✓ Branch 13 → 14 taken 446 times.
892x threads_.emplace_back([this, i] { worker_loop(i + 1); });
116 }
117 catch (...)
118 {
119 shutdown();
120 throw;
121 }
122 447x }
123
124 885x ~thread_pool() override = default;
125
126 thread_pool(thread_pool const&) = delete;
127 thread_pool& operator=(thread_pool const&) = delete;
128
129 /** Enqueue a work item for execution on the thread pool.
130
131 Zero-allocation: the caller owns the work item's storage.
132
133 @param w The work item to execute. Must remain valid until
134 its `func_` has been called.
135
136 @return `true` if the item was enqueued, `false` if the
137 pool has already shut down.
138 */
139 bool post(pool_work_item* w) noexcept;
140
141 /** Shut down the thread pool.
142
143 Signals all threads to exit after draining any
144 remaining queued work, then joins them.
145 */
146 void shutdown() override;
147 };
148
149 inline void
150 446x thread_pool::worker_loop(unsigned index)
151 {
152 // Name format chosen to fit Linux's 15-char pthread limit:
153 // "tpool-svc-" (10) + up to 4 digit index leaves "tpool-svc-9999".
154 char name[16];
155
1/1
✓ Branch 2 → 3 taken 446 times.
446x std::snprintf(name, sizeof(name), "tpool-svc-%u", index);
156 446x capy::set_current_thread_name(name);
157
158 for (;;)
159 {
160 pool_work_item* w;
161 {
162
1/1
✓ Branch 4 → 5 taken 471 times.
471x std::unique_lock<std::mutex> lock(mutex_);
163
1/1
✓ Branch 5 → 6 taken 471 times.
471x cv_.wait(
164
4/4
✓ Branch 2 → 3 taken 466 times.
✓ Branch 2 → 5 taken 460 times.
✓ Branch 4 → 5 taken 11 times.
✓ Branch 4 → 6 taken 455 times.
926x lock, [this] { return shutdown_ || !work_queue_.empty(); });
165
166 471x w = work_queue_.pop();
167
2/2
✓ Branch 7 → 8 taken 446 times.
✓ Branch 7 → 11 taken 25 times.
471x if (!w)
168 {
169
1/2
✓ Branch 8 → 9 taken 446 times.
✗ Branch 8 → 10 not taken.
446x if (shutdown_)
170 892x return;
171 continue;
172 }
173 471x }
174 25x w->func_(w);
175 25x }
176 }
177
178 inline bool
179 26x thread_pool::post(pool_work_item* w) noexcept
180 {
181 {
182 26x std::lock_guard<std::mutex> lock(mutex_);
183
2/2
✓ Branch 3 → 4 taken 1 time.
✓ Branch 3 → 5 taken 25 times.
26x if (shutdown_)
184 1x return false;
185 25x work_queue_.push(w);
186 26x }
187 25x cv_.notify_one();
188 25x return true;
189 }
190
191 inline void
192 447x thread_pool::shutdown()
193 {
194 {
195
1/1
✓ Branch 2 → 3 taken 447 times.
447x std::lock_guard<std::mutex> lock(mutex_);
196 447x shutdown_ = true;
197 447x }
198 447x cv_.notify_all();
199
200
2/2
✓ Branch 13 → 7 taken 446 times.
✓ Branch 13 → 14 taken 447 times.
893x for (auto& t : threads_)
201 {
202
1/2
✓ Branch 9 → 10 taken 446 times.
✗ Branch 9 → 11 not taken.
446x if (t.joinable())
203
1/1
✓ Branch 10 → 11 taken 446 times.
446x t.join();
204 }
205 447x threads_.clear();
206
207 {
208
1/1
✓ Branch 15 → 16 taken 447 times.
447x std::lock_guard<std::mutex> lock(mutex_);
209
1/2
✗ Branch 18 → 17 not taken.
✓ Branch 18 → 19 taken 447 times.
447x while (work_queue_.pop())
210 ;
211 447x }
212 447x }
213
214 } // namespace boost::corosio::detail
215
216 #endif // BOOST_COROSIO_DETAIL_THREAD_POOL_HPP
217