Line | Count | Source |
1 | | // Copyright 2024 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use std::fs::Metadata; |
16 | | use std::io::{IoSlice, Seek}; |
17 | | use std::path::{Path, PathBuf}; |
18 | | use std::pin::Pin; |
19 | | use std::sync::atomic::{AtomicUsize, Ordering}; |
20 | | use std::task::{Context, Poll}; |
21 | | |
22 | | use nativelink_error::{make_err, Code, Error, ResultExt}; |
23 | | use rlimit::increase_nofile_limit; |
24 | | /// We wrap all `tokio::fs` items in our own wrapper so we can limit the number of outstanding |
25 | | /// open files at any given time. This will greatly reduce the chance we'll hit open file limit |
26 | | /// issues. |
27 | | pub use tokio::fs::DirEntry; |
28 | | use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncWrite, ReadBuf, SeekFrom, Take}; |
29 | | use tokio::sync::{Semaphore, SemaphorePermit}; |
30 | | use tracing::{event, Level}; |
31 | | |
32 | | use crate::spawn_blocking; |
33 | | |
34 | | /// Default read buffer size when reading to/from disk. |
35 | | pub const DEFAULT_READ_BUFF_SIZE: usize = 16384; |
36 | | |
37 | | #[derive(Debug)] |
38 | | pub struct FileSlot { |
39 | | // We hold the permit because once it is dropped it goes back into the queue. |
40 | | _permit: SemaphorePermit<'static>, |
41 | | inner: tokio::fs::File, |
42 | | } |
43 | | |
44 | | impl AsRef<tokio::fs::File> for FileSlot { |
45 | 112 | fn as_ref(&self) -> &tokio::fs::File { |
46 | 112 | &self.inner |
47 | 112 | } |
48 | | } |
49 | | |
50 | | impl AsMut<tokio::fs::File> for FileSlot { |
51 | 2 | fn as_mut(&mut self) -> &mut tokio::fs::File { |
52 | 2 | &mut self.inner |
53 | 2 | } |
54 | | } |
55 | | |
56 | | impl AsyncRead for FileSlot { |
57 | 2.52k | fn poll_read( |
58 | 2.52k | mut self: Pin<&mut Self>, |
59 | 2.52k | cx: &mut Context<'_>, |
60 | 2.52k | buf: &mut ReadBuf<'_>, |
61 | 2.52k | ) -> Poll<Result<(), tokio::io::Error>> { |
62 | 2.52k | Pin::new(&mut self.inner).poll_read(cx, buf) |
63 | 2.52k | } |
64 | | } |
65 | | |
66 | | impl AsyncSeek for FileSlot { |
67 | 23 | fn start_seek(mut self: Pin<&mut Self>, position: SeekFrom) -> Result<(), tokio::io::Error> { |
68 | 23 | Pin::new(&mut self.inner).start_seek(position) |
69 | 23 | } |
70 | | |
71 | 69 | fn poll_complete( |
72 | 69 | mut self: Pin<&mut Self>, |
73 | 69 | cx: &mut Context<'_>, |
74 | 69 | ) -> Poll<Result<u64, tokio::io::Error>> { |
75 | 69 | Pin::new(&mut self.inner).poll_complete(cx) |
76 | 69 | } |
77 | | } |
78 | | |
79 | | impl AsyncWrite for FileSlot { |
80 | 2 | fn poll_write( |
81 | 2 | mut self: Pin<&mut Self>, |
82 | 2 | cx: &mut Context<'_>, |
83 | 2 | buf: &[u8], |
84 | 2 | ) -> Poll<Result<usize, tokio::io::Error>> { |
85 | 2 | Pin::new(&mut self.inner).poll_write(cx, buf) |
86 | 2 | } |
87 | | |
88 | 0 | fn poll_flush( |
89 | 0 | mut self: Pin<&mut Self>, |
90 | 0 | cx: &mut Context<'_>, |
91 | 0 | ) -> Poll<Result<(), tokio::io::Error>> { |
92 | 0 | Pin::new(&mut self.inner).poll_flush(cx) |
93 | 0 | } |
94 | | |
95 | 0 | fn poll_shutdown( |
96 | 0 | mut self: Pin<&mut Self>, |
97 | 0 | cx: &mut Context<'_>, |
98 | 0 | ) -> Poll<Result<(), tokio::io::Error>> { |
99 | 0 | Pin::new(&mut self.inner).poll_shutdown(cx) |
100 | 0 | } |
101 | | |
102 | 75 | fn poll_write_vectored( |
103 | 75 | mut self: Pin<&mut Self>, |
104 | 75 | cx: &mut Context<'_>, |
105 | 75 | bufs: &[IoSlice<'_>], |
106 | 75 | ) -> Poll<Result<usize, tokio::io::Error>> { |
107 | 75 | Pin::new(&mut self.inner).poll_write_vectored(cx, bufs) |
108 | 75 | } |
109 | | |
110 | 75 | fn is_write_vectored(&self) -> bool { |
111 | 75 | self.inner.is_write_vectored() |
112 | 75 | } |
113 | | } |
114 | | |
115 | | // Note: If the default changes make sure you update the documentation in |
116 | | // `config/cas_server.rs`. |
117 | | pub const DEFAULT_OPEN_FILE_PERMITS: usize = 24 * 1024; // 24k. |
118 | | static TOTAL_FILE_SEMAPHORES: AtomicUsize = AtomicUsize::new(DEFAULT_OPEN_FILE_PERMITS); |
119 | | pub static OPEN_FILE_SEMAPHORE: Semaphore = Semaphore::const_new(DEFAULT_OPEN_FILE_PERMITS); |
120 | | |
121 | | /// Try to acquire a permit from the open file semaphore. |
122 | | #[inline] |
123 | 25.3k | pub async fn get_permit() -> Result<SemaphorePermit<'static>, Error> { |
124 | 25.3k | OPEN_FILE_SEMAPHORE |
125 | 25.3k | .acquire() |
126 | 25.3k | .await |
127 | 25.3k | .map_err(|e| make_err!(Code::Internal, "Open file semaphore closed {:?}", e)0 ) |
128 | 25.3k | } |
129 | | /// Acquire a permit from the open file semaphore and call a raw function. |
130 | | #[inline] |
131 | 735 | pub async fn call_with_permit<F, T>(f: F) -> Result<T, Error> |
132 | 735 | where |
133 | 735 | F: FnOnce(SemaphorePermit<'static>) -> Result<T, Error> + Send + 'static, |
134 | 735 | T: Send + 'static, |
135 | 735 | { |
136 | 735 | let permit = get_permit().await?0 ; |
137 | 735 | spawn_blocking!("fs_call_with_permit", move || f(permit)) |
138 | 735 | .await |
139 | 734 | .unwrap_or_else(|e| Err(make_err!(Code::Internal, "background task failed: {e:?}"))0 ) |
140 | 734 | } |
141 | | |
142 | 0 | pub fn set_open_file_limit(limit: usize) { |
143 | 0 | let new_limit = { |
144 | | // We increase the limit by 20% to give extra |
145 | | // room for other file descriptors like sockets, |
146 | | // pipes, and other things. |
147 | 0 | let fs_ulimit = |
148 | 0 | u64::try_from(limit.saturating_add(limit / 5)).expect("set_open_file_limit too large"); |
149 | 0 | match increase_nofile_limit(fs_ulimit) { |
150 | 0 | Ok(new_fs_ulimit) => { |
151 | 0 | event!( |
152 | 0 | Level::INFO, |
153 | 0 | "set_open_file_limit({limit})::ulimit success. New fs.ulimit: {fs_ulimit} (20% increase of {limit}).", |
154 | | ); |
155 | 0 | usize::try_from(new_fs_ulimit).expect("new_fs_ulimit too large") |
156 | | } |
157 | 0 | Err(e) => { |
158 | 0 | event!( |
159 | 0 | Level::ERROR, |
160 | 0 | "set_open_file_limit({limit})::ulimit failed. Maybe system does not have ulimits, continuing anyway. - {e:?}", |
161 | | ); |
162 | 0 | limit |
163 | | } |
164 | | } |
165 | | }; |
166 | 0 | if new_limit < DEFAULT_OPEN_FILE_PERMITS { Branch (166:8): [True: 0, False: 0]
Branch (166:8): [Folded - Ignored]
|
167 | 0 | event!( |
168 | 0 | Level::WARN, |
169 | 0 | "set_open_file_limit({limit}) succeeded, but this is below the default limit of {DEFAULT_OPEN_FILE_PERMITS}. Will continue, but we recommend increasing the limit to at least the default.", |
170 | | ); |
171 | 0 | } |
172 | 0 | if new_limit < limit { Branch (172:8): [True: 0, False: 0]
Branch (172:8): [Folded - Ignored]
|
173 | 0 | event!( |
174 | 0 | Level::WARN, |
175 | 0 | "set_open_file_limit({limit}) succeeded, but new open file limit is {new_limit}. Will continue, but likely a config or system options (ie: ulimit) needs updated.", |
176 | | ); |
177 | 0 | } |
178 | | |
179 | 0 | let current_total = TOTAL_FILE_SEMAPHORES.load(Ordering::Acquire); |
180 | 0 | if limit < current_total { Branch (180:8): [True: 0, False: 0]
Branch (180:8): [Folded - Ignored]
|
181 | 0 | event!( |
182 | 0 | Level::ERROR, |
183 | 0 | "set_open_file_limit({}) must be greater than {}", |
184 | | limit, |
185 | | current_total |
186 | | ); |
187 | 0 | return; |
188 | 0 | } |
189 | 0 | TOTAL_FILE_SEMAPHORES.fetch_add(limit - current_total, Ordering::Release); |
190 | 0 | OPEN_FILE_SEMAPHORE.add_permits(limit - current_total); |
191 | 0 | } |
192 | | |
193 | 13 | pub fn get_open_files_for_test() -> usize { |
194 | 13 | TOTAL_FILE_SEMAPHORES.load(Ordering::Acquire) - OPEN_FILE_SEMAPHORE.available_permits() |
195 | 13 | } |
196 | | |
197 | 60 | pub async fn open_file( |
198 | 60 | path: impl AsRef<Path>, |
199 | 60 | start: u64, |
200 | 60 | limit: u64, |
201 | 60 | ) -> Result<Take<FileSlot>, Error> { |
202 | 60 | let path = path.as_ref().to_owned(); |
203 | 60 | let (permit, os_file58 ) = call_with_permit(move |permit| { |
204 | 58 | let mut os_file = |
205 | 60 | std::fs::File::open(&path).err_tip(|| format!("Could not open {path:?}")2 )?2 ; |
206 | 58 | if start > 0 { Branch (206:12): [True: 0, False: 0]
Branch (206:12): [Folded - Ignored]
Branch (206:12): [True: 0, False: 1]
Branch (206:12): [True: 0, False: 2]
Branch (206:12): [True: 0, False: 4]
Branch (206:12): [True: 0, False: 6]
Branch (206:12): [True: 0, False: 0]
Branch (206:12): [Folded - Ignored]
Branch (206:12): [True: 0, False: 0]
Branch (206:12): [True: 0, False: 38]
Branch (206:12): [True: 0, False: 3]
Branch (206:12): [True: 0, False: 4]
|
207 | 0 | os_file |
208 | 0 | .seek(std::io::SeekFrom::Start(start)) |
209 | 0 | .err_tip(|| format!("Could not seek to {start} in {path:?}"))?; |
210 | 58 | } |
211 | 58 | Ok((permit, os_file)) |
212 | 60 | }) |
213 | 60 | .await?2 ; |
214 | 58 | Ok(FileSlot { |
215 | 58 | _permit: permit, |
216 | 58 | inner: tokio::fs::File::from_std(os_file), |
217 | 58 | } |
218 | 58 | .take(limit)) |
219 | 60 | } |
220 | | |
221 | 112 | pub async fn create_file(path: impl AsRef<Path>) -> Result<FileSlot, Error> { |
222 | 112 | let path = path.as_ref().to_owned(); |
223 | 112 | let (permit, os_file) = call_with_permit(move |permit| { |
224 | 112 | Ok(( |
225 | 112 | permit, |
226 | 112 | std::fs::File::options() |
227 | 112 | .read(true) |
228 | 112 | .write(true) |
229 | 112 | .create(true) |
230 | 112 | .truncate(true) |
231 | 112 | .open(&path) |
232 | 112 | .err_tip(|| format!("Could not open {path:?}")0 )?0 , |
233 | | )) |
234 | 112 | }) |
235 | 112 | .await?0 ; |
236 | 112 | Ok(FileSlot { |
237 | 112 | _permit: permit, |
238 | 112 | inner: tokio::fs::File::from_std(os_file), |
239 | 112 | }) |
240 | 112 | } |
241 | | |
242 | 4 | pub async fn hard_link(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> Result<(), Error> { |
243 | 4 | let src = src.as_ref().to_owned(); |
244 | 4 | let dst = dst.as_ref().to_owned(); |
245 | 4 | call_with_permit(move |_| std::fs::hard_link(src, dst).map_err(Into::<Error>::into)).await |
246 | 4 | } |
247 | | |
248 | 1 | pub async fn set_permissions( |
249 | 1 | src: impl AsRef<Path>, |
250 | 1 | perm: std::fs::Permissions, |
251 | 1 | ) -> Result<(), Error> { |
252 | 1 | let src = src.as_ref().to_owned(); |
253 | 1 | call_with_permit(move |_| std::fs::set_permissions(src, perm).map_err(Into::<Error>::into)) |
254 | 1 | .await |
255 | 1 | } |
256 | | |
257 | 38 | pub async fn create_dir(path: impl AsRef<Path>) -> Result<(), Error> { |
258 | 38 | let path = path.as_ref().to_owned(); |
259 | 38 | call_with_permit(move |_| std::fs::create_dir(path).map_err(Into::<Error>::into)).await |
260 | 38 | } |
261 | | |
262 | 197 | pub async fn create_dir_all(path: impl AsRef<Path>) -> Result<(), Error> { |
263 | 197 | let path = path.as_ref().to_owned(); |
264 | 197 | call_with_permit(move |_| std::fs::create_dir_all(path).map_err(Into::<Error>::into)).await |
265 | 197 | } |
266 | | |
267 | | #[cfg(target_family = "unix")] |
268 | 1 | pub async fn symlink(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> Result<(), Error> { |
269 | 1 | let src = src.as_ref().to_owned(); |
270 | 1 | let dst = dst.as_ref().to_owned(); |
271 | 1 | call_with_permit(move |_| { |
272 | 1 | tokio::runtime::Handle::current() |
273 | 1 | .block_on(tokio::fs::symlink(src, dst)) |
274 | 1 | .map_err(Into::<Error>::into) |
275 | 1 | }) |
276 | 1 | .await |
277 | 1 | } |
278 | | |
279 | 2 | pub async fn read_link(path: impl AsRef<Path>) -> Result<std::path::PathBuf, Error> { |
280 | 2 | let path = path.as_ref().to_owned(); |
281 | 2 | call_with_permit(move |_| std::fs::read_link(path).map_err(Into::<Error>::into)).await |
282 | 2 | } |
283 | | |
284 | | pub struct ReadDir { |
285 | | // We hold the permit because once it is dropped it goes back into the queue. |
286 | | permit: SemaphorePermit<'static>, |
287 | | inner: tokio::fs::ReadDir, |
288 | | } |
289 | | |
290 | | impl ReadDir { |
291 | 220 | pub fn into_inner(self) -> (SemaphorePermit<'static>, tokio::fs::ReadDir) { |
292 | 220 | (self.permit, self.inner) |
293 | 220 | } |
294 | | } |
295 | | |
296 | | impl AsRef<tokio::fs::ReadDir> for ReadDir { |
297 | 0 | fn as_ref(&self) -> &tokio::fs::ReadDir { |
298 | 0 | &self.inner |
299 | 0 | } |
300 | | } |
301 | | |
302 | | impl AsMut<tokio::fs::ReadDir> for ReadDir { |
303 | 0 | fn as_mut(&mut self) -> &mut tokio::fs::ReadDir { |
304 | 0 | &mut self.inner |
305 | 0 | } |
306 | | } |
307 | | |
308 | 222 | pub async fn read_dir(path: impl AsRef<Path>) -> Result<ReadDir, Error> { |
309 | 222 | let path = path.as_ref().to_owned(); |
310 | 222 | let (permit, inner) = call_with_permit(move |permit| { |
311 | 222 | Ok(( |
312 | 222 | permit, |
313 | 222 | tokio::runtime::Handle::current() |
314 | 222 | .block_on(tokio::fs::read_dir(path)) |
315 | 222 | .map_err(Into::<Error>::into)?0 , |
316 | | )) |
317 | 222 | }) |
318 | 222 | .await?0 ; |
319 | 222 | Ok(ReadDir { permit, inner }) |
320 | 222 | } |
321 | | |
322 | 27 | pub async fn rename(from: impl AsRef<Path>, to: impl AsRef<Path>) -> Result<(), Error> { |
323 | 27 | let from = from.as_ref().to_owned(); |
324 | 27 | let to = to.as_ref().to_owned(); |
325 | 27 | call_with_permit(move |_| std::fs::rename(from, to).map_err(Into::<Error>::into)).await |
326 | 27 | } |
327 | | |
328 | 24 | pub async fn remove_file(path: impl AsRef<Path>) -> Result<(), Error> { |
329 | 24 | let path = path.as_ref().to_owned(); |
330 | 24 | call_with_permit(move |_| std::fs::remove_file(path).map_err(Into::<Error>::into)).await |
331 | 23 | } |
332 | | |
333 | 2 | pub async fn canonicalize(path: impl AsRef<Path>) -> Result<PathBuf, Error> { |
334 | 2 | let path = path.as_ref().to_owned(); |
335 | 2 | call_with_permit(move |_| std::fs::canonicalize(path).map_err(Into::<Error>::into)).await |
336 | 2 | } |
337 | | |
338 | 16 | pub async fn metadata(path: impl AsRef<Path>) -> Result<Metadata, Error> { |
339 | 16 | let path = path.as_ref().to_owned(); |
340 | 16 | call_with_permit(move |_| std::fs::metadata(path).map_err(Into::<Error>::into)).await |
341 | 16 | } |
342 | | |
343 | 4 | pub async fn read(path: impl AsRef<Path>) -> Result<Vec<u8>, Error> { |
344 | 4 | let path = path.as_ref().to_owned(); |
345 | 4 | call_with_permit(move |_| std::fs::read(path).map_err(Into::<Error>::into)).await |
346 | 4 | } |
347 | | |
348 | 8 | pub async fn symlink_metadata(path: impl AsRef<Path>) -> Result<Metadata, Error> { |
349 | 8 | let path = path.as_ref().to_owned(); |
350 | 8 | call_with_permit(move |_| std::fs::symlink_metadata(path).map_err(Into::<Error>::into)).await |
351 | 8 | } |
352 | | |
353 | 17 | pub async fn remove_dir_all(path: impl AsRef<Path>) -> Result<(), Error> { |
354 | 17 | let path = path.as_ref().to_owned(); |
355 | 17 | call_with_permit(move |_| std::fs::remove_dir_all(path).map_err(Into::<Error>::into)).await |
356 | 17 | } |