Coverage Report

Created: 2024-12-19 04:04

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-util/src/fs.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::fs::Metadata;
16
use std::io::IoSlice;
17
use std::path::{Path, PathBuf};
18
use std::pin::Pin;
19
use std::sync::atomic::{AtomicUsize, Ordering};
20
use std::sync::OnceLock;
21
use std::task::{Context, Poll};
22
use std::time::Duration;
23
24
use bytes::BytesMut;
25
use futures::Future;
26
use nativelink_error::{make_err, Code, Error, ResultExt};
27
/// We wrap all `tokio::fs` items in our own wrapper so we can limit the number of outstanding
28
/// open files at any given time. This will greatly reduce the chance we'll hit open file limit
29
/// issues.
30
pub use tokio::fs::DirEntry;
31
use tokio::io::{
32
    AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWrite, ReadBuf, SeekFrom, Take,
33
};
34
use tokio::sync::{Semaphore, SemaphorePermit};
35
use tokio::time::timeout;
36
use tracing::{event, Level};
37
38
use crate::spawn_blocking;
39
40
/// Default read buffer size when reading to/from disk.
41
pub const DEFAULT_READ_BUFF_SIZE: usize = 16384;
42
43
type StreamPosition = u64;
44
type BytesRemaining = u64;
45
46
#[derive(Debug)]
47
enum MaybeFileSlot {
48
    Open(Take<FileSlot>),
49
    Closed((StreamPosition, BytesRemaining)),
50
}
51
52
/// A wrapper around a generic `FileSlot`. This gives us the ability to
53
/// close a file and then resume it later. Specifically useful for cases
54
/// piping data from one location to another and one side is slow at
55
/// reading or writing the data, we can have a timeout, close the file
56
/// and then reopen it later.
57
///
58
/// Note: This wraps both files opened for read and write, so we always
59
/// need to know how the original file was opened and the location of
60
/// the file. To simplify the code significantly we always require the
61
/// file to be a `Take<FileSlot>`.
62
#[derive(Debug)]
63
pub struct ResumeableFileSlot {
64
    maybe_file_slot: MaybeFileSlot,
65
    path: PathBuf,
66
    is_write: bool,
67
}
68
69
impl ResumeableFileSlot {
70
83
    pub fn new(file: FileSlot, path: PathBuf, is_write: bool) -> Self {
71
83
        Self {
72
83
            maybe_file_slot: MaybeFileSlot::Open(file.take(u64::MAX)),
73
83
            path,
74
83
            is_write,
75
83
        }
76
83
    }
77
78
41
    pub fn new_with_take(file: Take<FileSlot>, path: PathBuf, is_write: bool) -> Self {
79
41
        Self {
80
41
            maybe_file_slot: MaybeFileSlot::Open(file),
81
41
            path,
82
41
            is_write,
83
41
        }
84
41
    }
85
86
    /// Returns the path of the file.
87
5
    pub fn get_path(&self) -> &Path {
88
5
        Path::new(&self.path)
89
5
    }
90
91
    /// Returns the current read position of a file.
92
2
    pub async fn stream_position(&mut self) -> Result<u64, Error> {
93
2
        let file_slot = match &mut self.maybe_file_slot {
94
2
            MaybeFileSlot::Open(file_slot) => file_slot,
95
0
            MaybeFileSlot::Closed((pos, _)) => return Ok(*pos),
96
        };
97
2
        file_slot
98
2
            .get_mut()
99
2
            .inner
100
2
            .stream_position()
101
2
            .await
102
2
            .err_tip(|| 
"Failed to get file position in digest_for_file"0
)
103
2
    }
104
105
8
    pub async fn close_file(&mut self) -> Result<(), Error> {
106
8
        let MaybeFileSlot::Open(file_slot) = &mut self.maybe_file_slot else {
  Branch (106:13): [True: 1, False: 0]
  Branch (106:13): [Folded - Ignored]
  Branch (106:13): [True: 0, False: 0]
  Branch (106:13): [True: 0, False: 0]
  Branch (106:13): [True: 0, False: 0]
  Branch (106:13): [True: 0, False: 0]
  Branch (106:13): [True: 0, False: 0]
  Branch (106:13): [True: 0, False: 0]
  Branch (106:13): [True: 0, False: 0]
  Branch (106:13): [True: 0, False: 0]
  Branch (106:13): [True: 7, False: 0]
  Branch (106:13): [True: 0, False: 0]
  Branch (106:13): [True: 0, False: 0]
  Branch (106:13): [Folded - Ignored]
  Branch (106:13): [True: 0, False: 0]
  Branch (106:13): [True: 0, False: 0]
  Branch (106:13): [True: 0, False: 0]
  Branch (106:13): [True: 0, False: 0]
  Branch (106:13): [True: 0, False: 0]
  Branch (106:13): [True: 0, False: 0]
  Branch (106:13): [True: 0, False: 0]
107
0
            return Ok(());
108
        };
109
8
        let position = file_slot
110
8
            .get_mut()
111
8
            .inner
112
8
            .stream_position()
113
8
            .await
114
8
            .err_tip(|| 
format!("Failed to get file position {:?}", self.path)0
)
?0
;
115
8
        self.maybe_file_slot = MaybeFileSlot::Closed((position, file_slot.limit()));
116
8
        Ok(())
117
8
    }
118
119
    #[inline]
120
1.36k
    pub async fn as_reader(&mut self) -> Result<&mut Take<FileSlot>, Error> {
121
1.36k
        let (
stream_position, bytes_remaining7
) = match self.maybe_file_slot {
122
1.35k
            MaybeFileSlot::Open(ref mut file_slot) => return Ok(file_slot),
123
7
            MaybeFileSlot::Closed(pos) => pos,
124
        };
125
7
        let permit = OPEN_FILE_SEMAPHORE
126
7
            .acquire()
127
7
            .await
128
7
            .map_err(|e| 
make_err!(Code::Internal, "Open file semaphore closed {:?}", e)0
)
?0
;
129
7
        let inner = tokio::fs::OpenOptions::new()
130
7
            .write(self.is_write)
131
7
            .read(!self.is_write)
132
7
            .open(&self.path)
133
7
            .await
134
7
            .err_tip(|| 
format!("Could not open after resume {:?}", self.path)0
)
?0
;
135
7
        let mut file_slot = FileSlot {
136
7
            _permit: permit,
137
7
            inner,
138
7
        };
139
7
        file_slot
140
7
            .inner
141
7
            .seek(SeekFrom::Start(stream_position))
142
7
            .await
143
7
            .err_tip(|| {
144
0
                format!(
145
0
                    "Failed to seek to position {stream_position} {:?}",
146
0
                    self.path
147
0
                )
148
7
            })
?0
;
149
150
7
        self.maybe_file_slot = MaybeFileSlot::Open(file_slot.take(bytes_remaining));
151
7
        match &mut self.maybe_file_slot {
152
7
            MaybeFileSlot::Open(file_slot) => Ok(file_slot),
153
0
            MaybeFileSlot::Closed(_) => unreachable!(),
154
        }
155
1.36k
    }
156
157
    #[inline]
158
149
    pub async fn as_writer(&mut self) -> Result<&mut FileSlot, Error> {
159
149
        Ok(self.as_reader().await
?0
.get_mut())
160
149
    }
161
162
    /// Utility function to read data from a handler and handles file descriptor
163
    /// timeouts. Chunk size is based on the `buf`'s capacity.
164
    /// Note: If the `handler` changes `buf`s capacity, it is responsible for reserving
165
    /// more before returning.
166
4
    pub async fn read_buf_cb<'b, T, F, Fut>(
167
4
        &'b mut self,
168
4
        (mut buf, mut state): (BytesMut, T),
169
4
        mut handler: F,
170
4
    ) -> Result<(BytesMut, T), Error>
