Coverage Report

Created: 2025-12-17 22:46

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-util/src/fs.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::pin::Pin;
16
use core::sync::atomic::{AtomicUsize, Ordering};
17
use core::task::{Context, Poll};
18
use std::fs::{Metadata, Permissions};
19
use std::io::{IoSlice, Seek};
20
use std::path::{Path, PathBuf};
21
22
use nativelink_error::{Code, Error, ResultExt, make_err};
23
use rlimit::increase_nofile_limit;
24
/// We wrap all `tokio::fs` items in our own wrapper so we can limit the number of outstanding
25
/// open files at any given time. This will greatly reduce the chance we'll hit open file limit
26
/// issues.
27
pub use tokio::fs::DirEntry;
28
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncWrite, ReadBuf, SeekFrom, Take};
29
use tokio::sync::{Semaphore, SemaphorePermit};
30
use tracing::{error, info, trace, warn};
31
32
use crate::spawn_blocking;
33
34
/// Default read buffer size when reading to/from disk.
35
pub const DEFAULT_READ_BUFF_SIZE: usize = 0x4000;
36
37
#[derive(Debug)]
38
pub struct FileSlot {
39
    // We hold the permit because once it is dropped it goes back into the queue.
40
    _permit: SemaphorePermit<'static>,
41
    inner: tokio::fs::File,
42
}
43
44
impl AsRef<tokio::fs::File> for FileSlot {
45
89
    fn as_ref(&self) -> &tokio::fs::File {
46
89
        &self.inner
47
89
    }
48
}
49
50
impl AsMut<tokio::fs::File> for FileSlot {
51
2
    fn as_mut(&mut self) -> &mut tokio::fs::File {
52
2
        &mut self.inner
53
2
    }
54
}
55
56
impl AsyncRead for FileSlot {
57
2.57k
    fn poll_read(
58
2.57k
        mut self: Pin<&mut Self>,
59
2.57k
        cx: &mut Context<'_>,
60
2.57k
        buf: &mut ReadBuf<'_>,
61
2.57k
    ) -> Poll<Result<(), tokio::io::Error>> {
62
2.57k
        Pin::new(&mut self.inner).poll_read(cx, buf)
63
2.57k
    }
64
}
65
66
impl AsyncSeek for FileSlot {
67
19
    fn start_seek(mut self: Pin<&mut Self>, position: SeekFrom) -> Result<(), tokio::io::Error> {
68
19
        Pin::new(&mut self.inner).start_seek(position)
69
19
    }
70
71
57
    fn poll_complete(
72
57
        mut self: Pin<&mut Self>,
73
57
        cx: &mut Context<'_>,
74
57
    ) -> Poll<Result<u64, tokio::io::Error>> {
75
57
        Pin::new(&mut self.inner).poll_complete(cx)
76
57
    }
77
}
78
79
impl AsyncWrite for FileSlot {
80
24
    fn poll_write(
81
24
        mut self: Pin<&mut Self>,
82
24
        cx: &mut Context<'_>,
83
24
        buf: &[u8],
84
24
    ) -> Poll<Result<usize, tokio::io::Error>> {
85
24
        Pin::new(&mut self.inner).poll_write(cx, buf)
86
24
    }
87
88
0
    fn poll_flush(
89
0
        mut self: Pin<&mut Self>,
90
0
        cx: &mut Context<'_>,
91
0
    ) -> Poll<Result<(), tokio::io::Error>> {
92
0
        Pin::new(&mut self.inner).poll_flush(cx)
93
0
    }
94
95
0
    fn poll_shutdown(
96
0
        mut self: Pin<&mut Self>,
97
0
        cx: &mut Context<'_>,
98
0
    ) -> Poll<Result<(), tokio::io::Error>> {
99
0
        Pin::new(&mut self.inner).poll_shutdown(cx)
100
0
    }
101
102
76
    fn poll_write_vectored(
103
76
        mut self: Pin<&mut Self>,
104
76
        cx: &mut Context<'_>,
105
76
        bufs: &[IoSlice<'_>],
106
76
    ) -> Poll<Result<usize, tokio::io::Error>> {
107
76
        Pin::new(&mut self.inner).poll_write_vectored(cx, bufs)
108
76
    }
109
110
76
    fn is_write_vectored(&self) -> bool {
111
76
        self.inner.is_write_vectored()
112
76
    }
113
}
114
115
// Note: If the default changes make sure you update the documentation in
116
// `config/cas_server.rs`.
117
pub const DEFAULT_OPEN_FILE_LIMIT: usize = 24 * 1024; // 24k.
118
static OPEN_FILE_LIMIT: AtomicUsize = AtomicUsize::new(DEFAULT_OPEN_FILE_LIMIT);
119
pub static OPEN_FILE_SEMAPHORE: Semaphore = Semaphore::const_new(DEFAULT_OPEN_FILE_LIMIT);
120
121
/// Try to acquire a permit from the open file semaphore.
122
#[inline]
123
25.3k
pub async fn get_permit() -> Result<SemaphorePermit<'static>, Error> {
124
25.3k
    trace!(
125
25.3k
        available_permits = OPEN_FILE_SEMAPHORE.available_permits(),
126
25.3k
        "getting FS permit"
127
    );
128
25.3k
    OPEN_FILE_SEMAPHORE
129
25.3k
        .acquire()
130
25.3k
        .await
131
25.3k
        .map_err(|e| make_err!(
Code::Internal0
, "Open file semaphore closed {:?}", e))
132
25.3k
}
133
/// Acquire a permit from the open file semaphore and call a raw function.
134
#[inline]
135
807
pub async fn call_with_permit<F, T>(f: F) -> Result<T, Error>
136
807
where
137
807
    F: FnOnce(SemaphorePermit<'static>) -> Result<T, Error> + Send + 'static,
138
807
    T: Send + 'static,
139
807
{
140
807
    let permit = get_permit().await
?0
;
141
807
    spawn_blocking!("fs_call_with_permit", move || 
f806
(
permit806
))
142
807
        .await
143
804
        .unwrap_or_else(|e| Err(
make_err!0
(
Code::Internal0
, "background task failed: {e:?}")))
144
804
}
145
146
/// Sets the soft nofile limit to `desired_open_file_limit` and adjusts
147
/// `OPEN_FILE_SEMAPHORE` accordingly.
148
///
149
/// # Panics
150
///
151
/// If any type conversion fails. This can't happen if `usize` is smaller than
152
/// `u64`.
153
0
pub fn set_open_file_limit(desired_open_file_limit: usize) {
154
0
    let new_open_file_limit = {
155
0
        match increase_nofile_limit(
156
0
            u64::try_from(desired_open_file_limit)
157
0
                .expect("desired_open_file_limit is too large to convert to u64."),
158
0
        ) {
159
0
            Ok(open_file_limit) => {
160
0
                info!("set_open_file_limit() assigns new open file limit {open_file_limit}.",);
161
0
                usize::try_from(open_file_limit)
162
0
                    .expect("open_file_limit is too large to convert to usize.")
163
            }
164
0
            Err(e) => {
165
0
                error!(
166
0
                    "set_open_file_limit() failed to assign open file limit. Maybe system does not have ulimits, continuing anyway. - {e:?}",
167
                );
168
0
                DEFAULT_OPEN_FILE_LIMIT
169
            }
170
        }
171
    };
172
    // TODO(jaroeichler): Can we give a better estimate?
173
0
    if new_open_file_limit < DEFAULT_OPEN_FILE_LIMIT {
  Branch (173:8): [True: 0, False: 0]
  Branch (173:8): [Folded - Ignored]
174
0
        warn!(
175
0
            "The new open file limit ({new_open_file_limit}) is below the recommended value of {DEFAULT_OPEN_FILE_LIMIT}. Consider raising max_open_files.",
176
        );
177
0
    }
178
179
    // Use only 80% of the open file limit for permits from OPEN_FILE_SEMAPHORE
180
    // to give extra room for other file descriptors like sockets, pipes, and
181
    // other things.
182
0
    let reduced_open_file_limit = new_open_file_limit.saturating_sub(new_open_file_limit / 5);
183
0
    let previous_open_file_limit = OPEN_FILE_LIMIT.load(Ordering::Acquire);
184
    // No permit should be acquired yet, so this warning should not occur.
185
0
    if (OPEN_FILE_SEMAPHORE.available_permits() + reduced_open_file_limit)
  Branch (185:8): [True: 0, False: 0]
  Branch (185:8): [Folded - Ignored]
186
0
        < previous_open_file_limit
187
    {
188
0
        warn!(
189
0
            "There are not enough available permits to remove {previous_open_file_limit} - {reduced_open_file_limit} permits.",
190
        );
191
0
    }
192
0
    if previous_open_file_limit <= reduced_open_file_limit {
  Branch (192:8): [True: 0, False: 0]
  Branch (192:8): [Folded - Ignored]
193
0
        OPEN_FILE_LIMIT.fetch_add(
194
0
            reduced_open_file_limit - previous_open_file_limit,
195
0
            Ordering::Release,
196
0
        );
197
0
        OPEN_FILE_SEMAPHORE.add_permits(reduced_open_file_limit - previous_open_file_limit);
198
0
    } else {
199
0
        OPEN_FILE_LIMIT.fetch_sub(
200
0
            previous_open_file_limit - reduced_open_file_limit,
201
0
            Ordering::Release,
202
0
        );
203
0
        OPEN_FILE_SEMAPHORE.forget_permits(previous_open_file_limit - reduced_open_file_limit);
204
0
    }
205
0
}
206
207
18
pub fn get_open_files_for_test() -> usize {
208
18
    OPEN_FILE_LIMIT.load(Ordering::Acquire) - OPEN_FILE_SEMAPHORE.available_permits()
209
18
}
210
211
65
pub async fn open_file(
212
65
    path: impl AsRef<Path>,
213
65
    start: u64,
214
65
    limit: u64,
215
65
) -> Result<Take<FileSlot>, Error> {
216
65
    let path = path.as_ref().to_owned();
217
65
    let (
permit63
,
os_file63
) = call_with_permit(move |permit| {
218
63
        let mut os_file =
219
65
            std::fs::File::open(&path).err_tip(|| format!(
"Could not open {}"2
,
path.display()2
))
?2
;
220
63
        if start > 0 {
  Branch (220:12): [True: 0, False: 0]
  Branch (220:12): [Folded - Ignored]
  Branch (220:12): [True: 0, False: 1]
  Branch (220:12): [True: 0, False: 2]
  Branch (220:12): [True: 0, False: 1]
  Branch (220:12): [True: 0, False: 4]
  Branch (220:12): [True: 0, False: 6]
  Branch (220:12): [True: 0, False: 0]
  Branch (220:12): [True: 0, False: 1]
  Branch (220:12): [Folded - Ignored]
  Branch (220:12): [True: 0, False: 0]
  Branch (220:12): [True: 0, False: 41]
  Branch (220:12): [True: 0, False: 3]
  Branch (220:12): [True: 0, False: 4]
221
0
            os_file
222
0
                .seek(SeekFrom::Start(start))
223
0
                .err_tip(|| format!("Could not seek to {start} in {}", path.display()))?;
224
63
        }
225
63
        Ok((permit, os_file))
226
65
    })
227
65
    .await
?2
;
228
63
    Ok(FileSlot {
229
63
        _permit: permit,
230
63
        inner: tokio::fs::File::from_std(os_file),
231
63
    }
232
63
    .take(limit))
233
65
}
234
235
89
pub async fn create_file(path: impl AsRef<Path>) -> Result<FileSlot, Error> {
236
89
    let path = path.as_ref().to_owned();
237
89
    let (permit, os_file) = call_with_permit(move |permit| {
238
        Ok((
239
89
            permit,
240
89
            std::fs::File::options()
241
89
                .read(true)
242
89
                .write(true)
243
89
                .create(true)
244
89
                .truncate(true)
245
89
                .open(&path)
246
89
                .err_tip(|| format!(
"Could not open {}"0
,
path.display()0
))
?0
,
247
        ))
248
89
    })
249
89
    .await
?0
;
250
89
    Ok(FileSlot {
251
89
        _permit: permit,
252
89
        inner: tokio::fs::File::from_std(os_file),
253
89
    })
254
89
}
255
256
4
pub async fn hard_link(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> Result<(), Error> {
257
4
    let src = src.as_ref().to_owned();
258
4
    let dst = dst.as_ref().to_owned();
259
4
    call_with_permit(move |_| std::fs::hard_link(src, dst).map_err(Into::<Error>::into)).await
260
4
}
261
262
1
pub async fn set_permissions(src: impl AsRef<Path>, perm: Permissions) -> Result<(), Error> {
263
1
    let src = src.as_ref().to_owned();
264
1
    call_with_permit(move |_| std::fs::set_permissions(src, perm).map_err(Into::<Error>::into))
265
1
        .await
266
1
}
267
268
41
pub async fn create_dir(path: impl AsRef<Path>) -> Result<(), Error> {
269
41
    let path = path.as_ref().to_owned();
270
41
    call_with_permit(move |_| std::fs::create_dir(path).map_err(Into::<Error>::into)).await
271
41
}
272
273
245
pub async fn create_dir_all(path: impl AsRef<Path>) -> Result<(), Error> {
274
245
    let path = path.as_ref().to_owned();
275
245
    call_with_permit(move |_| std::fs::create_dir_all(path).map_err(Into::<Error>::into)).await
276
245
}
277
278
#[cfg(target_family = "unix")]
279
1
pub async fn symlink(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> Result<(), Error> {
280
    // TODO: add a test for #2051: deadlock with large number of files
281
1
    let _permit = get_permit().await
?0
;
282
1
    tokio::fs::symlink(src, dst).await.map_err(Into::into)
283
1
}
284
285
2
pub async fn read_link(path: impl AsRef<Path>) -> Result<PathBuf, Error> {
286
2
    let path = path.as_ref().to_owned();
287
2
    call_with_permit(move |_| std::fs::read_link(path).map_err(Into::<Error>::into)).await
288
2
}
289
290
#[derive(Debug)]
291
pub struct ReadDir {
292
    // We hold the permit because once it is dropped it goes back into the queue.
293
    permit: SemaphorePermit<'static>,
294
    inner: tokio::fs::ReadDir,
295
}
296
297
impl ReadDir {
298
289
    pub fn into_inner(self) -> (SemaphorePermit<'static>, tokio::fs::ReadDir) {
299
289
        (self.permit, self.inner)
300
289
    }
301
}
302
303
impl AsRef<tokio::fs::ReadDir> for ReadDir {
304
0
    fn as_ref(&self) -> &tokio::fs::ReadDir {
305
0
        &self.inner
306
0
    }
307
}
308
309
impl AsMut<tokio::fs::ReadDir> for ReadDir {
310
0
    fn as_mut(&mut self) -> &mut tokio::fs::ReadDir {
311
0
        &mut self.inner
312
0
    }
313
}
314
315
291
pub async fn read_dir(path: impl AsRef<Path>) -> Result<ReadDir, Error> {
316
291
    let path = path.as_ref().to_owned();
317
291
    let (permit, inner) = call_with_permit(move |permit| {
318
        Ok((
319
291
            permit,
320
291
            tokio::runtime::Handle::current()
321
291
                .block_on(tokio::fs::read_dir(path))
322
291
                .map_err(Into::<Error>::into)
?0
,
323
        ))
324
291
    })
325
291
    .await
?0
;
326
291
    Ok(ReadDir { permit, inner })
327
291
}
328
329
9
pub async fn rename(from: impl AsRef<Path>, to: impl AsRef<Path>) -> Result<(), Error> {
330
9
    let from = from.as_ref().to_owned();
331
9
    let to = to.as_ref().to_owned();
332
9
    call_with_permit(move |_| std::fs::rename(from, to).map_err(Into::<Error>::into)).await
333
9
}
334
335
8
pub async fn remove_file(path: impl AsRef<Path>) -> Result<(), Error> {
336
8
    let path = path.as_ref().to_owned();
337
8
    call_with_permit(move |_| 
std::fs::remove_file7
(
path7
).
map_err7
(Into::<Error>::into)).await
338
5
}
339
340
2
pub async fn canonicalize(path: impl AsRef<Path>) -> Result<PathBuf, Error> {
341
2
    let path = path.as_ref().to_owned();
342
2
    call_with_permit(move |_| std::fs::canonicalize(path).map_err(Into::<Error>::into)).await
343
2
}
344
345
15
pub async fn metadata(path: impl AsRef<Path>) -> Result<Metadata, Error> {
346
15
    let path = path.as_ref().to_owned();
347
15
    call_with_permit(move |_| std::fs::metadata(path).map_err(Into::<Error>::into)).await
348
15
}
349
350
4
pub async fn read(path: impl AsRef<Path>) -> Result<Vec<u8>, Error> {
351
4
    let path = path.as_ref().to_owned();
352
4
    call_with_permit(move |_| std::fs::read(path).map_err(Into::<Error>::into)).await
353
4
}
354
355
8
pub async fn symlink_metadata(path: impl AsRef<Path>) -> Result<Metadata, Error> {
356
8
    let path = path.as_ref().to_owned();
357
8
    call_with_permit(move |_| std::fs::symlink_metadata(path).map_err(Into::<Error>::into)).await
358
8
}
359
360
// We can't just use the stock remove_dir_all as it falls over if someone's set readonly
361
// permissions. This version walks the directories and fixes the permissions where needed
362
// before deleting everything.
363
#[cfg(not(target_family = "windows"))]
364
23
fn internal_remove_dir_all(path: impl AsRef<Path>) -> Result<(), Error> {
365
    // Because otherwise Windows builds complain about these things not being used
366
    use std::io::ErrorKind;
367
    use std::os::unix::fs::PermissionsExt;
368
369
    use tracing::debug;
370
    use walkdir::WalkDir;
371
372
24
    for entry in 
WalkDir::new23
(
&path23
) {
373
24
        let Ok(
entry23
) = &entry else {
  Branch (373:13): [Folded - Ignored]
  Branch (373:13): [True: 1, False: 0]
  Branch (373:13): [True: 1, False: 1]
  Branch (373:13): [True: 1, False: 0]
  Branch (373:13): [True: 0, False: 0]
  Branch (373:13): [Folded - Ignored]
  Branch (373:13): [True: 20, False: 0]
374
1
            debug!("Can't get into {entry:?}, assuming already deleted");
375
1
            continue;
376
        };
377
23
        let metadata = entry.metadata()
?0
;
378
23
        if metadata.is_dir() {
  Branch (378:12): [Folded - Ignored]
  Branch (378:12): [True: 1, False: 0]
  Branch (378:12): [True: 1, False: 0]
  Branch (378:12): [True: 1, False: 0]
  Branch (378:12): [True: 0, False: 0]
  Branch (378:12): [Folded - Ignored]
  Branch (378:12): [True: 20, False: 0]
379
23
            match std::fs::remove_dir_all(entry.path()) {
380
22
                Ok(()) => {}
381
1
                Err(e) if e.kind() == ErrorKind::PermissionDenied => {
  Branch (381:27): [Folded - Ignored]
  Branch (381:27): [True: 0, False: 0]
  Branch (381:27): [True: 1, False: 0]
  Branch (381:27): [True: 0, False: 0]
  Branch (381:27): [True: 0, False: 0]
  Branch (381:27): [Folded - Ignored]
  Branch (381:27): [True: 0, False: 0]
382
1
                    std::fs::set_permissions(entry.path(), Permissions::from_mode(0o700)).err_tip(
383
0
                        || format!("Setting permissions for {}", entry.path().display()),
384
0
                    )?;
385
                }
386
0
                e @ Err(_) => e.err_tip(|| format!("Removing {}", entry.path().display()))?,
387
            }
388
0
        } else if metadata.is_file() {
  Branch (388:19): [Folded - Ignored]
  Branch (388:19): [True: 0, False: 0]
  Branch (388:19): [True: 0, False: 0]
  Branch (388:19): [True: 0, False: 0]
  Branch (388:19): [True: 0, False: 0]
  Branch (388:19): [Folded - Ignored]
  Branch (388:19): [True: 0, False: 0]
389
0
            std::fs::set_permissions(entry.path(), Permissions::from_mode(0o600))
390
0
                .err_tip(|| format!("Setting permissions for {}", entry.path().display()))?;
391
0
        }
392
    }
393
394
    // should now be safe to delete after we fixed all the permissions in the walk loop
395
23
    match std::fs::remove_dir_all(&path) {
396
1
        Ok(()) => {}
397
22
        Err(e) if e.kind() == ErrorKind::NotFound => {}
  Branch (397:19): [Folded - Ignored]
  Branch (397:19): [True: 1, False: 0]
  Branch (397:19): [True: 0, False: 0]
  Branch (397:19): [True: 1, False: 0]
  Branch (397:19): [True: 0, False: 0]
  Branch (397:19): [Folded - Ignored]
  Branch (397:19): [True: 20, False: 0]
398
0
        e @ Err(_) => e.err_tip(|| {
399
0
            format!(
400
0
                "Removing {} after permissions fixes",
401
0
                path.as_ref().display()
402
            )
403
0
        })?,
404
    }
405
23
    Ok(())
406
23
}
407
408
// We can't set the permissions easily in Windows, so just fallback to
409
// the stock Rust remove_dir_all
410
#[cfg(target_family = "windows")]
411
fn internal_remove_dir_all(path: impl AsRef<Path>) -> Result<(), Error> {
412
    std::fs::remove_dir_all(&path)?;
413
    Ok(())
414
}
415
416
23
pub async fn remove_dir_all(path: impl AsRef<Path>) -> Result<(), Error> {
417
23
    let path = path.as_ref().to_owned();
418
23
    call_with_permit(move |_| internal_remove_dir_all(path)).await
419
23
}