include/boost/corosio/native/detail/io_uring/io_uring_stream_file.hpp

60.8% Lines (87/143) 65.2% List of functions (15/23)
io_uring_stream_file.hpp
f(x) Functions (23)
Function Calls Lines Blocks
boost::corosio::detail::io_uring_stream_file::io_uring_stream_file(boost::corosio::detail::io_uring_scheduler&) :78 5x 100.0% 100.0% boost::corosio::detail::io_uring_stream_file::~io_uring_stream_file() :82 5x 100.0% 100.0% boost::corosio::detail::io_uring_stream_file::native_handle() const :107 12x 100.0% 100.0% boost::corosio::detail::io_uring_stream_file::cancel() :112 0 0.0% 0.0% boost::corosio::detail::io_uring_stream_file::size() const :118 0 0.0% 0.0% boost::corosio::detail::io_uring_stream_file::resize(unsigned long) :126 0 0.0% 0.0% boost::corosio::detail::io_uring_stream_file::sync_data() :136 0 0.0% 0.0% boost::corosio::detail::io_uring_stream_file::sync_all() :147 0 0.0% 0.0% boost::corosio::detail::io_uring_stream_file::release() :153 0 0.0% 0.0% boost::corosio::detail::io_uring_stream_file::assign(int) :160 0 0.0% 0.0% boost::corosio::detail::io_uring_stream_file::seek(long, boost::corosio::file_base::seek_basis) :166 0 0.0% 0.0% boost::corosio::detail::io_uring_stream_file::open_file(std::filesystem::__cxx11::path const&, boost::corosio::file_base::flags) :182 4x 80.8% 83.0% boost::corosio::detail::io_uring_stream_file::close_file() :225 23x 100.0% 100.0% boost::corosio::detail::io_uring_stream_file::read_some(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::buffer_param, std::stop_token, std::error_code*, unsigned long*) :237 2x 66.7% 63.0% boost::corosio::detail::io_uring_stream_file::write_some(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::buffer_param, std::stop_token, std::error_code*, unsigned long*) :262 1x 66.7% 63.0% boost::corosio::detail::io_uring_stream_file_service::io_uring_stream_file_service(boost::capy::execution_context&, boost::corosio::detail::io_uring_scheduler&) :296 389x 100.0% 100.0% boost::corosio::detail::io_uring_stream_file_service::~io_uring_stream_file_service() :301 778x 100.0% 100.0% boost::corosio::detail::io_uring_stream_file_service::construct() :308 5x 100.0% 71.0% boost::corosio::detail::io_uring_stream_file_service::destroy(boost::corosio::io_object::implementation*) :320 5x 100.0% 100.0% boost::corosio::detail::io_uring_stream_file_service::close(boost::corosio::io_object::handle&) :330 9x 100.0% 100.0% boost::corosio::detail::io_uring_stream_file_service::open_file(boost::corosio::stream_file::implementation&, std::filesystem::__cxx11::path const&, boost::corosio::file_base::flags) :336 4x 100.0% 100.0% boost::corosio::detail::io_uring_stream_file_service::shutdown() :345 389x 71.4% 78.0% boost::corosio::detail::io_uring_stream_file_service::destroy_impl(boost::corosio::detail::io_uring_stream_file&) :357 5x 100.0% 67.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_IO_URING_IO_URING_STREAM_FILE_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_IO_URING_IO_URING_STREAM_FILE_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_IO_URING
16
17 #include <boost/corosio/detail/file_service.hpp>
18 #include <boost/corosio/detail/intrusive.hpp>
19 #include <boost/corosio/native/detail/io_uring/io_uring_file_ops.hpp>
20 #include <boost/corosio/native/detail/io_uring/io_uring_scheduler.hpp>
21 #include <boost/corosio/native/detail/make_err.hpp>
22 #include <boost/corosio/stream_file.hpp>
23
24 #include <cstdint>
25 #include <filesystem>
26 #include <limits>
27 #include <memory>
28 #include <mutex>
29 #include <system_error>
30 #include <unordered_map>
31
32 #include <fcntl.h>
33 #include <sys/stat.h>
34 #include <sys/types.h>
35 #include <unistd.h>
36
37 namespace boost::corosio::detail {
38
39 class io_uring_stream_file_service;
40
41 /** Native io_uring stream-file implementation.
42
43 Async `read_some` / `write_some` submit `IORING_OP_READV` /
44 `IORING_OP_WRITEV` with `offset == -1` (kernel f_pos). All
45 metadata operations (open, size, resize, sync, seek, close)
46 are synchronous syscalls.
47
48 @par Thread Safety
49 Concurrent `read_some` / `write_some` calls on the same file
50 interleave at the kernel level (matches POSIX `read(2)` /
51 `write(2)` semantics on a shared positional fd).
52
53 @note On `O_APPEND` open this backend relies on the kernel's
54 `f_pos` rather than tracking the offset in user space. Writes
55 still go to EOF atomically per `O_APPEND` semantics, but
56 `seek(0, seek_cur)` immediately after an append-mode open
57 returns `0` (the current f_pos), not the file size — observably
58 different from the POSIX backend, which seeds an internal offset
59 to size-at-open. Both behaviours are valid; documented for
60 cross-backend symmetry.
61 */
62 class BOOST_COROSIO_DECL io_uring_stream_file final
63 : public stream_file::implementation
64 , public std::enable_shared_from_this<io_uring_stream_file>
65 , public intrusive_list<io_uring_stream_file>::node
66 {
67 friend class io_uring_stream_file_service;
68
69 int fd_ = -1;
70 io_uring_scheduler* sched_ = nullptr;
71
72 // Per-fd op slots — embedded to eliminate per-call heap allocation.
73 // Single-pending invariant per slot.
74 uring_file_read_op rd_;
75 uring_file_write_op wr_;
76
77 public:
78 5x explicit io_uring_stream_file(io_uring_scheduler& sched) noexcept
79 5x : sched_(&sched)
80 5x {}
81
82 5x ~io_uring_stream_file() override
83 5x {
84 5x close_file();
85 5x }
86
87 // -- io_stream::implementation --
88
89 std::coroutine_handle<> read_some(
90 std::coroutine_handle<>,
91 capy::executor_ref,
92 buffer_param,
93 std::stop_token,
94 std::error_code*,
95 std::size_t*) override;
96
97 std::coroutine_handle<> write_some(
98 std::coroutine_handle<>,
99 capy::executor_ref,
100 buffer_param,
101 std::stop_token,
102 std::error_code*,
103 std::size_t*) override;
104
105 // -- stream_file::implementation --
106
107 12x native_handle_type native_handle() const noexcept override
108 {
109 12x return fd_;
110 }
111
112 void cancel() noexcept override
113 {
114 if (fd_ >= 0)
115 sched_->submit_cancel_by_fd(fd_);
116 }
117
118 std::uint64_t size() const override
119 {
120 struct stat st;
121 if (::fstat(fd_, &st) < 0)
122 throw_system_error(make_err(errno), "stream_file::size");
123 return static_cast<std::uint64_t>(st.st_size);
124 }
125
126 void resize(std::uint64_t new_size) override
127 {
128 if (new_size > static_cast<std::uint64_t>(
129 (std::numeric_limits<off_t>::max)()))
130 throw_system_error(
131 make_err(EOVERFLOW), "stream_file::resize");
132 if (::ftruncate(fd_, static_cast<off_t>(new_size)) < 0)
133 throw_system_error(make_err(errno), "stream_file::resize");
134 }
135
136 void sync_data() override
137 {
138 #if BOOST_COROSIO_HAS_POSIX_SYNCHRONIZED_IO
139 if (::fdatasync(fd_) < 0)
140 #else
141 if (::fsync(fd_) < 0)
142 #endif
143 throw_system_error(
144 make_err(errno), "stream_file::sync_data");
145 }
146
147 void sync_all() override
148 {
149 if (::fsync(fd_) < 0)
150 throw_system_error(make_err(errno), "stream_file::sync_all");
151 }
152
153 native_handle_type release() override
154 {
155 int fd = fd_;
156 fd_ = -1;
157 return fd;
158 }
159
160 void assign(native_handle_type handle) override
161 {
162 close_file();
163 fd_ = handle;
164 }
165
166 std::uint64_t seek(
167 std::int64_t offset, file_base::seek_basis origin) override
168 {
169 int whence = SEEK_SET;
170 if (origin == file_base::seek_cur) whence = SEEK_CUR;
171 else if (origin == file_base::seek_end) whence = SEEK_END;
172
173 off_t r = ::lseek(fd_, static_cast<off_t>(offset), whence);
174 if (r == static_cast<off_t>(-1))
175 throw_system_error(make_err(errno), "stream_file::seek");
176 return static_cast<std::uint64_t>(r);
177 }
178
179 // -- Internal --
180
181 /// Open the file. Synchronous; sets `fd_`. Caller is the service.
182 4x std::error_code open_file(
183 std::filesystem::path const& path, file_base::flags mode)
184 {
185 4x close_file();
186
187 4x int oflags = 0;
188 4x unsigned access = static_cast<unsigned>(mode) & 3u;
189 4x if (access == static_cast<unsigned>(file_base::read_write))
190 oflags |= O_RDWR;
191 4x else if (access == static_cast<unsigned>(file_base::write_only))
192 1x oflags |= O_WRONLY;
193 else
194 3x oflags |= O_RDONLY;
195
196 4x if ((mode & file_base::create) != file_base::flags(0))
197 1x oflags |= O_CREAT;
198 4x if ((mode & file_base::exclusive) != file_base::flags(0))
199 oflags |= O_EXCL;
200 4x if ((mode & file_base::truncate) != file_base::flags(0))
201 1x oflags |= O_TRUNC;
202 4x if ((mode & file_base::append) != file_base::flags(0))
203 oflags |= O_APPEND;
204 4x if ((mode & file_base::sync_all_on_write) != file_base::flags(0))
205 oflags |= O_SYNC;
206
207 4x oflags |= O_CLOEXEC;
208
209 4x int fd = ::open(path.c_str(), oflags, 0666);
210 4x if (fd < 0)
211 return make_err(errno);
212
213 4x fd_ = fd;
214
215 #ifdef POSIX_FADV_SEQUENTIAL
216 // Hint the page cache about the access pattern; matches the
217 // POSIX backend.
218 4x ::posix_fadvise(fd_, 0, 0, POSIX_FADV_SEQUENTIAL);
219 #endif
220
221 4x return {};
222 }
223
224 /// Cancel any in-flight ops and close the fd. Idempotent.
225 23x void close_file() noexcept
226 {
227 23x if (fd_ >= 0)
228 {
229 4x sched_->cancel_and_flush(fd_);
230 4x ::close(fd_);
231 4x fd_ = -1;
232 }
233 23x }
234 };
235
236 inline std::coroutine_handle<>
237 2x io_uring_stream_file::read_some(
238 std::coroutine_handle<> h,
239 capy::executor_ref ex,
240 buffer_param buffers,
241 std::stop_token token,
242 std::error_code* ec,
243 std::size_t* bytes)
244 {
245 2x rd_.prepare(h, ex, ec, bytes, fd_, /*file_offset=*/-1, sched_,
246 4x shared_from_this(), buffers, token);
247 2x sched_->work_started();
248
249 4x if (rd_.empty_buffer ||
250 2x rd_.cancelled.load(std::memory_order_acquire))
251 {
252 io_uring_scheduler::lock_type lock(sched_->dispatch_mutex());
253 sched_->push_completed_locked(&rd_);
254 return std::noop_coroutine();
255 }
256
257 2x io_uring_submit_op(*sched_, &rd_);
258 2x return std::noop_coroutine();
259 }
260
261 inline std::coroutine_handle<>
262 1x io_uring_stream_file::write_some(
263 std::coroutine_handle<> h,
264 capy::executor_ref ex,
265 buffer_param buffers,
266 std::stop_token token,
267 std::error_code* ec,
268 std::size_t* bytes)
269 {
270 1x wr_.prepare(h, ex, ec, bytes, fd_, /*file_offset=*/-1, sched_,
271 2x shared_from_this(), buffers, token);
272 1x sched_->work_started();
273
274 2x if (wr_.empty_buffer ||
275 1x wr_.cancelled.load(std::memory_order_acquire))
276 {
277 io_uring_scheduler::lock_type lock(sched_->dispatch_mutex());
278 sched_->push_completed_locked(&wr_);
279 return std::noop_coroutine();
280 }
281
282 1x io_uring_submit_op(*sched_, &wr_);
283 1x return std::noop_coroutine();
284 }
285
286 /** Native io_uring stream-file service.
287
288 Owns all `io_uring_stream_file` impls. Replaces
289 `posix_stream_file_service` for the io_uring backend; registered
290 under the abstract `file_service` key by `io_uring_t::construct`.
291 */
292 class BOOST_COROSIO_DECL io_uring_stream_file_service final
293 : public file_service
294 {
295 public:
296 389x explicit io_uring_stream_file_service(
297 capy::execution_context& /*ctx*/, io_uring_scheduler& sched)
298 389x : sched_(&sched)
299 389x {}
300
301 778x ~io_uring_stream_file_service() override = default;
302
303 io_uring_stream_file_service(
304 io_uring_stream_file_service const&) = delete;
305 io_uring_stream_file_service& operator=(
306 io_uring_stream_file_service const&) = delete;
307
308 5x io_object::implementation* construct() override
309 {
310 5x auto ptr = std::make_shared<io_uring_stream_file>(*sched_);
311 5x auto* impl = ptr.get();
312 {
313 5x std::lock_guard<std::mutex> lock(mutex_);
314 5x file_list_.push_back(impl);
315 5x file_ptrs_[impl] = std::move(ptr);
316 5x }
317 5x return impl;
318 5x }
319
320 5x void destroy(io_object::implementation* p) override
321 {
322 // close_file() already does cancel_and_flush(fd_) before
323 // ::close — calling cancel() too would queue a redundant
324 // cancel-by-fd SQE that finds nothing.
325 5x auto& impl = static_cast<io_uring_stream_file&>(*p);
326 5x impl.close_file();
327 5x destroy_impl(impl);
328 5x }
329
330 9x void close(io_object::handle& h) override
331 {
332 9x if (h.get())
333 9x static_cast<io_uring_stream_file&>(*h.get()).close_file();
334 9x }
335
336 4x std::error_code open_file(
337 stream_file::implementation& impl,
338 std::filesystem::path const& path,
339 file_base::flags mode) override
340 {
341 4x return static_cast<io_uring_stream_file&>(impl).open_file(
342 4x path, mode);
343 }
344
345 389x void shutdown() override
346 {
347 389x std::lock_guard<std::mutex> lock(mutex_);
348 389x for (auto* impl = file_list_.pop_front(); impl != nullptr;
349 impl = file_list_.pop_front())
350 {
351 impl->close_file();
352 }
353 389x file_ptrs_.clear();
354 389x }
355
356 private:
357 5x void destroy_impl(io_uring_stream_file& impl)
358 {
359 5x std::lock_guard<std::mutex> lock(mutex_);
360 5x file_list_.remove(&impl);
361 5x file_ptrs_.erase(&impl);
362 5x }
363
364 io_uring_scheduler* sched_;
365 std::mutex mutex_;
366 intrusive_list<io_uring_stream_file> file_list_;
367 std::unordered_map<
368 io_uring_stream_file*,
369 std::shared_ptr<io_uring_stream_file>> file_ptrs_;
370 };
371
372 } // namespace boost::corosio::detail
373
374 #endif // BOOST_COROSIO_HAS_IO_URING
375
376 #endif // BOOST_COROSIO_NATIVE_DETAIL_IO_URING_IO_URING_STREAM_FILE_HPP
377