171
4
    where
172
4
        F: (FnMut((BytesMut, T)) -> Fut) + 'b,
173
4
        Fut: Future<Output = Result<(BytesMut, T), Error>> + 'b,
174
4
    {
175
        loop {
176
71
            buf.clear();
177
71
            self.as_reader()
178
71
                .await
179
71
                .err_tip(|| 
"Could not get reader from file slot in read_buf_cb"0
)
?0
180
71
                .read_buf(&mut buf)
181
71
                .await
182
71
                .err_tip(|| 
"Could not read chunk during read_buf_cb"0
)
?0
;
183
71
            if buf.is_empty() {
  Branch (183:16): [True: 3, False: 3]
  Branch (183:16): [True: 0, False: 0]
  Branch (183:16): [True: 0, False: 0]
  Branch (183:16): [True: 0, False: 0]
  Branch (183:16): [True: 0, False: 0]
  Branch (183:16): [True: 0, False: 0]
  Branch (183:16): [True: 0, False: 0]
  Branch (183:16): [True: 0, False: 0]
  Branch (183:16): [True: 0, False: 0]
  Branch (183:16): [True: 0, False: 0]
  Branch (183:16): [True: 0, False: 0]
  Branch (183:16): [True: 0, False: 0]
  Branch (183:16): [True: 0, False: 0]
  Branch (183:16): [True: 0, False: 0]
  Branch (183:16): [True: 0, False: 0]
  Branch (183:16): [Folded - Ignored]
  Branch (183:16): [True: 1, False: 64]
  Branch (183:16): [True: 0, False: 0]
  Branch (183:16): [True: 0, False: 0]
  Branch (183:16): [True: 0, False: 0]
  Branch (183:16): [True: 0, False: 0]
  Branch (183:16): [True: 0, False: 0]
  Branch (183:16): [True: 0, False: 0]
  Branch (183:16): [True: 0, False: 0]
  Branch (183:16): [True: 0, False: 0]
  Branch (183:16): [True: 0, False: 0]
  Branch (183:16): [True: 0, False: 0]
  Branch (183:16): [True: 0, False: 0]
  Branch (183:16): [True: 0, False: 0]
  Branch (183:16): [True: 0, False: 0]
  Branch (183:16): [True: 0, False: 0]
  Branch (183:16): [True: 0, False: 0]
  Branch (183:16): [Folded - Ignored]
  Branch (183:16): [True: 0, False: 0]
  Branch (183:16): [True: 0, False: 0]
  Branch (183:16): [True: 0, False: 0]
  Branch (183:16): [True: 0, False: 0]
  Branch (183:16): [True: 0, False: 0]
  Branch (183:16): [True: 0, False: 0]
  Branch (183:16): [True: 0, False: 0]
  Branch (183:16): [True: 0, False: 0]
  Branch (183:16): [True: 0, False: 0]
  Branch (183:16): [True: 0, False: 0]
  Branch (183:16): [True: 0, False: 0]
184
4
                return Ok((buf, state));
185
67
            }
186
67
            let handler_fut = handler((buf, state));
187
67
            tokio::pin!(handler_fut);
188
            loop {
189
67
                match timeout(idle_file_descriptor_timeout(), &mut handler_fut).await {
190
67
                    Ok(Ok(output)) => {
191
67
                        (buf, state) = output;
192
67
                        break;
193
                    }
194
0
                    Ok(Err(err)) => {
195
0
                        return Err(err).err_tip(|| "read_buf_cb's handler returned an error")
196
                    }
197
                    Err(_) => {
198
0
                        self.close_file()
199
0
                            .await
200
0
                            .err_tip(|| "Could not close file due to timeout in read_buf_cb")?;
201
0
                        continue;
202
                    }
203
                }
204
            }
205
        }
206
4
    }
