parallel_operation.hpp
1 /*
2  * Copyright (c) 2021-2025, NVIDIA CORPORATION.
3  *
4  * 根据 Apache 许可证 2.0 版本("许可证")获得许可;
5  * 除非遵守许可证,否则不得使用此文件。
6  * 您可以在以下位置获取许可证的副本:
7  *
8  * https://apache.ac.cn/licenses/LICENSE-2.0
9  *
10  * 除非适用法律要求或书面同意,根据许可证分发的软件
11  * 按“现状”分发,不附带任何明示或暗示的保证或条件。
12  * 有关许可证下特定语言权限和限制,请参阅许可证。
13  * 查看许可证。
14  */
15  */
16 #pragma once
17 
18 #include <atomic>
19 #include <cassert>
20 #include <future>
21 #include <memory>
22 #include <numeric>
23 #include <system_error>
24 #include <type_traits>
25 #include <utility>
26 #include <vector>
27 
28 #include <kvikio/defaults.hpp>
29 #include <kvikio/error.hpp>
30 #include <kvikio/nvtx.hpp>
31 #include <kvikio/utils.hpp>
32 
33 namespace kvikio {
34 
35 namespace detail {
36 
49 /// 将一个 lambda 或可调用对象包装在一个可拷贝的 lambda 中,
50 /// 即使原始对象不可拷贝(例如,包含 unique_ptr)。
51 template <typename F>
52 auto make_copyable_lambda(F op)
53 {
54  // 通过从 op 移动,在堆上创建可调用对象。使用共享指针管理其生命周期。
55  auto sp = std::make_shared<F>(std::move(op));
56 
57  // 使用可拷贝的闭包作为仅可移动可调用对象的代理。
58  return
59  [sp](auto&&... args) -> decltype(auto) { return (*sp)(std::forward<decltype(args)>(args)...); };
60 }
61 
70 /// @brief 返回用于 NVTX 范围的下一个颜色和调用索引。
71 /// @return `pair` 包含下一个颜色和调用索引。
72 inline const std::pair<const nvtx_color_type&, std::uint64_t> get_next_color_and_call_idx() noexcept
73 {
74  static std::atomic_uint64_t call_counter{1ull};
75  auto call_idx = call_counter.fetch_add(1ull, std::memory_order_relaxed);
76  auto& nvtx_color = NvtxManager::get_color_by_index(call_idx);
77  return {nvtx_color, call_idx};
78 }
79 
83 /// @brief 向默认线程池提交一个任务。
84 /// @tparam F 任务的可调用类型。
85 /// @tparam T 缓冲区的类型。
86 /// @param op 要执行的操作。
87 /// @param buf 缓冲区。
88 /// @param size 要操作的大小(以字节为单位)。
89 /// @param file_offset 文件偏移量(以字节为单位)。
90 /// @param devPtr_offset GPU 设备指针偏移量(以字节为单位)。
91 /// @param nvtx_payload NVTX 负载。
92 /// @param nvtx_color NVTX 颜色。
93 /// @return 表示任务结果的 future。
94 template <typename F, typename T>
95 std::future<std::size_t> submit_task(F op,
96  T buf,
97  std::size_t size,
98  std::size_t file_offset,
99  std::size_t devPtr_offset,
100  std::uint64_t nvtx_payload = 0ull,
101  nvtx_color_type nvtx_color = NvtxManager::default_color())
102 {
103  static_assert(std::is_invocable_r_v<std::size_t,
104  decltype(op),
105  decltype(buf),
106  decltype(size),
107  decltype(file_offset),
108  decltype(devPtr_offset)>);
109 
110  return defaults::thread_pool().submit_task([=] {
111  KVIKIO_NVTX_SCOPED_RANGE("task", nvtx_payload, nvtx_color);
112  return op(buf, size, file_offset, devPtr_offset);
113  });
114 }
115 
123 /// @brief 向默认线程池提交一个仅可移动任务。
124 /// @tparam F 任务的仅可移动可调用类型。
125 /// @param op_move_only 要执行的仅可移动操作。
126 /// @param nvtx_payload NVTX 负载。
127 /// @param nvtx_color NVTX 颜色。
128 /// @return 表示任务结果的 future。
129 template <typename F>
130 std::future<std::size_t> submit_move_only_task(
131  F op_move_only,
132  std::uint64_t nvtx_payload = 0ull,
133  nvtx_color_type nvtx_color = NvtxManager::default_color())
134 {
135  static_assert(std::is_invocable_r_v<std::size_t, F>);
136  auto op_copyable = make_copyable_lambda(std::move(op_move_only));
137  return defaults::thread_pool().submit_task([=] {
138  KVIKIO_NVTX_SCOPED_RANGE("task", nvtx_payload, nvtx_color);
139  return op_copyable();
140  });
141 }
142 
143 } // namespace detail
144 
140 /**
141  * @brief 并行应用读或写操作。
142  *
143  * 这个函数将一个大型的 I/O 操作分割成多个较小的任务,并在后台线程池中异步执行它们。
144  * 用户需要通过等待返回的 future 对象来等待操作完成。
145  *
146  * @tparam F 操作的可调用类型。期望签名是 `size_t op(T buf, size_t size, size_t file_offset, size_t devPtr_offset)`。
147  * @tparam T 缓冲区的类型。
148  * @param op 要执行的操作。
149  * @param buf 缓冲区。可以是 `void*` 用于 CPU 内存,或 `cuda::device::pointer` 用于 GPU 内存。
150  * @param size 要操作的总大小(以字节为单位)。
151  * @param file_offset 文件偏移量(以字节为单位)。
152  * @param task_size 单个任务的最大大小(以字节为单位)。函数将把总大小分割成不超过此大小的任务。
153  * @param devPtr_offset GPU 设备指针偏移量(以字节为单位)。对于 CPU 内存操作,这应为 0。
154  * @param call_idx 当前 kvikIO 调用索引,用于 NVTX。默认值为 0,表示将自动生成索引。
155  * @param nvtx_color 用于 NVTX 的颜色。
156  * @return future 表示所有任务完成时读取/写入的总字节数。
157  */
158 template <typename F, typename T>
159 std::future<std::size_t> parallel_io(F op,
160  T buf,
161  std::size_t size,
162  std::size_t file_offset,
163  std::size_t task_size,
164  std::size_t devPtr_offset,
165  std::uint64_t call_idx = 0,
166  nvtx_color_type nvtx_color = NvtxManager::default_color())
167 {
168  KVIKIO_EXPECT(task_size > 0, "`task_size` 必须为正数", std::invalid_argument);
169  static_assert(std::is_invocable_r_v<std::size_t,
170  decltype(op),
171  decltype(buf),
172  decltype(size),
173  decltype(file_offset),
174  decltype(devPtr_offset)>);
175 
176  // 单任务守卫
177  if (task_size >= size || page_size >= size) {
178  return detail::submit_task(op, buf, size, file_offset, devPtr_offset, call_idx, nvtx_color);
179  }
180 
181  std::vector<std::future<std::size_t>> tasks;
182  tasks.reserve(size / task_size);
183 
184  // 1) 提交除最后一个任务外的所有任务。这些任务的大小都为 `task_size`。
185  while (size > task_size) {