src/ex/detail/strand_queue.hpp

100.0% Lines (21/21) 100.0% List of functions (5/5)
strand_queue.hpp
f(x) Functions (5)
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco ([email protected])
3 // Copyright (c) 2026 Michael Vandeberg
4 //
5 // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 //
8 // Official repository: https://github.com/cppalliance/capy
9 //
10
11 #ifndef BOOST_CAPY_SRC_EX_DETAIL_STRAND_QUEUE_HPP
12 #define BOOST_CAPY_SRC_EX_DETAIL_STRAND_QUEUE_HPP
13
14 #include <boost/capy/continuation.hpp>
15 #include <boost/capy/detail/config.hpp>
16 #include <boost/capy/ex/frame_allocator.hpp>
17
18 namespace boost {
19 namespace capy {
20 namespace detail {
21
22 /** Single-threaded intrusive FIFO of pending continuations.
23
24 Links continuations directly through `continuation::next`, so
25 push() carries no per-item allocation.
26
27 @par Thread Safety
28 Not thread-safe. Caller must externally synchronize push() and
29 take_all(). dispatch_batch() does not touch queue state and may
30 run unlocked once the batch has been taken.
31 */
32 class strand_queue
33 {
34 continuation* head_ = nullptr;
35 continuation* tail_ = nullptr;
36
37 public:
38 11442x strand_queue() = default;
39 strand_queue(strand_queue const&) = delete;
40 strand_queue& operator=(strand_queue const&) = delete;
41
42 /** Returns true if there are no pending continuations. */
43 bool
44 19503x empty() const noexcept
45 {
46 19503x return head_ == nullptr;
47 }
48
49 /** Push a continuation to the queue.
50
51 @param c The continuation to enqueue; see `continuation`
52 for lifetime and aliasing requirements.
53 */
54 void
55 30340x push(continuation& c) noexcept
56 {
57 30340x c.next = nullptr;
58 30340x if(tail_)
59 10837x tail_->next = &c;
60 else
61 19503x head_ = &c;
62 30340x tail_ = &c;
63 30340x }
64
65 /** Batch of taken items for thread-safe dispatch. */
66 struct taken_batch
67 {
68 continuation* head = nullptr;
69 continuation* tail = nullptr;
70 };
71
72 /** Take all pending items atomically.
73
74 Removes all items from the queue and returns them as a
75 batch. The queue is left empty.
76
77 @return The batch of taken items.
78 */
79 taken_batch
80 19503x take_all() noexcept
81 {
82 19503x taken_batch batch{head_, tail_};
83 19503x head_ = tail_ = nullptr;
84 19503x return batch;
85 }
86
87 /** Resume each continuation in a taken batch.
88
89 Advances past each node before resuming, since the
90 resumed coroutine may destroy the awaitable (and thus
91 the continuation) before control returns here.
92
93 @param batch The batch to dispatch.
94
95 @note Thread-safe with respect to push() because the queue
96 itself is not touched.
97 */
98 static
99 void
100 19503x dispatch_batch(taken_batch& batch)
101 {
102 49843x while(batch.head)
103 {
104 30340x continuation* c = batch.head;
105 30340x batch.head = c->next;
106 30340x safe_resume(c->h);
107 }
108 19503x batch.tail = nullptr;
109 19503x }
110 };
111
112 } // namespace detail
113 } // namespace capy
114 } // namespace boost
115
116 #endif
117