Coverage Report

Created: 2026-07-03 15:49

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-util/src/fs.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::pin::Pin;
16
use core::sync::atomic::{AtomicUsize, Ordering};
17
use core::task::{Context, Poll};
18
use std::fs::{Metadata, Permissions};
19
use std::io::{self, IoSlice, Seek};
20
#[cfg(target_os = "linux")]
21
use std::os::unix::io::AsRawFd;
22
use std::path::{Path, PathBuf};
23
24
use nativelink_error::{Code, Error, ResultExt, make_err};
25
use rlimit::increase_nofile_limit;
26
/// We wrap all `tokio::fs` items in our own wrapper so we can limit the number of outstanding
27
/// open files at any given time. This will greatly reduce the chance we'll hit open file limit
28
/// issues.
29
pub use tokio::fs::DirEntry;
30
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncWrite, ReadBuf, SeekFrom, Take};
31
use tokio::sync::{Semaphore, SemaphorePermit};
32
use tracing::{error, info, trace, warn};
33
34
use crate::spawn_blocking;
35
36
/// Default read buffer size when reading to/from disk.
37
pub const DEFAULT_READ_BUFF_SIZE: usize = 0x4000;
38
39
#[derive(Debug)]
40
pub struct FileSlot {
41
    // We hold the permit because once it is dropped it goes back into the queue.
42
    _permit: SemaphorePermit<'static>,
43
    inner: tokio::fs::File,
44
}
45
46
impl FileSlot {
47
    /// Advise the kernel to drop page cache for this file's contents.
48
    /// Only available on Linux;
49
    #[cfg(target_os = "linux")]
50
221
    pub fn advise_dontneed(&self) {
51
221
        let fd = self.inner.as_raw_fd();
52
221
        let ret = unsafe { libc::posix_fadvise(fd, 0, 0, libc::POSIX_FADV_DONTNEED) };
53
221
        if ret != 0 {
54
0
            tracing::debug!(
55
                fd,
56
                ret,
57
                "posix_fadvise(DONTNEED) returned non-zero (best-effort, ignoring)",
58
            );
59
221
        }
60
221
    }
61
62
    #[cfg(not(target_os = "linux"))]
63
    pub const fn advise_dontneed(&self) {
64
        // No-op: posix_fadvise is not available on Mac or Windows.
65
    }
66
}
67
68
/// Enable [`IP_FREEBIND`](`libc::IP_FREEBIND`) before binding a socket.
69
#[cfg(target_os = "linux")]
70
1
pub fn set_freebind<F: AsRawFd>(socket: &F) -> Result<(), io::Error> {
71
1
    let enable = 1;
72
1
    let optlen = libc::socklen_t::try_from(size_of::<libc::c_int>())
73
1
        .expect("size of c_int always fits in socklen_t");
74
75
    // SAFETY: we pass in a valid fd, initialized optval, and matching optlen.
76
1
    let ret = unsafe {
77
1
        libc::setsockopt(
78
1
            socket.as_raw_fd(),
79
            libc::IPPROTO_IP,
80
            libc::IP_FREEBIND,
81
1
            core::ptr::addr_of!(enable).cast(),
82
1
            optlen,
83
        )
84
    };
85
1
    if ret == 0 {
86
1
        Ok(())
87
    } else {
88
0
        Err(io::Error::last_os_error())
89
    }
90
1
}
91
92
#[cfg(not(target_os = "linux"))]
93
pub const fn set_freebind<F>(_socket: &F) -> Result<(), io::Error> {
94
    Ok(())
95
}
96
97
impl AsRef<tokio::fs::File> for FileSlot {
98
149
    fn as_ref(&self) -> &tokio::fs::File {
99
149
        &self.inner
100
149
    }
101
}
102
103
impl AsMut<tokio::fs::File> for FileSlot {
104
0
    fn as_mut(&mut self) -> &mut tokio::fs::File {
105
0
        &mut self.inner
106
0
    }
107
}
108
109
impl AsyncRead for FileSlot {
110
2.85k
    fn poll_read(
111
2.85k
        mut self: Pin<&mut Self>,
112
2.85k
        cx: &mut Context<'_>,
113
2.85k
        buf: &mut ReadBuf<'_>,
114
2.85k
    ) -> Poll<Result<(), tokio::io::Error>> {
115
2.85k
        Pin::new(&mut self.inner).poll_read(cx, buf)
116
2.85k
    }
117
}
118
119
impl AsyncSeek for FileSlot {
120
35
    fn start_seek(mut self: Pin<&mut Self>, position: SeekFrom) -> Result<(), tokio::io::Error> {
121
35
        Pin::new(&mut self.inner).start_seek(position)
122
35
    }
123
124
104
    fn poll_complete(
125
104
        mut self: Pin<&mut Self>,
126
104
        cx: &mut Context<'_>,
127
104
    ) -> Poll<Result<u64, tokio::io::Error>> {
128
104
        Pin::new(&mut self.inner).poll_complete(cx)
129
104
    }
130
}
131
132
impl AsyncWrite for FileSlot {
133
33
    fn poll_write(
134
33
        mut self: Pin<&mut Self>,
135
33
        cx: &mut Context<'_>,
136
33
        buf: &[u8],
137
33
    ) -> Poll<Result<usize, tokio::io::Error>> {
138
33
        Pin::new(&mut self.inner).poll_write(cx, buf)
139
33
    }
140
141
290
    fn poll_flush(
142
290
        mut self: Pin<&mut Self>,
143
290
        cx: &mut Context<'_>,
144
290
    ) -> Poll<Result<(), tokio::io::Error>> {
145
290
        Pin::new(&mut self.inner).poll_flush(cx)
146
290
    }
147
148
0
    fn poll_shutdown(
149
0
        mut self: Pin<&mut Self>,
150
0
        cx: &mut Context<'_>,
151
0
    ) -> Poll<Result<(), tokio::io::Error>> {
152
0
        Pin::new(&mut self.inner).poll_shutdown(cx)
153
0
    }
154
155
124
    fn poll_write_vectored(
156
124
        mut self: Pin<&mut Self>,
157
124
        cx: &mut Context<'_>,
158
124
        bufs: &[IoSlice<'_>],
159
124
    ) -> Poll<Result<usize, tokio::io::Error>> {
160
124
        Pin::new(&mut self.inner).poll_write_vectored(cx, bufs)
161
124
    }
162
163
124
    fn is_write_vectored(&self) -> bool {
164
124
        self.inner.is_write_vectored()
165
124
    }
166
}
167
168
// Note: If the default changes make sure you update the documentation in
169
// `config/cas_server.rs`.
170
pub const DEFAULT_OPEN_FILE_LIMIT: usize = 24 * 1024; // 24k.
171
static OPEN_FILE_LIMIT: AtomicUsize = AtomicUsize::new(DEFAULT_OPEN_FILE_LIMIT);
172
pub static OPEN_FILE_SEMAPHORE: Semaphore = Semaphore::const_new(DEFAULT_OPEN_FILE_LIMIT);
173
174
/// Try to acquire a permit from the open file semaphore.
175
#[inline]
176
26.2k
pub async fn get_permit() -> Result<SemaphorePermit<'static>, Error> {
177
26.2k
    trace!(
178
26.1k
        available_permits = OPEN_FILE_SEMAPHORE.available_permits(),
179
        "getting FS permit"
180
    );
181
26.2k
    OPEN_FILE_SEMAPHORE
182
26.2k
        .acquire()
183
26.2k
        .await
184
26.2k
        .map_err(|e| 
make_err!0
(
Code::Internal0
, "Open file semaphore closed {:?}", e))
185
26.2k
}
186
/// Acquire a permit from the open file semaphore and call a raw function.
187
#[inline]
188
1.64k
pub async fn call_with_permit<F, T>(f: F) -> Result<T, Error>
189
1.64k
where
190
1.64k
    F: FnOnce(SemaphorePermit<'static>) -> Result<T, Error> + Send + 'static,
191
1.64k
    T: Send + 'static,
192
1.64k
{
193
1.64k
    let permit = get_permit().await
?0
;
194
1.64k
    spawn_blocking!("fs_call_with_permit", move || 
f1.63k
(
permit1.63k
))
195
1.64k
        .await
196
1.63k
        .unwrap_or_else(|e| 
{0
197
0
            Err(Error::from_std_err(Code::Internal, &e).append("background task failed"))
198
0
        })
199
1.63k
}
200
201
/// Sets the soft nofile limit to `desired_open_file_limit` and adjusts
202
/// `OPEN_FILE_SEMAPHORE` accordingly.
203
///
204
/// # Panics
205
///
206
/// If any type conversion fails. This can't happen if `usize` is smaller than
207
/// `u64`.
208
0
pub fn set_open_file_limit(desired_open_file_limit: usize) {
209
    // Tokio semaphores have a max of 2^61 - 1 permits. On some platforms
210
    // (e.g. macOS) the kernel reports an "unlimited" file limit that exceeds
211
    // this, causing a panic when we try to add permits. Cap at a generous but
212
    // safe value.
213
    const MAX_SAFE_LIMIT: usize = 1 << 30; // ~1 billion
214
215
0
    let new_open_file_limit = {
216
0
        match increase_nofile_limit(
217
0
            u64::try_from(desired_open_file_limit)
218
0
                .expect("desired_open_file_limit is too large to convert to u64."),
219
0
        ) {
220
0
            Ok(open_file_limit) => {
221
0
                info!("set_open_file_limit() assigns new open file limit {open_file_limit}.",);
222
0
                usize::try_from(open_file_limit)
223
0
                    .expect("open_file_limit is too large to convert to usize.")
224
0
                    .min(MAX_SAFE_LIMIT)
225
            }
226
0
            Err(e) => {
227
0
                error!(
228
                    "set_open_file_limit() failed to assign open file limit. Maybe system does not have ulimits, continuing anyway. - {e:?}",
229
                );
230
0
                DEFAULT_OPEN_FILE_LIMIT
231
            }
232
        }
233
    };
234
    // TODO(jaroeichler): Can we give a better estimate?
235
0
    if new_open_file_limit < DEFAULT_OPEN_FILE_LIMIT {
236
0
        warn!(
237
            "The new open file limit ({new_open_file_limit}) is below the recommended value of {DEFAULT_OPEN_FILE_LIMIT}. Consider raising max_open_files.",
238
        );
239
0
    }
240
241
    // Use only 80% of the open file limit for permits from OPEN_FILE_SEMAPHORE
242
    // to give extra room for other file descriptors like sockets, pipes, and
243
    // other things.
244
0
    let reduced_open_file_limit = new_open_file_limit.saturating_sub(new_open_file_limit / 5);
245
0
    let previous_open_file_limit = OPEN_FILE_LIMIT.load(Ordering::Acquire);
246
    // No permit should be acquired yet, so this warning should not occur.
247
0
    if (OPEN_FILE_SEMAPHORE.available_permits() + reduced_open_file_limit)
248
0
        < previous_open_file_limit
249
    {
250
0
        warn!(
251
            "There are not enough available permits to remove {previous_open_file_limit} - {reduced_open_file_limit} permits.",
252
        );
253
0
    }
254
0
    if previous_open_file_limit <= reduced_open_file_limit {
255
0
        OPEN_FILE_LIMIT.fetch_add(
256
0
            reduced_open_file_limit - previous_open_file_limit,
257
0
            Ordering::Release,
258
0
        );
259
0
        OPEN_FILE_SEMAPHORE.add_permits(reduced_open_file_limit - previous_open_file_limit);
260
0
    } else {
261
0
        OPEN_FILE_LIMIT.fetch_sub(
262
0
            previous_open_file_limit - reduced_open_file_limit,
263
0
            Ordering::Release,
264
0
        );
265
0
        OPEN_FILE_SEMAPHORE.forget_permits(previous_open_file_limit - reduced_open_file_limit);
266
0
    }
267
0
}
268
269
22
pub fn get_open_files_for_test() -> usize {
270
22
    OPEN_FILE_LIMIT.load(Ordering::Acquire) - OPEN_FILE_SEMAPHORE.available_permits()
271
22
}
272
273
106
pub async fn open_file(
274
106
    path: impl AsRef<Path>,
275
106
    start: u64,
276
106
    limit: u64,
277
106
) -> Result<Take<FileSlot>, Error> {
278
106
    let path = path.as_ref().to_owned();
279
106
    let (
permit103
,
os_file103
) = call_with_permit(move |permit| {
280
103
        let mut os_file =
281
106
            std::fs::File::open(&path).err_tip(|| 
format!3
("Could not open {}",
path.display()3
))
?3
;
282
103
        if start > 0 {
283
0
            os_file
284
0
                .seek(SeekFrom::Start(start))
285
0
                .err_tip(|| format!("Could not seek to {start} in {}", path.display()))?;
286
103
        }
287
103
        Ok((permit, os_file))
288
106
    })
289
106
    .await
?3
;
290
103
    Ok(FileSlot {
291
103
        _permit: permit,
292
103
        inner: tokio::fs::File::from_std(os_file),
293
103
    }
294
103
    .take(limit))
295
106
}
296
297
156
pub async fn create_file(path: impl AsRef<Path>) -> Result<FileSlot, Error> {
298
156
    let path = path.as_ref().to_owned();
299
156
    let (permit, os_file) = call_with_permit(move |permit| {
300
        Ok((
301
156
            permit,
302
156
            std::fs::File::options()
303
156
                .read(true)
304
156
                .write(true)
305
156
                .create(true)
306
156
                .truncate(true)
307
156
                .open(&path)
308
156
                .err_tip(|| 
format!0
("Could not open {}",
path.display()0
))
?0
,
309
        ))
310
156
    })
311
156
    .await
?0
;
312
156
    Ok(FileSlot {
313
156
        _permit: permit,
314
156
        inner: tokio::fs::File::from_std(os_file),
315
156
    })
316
156
}
317
318
9
pub async fn hard_link(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> Result<(), Error> {
319
9
    let src = src.as_ref().to_owned();
320
9
    let dst = dst.as_ref().to_owned();
321
9
    call_with_permit(move |_| std::fs::hard_link(src, dst).map_err(Into::<Error>::into)).await
322
9
}
323
324
157
pub async fn set_permissions(src: impl AsRef<Path>, perm: Permissions) -> Result<(), Error> {
325
157
    let src = src.as_ref().to_owned();
326
157
    call_with_permit(move |_| std::fs::set_permissions(src, perm).map_err(Into::<Error>::into))
327
157
        .await
328
157
}
329
330
55
pub async fn create_dir(path: impl AsRef<Path>) -> Result<(), Error> {
331
55
    let path = path.as_ref().to_owned();
332
55
    call_with_permit(move |_| std::fs::create_dir(path).map_err(Into::<Error>::into)).await
333
55
}
334
335
487
pub async fn create_dir_all(path: impl AsRef<Path>) -> Result<(), Error> {
336
487
    let path = path.as_ref().to_owned();
337
487
    call_with_permit(move |_| std::fs::create_dir_all(path).map_err(Into::<Error>::into)).await
338
487
}
339
340
#[cfg(target_family = "unix")]
341
2
pub async fn symlink(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> Result<(), Error> {
342
    // TODO: add a test for #2051: deadlock with large number of files
343
2
    let _permit = get_permit().await
?0
;
344
2
    tokio::fs::symlink(src, dst).await.map_err(Into::into)
345
2
}
346
347
6
pub async fn read_link(path: impl AsRef<Path>) -> Result<PathBuf, Error> {
348
6
    let path = path.as_ref().to_owned();
349
6
    call_with_permit(move |_| std::fs::read_link(path).map_err(Into::<Error>::into)).await
350
6
}
351
352
#[derive(Debug)]
353
pub struct ReadDir {
354
    // We hold the permit because once it is dropped it goes back into the queue.
355
    permit: SemaphorePermit<'static>,
356
    inner: tokio::fs::ReadDir,
357
}
358
359
impl ReadDir {
360
460
    pub fn into_inner(self) -> (SemaphorePermit<'static>, tokio::fs::ReadDir) {
361
460
        (self.permit, self.inner)
362
460
    }
363
}
364
365
impl AsRef<tokio::fs::ReadDir> for ReadDir {
366
0
    fn as_ref(&self) -> &tokio::fs::ReadDir {
367
0
        &self.inner
368
0
    }
369
}
370
371
impl AsMut<tokio::fs::ReadDir> for ReadDir {
372
0
    fn as_mut(&mut self) -> &mut tokio::fs::ReadDir {
373
0
        &mut self.inner
374
0
    }
375
}
376
377
462
pub async fn read_dir(path: impl AsRef<Path>) -> Result<ReadDir, Error> {
378
462
    let path = path.as_ref().to_owned();
379
462
    let (permit, inner) = call_with_permit(move |permit| {
380
        Ok((
381
462
            permit,
382
462
            tokio::runtime::Handle::current()
383
462
                .block_on(tokio::fs::read_dir(path))
384
462
                .map_err(Into::<Error>::into)
?0
,
385
        ))
386
462
    })
387
462
    .await
?0
;
388
462
    Ok(ReadDir { permit, inner })
389
462
}
390
391
11
pub async fn rename(from: impl AsRef<Path>, to: impl AsRef<Path>) -> Result<(), Error> {
392
11
    let from = from.as_ref().to_owned();
393
11
    let to = to.as_ref().to_owned();
394
11
    call_with_permit(move |_| std::fs::rename(from, to).map_err(Into::<Error>::into)).await
395
11
}
396
397
13
pub async fn remove_file(path: impl AsRef<Path>) -> Result<(), Error> {
398
13
    let path = path.as_ref().to_owned();
399
13
    call_with_permit(move |_| 
std::fs::remove_file12
(
path12
).
map_err12
(Into::<Error>::into)).await
400
8
}
401
402
/// Removes an empty directory. Errors if the directory is not empty; use
403
/// [`remove_dir_all`] when the contents should be removed too.
404
0
pub async fn remove_dir(path: impl AsRef<Path>) -> Result<(), Error> {
405
0
    let path = path.as_ref().to_owned();
406
0
    call_with_permit(move |_| std::fs::remove_dir(path).map_err(Into::<Error>::into)).await
407
0
}
408
409
2
pub async fn canonicalize(path: impl AsRef<Path>) -> Result<PathBuf, Error> {
410
2
    let path = path.as_ref().to_owned();
411
2
    call_with_permit(move |_| std::fs::canonicalize(path).map_err(Into::<Error>::into)).await
412
2
}
413
414
38
pub async fn metadata(path: impl AsRef<Path>) -> Result<Metadata, Error> {
415
38
    let path = path.as_ref().to_owned();
416
38
    call_with_permit(move |_| std::fs::metadata(path).map_err(Into::<Error>::into)).await
417
38
}
418
419
8
pub async fn read(path: impl AsRef<Path>) -> Result<Vec<u8>, Error> {
420
8
    let path = path.as_ref().to_owned();
421
8
    call_with_permit(move |_| std::fs::read(path).map_err(Into::<Error>::into)).await
422
8
}
423
424
14
pub async fn symlink_metadata(path: impl AsRef<Path>) -> Result<Metadata, Error> {
425
14
    let path = path.as_ref().to_owned();
426
14
    call_with_permit(move |_| std::fs::symlink_metadata(path).map_err(Into::<Error>::into)).await
427
14
}
428
429
// We can't just use the stock remove_dir_all as it falls over if someone's set readonly
430
// permissions. This version walks the directories and fixes the permissions where needed
431
// before deleting everything.
432
#[cfg(not(target_family = "windows"))]
433
116
fn internal_remove_dir_all(path: impl AsRef<Path>) -> Result<(), Error> {
434
    // Because otherwise Windows builds complain about these things not being used
435
    use std::io::ErrorKind;
436
    use std::os::unix::fs::PermissionsExt;
437
438
    use tracing::debug;
439
    use walkdir::WalkDir;
440
441
117
    for entry in 
WalkDir::new116
(
&path116
) {
442
117
        let Ok(
entry31
) = &entry else {
443
86
            debug!(?entry, "Can't get entry, assuming already deleted");
444
86
            continue;
445
        };
446
31
        let metadata = entry.metadata()
?0
;
447
31
        if metadata.is_dir() {
448
31
            match std::fs::remove_dir_all(entry.path()) {
449
30
                Ok(()) => {}
450
1
                Err(e) if e.kind() == ErrorKind::PermissionDenied => {
451
1
                    std::fs::set_permissions(entry.path(), Permissions::from_mode(0o700)).err_tip(
452
0
                        || format!("Setting permissions for {}", entry.path().display()),
453
0
                    )?;
454
                }
455
0
                e @ Err(_) => e.err_tip(|| format!("Removing {}", entry.path().display()))?,
456
            }
457
0
        } else if metadata.is_file() {
458
0
            std::fs::set_permissions(entry.path(), Permissions::from_mode(0o600))
459
0
                .err_tip(|| format!("Setting permissions for {}", entry.path().display()))?;
460
0
        }
461
    }
462
463
    // should now be safe to delete after we fixed all the permissions in the walk loop
464
116
    match std::fs::remove_dir_all(&path) {
465
1
        Ok(()) => {}
466
115
        Err(e) if e.kind() == ErrorKind::NotFound => {}
467
0
        e @ Err(_) => e.err_tip(|| {
468
0
            format!(
469
                "Removing {} after permissions fixes",
470
0
                path.as_ref().display()
471
            )
472
0
        })?,
473
    }
474
116
    Ok(())
475
116
}
476
477
// We can't set the permissions easily in Windows, so just fallback to
478
// the stock Rust remove_dir_all
479
#[cfg(target_family = "windows")]
480
fn internal_remove_dir_all(path: impl AsRef<Path>) -> Result<(), Error> {
481
    std::fs::remove_dir_all(&path)?;
482
    Ok(())
483
}
484
485
116
pub async fn remove_dir_all(path: impl AsRef<Path>) -> Result<(), Error> {
486
116
    let path = path.as_ref().to_owned();
487
116
    call_with_permit(move |_| internal_remove_dir_all(path)).await
488
116
}