207
}
208
209
#[derive(Debug)]
210
pub struct FileSlot {
211
    // We hold the permit because once it is dropped it goes back into the queue.
212
    _permit: SemaphorePermit<'static>,
213
    inner: tokio::fs::File,
214
}
215
216
impl AsRef<tokio::fs::File> for FileSlot {
217
78
    fn as_ref(&self) -> &tokio::fs::File {
218
78
        &self.inner
219
78
    }
220
}
221
222
impl AsMut<tokio::fs::File> for FileSlot {
223
7
    fn as_mut(&mut self) -> &mut tokio::fs::File {
224
7
        &mut self.inner
225
7
    }
226
}
227
228
impl AsyncRead for FileSlot {
229
2.40k
    fn poll_read(
230
2.40k
        mut self: Pin<&mut Self>,
231
2.40k
        cx: &mut Context<'_>,
232
2.40k
        buf: &mut ReadBuf<'_>,
233
2.40k
    ) -> Poll<Result<(), tokio::io::Error>> {
234
2.40k
        Pin::new(&mut self.inner).poll_read(cx, buf)
235
2.40k
    }
236
}
237
238
impl AsyncSeek for FileSlot {
239
37
    fn start_seek(mut self: Pin<&mut Self>, position: SeekFrom) -> Result<(), tokio::io::Error> {
240
37
        Pin::new(&mut self.inner).start_seek(position)
241
37
    }
242
243
112
    fn poll_complete(
244
112
        mut self: Pin<&mut Self>,
245
112
        cx: &mut Context<'_>,
246
112
    ) -> Poll<Result<u64, tokio::io::Error>> {
247
112
        Pin::new(&mut self.inner).poll_complete(cx)
248
112
    }
249
}
250
251
impl AsyncWrite for FileSlot {
252
8
    fn poll_write(
253
8
        mut self: Pin<&mut Self>,
254
8
        cx: &mut Context<'_>,
255
8
        buf: &[u8],
256
8
    ) -> Poll<Result<usize, tokio::io::Error>> {
257
8
        Pin::new(&mut self.inner).poll_write(cx, buf)
258
8
    }
259
260
0
    fn poll_flush(
261
0
        mut self: Pin<&mut Self>,
262
0
        cx: &mut Context<'_>,
263
0
    ) -> Poll<Result<(), tokio::io::Error>> {
264
0
        Pin::new(&mut self.inner).poll_flush(cx)
265
0
    }
266
267
0
    fn poll_shutdown(
268
0
        mut self: Pin<&mut Self>,
269
0
        cx: &mut Context<'_>,
270
0
    ) -> Poll<Result<(), tokio::io::Error>> {
271
0
        Pin::new(&mut self.inner).poll_shutdown(cx)
272
0
    }
273
274
58
    fn poll_write_vectored(
275
58
        mut self: Pin<&mut Self>,
276
58
        cx: &mut Context<'_>,
277
58
        bufs: &[IoSlice<'_>],
278
58
    ) -> Poll<Result<usize, tokio::io::Error>> {
279
58
        Pin::new(&mut self.inner).poll_write_vectored(cx, bufs)
280
58
    }
281
282
58
    fn is_write_vectored(&self) -> bool {
283
58
        self.inner.is_write_vectored()
284
58
    }
285
}
286
287
const DEFAULT_OPEN_FILE_PERMITS: usize = 10;
288
static TOTAL_FILE_SEMAPHORES: AtomicUsize = AtomicUsize::new(DEFAULT_OPEN_FILE_PERMITS);
289
pub static OPEN_FILE_SEMAPHORE: Semaphore = Semaphore::const_new(DEFAULT_OPEN_FILE_PERMITS);
290
291
/// Try to acquire a permit from the open file semaphore.
292
#[inline]
293
608
pub async fn get_permit() -> Result<SemaphorePermit<'static>, Error> {
294
608
    OPEN_FILE_SEMAPHORE
295
608
        .acquire()
296
608
        .await
297
608
        .map_err(|e| 
make_err!(Code::Internal, "Open file semaphore closed {:?}", e)0
)
298
608
}
299
/// Acquire a permit from the open file semaphore and call a raw function.
300
#[inline]
301
590
pub async fn call_with_permit<F, T>(f: F) -> Result<T, Error>
302
590
where
303
590
    F: FnOnce(SemaphorePermit<'static>) -> Result<T, Error> + Send + 'static,
304
590
    T: Send + 'static,
305
590
{
306
590
    let permit = get_permit().await
?0
;
307
590
    spawn_blocking!("fs_call_with_permit", move || f(permit))
308
590
        .await
309
588
        .unwrap_or_else(|e| 
Err(make_err!(Code::Internal, "background task failed: {e:?}"))0
)
310
588
}
311
312
0
pub fn set_open_file_limit(limit: usize) {
313
0
    let current_total = TOTAL_FILE_SEMAPHORES.load(Ordering::Acquire);
314
0
    if limit < current_total {
  Branch (314:8): [True: 0, False: 0]
  Branch (314:8): [Folded - Ignored]
315
0
        event!(
316
0
            Level::ERROR,
317
0
            "set_open_file_limit({}) must be greater than {}",
318
            limit,
319
            current_total
320
        );
321
0
        return;
322
0
    }
323
0
    TOTAL_FILE_SEMAPHORES.fetch_add(limit - current_total, Ordering::Release);
324
0
    OPEN_FILE_SEMAPHORE.add_permits(limit - current_total);
325
0
}
326
327
13
pub fn get_open_files_for_test() -> usize {
328
13
    TOTAL_FILE_SEMAPHORES.load(Ordering::Acquire) - OPEN_FILE_SEMAPHORE.available_permits()
329
13
}
330
331
/// How long a file descriptor can be open without being used before it is closed.
332
static IDLE_FILE_DESCRIPTOR_TIMEOUT: OnceLock<Duration> = OnceLock::new();
333
334
1.27k
pub fn idle_file_descriptor_timeout() -> Duration {
335
1.27k
    *IDLE_FILE_DESCRIPTOR_TIMEOUT.get_or_init(|| 
Duration::MAX0
)
336
1.27k
}
337
338
/// Set the idle file descriptor timeout. This is the amount of time
339
/// a file descriptor can be open without being used before it is closed.
340
301
pub fn set_idle_file_descriptor_timeout(timeout: Duration) -> Result<(), Error> {
341
301
    IDLE_FILE_DESCRIPTOR_TIMEOUT
342
301
        .set(timeout)
343
301
        .map_err(|_| 
make_err!(Code::Internal, "idle_file_descriptor_timeout already set")263
)
344
301
}
345
346
41
pub async fn open_file(path: impl AsRef<Path>, limit: u64) -> Result<ResumeableFileSlot, Error> {
347
41
    let path = path.as_ref().to_owned();
348
41
    let (permit, os_file, path) = call_with_permit(move |permit| {
349
41
        Ok((
350
41
            permit,
351
41
            std::fs::File::open(&path).err_tip(|| 
format!("Could not open {path:?}")0
)
?0
,
352
41
            path,
353
        ))
354
41
    })
355
41
    .await
?0
;
356
41
    Ok(ResumeableFileSlot::new_with_take(
357
41
        FileSlot {
358
41
            _permit: permit,
359
41
            inner: tokio::fs::File::from_std(os_file),
360
41
        }
361
41
        .take(limit),
362
41
        path,
363
41
        false, /* is_write */
364
41
    ))
365
41
}
366
367
83
pub async fn create_file(path: impl AsRef<Path>) -> Result<ResumeableFileSlot, Error> {
368
83
    let path = path.as_ref().to_owned();
369
83
    let (permit, os_file, path) = call_with_permit(move |permit| {
370
83
        Ok((
371
83
            permit,
372
83
            std::fs::File::options()
373
83
                .read(true)
374
83
                .write(true)
375
83
                .create(true)
376
83
                .truncate(true)
377
83
                .open(&path)
378
83
                .err_tip(|| 
format!("Could not open {path:?}")0
)
?0
,
379
83
            path,
380
        ))
381
83
    })
382
83
    .await
?0
;
383
83
    Ok(ResumeableFileSlot::new(
384
83
        FileSlot {
385
83
            _permit: permit,
386
83
            inner: tokio::fs::File::from_std(os_file),
387
83
        },
388
83
        path,
389
83
        true, /* is_write */
390
83
    ))
391
83
}
392
393
4
pub async fn hard_link(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> Result<(), Error> {
394
4
    let src = src.as_ref().to_owned();
395
4
    let dst = dst.as_ref().to_owned();
396
4
    call_with_permit(move |_| std::fs::hard_link(src, dst).map_err(Into::<Error>::into)).await
397
4
}
398
399
1
pub async fn set_permissions(
400
1
    src: impl AsRef<Path>,
401
1
    perm: std::fs::Permissions,
402
1
) -> Result<(), Error> {
403
1
    let src = src.as_ref().to_owned();
404
1
    call_with_permit(move |_| std::fs::set_permissions(src, perm).map_err(Into::<Error>::into))
405
1
        .await
406
1
}
407
408
20
pub async fn create_dir(path: impl AsRef<Path>) -> Result<(), Error> {
409
20
    let path = path.as_ref().to_owned();
410
20
    call_with_permit(move |_| std::fs::create_dir(path).map_err(Into::<Error>::into)).await
411
20
}
412
413
177
pub async fn create_dir_all(path: impl AsRef<Path>) -> Result<(), Error> {
414
177
    let path = path.as_ref().to_owned();
415
177
    call_with_permit(move |_| std::fs::create_dir_all(path).map_err(Into::<Error>::into)).await
416
177
}
417
418
#[cfg(target_family = "unix")]
419
1
pub async fn symlink(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> Result<(), Error> {
420
1
    let src = src.as_ref().to_owned();
421
1
    let dst = dst.as_ref().to_owned();
422
1
    call_with_permit(move |_| {
423
1
        tokio::runtime::Handle::current()
424
1
            .block_on(tokio::fs::symlink(src, dst))
425
1
            .map_err(Into::<Error>::into)
426
1
    })
427
1
    .await
428
1
}
429
430
0
pub async fn read_link(path: impl AsRef<Path>) -> Result<std::path::PathBuf, Error> {
431
0
    let path = path.as_ref().to_owned();
432
0
    call_with_permit(move |_| std::fs::read_link(path).map_err(Into::<Error>::into)).await
433
0
}
434
435
pub struct ReadDir {
436
    // We hold the permit because once it is dropped it goes back into the queue.
437
    permit: SemaphorePermit<'static>,
438
    inner: tokio::fs::ReadDir,
439
}
440
441
impl ReadDir {
442
202
    pub fn into_inner(self) -> (SemaphorePermit<'static>, tokio::fs::ReadDir) {
443
202
        (self.permit, self.inner)
444
202
    }
445
}
446
447
impl AsRef<tokio::fs::ReadDir> for ReadDir {
448
0
    fn as_ref(&self) -> &tokio::fs::ReadDir {
449
0
        &self.inner
450
0
    }
451
}
452
453
impl AsMut<tokio::fs::ReadDir> for ReadDir {
454
2
    fn as_mut(&mut self) -> &mut tokio::fs::ReadDir {
455
2
        &mut self.inner
456
2
    }
457
}
458
459
204
pub async fn read_dir(path: impl AsRef<Path>) -> Result<ReadDir, Error> {
460
204
    let path = path.as_ref().to_owned();
461
204
    let (permit, inner) = call_with_permit(move |permit| {
462
204
        Ok((
463
204
            permit,
464
204
            tokio::runtime::Handle::current()
465
204
                .block_on(tokio::fs::read_dir(path))
466
204
                .map_err(Into::<Error>::into)
?0
,
467
        ))
468
204
    })
469
204
    .await
?0
;
470
204
    Ok(ReadDir { permit, inner })
471
204
}
472
473
17
pub async fn rename(from: impl AsRef<Path>, to: impl AsRef<Path>) -> Result<(), Error> {
474
17
    let from = from.as_ref().to_owned();
475
17
    let to = to.as_ref().to_owned();
476
17
    call_with_permit(move |_| std::fs::rename(from, to).map_err(Into::<Error>::into)).await
477
17
}
478
479
14
pub async fn remove_file(path: impl AsRef<Path>) -> Result<(), Error> {
480
14
    let path = path.as_ref().to_owned();
481
14
    call_with_permit(move |_| std::fs::remove_file(path).map_err(Into::<Error>::into)).await
482
12
}
483
484
2
pub async fn canonicalize(path: impl AsRef<Path>) -> Result<PathBuf, Error> {
485
2
    let path = path.as_ref().to_owned();
486
2
    call_with_permit(move |_| std::fs::canonicalize(path).map_err(Into::<Error>::into)).await
487
2
}
488
489
10
pub async fn metadata(path: impl AsRef<Path>) -> Result<Metadata, Error> {
490
10
    let path = path.as_ref().to_owned();
491
10
    call_with_permit(move |_| std::fs::metadata(path).map_err(Into::<Error>::into)).await
492
10
}
493
494
4
pub async fn read(path: impl AsRef<Path>) -> Result<Vec<u8>, Error> {
495
4
    let path = path.as_ref().to_owned();
496
4
    call_with_permit(move |_| std::fs::read(path).map_err(Into::<Error>::into)).await
497
4
}
498
499
3
pub async fn symlink_metadata(path: impl AsRef<Path>) -> Result<Metadata, Error> {
500
3
    let path = path.as_ref().to_owned();
501
3
    call_with_permit(move |_| std::fs::symlink_metadata(path).map_err(Into::<Error>::into)).await
502
3
}
503
504
9
pub async fn remove_dir_all(path: impl AsRef<Path>) -> Result<(), Error> {
505
9
    let path = path.as_ref().to_owned();
506
9
    call_with_permit(move |_| std::fs::remove_dir_all(path).map_err(Into::<Error>::into)).await
507
9
}