Coverage Report

Created: 2026-05-01 12:32

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-util/src/fs.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::pin::Pin;
16
use core::sync::atomic::{AtomicUsize, Ordering};
17
use core::task::{Context, Poll};
18
use std::fs::{Metadata, Permissions};
19
use std::io::{IoSlice, Seek};
20
use std::path::{Path, PathBuf};
21
22
use nativelink_error::{Code, Error, ResultExt, make_err};
23
use rlimit::increase_nofile_limit;
24
/// We wrap all `tokio::fs` items in our own wrapper so we can limit the number of outstanding
25
/// open files at any given time. This will greatly reduce the chance we'll hit open file limit
26
/// issues.
27
pub use tokio::fs::DirEntry;
28
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncWrite, ReadBuf, SeekFrom, Take};
29
use tokio::sync::{Semaphore, SemaphorePermit};
30
use tracing::{error, info, trace, warn};
31
32
use crate::spawn_blocking;
33
34
/// Default read buffer size when reading to/from disk.
35
pub const DEFAULT_READ_BUFF_SIZE: usize = 0x4000;
36
37
#[derive(Debug)]
38
pub struct FileSlot {
39
    // We hold the permit because once it is dropped it goes back into the queue.
40
    _permit: SemaphorePermit<'static>,
41
    inner: tokio::fs::File,
42
}
43
44
impl FileSlot {
45
    /// Advise the kernel to drop page cache for this file's contents.
46
    /// Only available on Linux;
47
    #[cfg(target_os = "linux")]
48
148
    pub fn advise_dontneed(&self) {
49
        use std::os::unix::io::AsRawFd;
50
148
        let fd = self.inner.as_raw_fd();
51
148
        let ret = unsafe { libc::posix_fadvise(fd, 0, 0, libc::POSIX_FADV_DONTNEED) };
52
148
        if ret != 0 {
53
0
            tracing::debug!(
54
                fd,
55
                ret,
56
                "posix_fadvise(DONTNEED) returned non-zero (best-effort, ignoring)",
57
            );
58
148
        }
59
148
    }
60
61
    #[cfg(not(target_os = "linux"))]
62
    pub const fn advise_dontneed(&self) {
63
        // No-op: posix_fadvise is not available on Mac or Windows.
64
    }
65
}
66
67
impl AsRef<tokio::fs::File> for FileSlot {
68
95
    fn as_ref(&self) -> &tokio::fs::File {
69
95
        &self.inner
70
95
    }
71
}
72
73
impl AsMut<tokio::fs::File> for FileSlot {
74
0
    fn as_mut(&mut self) -> &mut tokio::fs::File {
75
0
        &mut self.inner
76
0
    }
77
}
78
79
impl AsyncRead for FileSlot {
80
2.73k
    fn poll_read(
81
2.73k
        mut self: Pin<&mut Self>,
82
2.73k
        cx: &mut Context<'_>,
83
2.73k
        buf: &mut ReadBuf<'_>,
84
2.73k
    ) -> Poll<Result<(), tokio::io::Error>> {
85
2.73k
        Pin::new(&mut self.inner).poll_read(cx, buf)
86
2.73k
    }
87
}
88
89
impl AsyncSeek for FileSlot {
90
22
    fn start_seek(mut self: Pin<&mut Self>, position: SeekFrom) -> Result<(), tokio::io::Error> {
91
22
        Pin::new(&mut self.inner).start_seek(position)
92
22
    }
93
94
66
    fn poll_complete(
95
66
        mut self: Pin<&mut Self>,
96
66
        cx: &mut Context<'_>,
97
66
    ) -> Poll<Result<u64, tokio::io::Error>> {
98
66
        Pin::new(&mut self.inner).poll_complete(cx)
99
66
    }
100
}
101
102
impl AsyncWrite for FileSlot {
103
24
    fn poll_write(
104
24
        mut self: Pin<&mut Self>,
105
24
        cx: &mut Context<'_>,
106
24
        buf: &[u8],
107
24
    ) -> Poll<Result<usize, tokio::io::Error>> {
108
24
        Pin::new(&mut self.inner).poll_write(cx, buf)
109
24
    }
110
111
0
    fn poll_flush(
112
0
        mut self: Pin<&mut Self>,
113
0
        cx: &mut Context<'_>,
114
0
    ) -> Poll<Result<(), tokio::io::Error>> {
115
0
        Pin::new(&mut self.inner).poll_flush(cx)
116
0
    }
117
118
0
    fn poll_shutdown(
119
0
        mut self: Pin<&mut Self>,
120
0
        cx: &mut Context<'_>,
121
0
    ) -> Poll<Result<(), tokio::io::Error>> {
122
0
        Pin::new(&mut self.inner).poll_shutdown(cx)
123
0
    }
124
125
82
    fn poll_write_vectored(
126
82
        mut self: Pin<&mut Self>,
127
82
        cx: &mut Context<'_>,
128
82
        bufs: &[IoSlice<'_>],
129
82
    ) -> Poll<Result<usize, tokio::io::Error>> {
130
82
        Pin::new(&mut self.inner).poll_write_vectored(cx, bufs)
131
82
    }
132
133
82
    fn is_write_vectored(&self) -> bool {
134
82
        self.inner.is_write_vectored()
135
82
    }
136
}
137
138
// Note: If the default changes make sure you update the documentation in
139
// `config/cas_server.rs`.
140
pub const DEFAULT_OPEN_FILE_LIMIT: usize = 24 * 1024; // 24k.
141
static OPEN_FILE_LIMIT: AtomicUsize = AtomicUsize::new(DEFAULT_OPEN_FILE_LIMIT);
142
pub static OPEN_FILE_SEMAPHORE: Semaphore = Semaphore::const_new(DEFAULT_OPEN_FILE_LIMIT);
143
144
/// Try to acquire a permit from the open file semaphore.
145
#[inline]
146
25.4k
pub async fn get_permit() -> Result<SemaphorePermit<'static>, Error> {
147
25.4k
    trace!(
148
25.4k
        available_permits = OPEN_FILE_SEMAPHORE.available_permits(),
149
        "getting FS permit"
150
    );
151
25.4k
    OPEN_FILE_SEMAPHORE
152
25.4k
        .acquire()
153
25.4k
        .await
154
25.4k
        .map_err(|e| 
make_err!0
(
Code::Internal0
, "Open file semaphore closed {:?}", e))
155
25.4k
}
156
/// Acquire a permit from the open file semaphore and call a raw function.
157
#[inline]
158
854
pub async fn call_with_permit<F, T>(f: F) -> Result<T, Error>
159
854
where
160
854
    F: FnOnce(SemaphorePermit<'static>) -> Result<T, Error> + Send + 'static,
161
854
    T: Send + 'static,
162
854
{
163
854
    let permit = get_permit().await
?0
;
164
854
    spawn_blocking!("fs_call_with_permit", move || 
f853
(
permit853
))
165
854
        .await
166
851
        .unwrap_or_else(|e| 
{0
167
0
            Err(Error::from_std_err(Code::Internal, &e).append("background task failed"))
168
0
        })
169
851
}
170
171
/// Sets the soft nofile limit to `desired_open_file_limit` and adjusts
172
/// `OPEN_FILE_SEMAPHORE` accordingly.
173
///
174
/// # Panics
175
///
176
/// If any type conversion fails. This can't happen if `usize` is smaller than
177
/// `u64`.
178
0
pub fn set_open_file_limit(desired_open_file_limit: usize) {
179
    // Tokio semaphores have a max of 2^61 - 1 permits. On some platforms
180
    // (e.g. macOS) the kernel reports an "unlimited" file limit that exceeds
181
    // this, causing a panic when we try to add permits. Cap at a generous but
182
    // safe value.
183
    const MAX_SAFE_LIMIT: usize = 1 << 30; // ~1 billion
184
185
0
    let new_open_file_limit = {
186
0
        match increase_nofile_limit(
187
0
            u64::try_from(desired_open_file_limit)
188
0
                .expect("desired_open_file_limit is too large to convert to u64."),
189
0
        ) {
190
0
            Ok(open_file_limit) => {
191
0
                info!("set_open_file_limit() assigns new open file limit {open_file_limit}.",);
192
0
                usize::try_from(open_file_limit)
193
0
                    .expect("open_file_limit is too large to convert to usize.")
194
0
                    .min(MAX_SAFE_LIMIT)
195
            }
196
0
            Err(e) => {
197
0
                error!(
198
                    "set_open_file_limit() failed to assign open file limit. Maybe system does not have ulimits, continuing anyway. - {e:?}",
199
                );
200
0
                DEFAULT_OPEN_FILE_LIMIT
201
            }
202
        }
203
    };
204
    // TODO(jaroeichler): Can we give a better estimate?
205
0
    if new_open_file_limit < DEFAULT_OPEN_FILE_LIMIT {
206
0
        warn!(
207
            "The new open file limit ({new_open_file_limit}) is below the recommended value of {DEFAULT_OPEN_FILE_LIMIT}. Consider raising max_open_files.",
208
        );
209
0
    }
210
211
    // Use only 80% of the open file limit for permits from OPEN_FILE_SEMAPHORE
212
    // to give extra room for other file descriptors like sockets, pipes, and
213
    // other things.
214
0
    let reduced_open_file_limit = new_open_file_limit.saturating_sub(new_open_file_limit / 5);
215
0
    let previous_open_file_limit = OPEN_FILE_LIMIT.load(Ordering::Acquire);
216
    // No permit should be acquired yet, so this warning should not occur.
217
0
    if (OPEN_FILE_SEMAPHORE.available_permits() + reduced_open_file_limit)
218
0
        < previous_open_file_limit
219
    {
220
0
        warn!(
221
            "There are not enough available permits to remove {previous_open_file_limit} - {reduced_open_file_limit} permits.",
222
        );
223
0
    }
224
0
    if previous_open_file_limit <= reduced_open_file_limit {
225
0
        OPEN_FILE_LIMIT.fetch_add(
226
0
            reduced_open_file_limit - previous_open_file_limit,
227
0
            Ordering::Release,
228
0
        );
229
0
        OPEN_FILE_SEMAPHORE.add_permits(reduced_open_file_limit - previous_open_file_limit);
230
0
    } else {
231
0
        OPEN_FILE_LIMIT.fetch_sub(
232
0
            previous_open_file_limit - reduced_open_file_limit,
233
0
            Ordering::Release,
234
0
        );
235
0
        OPEN_FILE_SEMAPHORE.forget_permits(previous_open_file_limit - reduced_open_file_limit);
236
0
    }
237
0
}
238
239
15
pub fn get_open_files_for_test() -> usize {
240
15
    OPEN_FILE_LIMIT.load(Ordering::Acquire) - OPEN_FILE_SEMAPHORE.available_permits()
241
15
}
242
243
70
pub async fn open_file(
244
70
    path: impl AsRef<Path>,
245
70
    start: u64,
246
70
    limit: u64,
247
70
) -> Result<Take<FileSlot>, Error> {
248
70
    let path = path.as_ref().to_owned();
249
70
    let (
permit68
,
os_file68
) = call_with_permit(move |permit| {
250
68
        let mut os_file =
251
70
            std::fs::File::open(&path).err_tip(|| 
format!2
("Could not open {}",
path.display()2
))
?2
;
252
68
        if start > 0 {
253
0
            os_file
254
0
                .seek(SeekFrom::Start(start))
255
0
                .err_tip(|| format!("Could not seek to {start} in {}", path.display()))?;
256
68
        }
257
68
        Ok((permit, os_file))
258
70
    })
259
70
    .await
?2
;
260
68
    Ok(FileSlot {
261
68
        _permit: permit,
262
68
        inner: tokio::fs::File::from_std(os_file),
263
68
    }
264
68
    .take(limit))
265
70
}
266
267
95
pub async fn create_file(path: impl AsRef<Path>) -> Result<FileSlot, Error> {
268
95
    let path = path.as_ref().to_owned();
269
95
    let (permit, os_file) = call_with_permit(move |permit| {
270
        Ok((
271
95
            permit,
272
95
            std::fs::File::options()
273
95
                .read(true)
274
95
                .write(true)
275
95
                .create(true)
276
95
                .truncate(true)
277
95
                .open(&path)
278
95
                .err_tip(|| 
format!0
("Could not open {}",
path.display()0
))
?0
,
279
        ))
280
95
    })
281
95
    .await
?0
;
282
95
    Ok(FileSlot {
283
95
        _permit: permit,
284
95
        inner: tokio::fs::File::from_std(os_file),
285
95
    })
286
95
}
287
288
4
pub async fn hard_link(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> Result<(), Error> {
289
4
    let src = src.as_ref().to_owned();
290
4
    let dst = dst.as_ref().to_owned();
291
4
    call_with_permit(move |_| std::fs::hard_link(src, dst).map_err(Into::<Error>::into)).await
292
4
}
293
294
1
pub async fn set_permissions(src: impl AsRef<Path>, perm: Permissions) -> Result<(), Error> {
295
1
    let src = src.as_ref().to_owned();
296
1
    call_with_permit(move |_| std::fs::set_permissions(src, perm).map_err(Into::<Error>::into))
297
1
        .await
298
1
}
299
300
44
pub async fn create_dir(path: impl AsRef<Path>) -> Result<(), Error> {
301
44
    let path = path.as_ref().to_owned();
302
44
    call_with_permit(move |_| std::fs::create_dir(path).map_err(Into::<Error>::into)).await
303
44
}
304
305
261
pub async fn create_dir_all(path: impl AsRef<Path>) -> Result<(), Error> {
306
261
    let path = path.as_ref().to_owned();
307
261
    call_with_permit(move |_| std::fs::create_dir_all(path).map_err(Into::<Error>::into)).await
308
261
}
309
310
#[cfg(target_family = "unix")]
311
2
pub async fn symlink(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> Result<(), Error> {
312
    // TODO: add a test for #2051: deadlock with large number of files
313
2
    let _permit = get_permit().await
?0
;
314
2
    tokio::fs::symlink(src, dst).await.map_err(Into::into)
315
2
}
316
317
2
pub async fn read_link(path: impl AsRef<Path>) -> Result<PathBuf, Error> {
318
2
    let path = path.as_ref().to_owned();
319
2
    call_with_permit(move |_| std::fs::read_link(path).map_err(Into::<Error>::into)).await
320
2
}
321
322
#[derive(Debug)]
323
pub struct ReadDir {
324
    // We hold the permit because once it is dropped it goes back into the queue.
325
    permit: SemaphorePermit<'static>,
326
    inner: tokio::fs::ReadDir,
327
}
328
329
impl ReadDir {
330
304
    pub fn into_inner(self) -> (SemaphorePermit<'static>, tokio::fs::ReadDir) {
331
304
        (self.permit, self.inner)
332
304
    }
333
}
334
335
impl AsRef<tokio::fs::ReadDir> for ReadDir {
336
0
    fn as_ref(&self) -> &tokio::fs::ReadDir {
337
0
        &self.inner
338
0
    }
339
}
340
341
impl AsMut<tokio::fs::ReadDir> for ReadDir {
342
0
    fn as_mut(&mut self) -> &mut tokio::fs::ReadDir {
343
0
        &mut self.inner
344
0
    }
345
}
346
347
306
pub async fn read_dir(path: impl AsRef<Path>) -> Result<ReadDir, Error> {
348
306
    let path = path.as_ref().to_owned();
349
306
    let (permit, inner) = call_with_permit(move |permit| {
350
        Ok((
351
306
            permit,
352
306
            tokio::runtime::Handle::current()
353
306
                .block_on(tokio::fs::read_dir(path))
354
306
                .map_err(Into::<Error>::into)
?0
,
355
        ))
356
306
    })
357
306
    .await
?0
;
358
306
    Ok(ReadDir { permit, inner })
359
306
}
360
361
9
pub async fn rename(from: impl AsRef<Path>, to: impl AsRef<Path>) -> Result<(), Error> {
362
9
    let from = from.as_ref().to_owned();
363
9
    let to = to.as_ref().to_owned();
364
9
    call_with_permit(move |_| std::fs::rename(from, to).map_err(Into::<Error>::into)).await
365
9
}
366
367
8
pub async fn remove_file(path: impl AsRef<Path>) -> Result<(), Error> {
368
8
    let path = path.as_ref().to_owned();
369
8
    call_with_permit(move |_| 
std::fs::remove_file7
(
path7
).
map_err7
(Into::<Error>::into)).await
370
5
}
371
372
2
pub async fn canonicalize(path: impl AsRef<Path>) -> Result<PathBuf, Error> {
373
2
    let path = path.as_ref().to_owned();
374
2
    call_with_permit(move |_| std::fs::canonicalize(path).map_err(Into::<Error>::into)).await
375
2
}
376
377
15
pub async fn metadata(path: impl AsRef<Path>) -> Result<Metadata, Error> {
378
15
    let path = path.as_ref().to_owned();
379
15
    call_with_permit(move |_| std::fs::metadata(path).map_err(Into::<Error>::into)).await
380
15
}
381
382
4
pub async fn read(path: impl AsRef<Path>) -> Result<Vec<u8>, Error> {
383
4
    let path = path.as_ref().to_owned();
384
4
    call_with_permit(move |_| std::fs::read(path).map_err(Into::<Error>::into)).await
385
4
}
386
387
9
pub async fn symlink_metadata(path: impl AsRef<Path>) -> Result<Metadata, Error> {
388
9
    let path = path.as_ref().to_owned();
389
9
    call_with_permit(move |_| std::fs::symlink_metadata(path).map_err(Into::<Error>::into)).await
390
9
}
391
392
// We can't just use the stock remove_dir_all as it falls over if someone's set readonly
393
// permissions. This version walks the directories and fixes the permissions where needed
394
// before deleting everything.
395
#[cfg(not(target_family = "windows"))]
396
24
fn internal_remove_dir_all(path: impl AsRef<Path>) -> Result<(), Error> {
397
    // Because otherwise Windows builds complain about these things not being used
398
    use std::io::ErrorKind;
399
    use std::os::unix::fs::PermissionsExt;
400
401
    use tracing::debug;
402
    use walkdir::WalkDir;
403
404
25
    for entry in 
WalkDir::new24
(
&path24
) {
405
25
        let Ok(
entry24
) = &entry else {
406
1
            debug!("Can't get into {entry:?}, assuming already deleted");
407
1
            continue;
408
        };
409
24
        let metadata = entry.metadata()
?0
;
410
24
        if metadata.is_dir() {
411
24
            match std::fs::remove_dir_all(entry.path()) {
412
23
                Ok(()) => {}
413
1
                Err(e) if e.kind() == ErrorKind::PermissionDenied => {
414
1
                    std::fs::set_permissions(entry.path(), Permissions::from_mode(0o700)).err_tip(
415
0
                        || format!("Setting permissions for {}", entry.path().display()),
416
0
                    )?;
417
                }
418
0
                e @ Err(_) => e.err_tip(|| format!("Removing {}", entry.path().display()))?,
419
            }
420
0
        } else if metadata.is_file() {
421
0
            std::fs::set_permissions(entry.path(), Permissions::from_mode(0o600))
422
0
                .err_tip(|| format!("Setting permissions for {}", entry.path().display()))?;
423
0
        }
424
    }
425
426
    // should now be safe to delete after we fixed all the permissions in the walk loop
427
24
    match std::fs::remove_dir_all(&path) {
428
1
        Ok(()) => {}
429
23
        Err(e) if e.kind() == ErrorKind::NotFound => {}
430
0
        e @ Err(_) => e.err_tip(|| {
431
0
            format!(
432
                "Removing {} after permissions fixes",
433
0
                path.as_ref().display()
434
            )
435
0
        })?,
436
    }
437
24
    Ok(())
438
24
}
439
440
// We can't set the permissions easily in Windows, so just fallback to
441
// the stock Rust remove_dir_all
442
#[cfg(target_family = "windows")]
443
fn internal_remove_dir_all(path: impl AsRef<Path>) -> Result<(), Error> {
444
    std::fs::remove_dir_all(&path)?;
445
    Ok(())
446
}
447
448
24
pub async fn remove_dir_all(path: impl AsRef<Path>) -> Result<(), Error> {
449
24
    let path = path.as_ref().to_owned();
450
24
    call_with_permit(move |_| internal_remove_dir_all(path)).await
451
24
}