include/boost/corosio/detail/thread_pool.hpp

89.1% Lines (41/46) 100.0% List of functions (7/7) 86.2% Branches (25/29)
f(x) Functions (7)
Line Branch TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_DETAIL_THREAD_POOL_HPP
11 #define BOOST_COROSIO_DETAIL_THREAD_POOL_HPP
12
13 #include <boost/corosio/detail/config.hpp>
14 #include <boost/corosio/detail/intrusive.hpp>
15 #include <boost/capy/ex/execution_context.hpp>
16
17 #include <condition_variable>
18 #include <mutex>
19 #include <stdexcept>
20 #include <thread>
21 #include <vector>
22
23 namespace boost::corosio::detail {
24
25 /** Base class for thread pool work items.
26
27 Derive from this to create work that can be posted to a
28 @ref thread_pool. Uses static function pointer dispatch,
29 consistent with the IOCP `op` pattern.
30
31 @par Example
32 @code
33 struct my_work : pool_work_item
34 {
35 int* result;
36 static void execute( pool_work_item* w ) noexcept
37 {
38 auto* self = static_cast<my_work*>( w );
39 *self->result = 42;
40 }
41 };
42
43 my_work w;
44 w.func_ = &my_work::execute;
45 w.result = &r;
46 pool.post( &w );
47 @endcode
48 */
49 struct pool_work_item : intrusive_queue<pool_work_item>::node
50 {
51 /// Static dispatch function signature.
52 using func_type = void (*)(pool_work_item*) noexcept;
53
54 /// Completion handler invoked by the worker thread.
55 func_type func_ = nullptr;
56 };
57
58 /** Shared thread pool for dispatching blocking operations.
59
60 Provides a fixed pool of reusable worker threads for operations
61 that cannot be integrated with async I/O (e.g. blocking DNS
62 calls). Registered as an `execution_context::service` so it
63 is a singleton per io_context.
64
65 Threads are created eagerly in the constructor. The default
66 thread count is 1.
67
68 @par Thread Safety
69 All public member functions are thread-safe.
70
71 @par Shutdown
72 Sets a shutdown flag, notifies all threads, and joins them.
73 In-flight blocking calls complete naturally before the thread
74 exits.
75 */
76 class thread_pool final : public capy::execution_context::service
77 {
78 std::mutex mutex_;
79 std::condition_variable cv_;
80 intrusive_queue<pool_work_item> work_queue_;
81 std::vector<std::thread> threads_;
82 bool shutdown_ = false;
83
84 void worker_loop();
85
86 public:
87 using key_type = thread_pool;
88
89 /** Construct the thread pool service.
90
91 Eagerly creates all worker threads.
92
93 @par Exception Safety
94 Strong guarantee. If thread creation fails, all
95 already-created threads are shut down and joined
96 before the exception propagates.
97
98 @param ctx Reference to the owning execution_context.
99 @param num_threads Number of worker threads. Must be
100 at least 1.
101
102 @throws std::logic_error If `num_threads` is 0.
103 */
104 371x explicit thread_pool(capy::execution_context& ctx, unsigned num_threads = 1)
105 371x {
106 (void)ctx;
107
2/2
✓ Branch 7 → 8 taken 1 time.
✓ Branch 7 → 11 taken 370 times.
371x if (!num_threads)
108
1/1
✓ Branch 9 → 10 taken 1 time.
1x throw std::logic_error("thread_pool requires at least 1 thread");
109
1/1
✓ Branch 11 → 12 taken 370 times.
370x threads_.reserve(num_threads);
110 try
111 {
112
2/2
✓ Branch 15 → 13 taken 373 times.
✓ Branch 15 → 16 taken 370 times.
743x for (unsigned i = 0; i < num_threads; ++i)
113
1/1
✓ Branch 13 → 14 taken 373 times.
746x threads_.emplace_back([this] { worker_loop(); });
114 }
115 catch (...)
116 {
117 shutdown();
118 throw;
119 }
120 374x }
121
122 739x ~thread_pool() override = default;
123
124 thread_pool(thread_pool const&) = delete;
125 thread_pool& operator=(thread_pool const&) = delete;
126
127 /** Enqueue a work item for execution on the thread pool.
128
129 Zero-allocation: the caller owns the work item's storage.
130
131 @param w The work item to execute. Must remain valid until
132 its `func_` has been called.
133
134 @return `true` if the item was enqueued, `false` if the
135 pool has already shut down.
136 */
137 bool post(pool_work_item* w) noexcept;
138
139 /** Shut down the thread pool.
140
141 Signals all threads to exit after draining any
142 remaining queued work, then joins them.
143 */
144 void shutdown() override;
145 };
146
147 inline void
148 398x thread_pool::worker_loop()
149 {
150 for (;;)
151 {
152 pool_work_item* w;
153 {
154
1/1
✓ Branch 2 → 3 taken 398 times.
398x std::unique_lock<std::mutex> lock(mutex_);
155
1/1
✓ Branch 3 → 4 taken 398 times.
398x cv_.wait(
156
4/4
✓ Branch 2 → 3 taken 395 times.
✓ Branch 2 → 5 taken 385 times.
✓ Branch 4 → 5 taken 13 times.
✓ Branch 4 → 6 taken 382 times.
780x lock, [this] { return shutdown_ || !work_queue_.empty(); });
157
158 398x w = work_queue_.pop();
159
2/2
✓ Branch 5 → 6 taken 373 times.
✓ Branch 5 → 9 taken 25 times.
398x if (!w)
160 {
161
1/2
✓ Branch 6 → 7 taken 373 times.
✗ Branch 6 → 8 not taken.
373x if (shutdown_)
162 746x return;
163 continue;
164 }
165 398x }
166 25x w->func_(w);
167 25x }
168 }
169
170 inline bool
171 26x thread_pool::post(pool_work_item* w) noexcept
172 {
173 {
174 26x std::lock_guard<std::mutex> lock(mutex_);
175
2/2
✓ Branch 3 → 4 taken 1 time.
✓ Branch 3 → 5 taken 25 times.
26x if (shutdown_)
176 1x return false;
177 25x work_queue_.push(w);
178 26x }
179 25x cv_.notify_one();
180 25x return true;
181 }
182
183 inline void
184 374x thread_pool::shutdown()
185 {
186 {
187
1/1
✓ Branch 2 → 3 taken 374 times.
374x std::lock_guard<std::mutex> lock(mutex_);
188 374x shutdown_ = true;
189 374x }
190 374x cv_.notify_all();
191
192
2/2
✓ Branch 13 → 7 taken 373 times.
✓ Branch 13 → 14 taken 374 times.
747x for (auto& t : threads_)
193 {
194
1/2
✓ Branch 9 → 10 taken 373 times.
✗ Branch 9 → 11 not taken.
373x if (t.joinable())
195
1/1
✓ Branch 10 → 11 taken 373 times.
373x t.join();
196 }
197 374x threads_.clear();
198
199 {
200
1/1
✓ Branch 15 → 16 taken 374 times.
374x std::lock_guard<std::mutex> lock(mutex_);
201
1/2
✗ Branch 18 → 17 not taken.
✓ Branch 18 → 19 taken 374 times.
374x while (work_queue_.pop())
202 ;
203 374x }
204 374x }
205
206 } // namespace boost::corosio::detail
207
208 #endif // BOOST_COROSIO_DETAIL_THREAD_POOL_HPP
209