Coverage Report

Created: 2024-11-22 20:17

/build/source/nativelink-store/src/filesystem_store.rs
Line
Count
Source (jump to first uncovered line)
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::borrow::Cow;
16
use std::ffi::{OsStr, OsString};
17
use std::fmt::{Debug, Formatter};
18
use std::pin::Pin;
19
use std::sync::atomic::{AtomicU64, Ordering};
20
use std::sync::{Arc, Weak};
21
use std::time::{Duration, SystemTime};
22
23
use async_lock::RwLock;
24
use async_trait::async_trait;
25
use bytes::BytesMut;
26
use filetime::{set_file_atime, FileTime};
27
use futures::stream::{StreamExt, TryStreamExt};
28
use futures::{Future, TryFutureExt};
29
use nativelink_config::stores::FilesystemSpec;
30
use nativelink_error::{make_err, make_input_err, Code, Error, ResultExt};
31
use nativelink_metric::MetricsComponent;
32
use nativelink_util::buf_channel::{
33
    make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf,
34
};
35
use nativelink_util::common::{fs, DigestInfo};
36
use nativelink_util::evicting_map::{EvictingMap, LenEntry};
37
use nativelink_util::health_utils::{HealthRegistryBuilder, HealthStatus, HealthStatusIndicator};
38
use nativelink_util::store_trait::{StoreDriver, StoreKey, StoreOptimizations, UploadSizeInfo};
39
use nativelink_util::{background_spawn, spawn_blocking};
40
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom};
41
use tokio::time::{sleep, timeout, Sleep};
42
use tokio_stream::wrappers::ReadDirStream;
43
use tracing::{event, Level};
44
45
use crate::cas_utils::is_zero_digest;
46
47
// Default size to allocate memory of the buffer when reading files.
48
const DEFAULT_BUFF_SIZE: usize = 32 * 1024;
49
// Default block size of all major filesystems is 4KB
50
const DEFAULT_BLOCK_SIZE: u64 = 4 * 1024;
51
52
0
#[derive(Debug, MetricsComponent)]
53
pub struct SharedContext {
54
    // Used in testing to know how many active drop() spawns are running.
55
    // TODO(allada) It is probably a good idea to use a spin lock during
56
    // destruction of the store to ensure that all files are actually
57
    // deleted (similar to how it is done in tests).
58
    #[metric(help = "Number of active drop spawns")]
59
    pub active_drop_spawns: AtomicU64,
60
    #[metric(help = "Path to the configured temp path")]
61
    temp_path: String,
62
    #[metric(help = "Path to the configured content path")]
63
    content_path: String,
64
}
65
66
#[derive(Eq, PartialEq, Debug)]
67
enum PathType {
68
    Content,
69
    Temp,
70
    Custom(OsString),
71
}
72
73
// Note: We don't store the full path of the file because it would cause
74
// a lot of needless memeory bloat. There's a high chance we'll end up with a
75
// lot of small files, so to prevent storing duplicate data, we store an Arc
76
// to the path of the directory where the file is stored and the packed digest.
77
// Resulting in usize + sizeof(DigestInfo).
78
type FileNameDigest = DigestInfo;
79
pub struct EncodedFilePath {
80
    shared_context: Arc<SharedContext>,
81
    path_type: PathType,
82
    digest: FileNameDigest,
83
}
84
85
impl EncodedFilePath {
86
    #[inline]
87
297
    fn get_file_path(&self) -> Cow<'_, OsStr> {
88
297
        get_file_path_raw(&self.path_type, self.shared_context.as_ref(), &self.digest)
89
297
    }
90
}
91
92
#[inline]
93
375
fn get_file_path_raw<'a>(
94
375
    path_type: &'a PathType,
95
375
    shared_context: &SharedContext,
96
375
    digest: &DigestInfo,
97
375
) -> Cow<'a, OsStr> {
98
375
    let 
folder370
= match path_type {
99
206
        PathType::Content => &shared_context.content_path,
100
164
        PathType::Temp => &shared_context.temp_path,
101
5
        PathType::Custom(path) => return Cow::Borrowed(path),
102
    };
103
370
    Cow::Owned(to_full_path_from_digest(folder, digest))
104
375
}
105
106
impl Drop for EncodedFilePath {
107
81
    fn drop(&mut self) {
108
81
        // `drop()` can be called during shutdown, so we use `path_type` flag to know if the
109
81
        // file actually needs to be deleted.
110
81
        if self.path_type == PathType::Content {
  Branch (110:12): [True: 64, False: 17]
  Branch (110:12): [Folded - Ignored]
111
64
            return;
112
17
        }
113
17
114
17
        let file_path = self.get_file_path().to_os_string();
115
17
        let shared_context = self.shared_context.clone();
116
17
        shared_context
117
17
            .active_drop_spawns
118
17
            .fetch_add(1, Ordering::Relaxed);
119
17
        background_spawn!("filesystem_delete_file", async move {
120
15
            event!(Level::INFO, ?file_path, 
"File deleted"0
,);
121
15
            let 
result13
= fs::remove_file(&file_path)
122
13
                .await
123
13
                .err_tip(|| 
format!("Failed to remove file {file_path:?}")0
);
124
13
            if let Err(
err0
) = result {
  Branch (124:20): [True: 0, False: 13]
  Branch (124:20): [Folded - Ignored]
125
0
                event!(Level::ERROR, ?file_path, ?err, "Failed to delete file",);
126
13
            }
127
13
            shared_context
128
13
                .active_drop_spawns
129
13
                .fetch_sub(1, Ordering::Relaxed);
130
17
        });
131
81
    }
132
}
133
134
#[inline]
135
387
fn to_full_path_from_digest(folder: &str, digest: &DigestInfo) -> OsString {
136
387
    format!("{folder}/{digest}").into()
137
387
}
138
139
pub trait FileEntry: LenEntry + Send + Sync + Debug + 'static {
140
    /// Responsible for creating the underlying FileEntry.
141
    fn create(data_size: u64, block_size: u64, encoded_file_path: RwLock<EncodedFilePath>) -> Self;
