Coverage Report

Created: 2026-06-03 11:20

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-util/src/fs.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::pin::Pin;
16
use core::sync::atomic::{AtomicUsize, Ordering};
17
use core::task::{Context, Poll};
18
use std::fs::{Metadata, Permissions};
19
use std::io::{IoSlice, Seek};
20
use std::path::{Path, PathBuf};
21
22
use nativelink_error::{Code, Error, ResultExt, make_err};
23
use rlimit::increase_nofile_limit;
24
/// We wrap all `tokio::fs` items in our own wrapper so we can limit the number of outstanding
25
/// open files at any given time. This will greatly reduce the chance we'll hit open file limit
26
/// issues.
27
pub use tokio::fs::DirEntry;
28
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncWrite, ReadBuf, SeekFrom, Take};
29
use tokio::sync::{Semaphore, SemaphorePermit};
30
use tracing::{error, info, trace, warn};
31
32
use crate::spawn_blocking;
33
34
/// Default read buffer size when reading to/from disk.
35
pub const DEFAULT_READ_BUFF_SIZE: usize = 0x4000;
36
37
#[derive(Debug)]
38
pub struct FileSlot {
39
    // We hold the permit because once it is dropped it goes back into the queue.
40
    _permit: SemaphorePermit<'static>,
41
    inner: tokio::fs::File,
42
}
43
44
impl FileSlot {
45
    /// Advise the kernel to drop page cache for this file's contents.
46
    /// Only available on Linux;
47
    #[cfg(target_os = "linux")]
48
214
    pub fn advise_dontneed(&self) {
49
        use std::os::unix::io::AsRawFd;
50
214
        let fd = self.inner.as_raw_fd();
51
214
        let ret = unsafe { libc::posix_fadvise(fd, 0, 0, libc::POSIX_FADV_DONTNEED) };
52
214
        if ret != 0 {
53
0
            tracing::debug!(
54
                fd,
55
                ret,
56
                "posix_fadvise(DONTNEED) returned non-zero (best-effort, ignoring)",
57
            );
58
214
        }
59
214
    }
60
61
    #[cfg(not(target_os = "linux"))]
62
    pub const fn advise_dontneed(&self) {
63
        // No-op: posix_fadvise is not available on Mac or Windows.
64
    }
65
}
66
67
impl AsRef<tokio::fs::File> for FileSlot {
68
142
    fn as_ref(&self) -> &tokio::fs::File {
69
142
        &self.inner
70
142
    }
71
}
72
73
impl AsMut<tokio::fs::File> for FileSlot {
74
2
    fn as_mut(&mut self) -> &mut tokio::fs::File {
75
2
        &mut self.inner
76
2
    }
77
}
78
79
impl AsyncRead for FileSlot {
80
2.81k
    fn poll_read(
81
2.81k
        mut self: Pin<&mut Self>,
82
2.81k
        cx: &mut Context<'_>,
83
2.81k
        buf: &mut ReadBuf<'_>,
84
2.81k
    ) -> Poll<Result<(), tokio::io::Error>> {
85
2.81k
        Pin::new(&mut self.inner).poll_read(cx, buf)
86
2.81k
    }
87
}
88
89
impl AsyncSeek for FileSlot {
90
35
    fn start_seek(mut self: Pin<&mut Self>, position: SeekFrom) -> Result<(), tokio::io::Error> {
91
35
        Pin::new(&mut self.inner).start_seek(position)
92
35
    }
93
94
105
    fn poll_complete(
95
105
        mut self: Pin<&mut Self>,
96
105
        cx: &mut Context<'_>,
97
105
    ) -> Poll<Result<u64, tokio::io::Error>> {
98
105
        Pin::new(&mut self.inner).poll_complete(cx)
99
105
    }
100
}
101
102
impl AsyncWrite for FileSlot {
103
25
    fn poll_write(
104
25
        mut self: Pin<&mut Self>,
105
25
        cx: &mut Context<'_>,
106
25
        buf: &[u8],
107
25
    ) -> Poll<Result<usize, tokio::io::Error>> {
108
25
        Pin::new(&mut self.inner).poll_write(cx, buf)
109
25
    }
110
111
0
    fn poll_flush(
112
0
        mut self: Pin<&mut Self>,
113
0
        cx: &mut Context<'_>,
114
0
    ) -> Poll<Result<(), tokio::io::Error>> {
115
0
        Pin::new(&mut self.inner).poll_flush(cx)
116
0
    }
117
118
0
    fn poll_shutdown(
119
0
        mut self: Pin<&mut Self>,
120
0
        cx: &mut Context<'_>,
121
0
    ) -> Poll<Result<(), tokio::io::Error>> {
122
0
        Pin::new(&mut self.inner).poll_shutdown(cx)
123
0
    }
124
125
122
    fn poll_write_vectored(
126
122
        mut self: Pin<&mut Self>,
127
122
        cx: &mut Context<'_>,
128
122
        bufs: &[IoSlice<'_>],
129
122
    ) -> Poll<Result<usize, tokio::io::Error>> {
130
122
        Pin::new(&mut self.inner).poll_write_vectored(cx, bufs)
131
122
    }
132
133
122
    fn is_write_vectored(&self) -> bool {
134
122
        self.inner.is_write_vectored()
135
122
    }
136
}
137
138
// Note: If the default changes make sure you update the documentation in
139
// `config/cas_server.rs`.
140
pub const DEFAULT_OPEN_FILE_LIMIT: usize = 24 * 1024; // 24k.
141
static OPEN_FILE_LIMIT: AtomicUsize = AtomicUsize::new(DEFAULT_OPEN_FILE_LIMIT);
142
pub static OPEN_FILE_SEMAPHORE: Semaphore = Semaphore::const_new(DEFAULT_OPEN_FILE_LIMIT);
143
144
/// Try to acquire a permit from the open file semaphore.
145
#[inline]
146
26.0k
pub async fn get_permit() -> Result<SemaphorePermit<'static>, Error> {
147
26.0k
    trace!(
148
26.0k
        available_permits = OPEN_FILE_SEMAPHORE.available_permits(),
149
        "getting FS permit"
150
    );
151
26.0k
    OPEN_FILE_SEMAPHORE
152
26.0k
        .acquire()
153
26.0k
        .await
154
26.0k
        .map_err(|e| 
make_err!0
(
Code::Internal0
, "Open file semaphore closed {:?}", e))
155
26.0k
}
156
/// Acquire a permit from the open file semaphore and call a raw function.
157
#[inline]
158
1.50k
pub async fn call_with_permit<F, T>(f: F) -> Result<T, Error>
159
1.50k
where
160
1.50k
    F: FnOnce(SemaphorePermit<'static>) -> Result<T, Error> + Send + 'static,
161
1.50k
    T: Send + 'static,
162
1.50k
{
163
1.50k
    let permit = get_permit().await
?0
;
164
1.50k
    spawn_blocking!("fs_call_with_permit", move || 
f1.50k
(
permit1.50k
))
165
1.50k
        .await
166
1.50k
        .unwrap_or_else(|e| 
{0
167
0
            Err(Error::from_std_err(Code::Internal, &e).append("background task failed"))
168
0
        })
169
1.50k
}
170
171
/// Sets the soft nofile limit to `desired_open_file_limit` and adjusts
172
/// `OPEN_FILE_SEMAPHORE` accordingly.
173
///
174
/// # Panics
175
///
176
/// If any type conversion fails. This can't happen if `usize` is smaller than
177
/// `u64`.
178
0
pub fn set_open_file_limit(desired_open_file_limit: usize) {
179
    // Tokio semaphores have a max of 2^61 - 1 permits. On some platforms
180
    // (e.g. macOS) the kernel reports an "unlimited" file limit that exceeds
181
    // this, causing a panic when we try to add permits. Cap at a generous but
182
    // safe value.
183
    const MAX_SAFE_LIMIT: usize = 1 << 30; // ~1 billion
184
185
0
    let new_open_file_limit = {
186
0
        match increase_nofile_limit(
187
0
            u64::try_from(desired_open_file_limit)
188
0
                .expect("desired_open_file_limit is too large to convert to u64."),
189
0
        ) {
190
0
            Ok(open_file_limit) => {
191
0
                info!("set_open_file_limit() assigns new open file limit {open_file_limit}.",);
192
0
                usize::try_from(open_file_limit)
193
0
                    .expect("open_file_limit is too large to convert to usize.")
194
0
                    .min(MAX_SAFE_LIMIT)
195
            }
196
0
            Err(e) => {
197
0
                error!(
198
                    "set_open_file_limit() failed to assign open file limit. Maybe system does not have ulimits, continuing anyway. - {e:?}",
199
                );
200
0
                DEFAULT_OPEN_FILE_LIMIT
201
            }
202
        }
203
    };
204
    // TODO(jaroeichler): Can we give a better estimate?
205
0
    if new_open_file_limit < DEFAULT_OPEN_FILE_LIMIT {
206
0
        warn!(
207
            "The new open file limit ({new_open_file_limit}) is below the recommended value of {DEFAULT_OPEN_FILE_LIMIT}. Consider raising max_open_files.",
208
        );
209
0
    }
210
211
    // Use only 80% of the open file limit for permits from OPEN_FILE_SEMAPHORE
212
    // to give extra room for other file descriptors like sockets, pipes, and
213
    // other things.
214
0
    let reduced_open_file_limit = new_open_file_limit.saturating_sub(new_open_file_limit / 5);
215
0
    let previous_open_file_limit = OPEN_FILE_LIMIT.load(Ordering::Acquire);
216
    // No permit should be acquired yet, so this warning should not occur.
217
0
    if (OPEN_FILE_SEMAPHORE.available_permits() + reduced_open_file_limit)
218
0
        < previous_open_file_limit
219
    {
220
0
        warn!(
221
            "There are not enough available permits to remove {previous_open_file_limit} - {reduced_open_file_limit} permits.",
222
        );
223
0
    }
224
0
    if previous_open_file_limit <= reduced_open_file_limit {
225
0
        OPEN_FILE_LIMIT.fetch_add(
226
0
            reduced_open_file_limit - previous_open_file_limit,
227
0
            Ordering::Release,
228
0
        );
229
0
        OPEN_FILE_SEMAPHORE.add_permits(reduced_open_file_limit - previous_open_file_limit);
230
0
    } else {
231
0
        OPEN_FILE_LIMIT.fetch_sub(
232
0
            previous_open_file_limit - reduced_open_file_limit,
233
0
            Ordering::Release,
234
0
        );
235
0
        OPEN_FILE_SEMAPHORE.forget_permits(previous_open_file_limit - reduced_open_file_limit);
236
0
    }
237
0
}
238
239
17
pub fn get_open_files_for_test() -> usize {
240
17
    OPEN_FILE_LIMIT.load(Ordering::Acquire) - OPEN_FILE_SEMAPHORE.available_permits()
241
17
}
242
243
90
pub async fn open_file(
244
90
    path: impl AsRef<Path>,
245
90
    start: u64,
246
90
    limit: u64,
247
90
) -> Result<Take<FileSlot>, Error> {
248
90
    let path = path.as_ref().to_owned();
249
90
    let (
permit88
,
os_file88
) = call_with_permit(move |permit| {
250
88
        let mut os_file =
251
90
            std::fs::File::open(&path).err_tip(|| 
format!2
("Could not open {}",
path.display()2
))
?2
;
252
88
        if start > 0 {
253
0
            os_file
254
0
                .seek(SeekFrom::Start(start))
255
0
                .err_tip(|| format!("Could not seek to {start} in {}", path.display()))?;
256
88
        }
257
88
        Ok((permit, os_file))
258
90
    })
259
90
    .await
?2
;
260
88
    Ok(FileSlot {
261
88
        _permit: permit,
262
88
        inner: tokio::fs::File::from_std(os_file),
263
88
    }
264
88
    .take(limit))
265
90
}
266
267
145
pub async fn create_file(path: impl AsRef<Path>) -> Result<FileSlot, Error> {
268
145
    let path = path.as_ref().to_owned();
269
145
    let (permit, os_file) = call_with_permit(move |permit| {
270
        Ok((
271
145
            permit,
272
145
            std::fs::File::options()
273
145
                .read(true)
274
145
                .write(true)
275
145
                .create(true)
276
145
                .truncate(true)
277
145
                .open(&path)
278
145
                .err_tip(|| 
format!0
("Could not open {}",
path.display()0
))
?0
,
279
        ))
280
145
    })
281
145
    .await
?0
;
282
145
    Ok(FileSlot {
283
145
        _permit: permit,
284
145
        inner: tokio::fs::File::from_std(os_file),
285
145
    })
286
145
}
287
288
9
pub async fn hard_link(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> Result<(), Error> {
289
9
    let src = src.as_ref().to_owned();
290
9
    let dst = dst.as_ref().to_owned();
291
9
    call_with_permit(move |_| std::fs::hard_link(src, dst).map_err(Into::<Error>::into)).await
292
9
}
293
294
151
pub async fn set_permissions(src: impl AsRef<Path>, perm: Permissions) -> Result<(), Error> {
295
151
    let src = src.as_ref().to_owned();
296
151
    call_with_permit(move |_| std::fs::set_permissions(src, perm).map_err(Into::<Error>::into))
297
151
        .await
298
151
}
299
300
55
pub async fn create_dir(path: impl AsRef<Path>) -> Result<(), Error> {
301
55
    let path = path.as_ref().to_owned();
302
55
    call_with_permit(move |_| std::fs::create_dir(path).map_err(Into::<Error>::into)).await
303
55
}
304
305
447
pub async fn create_dir_all(path: impl AsRef<Path>) -> Result<(), Error> {
306
447
    let path = path.as_ref().to_owned();
307
447
    call_with_permit(move |_| std::fs::create_dir_all(path).map_err(Into::<Error>::into)).await
308
447
}
309
310
#[cfg(target_family = "unix")]
311
2
pub async fn symlink(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> Result<(), Error> {
312
    // TODO: add a test for #2051: deadlock with large number of files
313
2
    let _permit = get_permit().await
?0
;
314
2
    tokio::fs::symlink(src, dst).await.map_err(Into::into)
315
2
}
316
317
6
pub async fn read_link(path: impl AsRef<Path>) -> Result<PathBuf, Error> {
318
6
    let path = path.as_ref().to_owned();
319
6
    call_with_permit(move |_| std::fs::read_link(path).map_err(Into::<Error>::into)).await
320
6
}
321
322
#[derive(Debug)]
323
pub struct ReadDir {
324
    // We hold the permit because once it is dropped it goes back into the queue.
325
    permit: SemaphorePermit<'static>,
326
    inner: tokio::fs::ReadDir,
327
}
328
329
impl ReadDir {
330
420
    pub fn into_inner(self) -> (SemaphorePermit<'static>, tokio::fs::ReadDir) {
331
420
        (self.permit, self.inner)
332
420
    }
333
}
334
335
impl AsRef<tokio::fs::ReadDir> for ReadDir {
336
0
    fn as_ref(&self) -> &tokio::fs::ReadDir {
337
0
        &self.inner
338
0
    }
339
}
340
341
impl AsMut<tokio::fs::ReadDir> for ReadDir {
342
0
    fn as_mut(&mut self) -> &mut tokio::fs::ReadDir {
343
0
        &mut self.inner
344
0
    }
345
}
346
347
422
pub async fn read_dir(path: impl AsRef<Path>) -> Result<ReadDir, Error> {
348
422
    let path = path.as_ref().to_owned();
349
422
    let (permit, inner) = call_with_permit(move |permit| {
350
        Ok((
351
422
            permit,
352
422
            tokio::runtime::Handle::current()
353
422
                .block_on(tokio::fs::read_dir(path))
354
422
                .map_err(Into::<Error>::into)
?0
,
355
        ))
356
422
    })
357
422
    .await
?0
;
358
422
    Ok(ReadDir { permit, inner })
359
422
}
360
361
9
pub async fn rename(from: impl AsRef<Path>, to: impl AsRef<Path>) -> Result<(), Error> {
362
9
    let from = from.as_ref().to_owned();
363
9
    let to = to.as_ref().to_owned();
364
9
    call_with_permit(move |_| std::fs::rename(from, to).map_err(Into::<Error>::into)).await
365
9
}
366
367
8
pub async fn remove_file(path: impl AsRef<Path>) -> Result<(), Error> {
368
8
    let path = path.as_ref().to_owned();
369
8
    call_with_permit(move |_| 
std::fs::remove_file7
(
path7
).
map_err7
(Into::<Error>::into)).await
370
5
}
371
372
/// Removes an empty directory. Errors if the directory is not empty; use
373
/// [`remove_dir_all`] when the contents should be removed too.
374
0
pub async fn remove_dir(path: impl AsRef<Path>) -> Result<(), Error> {
375
0
    let path = path.as_ref().to_owned();
376
0
    call_with_permit(move |_| std::fs::remove_dir(path).map_err(Into::<Error>::into)).await
377
0
}
378
379
2
pub async fn canonicalize(path: impl AsRef<Path>) -> Result<PathBuf, Error> {
380
2
    let path = path.as_ref().to_owned();
381
2
    call_with_permit(move |_| std::fs::canonicalize(path).map_err(Into::<Error>::into)).await
382
2
}
383
384
33
pub async fn metadata(path: impl AsRef<Path>) -> Result<Metadata, Error> {
385
33
    let path = path.as_ref().to_owned();
386
33
    call_with_permit(move |_| std::fs::metadata(path).map_err(Into::<Error>::into)).await
387
33
}
388
389
8
pub async fn read(path: impl AsRef<Path>) -> Result<Vec<u8>, Error> {
390
8
    let path = path.as_ref().to_owned();
391
8
    call_with_permit(move |_| std::fs::read(path).map_err(Into::<Error>::into)).await
392
8
}
393
394
14
pub async fn symlink_metadata(path: impl AsRef<Path>) -> Result<Metadata, Error> {
395
14
    let path = path.as_ref().to_owned();
396
14
    call_with_permit(move |_| std::fs::symlink_metadata(path).map_err(Into::<Error>::into)).await
397
14
}
398
399
// We can't just use the stock remove_dir_all as it falls over if someone's set readonly
400
// permissions. This version walks the directories and fixes the permissions where needed
401
// before deleting everything.
402
#[cfg(not(target_family = "windows"))]
403
107
fn internal_remove_dir_all(path: impl AsRef<Path>) -> Result<(), Error> {
404
    // Because otherwise Windows builds complain about these things not being used
405
    use std::io::ErrorKind;
406
    use std::os::unix::fs::PermissionsExt;
407
408
    use tracing::debug;
409
    use walkdir::WalkDir;
410
411
108
    for entry in 
WalkDir::new107
(
&path107
) {
412
108
        let Ok(
entry30
) = &entry else {
413
78
            debug!("Can't get into {entry:?}, assuming already deleted");
414
78
            continue;
415
        };
416
30
        let metadata = entry.metadata()
?0
;
417
30
        if metadata.is_dir() {
418
30
            match std::fs::remove_dir_all(entry.path()) {
419
29
                Ok(()) => {}
420
1
                Err(e) if e.kind() == ErrorKind::PermissionDenied => {
421
1
                    std::fs::set_permissions(entry.path(), Permissions::from_mode(0o700)).err_tip(
422
0
                        || format!("Setting permissions for {}", entry.path().display()),
423
0
                    )?;
424
                }
425
0
                e @ Err(_) => e.err_tip(|| format!("Removing {}", entry.path().display()))?,
426
            }
427
0
        } else if metadata.is_file() {
428
0
            std::fs::set_permissions(entry.path(), Permissions::from_mode(0o600))
429
0
                .err_tip(|| format!("Setting permissions for {}", entry.path().display()))?;
430
0
        }
431
    }
432
433
    // should now be safe to delete after we fixed all the permissions in the walk loop
434
107
    match std::fs::remove_dir_all(&path) {
435
1
        Ok(()) => {}
436
106
        Err(e) if e.kind() == ErrorKind::NotFound => {}
437
0
        e @ Err(_) => e.err_tip(|| {
438
0
            format!(
439
                "Removing {} after permissions fixes",
440
0
                path.as_ref().display()
441
            )
442
0
        })?,
443
    }
444
107
    Ok(())
445
107
}
446
447
// We can't set the permissions easily in Windows, so just fallback to
448
// the stock Rust remove_dir_all
449
#[cfg(target_family = "windows")]
450
fn internal_remove_dir_all(path: impl AsRef<Path>) -> Result<(), Error> {
451
    std::fs::remove_dir_all(&path)?;
452
    Ok(())
453
}
454
455
107
pub async fn remove_dir_all(path: impl AsRef<Path>) -> Result<(), Error> {
456
107
    let path = path.as_ref().to_owned();
457
107
    call_with_permit(move |_| internal_remove_dir_all(path)).await
458
107
}