Coroutine 101 - A simple serializer
Jul 27, 2023This is the first post in a series explaining C++20 coroutines by example.
Goal
The goal for this article is to create a simple non-recursive stream serializer, that can be used like this:
stream::serializer serialize_ints(std::vector<int> data)
{
for (auto idx = 0u; idx < data.size(); idx++)
{
if (idx != 0)
co_yield ',';
co_yield std::to_string(data[idx]);
}
}
int main(int argc, char *argv[])
{
auto s = serialize_ints({1,2,3,4,5,6,7,8,9,10});
std::string buffer;
buffer.resize(10);
using std::operator""sv;
assert(s.read_some(buffer) == "1,2,3,4,5,"sv);
assert(s.read_some(buffer) == "6,7,8,9,10"sv);
return 0;
}
That is we want a serializer coroutine that can co_yield
strings and chars,
and can get consumed by a simple sync interface.
Serializer
The serializer interface itself is very simple:
struct serializer
{
// The promise_type tells how to make this a coroutine.
using promise_type = detail::serializer_promise;
bool done() const;
std::string_view read_some(std::span<char> buffer);
private:
// The data passed into read_some
std::span<char> buffer_;
// The amount written to the buffer
std::size_t written_;
// The bytes that couldn't be written bc the buffer was full.
std::string_view remaining_;
friend detail::serializer_promise;
// The unique_handle is essentially a unique_ptr of a std::coroutine_handle (which doesn't free)
detail::unique_handle<promise_type> impl_;
};
Next let’s quickly define the done & read_some functions:
bool serializer::done() const
{
// check if we have a coroutine attached & it's not done
// and that we don't have data remaining from a previously full buffer
return (!impl_ || impl_.done()) && remaining_.empty();
}
std::string_view serializer::read_some(std::span<char> buffer)
{
// consume data left from a previous co_yield.
auto n = (std::min)(remaining_.size(), buffer.size());
std::copy_n(remaining_.begin(), n, buffer.begin());
written_ = n;
remaining_.remove_prefix(n);
buffer_ = buffer;
// if the coroutine is still active & we have buffer space left, resume it
if (!done() && (buffer_.size() != written_))
{
// tell the coroutine promise where to write the data
impl_.promise().ser = this;
// resume the coroutine to yield more data
impl_.resume();
}
return {buffer.data(), written_};
}
The resume
function will resume the serializer coroutine until it
either yields or returns.
Awaitables
Before we get into the promise itself, we need to establish what awaitables are.
And awaitable is a type with three functions:
struct my_awaitable
{
bool await_ready();
void await_suspend(std::coroutine_handle<T>);
T await_resume();
};
When a coroutine awaits an awaitable, it will call await_ready
first.
If it returns true
the coroutine does not need to suspend and will call await_resume
to the await result (always void in this example).
If await_ready
returns false, the coroutine suspends and await_suspend
will be called with the coroutines handle passed it.
Once the coroutine gets resumed (by calling resume()
on the handle)
it will call await_resume
to get the result and return the value.
The main point is that await_ready
can be used to avoid suspension
of the coroutine, which can awaiting something an noop.
The standard provides two awaitables:
- std::suspend_never
- std::suspend_always
suspend_never
will do nothing (await_ready
returns true),
while suspend_always
will just suspend the coroutine (await_ready
returns false).
Serializer Promise
With that cleared up, let’s look a the promise:
struct serializer_promise
{
std::suspend_always initial_suspend() noexcept {return {};}
std::suspend_always final_suspend () noexcept {return {};}
serializer get_return_object();
void return_void() {}
void unhandled_exception() { throw; }
struct conditional_suspend
{
bool written;
bool await_ready()
{
[[likely]]
if (written)
return true;
else
return false;
}
void await_suspend(std::coroutine_handle<serializer_promise>) {}
void await_resume() {}
};
conditional_suspend yield_value(const char & c);
conditional_suspend yield_value(std::string_view c);
serializer * ser;
};
The next five functions are mandatory for any coroutine
initial_suspend
The initial_suspend
function gets called when the coroutine is created (i.e. when serialize_ints
is called) and it’s result
awaited.
In this case want the coroutine to be lazy, i.e. do nothing until read_some
resumes it. Therefor we return std::suspend_always
.
final_suspend
Once the coroutine is past the co_return
it will call final_suspend
and wait the result. This can be used for continuations and cleanup.
In our case, the serializer
object holds the coroutine handle and
will clean it up, so we also call std::suspend_always
.
get_return_object
The get_return_object
function is used to gets called to create the
handle of the coroutine, in our case a serialize
object.
The implementation is pretty straight forward:
serializer serializer_promise::get_return_object()
{
serializer s;
s.impl_ = detail::unique_handle<serializer_promise>::from_promise(*this);
return s;
}
return_void
A coroutine either defines return_void
if there is not return value
or return_value
if there is.
Since we check the completion through the handle, we don’t need to do anything here.
unhandled_exception
If the coroutine exits with an exception it can be intercepted here.
We just rethrow it, so the caller to .resume will receive it.
That is it will be thrown from serializer.read_some
.
conditional_suspend
The conditional_suspend
is an awaitable that let’s you dynamically chose between std::suspend_never
and std::suspend_always
.
We use this so we only suspend if the buffer is full and we tell
the compiler to optimize for this case using [[likely]]
.
yield_value
The yield_value
functions are needed when co_yield
should be possible within the coroutine function.
The function will get called with one value and it’s return value will be awaited. This way we can conditionally suspend if the buffer is full. In our coroutine, we can yield a string_view and a single char.
auto serializer_promise::yield_value(const char & c) -> conditional_suspend
{
// we got room in the buffer, just write the data
if (ser->written_ < ser->buffer_.size())
{
ser->buffer_[ser->written_] = c;
ser->written_++;
// don't suspend, we sent the data.
return conditional_suspend{true};
}
else // the buffer is full. store it as remaining
{
ser->remaining_ = {&c, 1u};
return conditional_suspend{false};
}
}
auto serializer_promise::yield_value(std::string_view c) -> conditional_suspend
{
if (ser->written_ < ser->buffer_.size())
{
// write as many bytes to the buffer as you can.
auto n = (std::min)(c.size(), (ser->buffer_.size() - ser->written_));
std::copy_n(c.begin(), n, ser->buffer_.begin() + ser->written_);
ser->written_ += n;
c.remove_prefix(n);
if (!c.empty())
ser->remaining_ = c;
// suspend if we couldn't write all of the data
return conditional_suspend{c.empty()};
}
else
{
// not space remaining, suspend
ser->remaining_ = c;
return conditional_suspend{false};
}
}
Conclusion
And with that we can write a stream serializer without confusing syntax.
In the next article, we’ll make the serializer recursive, i.e. allow one serializer to await another one.
All Posts by This Author
- 07/27/2023 Coroutine 101 - A simple serializer
- 01/02/2023 Asio 201 - timeouts, cancellation & custom tokens
- 12/11/2022 Asio 201 - deferred
- View All Posts...