142
143
    /// Creates a (usually) temp file, opens it and returns the path to the temp file.
144
    fn make_and_open_file(
145
        block_size: u64,
146
        encoded_file_path: EncodedFilePath,
147
    ) -> impl Future<Output = Result<(Self, fs::ResumeableFileSlot, OsString), Error>> + Send
148
    where
149
        Self: Sized;
150
151
    /// Returns the underlying reference to the size of the data in bytes
152
    fn data_size_mut(&mut self) -> &mut u64;
153
154
    /// Returns the actual size of the underlying file on the disk after accounting for filesystem block size.
155
    fn size_on_disk(&self) -> u64;
156
157
    /// Gets the underlying EncodedfilePath.
158
    fn get_encoded_file_path(&self) -> &RwLock<EncodedFilePath>;
159
160
    /// Returns a reader that will read part of the underlying file.
161
    fn read_file_part(
162
        &self,
163
        offset: u64,
164
        length: u64,
165
    ) -> impl Future<Output = Result<fs::ResumeableFileSlot, Error>> + Send;
166
167
    /// This function is a safe way to extract the file name of the underlying file. To protect users from
168
    /// accidentally creating undefined behavior we encourage users to do the logic they need to do with
169
    /// the filename inside this function instead of extracting the filename and doing the logic outside.
170
    /// This is because the filename is not guaranteed to exist after this function returns, however inside
171
    /// the callback the file is always guaranteed to exist and immutable.
172
    /// DO NOT USE THIS FUNCTION TO EXTRACT THE FILENAME AND STORE IT FOR LATER USE.
173
    fn get_file_path_locked<
174
        T,
175
        Fut: Future<Output = Result<T, Error>> + Send,
176
        F: FnOnce(OsString) -> Fut + Send,
177
    >(
178
        &self,
179
        handler: F,
180
    ) -> impl Future<Output = Result<T, Error>> + Send;
181
}
182
183
pub struct FileEntryImpl {
184
    data_size: u64,
185
    block_size: u64,
186
    encoded_file_path: RwLock<EncodedFilePath>,
187
}
188
189
impl FileEntryImpl {
190
9
    pub fn get_shared_context_for_test(&mut self) -> Arc<SharedContext> {
191
9
        self.encoded_file_path.get_mut().shared_context.clone()
192
9
    }
193
}
194
195
impl FileEntry for FileEntryImpl {
196
81
    fn create(data_size: u64, block_size: u64, encoded_file_path: RwLock<EncodedFilePath>) -> Self {
197
81
        Self {
198
81
            data_size,
199
81
            block_size,
200
81
            encoded_file_path,
201
81
        }
202
81
    }
203
204
    /// This encapsolates the logic for the edge case of if the file fails to create
205
    /// the cleanup of the file is handled without creating a FileEntry, which would
206
    /// try to cleanup the file as well during drop().
207
73
    async fn make_and_open_file(
208
73
        block_size: u64,
209
73
        encoded_file_path: EncodedFilePath,
210
73
    ) -> Result<(FileEntryImpl, fs::ResumeableFileSlot, OsString), Error> {
211
73
        let temp_full_path = encoded_file_path.get_file_path().to_os_string();
212
73
        let temp_file_result = fs::create_file(temp_full_path.clone())
213
73
            .or_else(|mut err| async 
{0
214
0
                let remove_result = fs::remove_file(&temp_full_path).await.err_tip(|| {
215
0
                    format!("Failed to remove file {temp_full_path:?} in filesystem store")
216
0
                });
217
0
                if let Err(remove_err) = remove_result {
  Branch (217:24): [True: 0, False: 0]
  Branch (217:24): [True: 0, False: 0]
  Branch (217:24): [True: 0, False: 0]
  Branch (217:24): [Folded - Ignored]
  Branch (217:24): [True: 0, False: 0]
218
0
                    err = err.merge(remove_err);
219
0
                }
220
0
                event!(
221
0
                    Level::WARN,
222
                    ?err,
223
                    ?block_size,
224
                    ?temp_full_path,
225
0
                    "Failed to create file",
226
                );
227
0
                Err(err)
228
0
                    .err_tip(|| format!("Failed to create {temp_full_path:?} in filesystem store"))
229
73
            
}0
)
230
98
            .await
?0
;
231
232
73
        Ok((
233
73
            <FileEntryImpl as FileEntry>::create(
234
73
                0, /* Unknown yet, we will fill it in later */
235
73
                block_size,
236
73
                RwLock::new(encoded_file_path),
237
73
            ),
238
73
            temp_file_result,
239
73
            temp_full_path,
240
73
        ))
241
73
    }
242
243
73
    fn data_size_mut(&mut self) -> &mut u64 {
244
73
        &mut self.data_size
245
73
    }
246
247
154
    fn size_on_disk(&self) -> u64 {
248
154
        self.data_size.div_ceil(self.block_size) * self.block_size
249
154
    }
250
251
190
    fn get_encoded_file_path(&self) -> &RwLock<EncodedFilePath> {
252
190
        &self.encoded_file_path
253
190
    }
254
255
28
    async fn read_file_part(
256
28
        &self,
257
28
        offset: u64,
258
28
        length: u64,
259
28
    ) -> Result<fs::ResumeableFileSlot, Error> {
260
28
        let (mut file, full_content_path_for_debug_only) = self
261
28
            .get_file_path_locked(|full_content_path| async move {
262
28
                let file = fs::open_file(full_content_path.clone(), length)
263
33
                    .await
264
28
                    .err_tip(|| {
265
0
                        format!("Failed to open file in filesystem store {full_content_path:?}")
266
28
                    })
?0
;
267
28
                Ok((file, full_content_path))
268
56
            }
)28
269
33
            .await
?0
;
270
271
28
        file.as_reader()
272
0
            .await
273
28
            .err_tip(|| 
"Could not seek file in read_file_part()"0
)
?0
274
28
            .get_mut()
275
28
            .seek(SeekFrom::Start(offset))
276
29
            .await
277
28
            .err_tip(|| 
format!("Failed to seek file: {full_content_path_for_debug_only:?}")0
)
?0
;
278
28
        Ok(file)
279
28
    }
280
281
112
    async fn get_file_path_locked<
282
112
        T,
283
112
        Fut: Future<Output = Result<T, Error>> + Send,
284
112
        F: FnOnce(OsString) -> Fut + Send,
285
112
    >(
286
112
        &self,
287
112
        handler: F,
288
112
    ) -> Result<T, Error> {
289
112
        let encoded_file_path = self.get_encoded_file_path().read().
await1
;
290
139
        
handler(encoded_file_path.get_file_path().to_os_string())112
.await
291
112
    }
292
}
293
294
impl Debug for FileEntryImpl {
295
0
    fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), std::fmt::Error> {
296
0
        f.debug_struct("FileEntryImpl")
297
0
            .field("data_size", &self.data_size)
298
0
            .field("encoded_file_path", &"<behind mutex>")
299
0
            .finish()
300
0
    }
