include/boost/corosio/native/detail/io_uring/io_uring_stream_file.hpp

95.5% Lines (107/112) 100.0% List of functions (17/17)
io_uring_stream_file.hpp
f(x) Functions (17)
Function Calls Lines Blocks
boost::corosio::detail::io_uring_stream_file::io_uring_stream_file(boost::corosio::detail::io_uring_scheduler&) :79 37x 100.0% 100.0% boost::corosio::detail::io_uring_stream_file::~io_uring_stream_file() :83 37x 100.0% 100.0% boost::corosio::detail::io_uring_stream_file::native_handle() const :108 115x 100.0% 100.0% boost::corosio::detail::io_uring_stream_file::cancel() :113 1x 100.0% 100.0% boost::corosio::detail::io_uring_stream_file::size() const :119 3x 75.0% 62.0% boost::corosio::detail::io_uring_stream_file::resize(unsigned long) :127 2x 87.5% 82.0% boost::corosio::detail::io_uring_stream_file::sync_data() :137 1x 60.0% 67.0% boost::corosio::detail::io_uring_stream_file::sync_all() :148 1x 75.0% 67.0% boost::corosio::detail::io_uring_stream_file::release() :154 1x 100.0% 100.0% boost::corosio::detail::io_uring_stream_file::assign(int) :161 1x 100.0% 100.0% boost::corosio::detail::io_uring_stream_file::seek(long, boost::corosio::file_base::seek_basis) :167 7x 100.0% 100.0% boost::corosio::detail::io_uring_stream_file::open_file(std::filesystem::__cxx11::path const&, boost::corosio::file_base::flags) :183 29x 100.0% 100.0% boost::corosio::detail::io_uring_stream_file::close_file() :226 168x 100.0% 100.0% boost::corosio::detail::io_uring_stream_file::read_some(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::buffer_param, std::stop_token, std::error_code*, unsigned long*) :238 9x 100.0% 93.0% boost::corosio::detail::io_uring_stream_file::write_some(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::buffer_param, std::stop_token, std::error_code*, unsigned long*) :263 9x 100.0% 93.0% boost::corosio::detail::io_uring_stream_file_service::io_uring_stream_file_service(boost::capy::execution_context&, boost::corosio::detail::io_uring_scheduler&) :301 551x 100.0% 100.0% boost::corosio::detail::io_uring_stream_file_service::open_file(boost::corosio::stream_file::implementation&, std::filesystem::__cxx11::path const&, boost::corosio::file_base::flags) :309 29x 100.0% 100.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_IO_URING_IO_URING_STREAM_FILE_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_IO_URING_IO_URING_STREAM_FILE_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_IO_URING
16
17 #include <boost/corosio/detail/file_service.hpp>
18 #include <boost/corosio/detail/intrusive.hpp>
19 #include <boost/corosio/native/detail/io_uring/io_uring_file_ops.hpp>
20 #include <boost/corosio/native/detail/io_uring/io_uring_file_service_base.hpp>
21 #include <boost/corosio/native/detail/io_uring/io_uring_scheduler.hpp>
22 #include <boost/corosio/native/detail/make_err.hpp>
23 #include <boost/corosio/stream_file.hpp>
24
25 #include <cstdint>
26 #include <filesystem>
27 #include <limits>
28 #include <memory>
29 #include <mutex>
30 #include <system_error>
31 #include <unordered_map>
32
33 #include <fcntl.h>
34 #include <sys/stat.h>
35 #include <sys/types.h>
36 #include <unistd.h>
37
38 namespace boost::corosio::detail {
39
40 class io_uring_stream_file_service;
41
42 /** Native io_uring stream-file implementation.
43
44 Async `read_some` / `write_some` submit `IORING_OP_READV` /
45 `IORING_OP_WRITEV` with `offset == -1` (kernel f_pos). All
46 metadata operations (open, size, resize, sync, seek, close)
47 are synchronous syscalls.
48
49 @par Thread Safety
50 Concurrent `read_some` / `write_some` calls on the same file
51 interleave at the kernel level (matches POSIX `read(2)` /
52 `write(2)` semantics on a shared positional fd).
53
54 @note On `O_APPEND` open this backend relies on the kernel's
55 `f_pos` rather than tracking the offset in user space. Writes
56 still go to EOF atomically per `O_APPEND` semantics, but
57 `seek(0, seek_cur)` immediately after an append-mode open
58 returns `0` (the current f_pos), not the file size — observably
59 different from the POSIX backend, which seeds an internal offset
60 to size-at-open. Both behaviours are valid; documented for
61 cross-backend symmetry.
62 */
63 class BOOST_COROSIO_DECL io_uring_stream_file final
64 : public stream_file::implementation
65 , public std::enable_shared_from_this<io_uring_stream_file>
66 , public intrusive_list<io_uring_stream_file>::node
67 {
68 friend class io_uring_stream_file_service;
69
70 int fd_ = -1;
71 io_uring_scheduler* sched_ = nullptr;
72
73 // Per-fd op slots — embedded to eliminate per-call heap allocation.
74 // Single-pending invariant per slot.
75 uring_file_read_op rd_;
76 uring_file_write_op wr_;
77
78 public:
79 37x explicit io_uring_stream_file(io_uring_scheduler& sched) noexcept
80 37x : sched_(&sched)
81 37x {}
82
83 37x ~io_uring_stream_file() override
84 37x {
85 37x close_file();
86 37x }
87
88 // -- io_stream::implementation --
89
90 std::coroutine_handle<> read_some(
91 std::coroutine_handle<>,
92 capy::executor_ref,
93 buffer_param,
94 std::stop_token,
95 std::error_code*,
96 std::size_t*) override;
97
98 std::coroutine_handle<> write_some(
99 std::coroutine_handle<>,
100 capy::executor_ref,
101 buffer_param,
102 std::stop_token,
103 std::error_code*,
104 std::size_t*) override;
105
106 // -- stream_file::implementation --
107
108 115x native_handle_type native_handle() const noexcept override
109 {
110 115x return fd_;
111 }
112
113 1x void cancel() noexcept override
114 {
115 1x if (fd_ >= 0)
116 1x sched_->submit_cancel_by_fd(fd_);
117 1x }
118
119 3x std::uint64_t size() const override
120 {
121 struct stat st;
122 3x if (::fstat(fd_, &st) < 0)
123 throw_system_error(make_err(errno), "stream_file::size");
124 3x return static_cast<std::uint64_t>(st.st_size);
125 }
126
127 2x void resize(std::uint64_t new_size) override
128 {
129 2x if (new_size > static_cast<std::uint64_t>(
130 2x (std::numeric_limits<off_t>::max)()))
131 1x throw_system_error(
132 2x make_err(EOVERFLOW), "stream_file::resize");
133 1x if (::ftruncate(fd_, static_cast<off_t>(new_size)) < 0)
134 throw_system_error(make_err(errno), "stream_file::resize");
135 1x }
136
137 1x void sync_data() override
138 {
139 #if BOOST_COROSIO_HAS_POSIX_SYNCHRONIZED_IO
140 1x if (::fdatasync(fd_) < 0)
141 #else
142 if (::fsync(fd_) < 0)
143 #endif
144 throw_system_error(
145 make_err(errno), "stream_file::sync_data");
146 1x }
147
148 1x void sync_all() override
149 {
150 1x if (::fsync(fd_) < 0)
151 throw_system_error(make_err(errno), "stream_file::sync_all");
152 1x }
153
154 1x native_handle_type release() override
155 {
156 1x int fd = fd_;
157 1x fd_ = -1;
158 1x return fd;
159 }
160
161 1x void assign(native_handle_type handle) override
162 {
163 1x close_file();
164 1x fd_ = handle;
165 1x }
166
167 7x std::uint64_t seek(
168 std::int64_t offset, file_base::seek_basis origin) override
169 {
170 7x int whence = SEEK_SET;
171 7x if (origin == file_base::seek_cur) whence = SEEK_CUR;
172 5x else if (origin == file_base::seek_end) whence = SEEK_END;
173
174 7x off_t r = ::lseek(fd_, static_cast<off_t>(offset), whence);
175 7x if (r == static_cast<off_t>(-1))
176 3x throw_system_error(make_err(errno), "stream_file::seek");
177 4x return static_cast<std::uint64_t>(r);
178 }
179
180 // -- Internal --
181
182 /// Open the file. Synchronous; sets `fd_`. Caller is the service.
183 29x std::error_code open_file(
184 std::filesystem::path const& path, file_base::flags mode)
185 {
186 29x close_file();
187
188 29x int oflags = 0;
189 29x unsigned access = static_cast<unsigned>(mode) & 3u;
190 29x if (access == static_cast<unsigned>(file_base::read_write))
191 2x oflags |= O_RDWR;
192 27x else if (access == static_cast<unsigned>(file_base::write_only))
193 10x oflags |= O_WRONLY;
194 else
195 17x oflags |= O_RDONLY;
196
197 29x if ((mode & file_base::create) != file_base::flags(0))
198 9x oflags |= O_CREAT;
199 29x if ((mode & file_base::exclusive) != file_base::flags(0))
200 1x oflags |= O_EXCL;
201 29x if ((mode & file_base::truncate) != file_base::flags(0))
202 8x oflags |= O_TRUNC;
203 29x if ((mode & file_base::append) != file_base::flags(0))
204 1x oflags |= O_APPEND;
205 29x if ((mode & file_base::sync_all_on_write) != file_base::flags(0))
206 1x oflags |= O_SYNC;
207
208 29x oflags |= O_CLOEXEC;
209
210 29x int fd = ::open(path.c_str(), oflags, 0666);
211 29x if (fd < 0)
212 2x return make_err(errno);
213
214 27x fd_ = fd;
215
216 #ifdef POSIX_FADV_SEQUENTIAL
217 // Hint the page cache about the access pattern; matches the
218 // POSIX backend.
219 27x ::posix_fadvise(fd_, 0, 0, POSIX_FADV_SEQUENTIAL);
220 #endif
221
222 27x return {};
223 }
224
225 /// Cancel any in-flight ops and close the fd. Idempotent.
226 168x void close_file() noexcept
227 {
228 168x if (fd_ >= 0)
229 {
230 27x sched_->cancel_and_flush(fd_);
231 27x ::close(fd_);
232 27x fd_ = -1;
233 }
234 168x }
235 };
236
237 inline std::coroutine_handle<>
238 9x io_uring_stream_file::read_some(
239 std::coroutine_handle<> h,
240 capy::executor_ref ex,
241 buffer_param buffers,
242 std::stop_token token,
243 std::error_code* ec,
244 std::size_t* bytes)
245 {
246 9x rd_.prepare(h, ex, ec, bytes, fd_, /*file_offset=*/-1, sched_,
247 18x shared_from_this(), buffers, token);
248 9x sched_->work_started();
249
250 17x if (rd_.empty_buffer ||
251 8x rd_.cancelled.load(std::memory_order_acquire))
252 {
253 2x io_uring_scheduler::lock_type lock(sched_->dispatch_mutex());
254 2x sched_->push_completed_locked(&rd_);
255 2x return std::noop_coroutine();
256 2x }
257
258 7x io_uring_submit_op(*sched_, &rd_);
259 7x return std::noop_coroutine();
260 }
261
262 inline std::coroutine_handle<>
263 9x io_uring_stream_file::write_some(
264 std::coroutine_handle<> h,
265 capy::executor_ref ex,
266 buffer_param buffers,
267 std::stop_token token,
268 std::error_code* ec,
269 std::size_t* bytes)
270 {
271 9x wr_.prepare(h, ex, ec, bytes, fd_, /*file_offset=*/-1, sched_,
272 18x shared_from_this(), buffers, token);
273 9x sched_->work_started();
274
275 17x if (wr_.empty_buffer ||
276 8x wr_.cancelled.load(std::memory_order_acquire))
277 {
278 1x io_uring_scheduler::lock_type lock(sched_->dispatch_mutex());
279 1x sched_->push_completed_locked(&wr_);
280 1x return std::noop_coroutine();
281 1x }
282
283 8x io_uring_submit_op(*sched_, &wr_);
284 8x return std::noop_coroutine();
285 }
286
287 /** Native io_uring stream-file service.
288
289 Owns all `io_uring_stream_file` impls. Replaces
290 `posix_stream_file_service` for the io_uring backend; registered
291 under the abstract `file_service` key by `io_uring_t::construct`.
292 */
293 class BOOST_COROSIO_DECL io_uring_stream_file_service final
294 : public io_uring_file_service_base<
295 io_uring_stream_file_service, file_service, io_uring_stream_file>
296 {
297 using base_service = io_uring_file_service_base<
298 io_uring_stream_file_service, file_service, io_uring_stream_file>;
299
300 public:
301 551x explicit io_uring_stream_file_service(
302 capy::execution_context& /*ctx*/, io_uring_scheduler& sched)
303 551x : base_service(sched)
304 551x {}
305
306 // construct / destroy / close / shutdown / scheduler() are inherited
307 // from io_uring_file_service_base.
308
309 29x std::error_code open_file(
310 stream_file::implementation& impl,
311 std::filesystem::path const& path,
312 file_base::flags mode) override
313 {
314 29x return static_cast<io_uring_stream_file&>(impl).open_file(
315 29x path, mode);
316 }
317 };
318
319 } // namespace boost::corosio::detail
320
321 #endif // BOOST_COROSIO_HAS_IO_URING
322
323 #endif // BOOST_COROSIO_NATIVE_DETAIL_IO_URING_IO_URING_STREAM_FILE_HPP
324