Coverage Report

Created: 2026-03-05 11:37

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-util/src/fs.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::pin::Pin;
16
use core::sync::atomic::{AtomicUsize, Ordering};
17
use core::task::{Context, Poll};
18
use std::fs::{Metadata, Permissions};
19
use std::io::{IoSlice, Seek};
20
use std::path::{Path, PathBuf};
21
22
use nativelink_error::{Code, Error, ResultExt, make_err};
23
use rlimit::increase_nofile_limit;
24
/// We wrap all `tokio::fs` items in our own wrapper so we can limit the number of outstanding
25
/// open files at any given time. This will greatly reduce the chance we'll hit open file limit
26
/// issues.
27
pub use tokio::fs::DirEntry;
28
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncWrite, ReadBuf, SeekFrom, Take};
29
use tokio::sync::{Semaphore, SemaphorePermit};
30
use tracing::{error, info, trace, warn};
31
32
use crate::spawn_blocking;
33
34
/// Default read buffer size when reading to/from disk.
35
pub const DEFAULT_READ_BUFF_SIZE: usize = 0x4000;
36
37
#[derive(Debug)]
38
pub struct FileSlot {
39
    // We hold the permit because once it is dropped it goes back into the queue.
40
    _permit: SemaphorePermit<'static>,
41
    inner: tokio::fs::File,
42
}
43
44
impl FileSlot {
45
    /// Advise the kernel to drop page cache for this file's contents.
46
    /// Only available on Linux;
47
    #[cfg(target_os = "linux")]
48
137
    pub fn advise_dontneed(&self) {
49
        use std::os::unix::io::AsRawFd;
50
137
        let fd = self.inner.as_raw_fd();
51
137
        let ret = unsafe { libc::posix_fadvise(fd, 0, 0, libc::POSIX_FADV_DONTNEED) };
52
137
        if ret != 0 {
  Branch (52:12): [True: 0, False: 137]
  Branch (52:12): [Folded - Ignored]
53
0
            tracing::debug!(
54
                fd,
55
                ret,
56
0
                "posix_fadvise(DONTNEED) returned non-zero (best-effort, ignoring)",
57
            );
58
137
        }
59
137
    }
60
61
    #[cfg(not(target_os = "linux"))]
62
    pub const fn advise_dontneed(&self) {
63
        // No-op: posix_fadvise is not available on Mac or Windows.
64
    }
65
}
66
67
impl AsRef<tokio::fs::File> for FileSlot {
68
89
    fn as_ref(&self) -> &tokio::fs::File {
69
89
        &self.inner
70
89
    }
71
}
72
73
impl AsMut<tokio::fs::File> for FileSlot {
74
0
    fn as_mut(&mut self) -> &mut tokio::fs::File {
75
0
        &mut self.inner
76
0
    }
77
}
78
79
impl AsyncRead for FileSlot {
80
2.62k
    fn poll_read(
81
2.62k
        mut self: Pin<&mut Self>,
82
2.62k
        cx: &mut Context<'_>,
83
2.62k
        buf: &mut ReadBuf<'_>,
84
2.62k
    ) -> Poll<Result<(), tokio::io::Error>> {
85
2.62k
        Pin::new(&mut self.inner).poll_read(cx, buf)
86
2.62k
    }
87
}
88
89
impl AsyncSeek for FileSlot {
90
19
    fn start_seek(mut self: Pin<&mut Self>, position: SeekFrom) -> Result<(), tokio::io::Error> {
91
19
        Pin::new(&mut self.inner).start_seek(position)
92
19
    }
93
94
57
    fn poll_complete(
95
57
        mut self: Pin<&mut Self>,
96
57
        cx: &mut Context<'_>,
97
57
    ) -> Poll<Result<u64, tokio::io::Error>> {
98
57
        Pin::new(&mut self.inner).poll_complete(cx)
99
57
    }
100
}
101
102
impl AsyncWrite for FileSlot {
103
24
    fn poll_write(
104
24
        mut self: Pin<&mut Self>,
105
24
        cx: &mut Context<'_>,
106
24
        buf: &[u8],
107
24
    ) -> Poll<Result<usize, tokio::io::Error>> {
108
24
        Pin::new(&mut self.inner).poll_write(cx, buf)
109
24
    }
110
111
0
    fn poll_flush(
112
0
        mut self: Pin<&mut Self>,
113
0
        cx: &mut Context<'_>,
114
0
    ) -> Poll<Result<(), tokio::io::Error>> {
115
0
        Pin::new(&mut self.inner).poll_flush(cx)
116
0
    }
117
118
0
    fn poll_shutdown(
119
0
        mut self: Pin<&mut Self>,
120
0
        cx: &mut Context<'_>,
121
0
    ) -> Poll<Result<(), tokio::io::Error>> {
122
0
        Pin::new(&mut self.inner).poll_shutdown(cx)
123
0
    }
124
125
76
    fn poll_write_vectored(
126
76
        mut self: Pin<&mut Self>,
127
76
        cx: &mut Context<'_>,
128
76
        bufs: &[IoSlice<'_>],
129
76
    ) -> Poll<Result<usize, tokio::io::Error>> {
130
76
        Pin::new(&mut self.inner).poll_write_vectored(cx, bufs)
131
76
    }
132
133
76
    fn is_write_vectored(&self) -> bool {
134
76
        self.inner.is_write_vectored()
135
76
    }
136
}
137
138
// Note: If the default changes make sure you update the documentation in
139
// `config/cas_server.rs`.
140
pub const DEFAULT_OPEN_FILE_LIMIT: usize = 24 * 1024; // 24k.
141
static OPEN_FILE_LIMIT: AtomicUsize = AtomicUsize::new(DEFAULT_OPEN_FILE_LIMIT);
142
pub static OPEN_FILE_SEMAPHORE: Semaphore = Semaphore::const_new(DEFAULT_OPEN_FILE_LIMIT);
143
144
/// Try to acquire a permit from the open file semaphore.
145
#[inline]
146
25.3k
pub async fn get_permit() -> Result<SemaphorePermit<'static>, Error> {
147
25.3k
    trace!(
148
25.3k
        available_permits = OPEN_FILE_SEMAPHORE.available_permits(),
149
25.3k
        "getting FS permit"
150
    );
151
25.3k
    OPEN_FILE_SEMAPHORE
152
25.3k
        .acquire()
153
25.3k
        .await
154
25.3k
        .map_err(|e| make_err!(
Code::Internal0
, "Open file semaphore closed {:?}", e))
155
25.3k
}
156
/// Acquire a permit from the open file semaphore and call a raw function.
157
#[inline]
158
807
pub async fn call_with_permit<F, T>(f: F) -> Result<T, Error>
159
807
where
160
807
    F: FnOnce(SemaphorePermit<'static>) -> Result<T, Error> + Send + 'static,
161
807
    T: Send + 'static,
162
807
{
163
807
    let permit = get_permit().await
?0
;
164
807
    spawn_blocking!("fs_call_with_permit", move || 
f806
(
permit806
))
165
807
        .await
166
804
        .unwrap_or_else(|e| Err(
make_err!0
(
Code::Internal0
, "background task failed: {e:?}")))
167
804
}
168
169
/// Sets the soft nofile limit to `desired_open_file_limit` and adjusts
170
/// `OPEN_FILE_SEMAPHORE` accordingly.
171
///
172
/// # Panics
173
///
174
/// If any type conversion fails. This can't happen if `usize` is smaller than
175
/// `u64`.
176
0
pub fn set_open_file_limit(desired_open_file_limit: usize) {
177
0
    let new_open_file_limit = {
178
0
        match increase_nofile_limit(
179
0
            u64::try_from(desired_open_file_limit)
180
0
                .expect("desired_open_file_limit is too large to convert to u64."),
181
0
        ) {
182
0
            Ok(open_file_limit) => {
183
0
                info!("set_open_file_limit() assigns new open file limit {open_file_limit}.",);
184
0
                usize::try_from(open_file_limit)
185
0
                    .expect("open_file_limit is too large to convert to usize.")
186
            }
187
0
            Err(e) => {
188
0
                error!(
189
0
                    "set_open_file_limit() failed to assign open file limit. Maybe system does not have ulimits, continuing anyway. - {e:?}",
190
                );
191
0
                DEFAULT_OPEN_FILE_LIMIT
192
            }
193
        }
194
    };
195
    // TODO(jaroeichler): Can we give a better estimate?
196
0
    if new_open_file_limit < DEFAULT_OPEN_FILE_LIMIT {
  Branch (196:8): [True: 0, False: 0]
  Branch (196:8): [Folded - Ignored]
197
0
        warn!(
198
0
            "The new open file limit ({new_open_file_limit}) is below the recommended value of {DEFAULT_OPEN_FILE_LIMIT}. Consider raising max_open_files.",
199
        );
200
0
    }
201
202
    // Use only 80% of the open file limit for permits from OPEN_FILE_SEMAPHORE
203
    // to give extra room for other file descriptors like sockets, pipes, and
204
    // other things.
205
0
    let reduced_open_file_limit = new_open_file_limit.saturating_sub(new_open_file_limit / 5);
206
0
    let previous_open_file_limit = OPEN_FILE_LIMIT.load(Ordering::Acquire);
207
    // No permit should be acquired yet, so this warning should not occur.
208
0
    if (OPEN_FILE_SEMAPHORE.available_permits() + reduced_open_file_limit)
  Branch (208:8): [True: 0, False: 0]
  Branch (208:8): [Folded - Ignored]
209
0
        < previous_open_file_limit
210
    {
211
0
        warn!(
212
0
            "There are not enough available permits to remove {previous_open_file_limit} - {reduced_open_file_limit} permits.",
213
        );
214
0
    }
215
0
    if previous_open_file_limit <= reduced_open_file_limit {
  Branch (215:8): [True: 0, False: 0]
  Branch (215:8): [Folded - Ignored]
216
0
        OPEN_FILE_LIMIT.fetch_add(
217
0
            reduced_open_file_limit - previous_open_file_limit,
218
0
            Ordering::Release,
219
0
        );
220
0
        OPEN_FILE_SEMAPHORE.add_permits(reduced_open_file_limit - previous_open_file_limit);
221
0
    } else {
222
0
        OPEN_FILE_LIMIT.fetch_sub(
223
0
            previous_open_file_limit - reduced_open_file_limit,
224
0
            Ordering::Release,
225
0
        );
226
0
        OPEN_FILE_SEMAPHORE.forget_permits(previous_open_file_limit - reduced_open_file_limit);
227
0
    }
228
0
}
229
230
18
pub fn get_open_files_for_test() -> usize {
231
18
    OPEN_FILE_LIMIT.load(Ordering::Acquire) - OPEN_FILE_SEMAPHORE.available_permits()
232
18
}
233
234
65
pub async fn open_file(
235
65
    path: impl AsRef<Path>,
236
65
    start: u64,
237
65
    limit: u64,
238
65
) -> Result<Take<FileSlot>, Error> {
239
65
    let path = path.as_ref().to_owned();
240
65
    let (
permit63
,
os_file63
) = call_with_permit(move |permit| {
241
63
        let mut os_file =
242
65
            std::fs::File::open(&path).err_tip(|| format!(
"Could not open {}"2
,
path.display()2
))
?2
;
243
63
        if start > 0 {
  Branch (243:12): [True: 0, False: 0]
  Branch (243:12): [Folded - Ignored]
  Branch (243:12): [True: 0, False: 1]
  Branch (243:12): [True: 0, False: 2]
  Branch (243:12): [True: 0, False: 1]
  Branch (243:12): [True: 0, False: 4]
  Branch (243:12): [True: 0, False: 6]
  Branch (243:12): [True: 0, False: 0]
  Branch (243:12): [True: 0, False: 1]
  Branch (243:12): [Folded - Ignored]
  Branch (243:12): [True: 0, False: 0]
  Branch (243:12): [True: 0, False: 41]
  Branch (243:12): [True: 0, False: 3]
  Branch (243:12): [True: 0, False: 4]
244
0
            os_file
245
0
                .seek(SeekFrom::Start(start))
246
0
                .err_tip(|| format!("Could not seek to {start} in {}", path.display()))?;
247
63
        }
248
63
        Ok((permit, os_file))
249
65
    })
250
65
    .await
?2
;
251
63
    Ok(FileSlot {
252
63
        _permit: permit,
253
63
        inner: tokio::fs::File::from_std(os_file),
254
63
    }
255
63
    .take(limit))
256
65
}
257
258
89
pub async fn create_file(path: impl AsRef<Path>) -> Result<FileSlot, Error> {
259
89
    let path = path.as_ref().to_owned();
260
89
    let (permit, os_file) = call_with_permit(move |permit| {
261
        Ok((
262
89
            permit,
263
89
            std::fs::File::options()
264
89
                .read(true)
265
89
                .write(true)
266
89
                .create(true)
267
89
                .truncate(true)
268
89
                .open(&path)
269
89
                .err_tip(|| format!(
"Could not open {}"0
,
path.display()0
))
?0
,
270
        ))
271
89
    })
272
89
    .await
?0
;
273
89
    Ok(FileSlot {
274
89
        _permit: permit,
275
89
        inner: tokio::fs::File::from_std(os_file),
276
89
    })
277
89
}
278
279
4
pub async fn hard_link(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> Result<(), Error> {
280
4
    let src = src.as_ref().to_owned();
281
4
    let dst = dst.as_ref().to_owned();
282
4
    call_with_permit(move |_| std::fs::hard_link(src, dst).map_err(Into::<Error>::into)).await
283
4
}
284
285
1
pub async fn set_permissions(src: impl AsRef<Path>, perm: Permissions) -> Result<(), Error> {
286
1
    let src = src.as_ref().to_owned();
287
1
    call_with_permit(move |_| std::fs::set_permissions(src, perm).map_err(Into::<Error>::into))
288
1
        .await
289
1
}
290
291
41
pub async fn create_dir(path: impl AsRef<Path>) -> Result<(), Error> {
292
41
    let path = path.as_ref().to_owned();
293
41
    call_with_permit(move |_| std::fs::create_dir(path).map_err(Into::<Error>::into)).await
294
41
}
295
296
245
pub async fn create_dir_all(path: impl AsRef<Path>) -> Result<(), Error> {
297
245
    let path = path.as_ref().to_owned();
298
245
    call_with_permit(move |_| std::fs::create_dir_all(path).map_err(Into::<Error>::into)).await
299
245
}
300
301
#[cfg(target_family = "unix")]
302
1
pub async fn symlink(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> Result<(), Error> {
303
    // TODO: add a test for #2051: deadlock with large number of files
304
1
    let _permit = get_permit().await
?0
;
305
1
    tokio::fs::symlink(src, dst).await.map_err(Into::into)
306
1
}
307
308
2
pub async fn read_link(path: impl AsRef<Path>) -> Result<PathBuf, Error> {
309
2
    let path = path.as_ref().to_owned();
310
2
    call_with_permit(move |_| std::fs::read_link(path).map_err(Into::<Error>::into)).await
311
2
}
312
313
#[derive(Debug)]
314
pub struct ReadDir {
315
    // We hold the permit because once it is dropped it goes back into the queue.
316
    permit: SemaphorePermit<'static>,
317
    inner: tokio::fs::ReadDir,
318
}
319
320
impl ReadDir {
321
289
    pub fn into_inner(self) -> (SemaphorePermit<'static>, tokio::fs::ReadDir) {
322
289
        (self.permit, self.inner)
323
289
    }
324
}
325
326
impl AsRef<tokio::fs::ReadDir> for ReadDir {
327
0
    fn as_ref(&self) -> &tokio::fs::ReadDir {
328
0
        &self.inner
329
0
    }
330
}
331
332
impl AsMut<tokio::fs::ReadDir> for ReadDir {
333
0
    fn as_mut(&mut self) -> &mut tokio::fs::ReadDir {
334
0
        &mut self.inner
335
0
    }
336
}
337
338
291
pub async fn read_dir(path: impl AsRef<Path>) -> Result<ReadDir, Error> {
339
291
    let path = path.as_ref().to_owned();
340
291
    let (permit, inner) = call_with_permit(move |permit| {
341
        Ok((
342
291
            permit,
343
291
            tokio::runtime::Handle::current()
344
291
                .block_on(tokio::fs::read_dir(path))
345
291
                .map_err(Into::<Error>::into)
?0
,
346
        ))
347
291
    })
348
291
    .await
?0
;
349
291
    Ok(ReadDir { permit, inner })
350
291
}
351
352
9
pub async fn rename(from: impl AsRef<Path>, to: impl AsRef<Path>) -> Result<(), Error> {
353
9
    let from = from.as_ref().to_owned();
354
9
    let to = to.as_ref().to_owned();
355
9
    call_with_permit(move |_| std::fs::rename(from, to).map_err(Into::<Error>::into)).await
356
9
}
357
358
8
pub async fn remove_file(path: impl AsRef<Path>) -> Result<(), Error> {
359
8
    let path = path.as_ref().to_owned();
360
8
    call_with_permit(move |_| 
std::fs::remove_file7
(
path7
).
map_err7
(Into::<Error>::into)).await
361
5
}
362
363
2
pub async fn canonicalize(path: impl AsRef<Path>) -> Result<PathBuf, Error> {
364
2
    let path = path.as_ref().to_owned();
365
2
    call_with_permit(move |_| std::fs::canonicalize(path).map_err(Into::<Error>::into)).await
366
2
}
367
368
15
pub async fn metadata(path: impl AsRef<Path>) -> Result<Metadata, Error> {
369
15
    let path = path.as_ref().to_owned();
370
15
    call_with_permit(move |_| std::fs::metadata(path).map_err(Into::<Error>::into)).await
371
15
}
372
373
4
pub async fn read(path: impl AsRef<Path>) -> Result<Vec<u8>, Error> {
374
4
    let path = path.as_ref().to_owned();
375
4
    call_with_permit(move |_| std::fs::read(path).map_err(Into::<Error>::into)).await
376
4
}
377
378
8
pub async fn symlink_metadata(path: impl AsRef<Path>) -> Result<Metadata, Error> {
379
8
    let path = path.as_ref().to_owned();
380
8
    call_with_permit(move |_| std::fs::symlink_metadata(path).map_err(Into::<Error>::into)).await
381
8
}
382
383
// We can't just use the stock remove_dir_all as it falls over if someone's set readonly
384
// permissions. This version walks the directories and fixes the permissions where needed
385
// before deleting everything.
386
#[cfg(not(target_family = "windows"))]
387
23
fn internal_remove_dir_all(path: impl AsRef<Path>) -> Result<(), Error> {
388
    // Because otherwise Windows builds complain about these things not being used
389
    use std::io::ErrorKind;
390
    use std::os::unix::fs::PermissionsExt;
391
392
    use tracing::debug;
393
    use walkdir::WalkDir;
394
395
24
    for entry in 
WalkDir::new23
(
&path23
) {
396
24
        let Ok(
entry23
) = &entry else {
  Branch (396:13): [Folded - Ignored]
  Branch (396:13): [True: 1, False: 0]
  Branch (396:13): [True: 1, False: 1]
  Branch (396:13): [True: 1, False: 0]
  Branch (396:13): [True: 0, False: 0]
  Branch (396:13): [Folded - Ignored]
  Branch (396:13): [True: 20, False: 0]
397
1
            debug!("Can't get into {entry:?}, assuming already deleted");
398
1
            continue;
399
        };
400
23
        let metadata = entry.metadata()
?0
;
401
23
        if metadata.is_dir() {
  Branch (401:12): [Folded - Ignored]
  Branch (401:12): [True: 1, False: 0]
  Branch (401:12): [True: 1, False: 0]
  Branch (401:12): [True: 1, False: 0]
  Branch (401:12): [True: 0, False: 0]
  Branch (401:12): [Folded - Ignored]
  Branch (401:12): [True: 20, False: 0]
402
23
            match std::fs::remove_dir_all(entry.path()) {
403
22
                Ok(()) => {}
404
1
                Err(e) if e.kind() == ErrorKind::PermissionDenied => {
  Branch (404:27): [Folded - Ignored]
  Branch (404:27): [True: 0, False: 0]
  Branch (404:27): [True: 1, False: 0]
  Branch (404:27): [True: 0, False: 0]
  Branch (404:27): [True: 0, False: 0]
  Branch (404:27): [Folded - Ignored]
  Branch (404:27): [True: 0, False: 0]
405
1
                    std::fs::set_permissions(entry.path(), Permissions::from_mode(0o700)).err_tip(
406
0
                        || format!("Setting permissions for {}", entry.path().display()),
407
0
                    )?;
408
                }
409
0
                e @ Err(_) => e.err_tip(|| format!("Removing {}", entry.path().display()))?,
410
            }
411
0
        } else if metadata.is_file() {
  Branch (411:19): [Folded - Ignored]
  Branch (411:19): [True: 0, False: 0]
  Branch (411:19): [True: 0, False: 0]
  Branch (411:19): [True: 0, False: 0]
  Branch (411:19): [True: 0, False: 0]
  Branch (411:19): [Folded - Ignored]
  Branch (411:19): [True: 0, False: 0]
412
0
            std::fs::set_permissions(entry.path(), Permissions::from_mode(0o600))
413
0
                .err_tip(|| format!("Setting permissions for {}", entry.path().display()))?;
414
0
        }
415
    }
416
417
    // should now be safe to delete after we fixed all the permissions in the walk loop
418
23
    match std::fs::remove_dir_all(&path) {
419
1
        Ok(()) => {}
420
22
        Err(e) if e.kind() == ErrorKind::NotFound => {}
  Branch (420:19): [Folded - Ignored]
  Branch (420:19): [True: 1, False: 0]
  Branch (420:19): [True: 0, False: 0]
  Branch (420:19): [True: 1, False: 0]
  Branch (420:19): [True: 0, False: 0]
  Branch (420:19): [Folded - Ignored]
  Branch (420:19): [True: 20, False: 0]
421
0
        e @ Err(_) => e.err_tip(|| {
422
0
            format!(
423
0
                "Removing {} after permissions fixes",
424
0
                path.as_ref().display()
425
            )
426
0
        })?,
427
    }
428
23
    Ok(())
429
23
}
430
431
// We can't set the permissions easily in Windows, so just fallback to
432
// the stock Rust remove_dir_all
433
#[cfg(target_family = "windows")]
434
fn internal_remove_dir_all(path: impl AsRef<Path>) -> Result<(), Error> {
435
    std::fs::remove_dir_all(&path)?;
436
    Ok(())
437
}
438
439
23
pub async fn remove_dir_all(path: impl AsRef<Path>) -> Result<(), Error> {
440
23
    let path = path.as_ref().to_owned();
441
23
    call_with_permit(move |_| internal_remove_dir_all(path)).await
442
23
}