301
}
302
303
90
fn make_temp_digest(digest: &mut DigestInfo) {
304
    static DELETE_FILE_COUNTER: AtomicU64 = AtomicU64::new(0);
305
90
    let mut hash = *digest.packed_hash();
306
90
    hash[24..].clone_from_slice(
307
90
        &DELETE_FILE_COUNTER
308
90
            .fetch_add(1, Ordering::Relaxed)
309
90
            .to_le_bytes(),
310
90
    );
311
90
    digest.set_packed_hash(*hash);
312
90
}
313
314
impl LenEntry for FileEntryImpl {
315
    #[inline]
316
152
    fn len(&self) -> u64 {
317
152
        self.size_on_disk()
318
152
    }
319
320
0
    fn is_empty(&self) -> bool {
321
0
        self.data_size == 0
322
0
    }
323
324
    #[inline]
325
74
    async fn touch(&self) -> bool {
326
74
        let result = self
327
74
            .get_file_path_locked(move |full_content_path| async move {
328
74
                let full_content_path = full_content_path.clone();
329
74
                spawn_blocking!("filesystem_touch_set_mtime", move || {
330
74
                    set_file_atime(&full_content_path, FileTime::now()).err_tip(|| {
331
1
                        format!("Failed to touch file in filesystem store {full_content_path:?}")
332
74
                    })
333
74
                })
334
98
                .await
335
74
                .map_err(|e| {
336
0
                    make_err!(
337
0
                        Code::Internal,
338
0
                        "Failed to change atime of file due to spawn failing {:?}",
339
0
                        e
340
0
                    )
341
74
                })
?0
342
148
            }
)74
343
99
            .await;
344
74
        if let Err(
err1
) = result {
  Branch (344:16): [True: 0, False: 0]
  Branch (344:16): [True: 1, False: 15]
  Branch (344:16): [True: 0, False: 0]
  Branch (344:16): [True: 0, False: 4]
  Branch (344:16): [Folded - Ignored]
  Branch (344:16): [True: 0, False: 54]
345
1
            event!(Level::ERROR, ?err, "Failed to touch file",);
346
1
            return false;
347
73
        }
348
73
        true
349
74
    }
350
351
    // unref() only triggers when an item is removed from the eviction_map. It is possible
352
    // that another place in code has a reference to `FileEntryImpl` and may later read the
353
    // file. To support this edge case, we first move the file to a temp file and point
354
    // target file location to the new temp file. `unref()` should only ever be called once.
355
    #[inline]
356
18
    async fn unref(&self) {
357
        {
358
18
            let mut encoded_file_path = self.encoded_file_path.write().
await0
;
359
18
            if encoded_file_path.path_type == PathType::Temp {
  Branch (359:16): [True: 0, False: 0]
  Branch (359:16): [True: 1, False: 11]
  Branch (359:16): [True: 0, False: 0]
  Branch (359:16): [True: 0, False: 0]
  Branch (359:16): [Folded - Ignored]
  Branch (359:16): [True: 0, False: 6]
360
                // We are already a temp file that is now marked for deletion on drop.
361
                // This is very rare, but most likely the rename into the content path failed.
362
1
                return;
363
17
            }
364
17
            let from_path = encoded_file_path.get_file_path();
365
17
            let mut new_digest = encoded_file_path.digest;
366
17
            make_temp_digest(&mut new_digest);
367
17
368
17
            let to_path =
369
17
                to_full_path_from_digest(&encoded_file_path.shared_context.temp_path, &new_digest);
370
371
17
            if let Err(
err1
) = fs::rename(&from_path, &to_path).await {
  Branch (371:20): [True: 0, False: 0]
  Branch (371:20): [True: 1, False: 10]
  Branch (371:20): [True: 0, False: 0]
  Branch (371:20): [True: 0, False: 0]
  Branch (371:20): [Folded - Ignored]
  Branch (371:20): [True: 0, False: 6]
372
1
                event!(
373
1
                    Level::WARN,
374
1
                    digest = ?encoded_file_path.digest,
375
1
                    ?from_path,
376
1
                    ?to_path,
377
1
                    ?err,
378
1
                    "Failed to rename file",
379
                );
380
            } else {
381
16
                event!(
382
16
                    Level::INFO,
383
0
                    digest = ?encoded_file_path.digest,
384
0
                    ?from_path,
385
0
                    ?to_path,
386
0
                    "Renamed file",
387
                );
388
16
                encoded_file_path.path_type = PathType::Temp;
389
16
                encoded_file_path.digest = new_digest;
390
            }
391
        }
392
18
    }
393
}
394
395
#[inline]
396
4
pub fn digest_from_filename(file_name: &str) -> Result<DigestInfo, Error> {
397
4
    let (hash, size) = file_name.split_once('-').err_tip(|| 
""0
)
?0
;
398
4
    let size = size.parse::<i64>()
?0
;
399
4
    DigestInfo::try_new(hash, size)
400
4
}
401
402
/// The number of files to read the metadata for at the same time when running
403
/// add_files_to_cache.
404
const SIMULTANEOUS_METADATA_READS: usize = 200;
405
406
39
async fn add_files_to_cache<Fe: FileEntry>(
407
39
    evicting_map: &EvictingMap<DigestInfo, Arc<Fe>, SystemTime>,
408
39
    anchor_time: &SystemTime,
409
39
    shared_context: &Arc<SharedContext>,
410
39
    block_size: u64,
411
39
) -> Result<(), Error> {
412
3
    async fn process_entry<Fe: FileEntry>(
413
3
        evicting_map: &EvictingMap<DigestInfo, Arc<Fe>, SystemTime>,
414
3
        file_name: &str,
415
3
        atime: SystemTime,
416
3
        data_size: u64,
417
3
        block_size: u64,
418
3
        anchor_time: &SystemTime,
419
3
        shared_context: &Arc<SharedContext>,
420
3
    ) -> Result<(), Error> {
421
3
        let digest = digest_from_filename(file_name)
?0
;
422
423
3
        let file_entry = Fe::create(
424
3
            data_size,
425
3
            block_size,
426
3
            RwLock::new(EncodedFilePath {
427
3
                shared_context: shared_context.clone(),
428
3
                path_type: PathType::Content,
429
3
                digest,
430
3
            }),
431
3
        );
432
3
        let time_since_anchor = anchor_time
433
3
            .duration_since(atime)
434
3
            .map_err(|_| 
make_input_err!("File access time newer than now")0
)
?0
;
435
3
        evicting_map
436
3
            .insert_with_time(
437
3
                digest,
438
3
                Arc::new(file_entry),
439
3
                time_since_anchor.as_secs() as i32,
440
3
            )
441
1
            .await;
442
3
        Ok(())
443
3
    }
444
445
39
    let mut file_infos: Vec<(String, SystemTime, u64)> = {
446
39
        let (_permit, dir_handle) = fs::read_dir(format!("{}/", shared_context.content_path))
447
39
            .await
448
39
            .err_tip(|| 
"Failed opening content directory for iterating in filesystem store"0
)
?0
449
39
            .into_inner();
450
39
451
39
        let read_dir_stream = ReadDirStream::new(dir_handle);
452
39
        read_dir_stream
453
39
            .map(|dir_entry| async move {
454
3
                let dir_entry = dir_entry.unwrap();
455
3
                let file_name = dir_entry.file_name().into_string().unwrap();
456
3
                let metadata = dir_entry
457
3
                    .metadata()
458
3
                    .await
459
3
                    .err_tip(|| 
"Failed to get metadata in filesystem store"0
)
?0
;
460
3
                let atime = match metadata.accessed() {
461
3
                    Ok(atime) => atime,
462
0
                    Err(err) => {
463
0
                        panic!(
464
0
                            "{}{}{} : {} {:?}",
465
0
                            "It appears this filesystem does not support access time. ",
466
0
                            "Please configure this program to run on a drive that supports ",
467
0
                            "atime",
468
0
                            file_name,
469
0
                            err
470
0
                        );
471
                    }
472
                };
473
3
                Result::<(String, SystemTime, u64), Error>::Ok((file_name, atime, metadata.len()))
474
39
            
}6
)
475
39
            .buffer_unordered(SIMULTANEOUS_METADATA_READS)
476
39
            .try_collect()
477
4
            .await
?0
478
    };
479
480
39
    file_infos.sort_by(|a, b| 
a.1.cmp(&b.1)1
);
481
42
    for (
file_name, atime, data_size3
) in file_infos {
482
3
        let result = process_entry(
483
3
            evicting_map,
484
3
            &file_name,
485
3
            atime,
486
3
            data_size,
487
3
            block_size,
488
3
            anchor_time,
489
3
            shared_context,
490
3
        )
491
1
        .await;
492
3
        if let Err(
err0
) = result {
  Branch (492:16): [True: 0, False: 0]
  Branch (492:16): [True: 0, False: 0]
  Branch (492:16): [True: 0, False: 0]
  Branch (492:16): [True: 0, False: 0]
  Branch (492:16): [True: 0, False: 0]
  Branch (492:16): [True: 0, False: 0]
  Branch (492:16): [True: 0, False: 3]
  Branch (492:16): [True: 0, False: 0]
  Branch (492:16): [Folded - Ignored]
  Branch (492:16): [True: 0, False: 0]
493
0
            event!(
494
0
                Level::WARN,
495
                ?file_name,
496
                ?err,
497
0
                "Failed to add file to eviction cache",
498
            );
499
            // Ignore result.
500
            let _ =
501
0
                fs::remove_file(format!("{}/{}", &shared_context.content_path, &file_name)).await;
502
3
        }
503
    }
504
39
    Ok(())
505
39
}
506
507
39
async fn prune_temp_path(temp_path: &str) -> Result<(), Error> {
508
39
    let (_permit, dir_handle) = fs::read_dir(temp_path)
509
39
        .await
510
39
        .err_tip(|| 
"Failed opening temp directory to prune partial downloads in filesystem store"0
)
?0
511
39
        .into_inner();
512
39
513
39
    let mut read_dir_stream = ReadDirStream::new(dir_handle);
514
39
    while let Some(
dir_entry0
) = read_dir_stream.next().
await0
{
  Branch (514:15): [True: 0, False: 0]
  Branch (514:15): [True: 0, False: 21]
  Branch (514:15): [True: 0, False: 2]
  Branch (514:15): [Folded - Ignored]
  Branch (514:15): [True: 0, False: 16]
515
0
        let path = dir_entry?.path();
516
0
        if let Err(err) = fs::remove_file(&path).await {
  Branch (516:16): [True: 0, False: 0]
  Branch (516:16): [True: 0, False: 0]
  Branch (516:16): [True: 0, False: 0]
  Branch (516:16): [Folded - Ignored]
  Branch (516:16): [True: 0, False: 0]
517
0
            event!(Level::WARN, ?path, ?err, "Failed to delete file",);
518
0
        }
519
    }
520
39
    Ok(())
521
39
}
522
523
0
#[derive(MetricsComponent)]
524
pub struct FilesystemStore<Fe: FileEntry = FileEntryImpl> {
525
    #[metric]
526
    shared_context: Arc<SharedContext>,
527
    #[metric(group = "evicting_map")]
528
    evicting_map: Arc<EvictingMap<DigestInfo, Arc<Fe>, SystemTime>>,
529
    #[metric(help = "Block size of the configured filesystem")]
530
    block_size: u64,
531
    #[metric(help = "Size of the configured read buffer size")]
532
    read_buffer_size: usize,
533
    weak_self: Weak<Self>,
534
    sleep_fn: fn(Duration) -> Sleep,
535
    rename_fn: fn(&OsStr, &OsStr) -> Result<(), std::io::Error>,
536
}
537
538
impl<Fe: FileEntry> FilesystemStore<Fe> {
539
32
    pub async fn new(spec: &FilesystemSpec) -> Result<Arc<Self>, Error> {
540
69
        Self::new_with_timeout_and_rename_fn(spec, sleep, |from, to| std::fs::rename(from, to)
)32
541
133
            .await
542
32
    }
543
544
39
    pub async fn new_with_timeout_and_rename_fn(
545
39
        spec: &FilesystemSpec,
546
39
        sleep_fn: fn(Duration) -> Sleep,
547
39
        rename_fn: fn(&OsStr, &OsStr) -> Result<(), std::io::Error>,
548
39
    ) -> Result<Arc<Self>, Error> {
549
39
        let now = SystemTime::now();
550
39
551
39
        let empty_policy = nativelink_config::stores::EvictionPolicy::default();
552
39
        let eviction_policy = spec.eviction_policy.as_ref().unwrap_or(&empty_policy);
553
39
        let evicting_map = Arc::new(EvictingMap::new(eviction_policy, now));
554
39
555
39
        fs::create_dir_all(&spec.temp_path)
556
39
            .await
557
39
            .err_tip(|| 
format!("Failed to temp directory {:?}", &spec.temp_path)0
)
?0
;
558
39
        fs::create_dir_all(&spec.content_path)
559
39
            .await
560
39
            .err_tip(|| 
format!("Failed to content directory {:?}", &spec.content_path)0
)
?0
;
561
562
39
        let shared_context = Arc::new(SharedContext {
563
39
            active_drop_spawns: AtomicU64::new(0),
564
39
            temp_path: spec.temp_path.clone(),
565
39
            content_path: spec.content_path.clone(),
566
39
        });
567
568
39
        let block_size = if spec.block_size == 0 {
  Branch (568:29): [True: 0, False: 0]
  Branch (568:29): [True: 0, False: 1]
  Branch (568:29): [True: 0, False: 1]
  Branch (568:29): [True: 1, False: 0]
  Branch (568:29): [True: 0, False: 1]
  Branch (568:29): [True: 1, False: 0]
  Branch (568:29): [True: 15, False: 1]
  Branch (568:29): [True: 2, False: 0]
  Branch (568:29): [Folded - Ignored]
  Branch (568:29): [True: 16, False: 0]
569
35
            DEFAULT_BLOCK_SIZE
570
        } else {
571
4
            spec.block_size
572
        };
573
44
        
add_files_to_cache(evicting_map.as_ref(), &now, &shared_context, block_size)39
.await
?0
;
574
39
        prune_temp_path(&shared_context.temp_path).await
?0
;
575
576
39
        let read_buffer_size = if spec.read_buffer_size == 0 {
  Branch (576:35): [True: 0, False: 0]
  Branch (576:35): [True: 1, False: 0]
  Branch (576:35): [True: 0, False: 1]
  Branch (576:35): [True: 1, False: 0]
  Branch (576:35): [True: 0, False: 1]
  Branch (576:35): [True: 1, False: 0]
  Branch (576:35): [True: 7, False: 9]
  Branch (576:35): [True: 2, False: 0]
  Branch (576:35): [Folded - Ignored]
  Branch (576:35): [True: 16, False: 0]
577
28
            DEFAULT_BUFF_SIZE
578
        } else {
579
11
            spec.read_buffer_size as usize
580
        };
581
39
        Ok(Arc::new_cyclic(|weak_self| Self {
582
39
            shared_context,
583
39
            evicting_map,
584
39
            block_size,
585
39
            read_buffer_size,
586
39
            weak_self: weak_self.clone(),
587
39
            sleep_fn,
588
39
            rename_fn,
589
39
        }))
590
39
    }
591
592
15
    pub fn get_arc(&self) -> Option<Arc<Self>> {
593
15
        self.weak_self.upgrade()
594
15
    }
595
596
13
    pub async fn get_file_entry_for_digest(&self, digest: &DigestInfo) -> Result<Arc<Fe>, Error> {
597
13
        self.evicting_map
598
13
            .get(digest)
599
13
            .await
600
13
            .ok_or_else(|| 
make_err!(Code::NotFound, "{digest} not found in filesystem store")1
)
601
13
    }
602
603
73
    async fn update_file<'a>(
604
73
        self: Pin<&'a Self>,
605
73
        mut entry: Fe,
606
73
        mut resumeable_temp_file: fs::ResumeableFileSlot,
607
73
        final_digest: DigestInfo,
608
73
        mut reader: DropCloserReadHalf,
609
73
    ) -> Result<(), Error> {
610
73
        let mut data_size = 0;
611
        loop {
612
130
            let Ok(data_result) = timeout(fs::idle_file_descriptor_timeout(), reader.recv()).
await2
  Branch (612:17): [True: 0, False: 0]
  Branch (612:17): [True: 4, False: 0]
  Branch (612:17): [True: 4, False: 0]
  Branch (612:17): [True: 4, False: 0]
  Branch (612:17): [True: 4, False: 0]
  Branch (612:17): [True: 2, False: 0]
  Branch (612:17): [True: 32, False: 0]
  Branch (612:17): [True: 0, False: 0]
  Branch (612:17): [Folded - Ignored]
  Branch (612:17): [True: 80, False: 0]
613
            else {
614
                // In the event we timeout, we want to close the writing file, to prevent
615
                // the file descriptor left open for long periods of time.
616
                // This is needed because we wrap `fs` so only a fixed number of file
617
                // descriptors may be open at any given time. If we are streaming from
618
                // File -> File, it can cause a deadlock if the Write file is not sending
619
                // data because it is waiting for a file descriotor to open before sending data.
620
0
                resumeable_temp_file.close_file().await.err_tip(|| {
621
0
                    "Could not close file due to timeout in FileSystemStore::update_file"
622
0
                })?;
623
0
                continue;
624
            };
625
130
            let mut data = data_result.err_tip(|| 
"Failed to receive data in filesystem store"0
)
?0
;
626
130
            let data_len = data.len();
627
130
            if data_len == 0 {
  Branch (627:16): [True: 0, False: 0]
  Branch (627:16): [True: 2, False: 2]
  Branch (627:16): [True: 2, False: 2]
  Branch (627:16): [True: 2, False: 2]
  Branch (627:16): [True: 2, False: 2]
  Branch (627:16): [True: 1, False: 1]
  Branch (627:16): [True: 17, False: 15]
  Branch (627:16): [True: 0, False: 0]
  Branch (627:16): [Folded - Ignored]
  Branch (627:16): [True: 47, False: 33]
628
73
                break; // EOF.
629
57
            }
630
57
            resumeable_temp_file
631
57
                .as_writer()
632
0
                .await
633
57
                .err_tip(|| 
"in filesystem_store::update_file"0
)
?0
634
57
                .write_all_buf(&mut data)
635
0
                .await
636
57
                .err_tip(|| 
"Failed to write data into filesystem store"0
)
?0
;
637
57
            data_size += data_len as u64;
638
        }
639
640
73
        resumeable_temp_file
641
73
            .as_writer()
642
0
            .await
643
73
            .err_tip(|| 
"in filesystem_store::update_file"0
)
?0
644
73
            .as_ref()
645
73
            .sync_all()
646
198
            .await
647
73
            .err_tip(|| 
"Failed to sync_data in filesystem store"0
)
?0
;
648
649
73
        drop(resumeable_temp_file);
650
73
651
73
        *entry.data_size_mut() = data_size;
652
75
        self.emplace_file(final_digest, Arc::new(entry)).await
653
72
    }
