Coverage Report

Created: 2025-02-20 14:24

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-util/src/fs.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::fs::Metadata;
16
use std::io::{IoSlice, Seek};
17
use std::path::{Path, PathBuf};
18
use std::pin::Pin;
19
use std::sync::atomic::{AtomicUsize, Ordering};
20
use std::task::{Context, Poll};
21
22
use nativelink_error::{make_err, Code, Error, ResultExt};
23
use rlimit::increase_nofile_limit;
24
/// We wrap all `tokio::fs` items in our own wrapper so we can limit the number of outstanding
25
/// open files at any given time. This will greatly reduce the chance we'll hit open file limit
26
/// issues.
27
pub use tokio::fs::DirEntry;
28
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncWrite, ReadBuf, SeekFrom, Take};
29
use tokio::sync::{Semaphore, SemaphorePermit};
30
use tracing::{event, Level};
31
32
use crate::spawn_blocking;
33
34
/// Default read buffer size when reading to/from disk.
35
pub const DEFAULT_READ_BUFF_SIZE: usize = 16384;
36
37
#[derive(Debug)]
38
pub struct FileSlot {
39
    // We hold the permit because once it is dropped it goes back into the queue.
40
    _permit: SemaphorePermit<'static>,
41
    inner: tokio::fs::File,
42
}
43
44
impl AsRef<tokio::fs::File> for FileSlot {
45
112
    fn as_ref(&self) -> &tokio::fs::File {
46
112
        &self.inner
47
112
    }
48
}
49
50
impl AsMut<tokio::fs::File> for FileSlot {
51
2
    fn as_mut(&mut self) -> &mut tokio::fs::File {
52
2
        &mut self.inner
53
2
    }
54
}
55
56
impl AsyncRead for FileSlot {
57
2.52k
    fn poll_read(
58
2.52k
        mut self: Pin<&mut Self>,
59
2.52k
        cx: &mut Context<'_>,
60
2.52k
        buf: &mut ReadBuf<'_>,
61
2.52k
    ) -> Poll<Result<(), tokio::io::Error>> {
62
2.52k
        Pin::new(&mut self.inner).poll_read(cx, buf)
63
2.52k
    }
64
}
65
66
impl AsyncSeek for FileSlot {
67
23
    fn start_seek(mut self: Pin<&mut Self>, position: SeekFrom) -> Result<(), tokio::io::Error> {
68
23
        Pin::new(&mut self.inner).start_seek(position)
69
23
    }
70
71
69
    fn poll_complete(
72
69
        mut self: Pin<&mut Self>,
73
69
        cx: &mut Context<'_>,
74
69
    ) -> Poll<Result<u64, tokio::io::Error>> {
75
69
        Pin::new(&mut self.inner).poll_complete(cx)
76
69
    }
77
}
78
79
impl AsyncWrite for FileSlot {
80
2
    fn poll_write(
81
2
        mut self: Pin<&mut Self>,
82
2
        cx: &mut Context<'_>,
83
2
        buf: &[u8],
84
2
    ) -> Poll<Result<usize, tokio::io::Error>> {
85
2
        Pin::new(&mut self.inner).poll_write(cx, buf)
86
2
    }
87
88
0
    fn poll_flush(
89
0
        mut self: Pin<&mut Self>,
90
0
        cx: &mut Context<'_>,
91
0
    ) -> Poll<Result<(), tokio::io::Error>> {
92
0
        Pin::new(&mut self.inner).poll_flush(cx)
93
0
    }
94
95
0
    fn poll_shutdown(
96
0
        mut self: Pin<&mut Self>,
97
0
        cx: &mut Context<'_>,
98
0
    ) -> Poll<Result<(), tokio::io::Error>> {
99
0
        Pin::new(&mut self.inner).poll_shutdown(cx)
100
0
    }
101
102
75
    fn poll_write_vectored(
103
75
        mut self: Pin<&mut Self>,
104
75
        cx: &mut Context<'_>,
105
75
        bufs: &[IoSlice<'_>],
106
75
    ) -> Poll<Result<usize, tokio::io::Error>> {
107
75
        Pin::new(&mut self.inner).poll_write_vectored(cx, bufs)
108
75
    }
109
110
75
    fn is_write_vectored(&self) -> bool {
111
75
        self.inner.is_write_vectored()
112
75
    }
113
}
114
115
// Note: If the default changes make sure you update the documentation in
116
// `config/cas_server.rs`.
117
pub const DEFAULT_OPEN_FILE_PERMITS: usize = 24 * 1024; // 24k.
118
static TOTAL_FILE_SEMAPHORES: AtomicUsize = AtomicUsize::new(DEFAULT_OPEN_FILE_PERMITS);
119
pub static OPEN_FILE_SEMAPHORE: Semaphore = Semaphore::const_new(DEFAULT_OPEN_FILE_PERMITS);
120
121
/// Try to acquire a permit from the open file semaphore.
122
#[inline]
123
25.3k
pub async fn get_permit() -> Result<SemaphorePermit<'static>, Error> {
124
25.3k
    OPEN_FILE_SEMAPHORE
125
25.3k
        .acquire()
126
25.3k
        .await
127
25.3k
        .map_err(|e| 
make_err!(Code::Internal, "Open file semaphore closed {:?}", e)0
)
128
25.3k
}
129
/// Acquire a permit from the open file semaphore and call a raw function.
130
#[inline]
131
735
pub async fn call_with_permit<F, T>(f: F) -> Result<T, Error>
132
735
where
133
735
    F: FnOnce(SemaphorePermit<'static>) -> Result<T, Error> + Send + 'static,
134
735
    T: Send + 'static,
135
735
{
136
735
    let permit = get_permit().await
?0
;
137
735
    spawn_blocking!("fs_call_with_permit", move || f(permit))
138
735
        .await
139
734
        .unwrap_or_else(|e| 
Err(make_err!(Code::Internal, "background task failed: {e:?}"))0
)
140
734
}
141
142
0
pub fn set_open_file_limit(limit: usize) {
143
0
    let new_limit = {
144
        // We increase the limit by 20% to give extra
145
        // room for other file descriptors like sockets,
146
        // pipes, and other things.
147
0
        let fs_ulimit =
148
0
            u64::try_from(limit.saturating_add(limit / 5)).expect("set_open_file_limit too large");
149
0
        match increase_nofile_limit(fs_ulimit) {
150
0
            Ok(new_fs_ulimit) => {
151
0
                event!(
152
0
                    Level::INFO,
153
0
                    "set_open_file_limit({limit})::ulimit success. New fs.ulimit: {fs_ulimit} (20% increase of {limit}).",
154
                );
155
0
                usize::try_from(new_fs_ulimit).expect("new_fs_ulimit too large")
156
            }
157
0
            Err(e) => {
158
0
                event!(
159
0
                    Level::ERROR,
160
0
                    "set_open_file_limit({limit})::ulimit failed. Maybe system does not have ulimits, continuing anyway. - {e:?}",
161
                );
162
0
                limit
163
            }
164
        }
165
    };
166
0
    if new_limit < DEFAULT_OPEN_FILE_PERMITS {
  Branch (166:8): [True: 0, False: 0]
  Branch (166:8): [Folded - Ignored]
167
0
        event!(
168
0
            Level::WARN,
169
0
            "set_open_file_limit({limit}) succeeded, but this is below the default limit of {DEFAULT_OPEN_FILE_PERMITS}. Will continue, but we recommend increasing the limit to at least the default.",
170
        );
171
0
    }
172
0
    if new_limit < limit {
  Branch (172:8): [True: 0, False: 0]
  Branch (172:8): [Folded - Ignored]
173
0
        event!(
174
0
            Level::WARN,
175
0
            "set_open_file_limit({limit}) succeeded, but new open file limit is {new_limit}. Will continue, but likely a config or system options (ie: ulimit) needs updated.",
176
        );
177
0
    }
178
179
0
    let current_total = TOTAL_FILE_SEMAPHORES.load(Ordering::Acquire);
180
0
    if limit < current_total {
  Branch (180:8): [True: 0, False: 0]
  Branch (180:8): [Folded - Ignored]
181
0
        event!(
182
0
            Level::ERROR,
183
0
            "set_open_file_limit({}) must be greater than {}",
184
            limit,
185
            current_total
186
        );
187
0
        return;
188
0
    }
189
0
    TOTAL_FILE_SEMAPHORES.fetch_add(limit - current_total, Ordering::Release);
190
0
    OPEN_FILE_SEMAPHORE.add_permits(limit - current_total);
191
0
}
192
193
13
pub fn get_open_files_for_test() -> usize {
194
13
    TOTAL_FILE_SEMAPHORES.load(Ordering::Acquire) - OPEN_FILE_SEMAPHORE.available_permits()
195
13
}
196
197
60
pub async fn open_file(
198
60
    path: impl AsRef<Path>,
199
60
    start: u64,
200
60
    limit: u64,
201
60
) -> Result<Take<FileSlot>, Error> {
202
60
    let path = path.as_ref().to_owned();
203
60
    let (
permit, os_file58
) = call_with_permit(move |permit| {
204
58
        let mut os_file =
205
60
            std::fs::File::open(&path).err_tip(|| 
format!("Could not open {path:?}")2
)
?2
;
206
58
        if start > 0 {
  Branch (206:12): [True: 0, False: 0]
  Branch (206:12): [Folded - Ignored]
  Branch (206:12): [True: 0, False: 1]
  Branch (206:12): [True: 0, False: 2]
  Branch (206:12): [True: 0, False: 4]
  Branch (206:12): [True: 0, False: 6]
  Branch (206:12): [True: 0, False: 0]
  Branch (206:12): [Folded - Ignored]
  Branch (206:12): [True: 0, False: 0]
  Branch (206:12): [True: 0, False: 38]
  Branch (206:12): [True: 0, False: 3]
  Branch (206:12): [True: 0, False: 4]
207
0
            os_file
208
0
                .seek(std::io::SeekFrom::Start(start))
209
0
                .err_tip(|| format!("Could not seek to {start} in {path:?}"))?;
210
58
        }
211
58
        Ok((permit, os_file))
212
60
    })
213
60
    .await
?2
;
214
58
    Ok(FileSlot {
215
58
        _permit: permit,
216
58
        inner: tokio::fs::File::from_std(os_file),
217
58
    }
218
58
    .take(limit))
219
60
}
220
221
112
pub async fn create_file(path: impl AsRef<Path>) -> Result<FileSlot, Error> {
222
112
    let path = path.as_ref().to_owned();
223
112
    let (permit, os_file) = call_with_permit(move |permit| {
224
112
        Ok((
225
112
            permit,
226
112
            std::fs::File::options()
227
112
                .read(true)
228
112
                .write(true)
229
112
                .create(true)
230
112
                .truncate(true)
231
112
                .open(&path)
232
112
                .err_tip(|| 
format!("Could not open {path:?}")0
)
?0
,
233
        ))
234
112
    })
235
112
    .await
?0
;
236
112
    Ok(FileSlot {
237
112
        _permit: permit,
238
112
        inner: tokio::fs::File::from_std(os_file),
239
112
    })
240
112
}
241
242
4
pub async fn hard_link(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> Result<(), Error> {
243
4
    let src = src.as_ref().to_owned();
244
4
    let dst = dst.as_ref().to_owned();
245
4
    call_with_permit(move |_| std::fs::hard_link(src, dst).map_err(Into::<Error>::into)).await
246
4
}
247
248
1
pub async fn set_permissions(
249
1
    src: impl AsRef<Path>,
250
1
    perm: std::fs::Permissions,
251
1
) -> Result<(), Error> {
252
1
    let src = src.as_ref().to_owned();
253
1
    call_with_permit(move |_| std::fs::set_permissions(src, perm).map_err(Into::<Error>::into))
254
1
        .await
255
1
}
256
257
38
pub async fn create_dir(path: impl AsRef<Path>) -> Result<(), Error> {
258
38
    let path = path.as_ref().to_owned();
259
38
    call_with_permit(move |_| std::fs::create_dir(path).map_err(Into::<Error>::into)).await
260
38
}
261
262
197
pub async fn create_dir_all(path: impl AsRef<Path>) -> Result<(), Error> {
263
197
    let path = path.as_ref().to_owned();
264
197
    call_with_permit(move |_| std::fs::create_dir_all(path).map_err(Into::<Error>::into)).await
265
197
}
266
267
#[cfg(target_family = "unix")]
268
1
pub async fn symlink(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> Result<(), Error> {
269
1
    let src = src.as_ref().to_owned();
270
1
    let dst = dst.as_ref().to_owned();
271
1
    call_with_permit(move |_| {
272
1
        tokio::runtime::Handle::current()
273
1
            .block_on(tokio::fs::symlink(src, dst))
274
1
            .map_err(Into::<Error>::into)
275
1
    })
276
1
    .await
277
1
}
278
279
2
pub async fn read_link(path: impl AsRef<Path>) -> Result<std::path::PathBuf, Error> {
280
2
    let path = path.as_ref().to_owned();
281
2
    call_with_permit(move |_| std::fs::read_link(path).map_err(Into::<Error>::into)).await
282
2
}
283
284
pub struct ReadDir {
285
    // We hold the permit because once it is dropped it goes back into the queue.
286
    permit: SemaphorePermit<'static>,
287
    inner: tokio::fs::ReadDir,
288
}
289
290
impl ReadDir {
291
220
    pub fn into_inner(self) -> (SemaphorePermit<'static>, tokio::fs::ReadDir) {
292
220
        (self.permit, self.inner)
293
220
    }
294
}
295
296
impl AsRef<tokio::fs::ReadDir> for ReadDir {
297
0
    fn as_ref(&self) -> &tokio::fs::ReadDir {
298
0
        &self.inner
299
0
    }
300
}
301
302
impl AsMut<tokio::fs::ReadDir> for ReadDir {
303
0
    fn as_mut(&mut self) -> &mut tokio::fs::ReadDir {
304
0
        &mut self.inner
305
0
    }
306
}
307
308
222
pub async fn read_dir(path: impl AsRef<Path>) -> Result<ReadDir, Error> {
309
222
    let path = path.as_ref().to_owned();
310
222
    let (permit, inner) = call_with_permit(move |permit| {
311
222
        Ok((
312
222
            permit,
313
222
            tokio::runtime::Handle::current()
314
222
                .block_on(tokio::fs::read_dir(path))
315
222
                .map_err(Into::<Error>::into)
?0
,
316
        ))
317
222
    })
318
222
    .await
?0
;
319
222
    Ok(ReadDir { permit, inner })
320
222
}
321
322
27
pub async fn rename(from: impl AsRef<Path>, to: impl AsRef<Path>) -> Result<(), Error> {
323
27
    let from = from.as_ref().to_owned();
324
27
    let to = to.as_ref().to_owned();
325
27
    call_with_permit(move |_| std::fs::rename(from, to).map_err(Into::<Error>::into)).await
326
27
}
327
328
24
pub async fn remove_file(path: impl AsRef<Path>) -> Result<(), Error> {
329
24
    let path = path.as_ref().to_owned();
330
24
    call_with_permit(move |_| std::fs::remove_file(path).map_err(Into::<Error>::into)).await
331
23
}
332
333
2
pub async fn canonicalize(path: impl AsRef<Path>) -> Result<PathBuf, Error> {
334
2
    let path = path.as_ref().to_owned();
335
2
    call_with_permit(move |_| std::fs::canonicalize(path).map_err(Into::<Error>::into)).await
336
2
}
337
338
16
pub async fn metadata(path: impl AsRef<Path>) -> Result<Metadata, Error> {
339
16
    let path = path.as_ref().to_owned();
340
16
    call_with_permit(move |_| std::fs::metadata(path).map_err(Into::<Error>::into)).await
341
16
}
342
343
4
pub async fn read(path: impl AsRef<Path>) -> Result<Vec<u8>, Error> {
344
4
    let path = path.as_ref().to_owned();
345
4
    call_with_permit(move |_| std::fs::read(path).map_err(Into::<Error>::into)).await
346
4
}
347
348
8
pub async fn symlink_metadata(path: impl AsRef<Path>) -> Result<Metadata, Error> {
349
8
    let path = path.as_ref().to_owned();
350
8
    call_with_permit(move |_| std::fs::symlink_metadata(path).map_err(Into::<Error>::into)).await
351
8
}
352
353
17
pub async fn remove_dir_all(path: impl AsRef<Path>) -> Result<(), Error> {
354
17
    let path = path.as_ref().to_owned();
355
17
    call_with_permit(move |_| std::fs::remove_dir_all(path).map_err(Into::<Error>::into)).await
356
17
}