654
655
78
    async fn emplace_file(&self, digest: DigestInfo, entry: Arc<Fe>) -> Result<(), Error> {
656
78
        // This sequence of events is quite ticky to understand due to the amount of triggers that
657
78
        // happen, async'ness of it and the locking. So here is a breakdown of what happens:
658
78
        // 1. Here will hold a write lock on any file operations of this FileEntry.
659
78
        // 2. Then insert the entry into the evicting map. This may trigger an eviction of other
660
78
        //    entries.
661
78
        // 3. Eviction triggers `unref()`, which grabs a write lock on the evicted FileEntrys
662
78
        //    during the rename.
663
78
        // 4. It should be impossible for items to be added while eviction is happening, so there
664
78
        //    should not be a deadlock possability. However, it is possible for the new FileEntry
665
78
        //    to be evicted before the file is moved into place. Eviction of the newly inserted
666
78
        //    item is not possible within the `insert()` call because the write lock inside the
667
78
        //    eviction map. If an eviction of new item happens after `insert()` but before
668
78
        //    `rename()` then we get to finish our operation because the `unref()` of the new item
669
78
        //    will be blocked on us because we currently have the lock.
670
78
        // 5. Move the file into place. Since we hold a write lock still anyone that gets our new
671
78
        //    FileEntry (which has not yet been placed on disk) will not be able to read the file's
672
78
        //    contents until we relese the lock.
673
78
        let evicting_map = self.evicting_map.clone();
674
78
        let rename_fn = self.rename_fn;
675
78
676
78
        // We need to guarantee that this will get to the end even if the parent future is dropped.
677
78
        // See: https://github.com/TraceMachina/nativelink/issues/495
678
78
        background_spawn!("filesystem_store_emplace_file", async move {
679
78
            let mut encoded_file_path = entry.get_encoded_file_path().write().
await0
;
680
78
            let final_path = get_file_path_raw(
681
78
                &PathType::Content,
682
78
                encoded_file_path.shared_context.as_ref(),
683
78
                &digest,
684
78
            );
685
78
686
78
            evicting_map.insert(digest, entry.clone()).
await18
;
687
688
78
            let from_path = encoded_file_path.get_file_path();
689
78
            // Internally tokio spawns fs commands onto a blocking thread anyways.
690
78
            // Since we are already on a blocking thread, we just need the `fs` wrapper to manage
691
78
            // an open-file permit (ensure we don't open too many files at once).
692
78
            let result = (rename_fn)(&from_path, &final_path)
693
78
                .err_tip(|| 
format!("Failed to rename temp file to final path {final_path:?}")1
);
694
695
            // In the event our move from temp file to final file fails we need to ensure we remove
696
            // the entry from our map.
697
            // Remember: At this point it is possible for another thread to have a reference to
698
            // `entry`, so we can't delete the file, only drop() should ever delete files.
699
78
            if let Err(
err1
) = result {
  Branch (699:20): [True: 0, False: 0]
  Branch (699:20): [True: 0, False: 2]
  Branch (699:20): [True: 0, False: 2]
  Branch (699:20): [True: 0, False: 2]
  Branch (699:20): [True: 0, False: 2]
  Branch (699:20): [True: 1, False: 0]
  Branch (699:20): [True: 0, False: 20]
  Branch (699:20): [True: 0, False: 0]
  Branch (699:20): [Folded - Ignored]
  Branch (699:20): [True: 0, False: 49]
700
1
                event!(
701
1
                    Level::ERROR,
702
                    ?err,
703
                    ?from_path,
704
                    ?final_path,
705
1
                    "Failed to rename file",
706
                );
707
                // Warning: To prevent deadlock we need to release our lock or during `remove_if()`
708
                // it will call `unref()`, which triggers a write-lock on `encoded_file_path`.
709
1
                drop(encoded_file_path);
710
1
                // It is possible that the item in our map is no longer the item we inserted,
711
1
                // So, we need to conditionally remove it only if the pointers are the same.
712
1
                evicting_map
713
1
                    .remove_if(&digest, |map_entry| Arc::<Fe>::ptr_eq(map_entry, &entry))
714
0
                    .await;
715
1
                return Err(err);
716
77
            }
717
77
            encoded_file_path.path_type = PathType::Content;
718
77
            encoded_file_path.digest = digest;
719
77
            Ok(())
720
78
        })
721
80
        .await
722
77
        .err_tip(|| 
"Failed to create spawn in filesystem store update_file"0
)
?0
723
77
    }
724
}
725
726
#[async_trait]
727
impl<Fe: FileEntry> StoreDriver for FilesystemStore<Fe> {
728
    async fn has_with_results(
729
        self: Pin<&Self>,
730
        keys: &[StoreKey<'_>],
731
        results: &mut [Option<u64>],
732
52
    ) -> Result<(), Error> {
733
        // TODO(allada) This is a bit of a hack to get around the lifetime issues with the
734
        // existence_cache. We need to convert the digests to owned values to be able to
735
        // insert them into the cache. In theory it should be able to elide this conversion
736
        // but it seems to be a bit tricky to get right.
737
52
        let keys: Vec<_> = keys.iter().map(|v| v.borrow().into_digest()).collect();
738
52
        self.evicting_map
739
52
            .sizes_for_keys(&keys, results, false /* peek */)
740
64
            .await;
741
        // We need to do a special pass to ensure our zero files exist.
742
        // If our results failed and the result was a zero file, we need to
743
        // create the file by spec.
744
52
        for (digest, result) in keys.iter().zip(results.iter_mut()) {
745
52
            if result.is_some() || 
!is_zero_digest(digest)17
{
  Branch (745:16): [True: 0, False: 0]
  Branch (745:36): [True: 0, False: 0]
  Branch (745:16): [True: 0, False: 0]
  Branch (745:36): [True: 0, False: 0]
  Branch (745:16): [True: 0, False: 0]
  Branch (745:36): [True: 0, False: 0]
  Branch (745:16): [True: 0, False: 0]
  Branch (745:36): [True: 0, False: 0]
  Branch (745:16): [True: 0, False: 0]
  Branch (745:36): [True: 0, False: 0]
  Branch (745:16): [True: 0, False: 1]
  Branch (745:36): [True: 1, False: 0]
  Branch (745:16): [True: 1, False: 3]
  Branch (745:36): [True: 1, False: 2]
  Branch (745:16): [True: 0, False: 0]
  Branch (745:36): [True: 0, False: 0]
  Branch (745:16): [Folded - Ignored]
  Branch (745:36): [Folded - Ignored]
  Branch (745:16): [True: 34, False: 13]
  Branch (745:36): [True: 13, False: 0]
746
50
                continue;
747
2
            }
748
2
            let (mut tx, rx) = make_buf_channel_pair();
749
2
            let send_eof_result = tx.send_eof();
750
2
            self.update(digest.into(), rx, UploadSizeInfo::ExactSize(0))
751
6
                .await
752
2
                .err_tip(|| 
format!("Failed to create zero file for key {digest}")0
)
753
2
                .merge(
754
2
                    send_eof_result
755
2
                        .err_tip(|| 
"Failed to send zero file EOF in filesystem store has"0
),
756
2
                )
?0
;
757
758
2
            *result = Some(0);
759
        }
760
52
        Ok(())
761
104
    }
762
763
    async fn update(
764
        self: Pin<&Self>,
765
        key: StoreKey<'_>,
766
        reader: DropCloserReadHalf,
767
        _upload_size: UploadSizeInfo,
768
73
    ) -> Result<(), Error> {
769
73
        let digest = key.into_digest();
770
73
        let mut temp_digest = digest;
771
73
        make_temp_digest(&mut temp_digest);
772
773
73
        let (entry, temp_file, temp_full_path) = Fe::make_and_open_file(
774
73
            self.block_size,
775
73
            EncodedFilePath {
776
73
                shared_context: self.shared_context.clone(),
777
73
                path_type: PathType::Temp,
778
73
                digest: temp_digest,
779
73
            },
780
73
        )
781
98
        .await
?0
;
782
783
73
        self.update_file(entry, temp_file, digest, reader)
784
275
            .await
785
72
            .err_tip(|| 
format!("While processing with temp file {temp_full_path:?}")1
)
786
145
    }
787
788
44
    fn optimized_for(&self, optimization: StoreOptimizations) -> bool {
789
44
        optimization == StoreOptimizations::FileUpdates
790
44
    }
791
792
    async fn update_with_whole_file(
793
        self: Pin<&Self>,
794
        key: StoreKey<'_>,
795
        mut file: fs::ResumeableFileSlot,
796
        upload_size: UploadSizeInfo,
797
5
    ) -> Result<Option<fs::ResumeableFileSlot>, Error> {
798
5
        let digest = key.into_digest();
799
5
        let path = file.get_path().as_os_str().to_os_string();
800
5
        let file_size = match upload_size {
801
5
            UploadSizeInfo::ExactSize(size) => size,
802
0
            UploadSizeInfo::MaxSize(_) => file
803
0
                .as_reader()
804
0
                .await
805
0
                .err_tip(|| {
806
0
                    format!("While getting metadata for {path:?} in update_with_whole_file")
807
0
                })?
808
0
                .get_ref()
809
0
                .as_ref()
810
0
                .metadata()
811
0
                .await
812
0
                .err_tip(|| format!("While reading metadata for {path:?}"))?
813
0
                .len(),
814
        };
815
5
        let entry = Fe::create(
816
5
            file_size,
817
5
            self.block_size,
818
5
            RwLock::new(EncodedFilePath {
819
5
                shared_context: self.shared_context.clone(),
820
5
                path_type: PathType::Custom(path),
821
5
                digest,
822
5
            }),
823
5
        );
824
5
        // We are done with the file, if we hold a reference to the file here, it could
825
5
        // result in a deadlock if `emplace_file()` also needs file descriptors.
826
5
        drop(file);
827
5
        self.emplace_file(digest, Arc::new(entry))
828
5
            .await
829
5
            .err_tip(|| 
"Could not move file into store in upload_file_to_store, maybe dest is on different volume?"0
)
?0
;
830
5
        return Ok(None);
831
10
    }
832
833
    async fn get_part(
834
        self: Pin<&Self>,
835
        key: StoreKey<'_>,
836
        writer: &mut DropCloserWriteHalf,
837
        offset: u64,
838
        length: Option<u64>,
839
34
    ) -> Result<(), Error> {
840
34
        let digest = key.into_digest();
841
34
        if is_zero_digest(digest) {
  Branch (841:12): [True: 0, False: 0]
  Branch (841:12): [True: 0, False: 0]
  Branch (841:12): [True: 0, False: 1]
  Branch (841:12): [True: 0, False: 0]
  Branch (841:12): [True: 0, False: 1]
  Branch (841:12): [True: 0, False: 0]
  Branch (841:12): [True: 1, False: 4]
  Branch (841:12): [True: 0, False: 0]
  Branch (841:12): [Folded - Ignored]
  Branch (841:12): [True: 7, False: 20]
842
8
            self.has(digest.into())
843
17
                .await
844
8
                .err_tip(|| 
"Failed to check if zero digest exists in filesystem store"0
)
?0
;
845
8
            writer
846
8
                .send_eof()
847
8
                .err_tip(|| 
"Failed to send zero EOF in filesystem store get_part"0
)
?0
;
848
8
            return Ok(());
849
26
        }
850
851
26
        let entry =
852
31
            
self.evicting_map.get(&digest)26
.await.
ok_or_else(26
|| {
853
0
                make_err!(Code::NotFound, "{digest} not found in filesystem store")
854
26
            })
?0
;
855
26
        let read_limit = length.unwrap_or(u64::MAX);
856
58
        let 
mut resumeable_temp_file26
=
entry.read_file_part(offset, read_limit)26
.await
?0
;
857
858
        loop {
859
1.09k
            let mut buf = BytesMut::with_capacity(self.read_buffer_size);
860
1.09k
            resumeable_temp_file
861
1.09k
                .as_reader()
862
0
                .await
863
1.09k
                .err_tip(|| 
"In FileSystemStore::get_part()"0
)
?0
864
1.09k
                .read_buf(&mut buf)
865
1.10k
                .await
866
1.09k
                .err_tip(|| 
"Failed to read data in filesystem store"0
)
?0
;
867
1.09k
            if buf.is_empty() {
  Branch (867:16): [True: 0, False: 0]
  Branch (867:16): [True: 0, False: 0]
  Branch (867:16): [True: 1, False: 10]
  Branch (867:16): [True: 0, False: 0]
  Branch (867:16): [True: 1, False: 10]
  Branch (867:16): [True: 0, False: 0]
  Branch (867:16): [True: 3, False: 1.02k]
  Branch (867:16): [True: 0, False: 0]
  Branch (867:16): [Folded - Ignored]
  Branch (867:16): [True: 20, False: 20]
868
25
                break; // EOF.
869
1.06k
            }
870
1.06k
            // In the event it takes a while to send the data to the client, we want to close the
871
1.06k
            // reading file, to prevent the file descriptor left open for long periods of time.
872
1.06k
            // Failing to do so might cause deadlocks if the receiver is unable to receive data
873
1.06k
            // because it is waiting for a file descriptor to open before receiving data.
874
1.06k
            // Using `ResumeableFileSlot` will re-open the file in the event it gets closed on the
875
1.06k
            // next iteration.
876
1.06k
            let buf_content = buf.freeze();
877
            loop {
878
1.06k
                let sleep_fn = (self.sleep_fn)(fs::idle_file_descriptor_timeout());
879
1.06k
                tokio::pin!(sleep_fn);
880
1.06k
                tokio::select! {
881
1.06k
                    _ = & mut (sleep_fn) => {
882
0
                        resumeable_temp_file
883
0
                            .close_file()
884
0
                            .await
885
0
                            .err_tip(|| "Could not close file due to timeout in FileSystemStore::get_part")?;
886
0
                        continue;
887
                    }
888
1.06k
                    res = writer.send(buf_content.clone()) => {
889
1.06k
                        match res {
890
1.06k
                            Ok(()) => break,
891
0
                            Err(err) => {
892
0
                                return Err(err).err_tip(|| "Failed to send chunk in filesystem store get_part");
893
                            }
894
                        }
895
                    }
896
                }
897
            }
898
        }
899
25
        writer
900
25
            .send_eof()
901
25
            .err_tip(|| 
"Filed to send EOF in filesystem store get_part"0
)
?0
;
902
903
25
        Ok(())
904
67
    }
905
906
55
    fn inner_store(&self, _digest: Option<StoreKey>) -> &dyn StoreDriver {
907
55
        self
908
55
    }
909
910
15
    fn as_any<'a>(&'a self) -> &'a (dyn std::any::Any + Sync + Send + 'static) {
911
15
        self
912
15
    }
913
914
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Sync + Send + 'static> {
915
0
        self
916
0
    }
917
918
0
    fn register_health(self: Arc<Self>, registry: &mut HealthRegistryBuilder) {
919
0
        registry.register_indicator(self);
920
0
    }
921
}
922
923
#[async_trait]
924
impl<Fe: FileEntry> HealthStatusIndicator for FilesystemStore<Fe> {
925
0
    fn get_name(&self) -> &'static str {
926
0
        "FilesystemStore"
927
0
    }
928
929
0
    async fn check_health(&self, namespace: Cow<'static, str>) -> HealthStatus {
930
0
        StoreDriver::check_health(Pin::new(self), namespace).await
931
0
    }
932
}