Coverage Report

Created: 2025-07-30 16:11

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-store/src/filesystem_store.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::fmt::{Debug, Formatter};
16
use core::pin::Pin;
17
use core::sync::atomic::{AtomicU64, Ordering};
18
use std::borrow::Cow;
19
use std::ffi::{OsStr, OsString};
20
use std::sync::{Arc, Weak};
21
use std::time::SystemTime;
22
23
use async_lock::RwLock;
24
use async_trait::async_trait;
25
use bytes::BytesMut;
26
use futures::stream::{StreamExt, TryStreamExt};
27
use futures::{Future, TryFutureExt};
28
use nativelink_config::stores::FilesystemSpec;
29
use nativelink_error::{Code, Error, ResultExt, make_err, make_input_err};
30
use nativelink_metric::MetricsComponent;
31
use nativelink_util::background_spawn;
32
use nativelink_util::buf_channel::{
33
    DropCloserReadHalf, DropCloserWriteHalf, make_buf_channel_pair,
34
};
35
use nativelink_util::common::{DigestInfo, fs};
36
use nativelink_util::evicting_map::{EvictingMap, LenEntry};
37
use nativelink_util::health_utils::{HealthRegistryBuilder, HealthStatus, HealthStatusIndicator};
38
use nativelink_util::store_trait::{
39
    StoreDriver, StoreKey, StoreKeyBorrow, StoreOptimizations, UploadSizeInfo,
40
};
41
use tokio::io::{AsyncReadExt, AsyncWriteExt, Take};
42
use tokio_stream::wrappers::ReadDirStream;
43
use tracing::{debug, error, warn};
44
45
use crate::cas_utils::is_zero_digest;
46
47
// Default size to allocate memory of the buffer when reading files.
48
const DEFAULT_BUFF_SIZE: usize = 32 * 1024;
49
// Default block size of all major filesystems is 4KB
50
const DEFAULT_BLOCK_SIZE: u64 = 4 * 1024;
51
52
pub const STR_FOLDER: &str = "s";
53
pub const DIGEST_FOLDER: &str = "d";
54
55
#[derive(Clone, Copy, Debug)]
56
pub enum FileType {
57
    Digest,
58
    String,
59
}
60
61
#[derive(Debug, MetricsComponent)]
62
pub struct SharedContext {
63
    // Used in testing to know how many active drop() spawns are running.
64
    // TODO(aaronmondal) It is probably a good idea to use a spin lock during
65
    // destruction of the store to ensure that all files are actually
66
    // deleted (similar to how it is done in tests).
67
    #[metric(help = "Number of active drop spawns")]
68
    pub active_drop_spawns: AtomicU64,
69
    #[metric(help = "Path to the configured temp path")]
70
    temp_path: String,
71
    #[metric(help = "Path to the configured content path")]
72
    content_path: String,
73
}
74
75
#[derive(Eq, PartialEq, Debug)]
76
enum PathType {
77
    Content,
78
    Temp,
79
    Custom(OsString),
80
}
81
82
/// [`EncodedFilePath`] stores the path to the file
83
/// including the context, path type and key to the file.
84
/// The whole [`StoreKey`] is stored as opposed to solely
85
/// the [`DigestInfo`] so that it is more usable for things
86
/// such as BEP -see Issue #1108
87
#[derive(Debug)]
88
pub struct EncodedFilePath {
89
    shared_context: Arc<SharedContext>,
90
    path_type: PathType,
91
    key: StoreKey<'static>,
92
}
93
94
impl EncodedFilePath {
95
    #[inline]
96
345
    fn get_file_path(&self) -> Cow<'_, OsStr> {
97
345
        get_file_path_raw(&self.path_type, self.shared_context.as_ref(), &self.key)
98
345
    }
99
}
100
101
#[inline]
102
468
fn get_file_path_raw<'a>(
103
468
    path_type: &'a PathType,
104
468
    shared_context: &SharedContext,
105
468
    key: &StoreKey<'a>,
106
468
) -> Cow<'a, OsStr> {
107
468
    let 
folder459
= match path_type {
108
204
        PathType::Content => &shared_context.content_path,
109
255
        PathType::Temp => &shared_context.temp_path,
110
9
        PathType::Custom(path) => return Cow::Borrowed(path),
111
    };
112
459
    Cow::Owned(to_full_path_from_key(folder, key))
113
468
}
114
115
impl Drop for EncodedFilePath {
116
124
    fn drop(&mut self) {
117
        // `drop()` can be called during shutdown, so we use `path_type` flag to know if the
118
        // file actually needs to be deleted.
119
124
        if self.path_type == PathType::Content {
  Branch (119:12): [True: 98, False: 26]
  Branch (119:12): [Folded - Ignored]
120
98
            return;
121
26
        }
122
123
26
        let file_path = self.get_file_path().to_os_string();
124
26
        let shared_context = self.shared_context.clone();
125
26
        shared_context
126
26
            .active_drop_spawns
127
26
            .fetch_add(1, Ordering::Relaxed);
128
26
        background_spawn!("filesystem_delete_file", async move 
{25
129
25
            let 
result23
= fs::remove_file(&file_path)
130
25
                .await
131
23
                .err_tip(|| format!(
"Failed to remove file {}"0
,
file_path.display()0
));
132
23
            if let Err(
err0
) = result {
  Branch (132:20): [True: 0, False: 23]
  Branch (132:20): [Folded - Ignored]
133
0
                error!(?file_path, ?err, "Failed to delete file",);
134
            } else {
135
23
                debug!(?file_path, "File deleted",);
136
            }
137
23
            shared_context
138
23
                .active_drop_spawns
139
23
                .fetch_sub(1, Ordering::Relaxed);
140
23
        });
141
124
    }
142
}
143
144
/// This creates the file path from the [`StoreKey`]. If
145
/// it is a string, the string, prefixed with [`STR_PREFIX`]
146
/// for backwards compatibility, is stored.
147
///
148
/// If it is a [`DigestInfo`], it is prefixed by [`DIGEST_PREFIX`]
149
/// followed by the string representation of a digest - the hash in hex,
150
/// a hyphen then the size in bytes
151
///
152
/// Previously, only the string representation of the [`DigestInfo`] was
153
/// used with no prefix
154
#[inline]
155
486
fn to_full_path_from_key(folder: &str, key: &StoreKey<'_>) -> OsString {
156
486
    match key {
157
3
        StoreKey::Str(str) => format!("{folder}/{STR_FOLDER}/{str}"),
158
483
        StoreKey::Digest(digest_info) => format!("{folder}/{DIGEST_FOLDER}/{digest_info}"),
159
    }
160
486
    .into()
161
486
}
162
163
pub trait FileEntry: LenEntry + Send + Sync + Debug + 'static {
164
    /// Responsible for creating the underlying `FileEntry`.
165
    fn create(data_size: u64, block_size: u64, encoded_file_path: RwLock<EncodedFilePath>) -> Self;
166
167
    /// Creates a (usually) temp file, opens it and returns the path to the temp file.
168
    fn make_and_open_file(
169
        block_size: u64,
170
        encoded_file_path: EncodedFilePath,
171
    ) -> impl Future<Output = Result<(Self, fs::FileSlot, OsString), Error>> + Send
172
    where
173
        Self: Sized;
174
175
    /// Returns the underlying reference to the size of the data in bytes
176
    fn data_size_mut(&mut self) -> &mut u64;
177
178
    /// Returns the actual size of the underlying file on the disk after accounting for filesystem block size.
179
    fn size_on_disk(&self) -> u64;
180
181
    /// Gets the underlying `EncodedfilePath`.
182
    fn get_encoded_file_path(&self) -> &RwLock<EncodedFilePath>;
183
184
    /// Returns a reader that will read part of the underlying file.
185
    fn read_file_part(
186
        &self,
187
        offset: u64,
188
        length: u64,
189
    ) -> impl Future<Output = Result<Take<fs::FileSlot>, Error>> + Send;
190
191
    /// This function is a safe way to extract the file name of the underlying file. To protect users from
192
    /// accidentally creating undefined behavior we encourage users to do the logic they need to do with
193
    /// the filename inside this function instead of extracting the filename and doing the logic outside.
194
    /// This is because the filename is not guaranteed to exist after this function returns, however inside
195
    /// the callback the file is always guaranteed to exist and immutable.
196
    /// DO NOT USE THIS FUNCTION TO EXTRACT THE FILENAME AND STORE IT FOR LATER USE.
197
    fn get_file_path_locked<
198
        T,
199
        Fut: Future<Output = Result<T, Error>> + Send,
200
        F: FnOnce(OsString) -> Fut + Send,
201
    >(
202
        &self,
203
        handler: F,
204
    ) -> impl Future<Output = Result<T, Error>> + Send;
205
}
206
207
pub struct FileEntryImpl {
208
    data_size: u64,
209
    block_size: u64,
210
    encoded_file_path: RwLock<EncodedFilePath>,
211
}
212
213
impl FileEntryImpl {
214
9
    pub fn get_shared_context_for_test(&mut self) -> Arc<SharedContext> {
215
9
        self.encoded_file_path.get_mut().shared_context.clone()
216
9
    }
217
}
218
219
impl FileEntry for FileEntryImpl {
220
124
    fn create(data_size: u64, block_size: u64, encoded_file_path: RwLock<EncodedFilePath>) -> Self {
221
124
        Self {
222
124
            data_size,
223
124
            block_size,
224
124
            encoded_file_path,
225
124
        }
226
124
    }
227
228
    /// This encapsulates the logic for the edge case of if the file fails to create
229
    /// the cleanup of the file is handled without creating a `FileEntry`, which would
230
    /// try to cleanup the file as well during `drop()`.
231
114
    async fn make_and_open_file(
232
114
        block_size: u64,
233
114
        encoded_file_path: EncodedFilePath,
234
114
    ) -> Result<(Self, fs::FileSlot, OsString), Error> {
235
114
        let temp_full_path = encoded_file_path.get_file_path().to_os_string();
236
114
        let temp_file_result = fs::create_file(temp_full_path.clone())
237
114
            .or_else(|mut err| async 
{0
238
0
                let remove_result = fs::remove_file(&temp_full_path).await.err_tip(|| {
239
0
                    format!(
240
0
                        "Failed to remove file {} in filesystem store",
241
0
                        temp_full_path.display()
242
                    )
243
0
                });
244
0
                if let Err(remove_err) = remove_result {
  Branch (244:24): [True: 0, False: 0]
  Branch (244:24): [True: 0, False: 0]
  Branch (244:24): [True: 0, False: 0]
  Branch (244:24): [Folded - Ignored]
  Branch (244:24): [True: 0, False: 0]
245
0
                    err = err.merge(remove_err);
246
0
                }
247
0
                warn!(?err, ?block_size, ?temp_full_path, "Failed to create file",);
248
0
                Err(err).err_tip(|| {
249
0
                    format!(
250
0
                        "Failed to create {} in filesystem store",
251
0
                        temp_full_path.display()
252
                    )
253
0
                })
254
0
            })
255
114
            .await
?0
;
256
257
114
        Ok((
258
114
            <Self as FileEntry>::create(
259
114
                0, /* Unknown yet, we will fill it in later */
260
114
                block_size,
261
114
                RwLock::new(encoded_file_path),
262
114
            ),
263
114
            temp_file_result,
264
114
            temp_full_path,
265
114
        ))
266
114
    }
267
268
114
    fn data_size_mut(&mut self) -> &mut u64 {
269
114
        &mut self.data_size
270
114
    }
271
272
254
    fn size_on_disk(&self) -> u64 {
273
254
        self.data_size.div_ceil(self.block_size) * self.block_size
274
254
    }
275
276
178
    fn get_encoded_file_path(&self) -> &RwLock<EncodedFilePath> {
277
178
        &self.encoded_file_path
278
178
    }
279
280
49
    fn read_file_part(
281
49
        &self,
282
49
        offset: u64,
283
49
        length: u64,
284
49
    ) -> impl Future<Output = Result<Take<fs::FileSlot>, Error>> + Send {
285
49
        self.get_file_path_locked(move |full_content_path| async move {
286
49
            let 
file47
= fs::open_file(&full_content_path, offset, length)
287
49
                .await
288
49
                .err_tip(|| 
{2
289
2
                    format!(
290
2
                        "Failed to open file in filesystem store {}",
291
2
                        full_content_path.display()
292
                    )
293
2
                })?;
294
47
            Ok(file)
295
98
        })
296
49
    }
297
298
55
    async fn get_file_path_locked<
299
55
        T,
300
55
        Fut: Future<Output = Result<T, Error>> + Send,
301
55
        F: FnOnce(OsString) -> Fut + Send,
302
55
    >(
303
55
        &self,
304
55
        handler: F,
305
55
    ) -> Result<T, Error> {
306
55
        let encoded_file_path = self.get_encoded_file_path().read().await;
307
55
        handler(encoded_file_path.get_file_path().to_os_string()).await
308
55
    }
309
}
310
311
impl Debug for FileEntryImpl {
312
0
    fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), core::fmt::Error> {
313
0
        f.debug_struct("FileEntryImpl")
314
0
            .field("data_size", &self.data_size)
315
0
            .field("encoded_file_path", &"<behind mutex>")
316
0
            .finish()
317
0
    }
318
}
319
320
141
fn make_temp_digest(mut digest: DigestInfo) -> DigestInfo {
321
    static DELETE_FILE_COUNTER: AtomicU64 = AtomicU64::new(0);
322
141
    let mut hash = *digest.packed_hash();
323
141
    hash[24..].clone_from_slice(
324
141
        &DELETE_FILE_COUNTER
325
141
            .fetch_add(1, Ordering::Relaxed)
326
141
            .to_le_bytes(),
327
    );
328
141
    digest.set_packed_hash(*hash);
329
141
    digest
330
141
}
331
332
141
fn make_temp_key(key: &StoreKey) -> StoreKey<'static> {
333
141
    StoreKey::Digest(make_temp_digest(key.borrow().into_digest()))
334
141
}
335
336
impl LenEntry for FileEntryImpl {
337
    #[inline]
338
252
    fn len(&self) -> u64 {
339
252
        self.size_on_disk()
340
252
    }
341
342
0
    fn is_empty(&self) -> bool {
343
0
        self.data_size == 0
344
0
    }
345
346
    // unref() only triggers when an item is removed from the eviction_map. It is possible
347
    // that another place in code has a reference to `FileEntryImpl` and may later read the
348
    // file. To support this edge case, we first move the file to a temp file and point
349
    // target file location to the new temp file. `unref()` should only ever be called once.
350
    #[inline]
351
28
    async fn unref(&self) {
352
        {
353
28
            let mut encoded_file_path = self.encoded_file_path.write().await;
354
28
            if encoded_file_path.path_type == PathType::Temp {
  Branch (354:16): [True: 0, False: 0]
  Branch (354:16): [True: 1, False: 9]
  Branch (354:16): [True: 0, False: 0]
  Branch (354:16): [True: 0, False: 0]
  Branch (354:16): [Folded - Ignored]
  Branch (354:16): [True: 0, False: 18]
355
                // We are already a temp file that is now marked for deletion on drop.
356
                // This is very rare, but most likely the rename into the content path failed.
357
1
                return;
358
27
            }
359
27
            let from_path = encoded_file_path.get_file_path();
360
27
            let new_key = make_temp_key(&encoded_file_path.key);
361
362
27
            let to_path =
363
27
                to_full_path_from_key(&encoded_file_path.shared_context.temp_path, &new_key);
364
365
27
            if let Err(
err2
) = fs::rename(&from_path, &to_path).await {
  Branch (365:20): [True: 0, False: 0]
  Branch (365:20): [True: 2, False: 7]
  Branch (365:20): [True: 0, False: 0]
  Branch (365:20): [True: 0, False: 0]
  Branch (365:20): [Folded - Ignored]
  Branch (365:20): [True: 0, False: 18]
366
2
                warn!(
367
2
                    key = ?encoded_file_path.key,
368
                    ?from_path,
369
                    ?to_path,
370
                    ?err,
371
2
                    "Failed to rename file",
372
                );
373
            } else {
374
25
                debug!(
375
25
                    key = ?encoded_file_path.key,
376
                    ?from_path,
377
                    ?to_path,
378
25
                    "Renamed file",
379
                );
380
25
                encoded_file_path.path_type = PathType::Temp;
381
25
                encoded_file_path.key = new_key;
382
            }
383
        }
384
28
    }
385
}
386
387
#[inline]
388
2
fn digest_from_filename(file_name: &str) -> Result<DigestInfo, Error> {
389
2
    let (hash, size) = file_name.split_once('-').err_tip(|| "")
?0
;
390
2
    let size = size.parse::<i64>()
?0
;
391
2
    DigestInfo::try_new(hash, size)
392
2
}
393
394
2
pub fn key_from_file(file_name: &str, file_type: FileType) -> Result<StoreKey<'_>, Error> {
395
2
    match file_type {
396
0
        FileType::String => Ok(StoreKey::new_str(file_name)),
397
2
        FileType::Digest => digest_from_filename(file_name).map(StoreKey::Digest),
398
    }
399
2
}
400
401
/// The number of files to read the metadata for at the same time when running
402
/// `add_files_to_cache`.
403
const SIMULTANEOUS_METADATA_READS: usize = 200;
404
405
43
async fn add_files_to_cache<Fe: FileEntry>(
406
43
    evicting_map: &EvictingMap<StoreKeyBorrow, Arc<Fe>, SystemTime>,
407
43
    anchor_time: &SystemTime,
408
43
    shared_context: &Arc<SharedContext>,
409
43
    block_size: u64,
410
43
    rename_fn: fn(&OsStr, &OsStr) -> Result<(), std::io::Error>,
411
43
) -> Result<(), Error> {
412
    #[expect(clippy::too_many_arguments)]
413
1
    async fn process_entry<Fe: FileEntry>(
414
1
        evicting_map: &EvictingMap<StoreKeyBorrow, Arc<Fe>, SystemTime>,
415
1
        file_name: &str,
416
1
        file_type: FileType,
417
1
        atime: SystemTime,
418
1
        data_size: u64,
419
1
        block_size: u64,
420
1
        anchor_time: &SystemTime,
421
1
        shared_context: &Arc<SharedContext>,
422
1
    ) -> Result<(), Error> {
423
1
        let key = key_from_file(file_name, file_type)
?0
;
424
425
1
        let file_entry = Fe::create(
426
1
            data_size,
427
1
            block_size,
428
1
            RwLock::new(EncodedFilePath {
429
1
                shared_context: shared_context.clone(),
430
1
                path_type: PathType::Content,
431
1
                key: key.borrow().into_owned(),
432
1
            }),
433
        );
434
1
        let time_since_anchor = anchor_time
435
1
            .duration_since(atime)
436
1
            .map_err(|_| make_input_err!("File access time newer than now"))
?0
;
437
1
        evicting_map
438
1
            .insert_with_time(
439
1
                key.into_owned().into(),
440
1
                Arc::new(file_entry),
441
1
                time_since_anchor.as_secs() as i32,
442
1
            )
443
1
            .await;
444
1
        Ok(())
445
1
    }
446
447
129
    async fn read_files(
448
129
        folder: Option<&str>,
449
129
        shared_context: &SharedContext,
450
129
    ) -> Result<Vec<(String, SystemTime, u64, bool)>, Error> {
451
        // Note: In Dec 2024 this is for backwards compatibility with the old
452
        // way files were stored on disk. Previously all files were in a single
453
        // folder regardless of the StoreKey type. This allows old versions of
454
        // nativelink file layout to be upgraded at startup time.
455
        // This logic can be removed once more time has passed.
456
129
        let read_dir = folder.map_or_else(
457
43
            || format!("{}/", shared_context.content_path),
458
86
            |folder| format!("{}/{folder}/", shared_context.content_path),
459
        );
460
461
129
        let (_permit, dir_handle) = fs::read_dir(read_dir)
462
129
            .await
463
129
            .err_tip(|| "Failed opening content directory for iterating in filesystem store")
?0
464
129
            .into_inner();
465
466
129
        let read_dir_stream = ReadDirStream::new(dir_handle);
467
129
        read_dir_stream
468
129
            .map(|dir_entry| async move 
{87
469
87
                let dir_entry = dir_entry.unwrap();
470
87
                let file_name = dir_entry.file_name().into_string().unwrap();
471
87
                let metadata = dir_entry
472
87
                    .metadata()
473
87
                    .await
474
87
                    .err_tip(|| "Failed to get metadata in filesystem store")
?0
;
475
                // We need to filter out folders - we do not want to try to cache the s and d folders.
476
87
                let is_file =
477
87
                    metadata.is_file() || !(
file_name == STR_FOLDER86
||
file_name == DIGEST_FOLDER43
);
  Branch (477:21): [True: 0, False: 0]
  Branch (477:45): [True: 0, False: 0]
  Branch (477:21): [True: 1, False: 32]
  Branch (477:45): [True: 16, False: 16]
  Branch (477:21): [True: 0, False: 4]
  Branch (477:45): [True: 2, False: 2]
  Branch (477:21): [Folded - Ignored]
  Branch (477:45): [Folded - Ignored]
  Branch (477:21): [True: 0, False: 50]
  Branch (477:45): [True: 25, False: 25]
478
                // Using access time is not perfect, but better than random. We do not update the
479
                // atime when a file is actually "touched", we rely on whatever the filesystem does
480
                // when we read the file (usually update on read).
481
87
                let atime = metadata
482
87
                    .accessed()
483
87
                    .or_else(|_| 
metadata0
.
modified0
())
484
87
                    .unwrap_or(SystemTime::UNIX_EPOCH);
485
87
                Result::<(String, SystemTime, u64, bool), Error>::Ok((
486
87
                    file_name,
487
87
                    atime,
488
87
                    metadata.len(),
489
87
                    is_file,
490
87
                ))
491
174
            })
492
129
            .buffer_unordered(SIMULTANEOUS_METADATA_READS)
493
129
            .try_collect()
494
129
            .await
495
129
    }
496
497
    /// Note: In Dec 2024 this is for backwards compatibility with the old
498
    /// way files were stored on disk. Previously all files were in a single
499
    /// folder regardless of the [`StoreKey`] type. This moves files from the old cache
500
    /// location to the new cache location, under [`DIGEST_FOLDER`].
501
43
    async fn move_old_cache(
502
43
        shared_context: &Arc<SharedContext>,
503
43
        rename_fn: fn(&OsStr, &OsStr) -> Result<(), std::io::Error>,
504
43
    ) -> Result<(), Error> {
505
43
        let file_infos = read_files(None, shared_context).await
?0
;
506
507
43
        let from_path = shared_context.content_path.to_string();
508
509
43
        let to_path = format!("{}/{DIGEST_FOLDER}", shared_context.content_path);
510
511
43
        for (
file_name0
, _, _, _) in file_infos.into_iter().filter(|x| x.3) {
512
0
            let from_file: OsString = format!("{from_path}/{file_name}").into();
513
0
            let to_file: OsString = format!("{to_path}/{file_name}").into();
514
515
0
            if let Err(err) = rename_fn(&from_file, &to_file) {
  Branch (515:20): [True: 0, False: 0]
  Branch (515:20): [True: 0, False: 0]
  Branch (515:20): [True: 0, False: 0]
  Branch (515:20): [Folded - Ignored]
  Branch (515:20): [True: 0, False: 0]
516
0
                warn!(?from_file, ?to_file, ?err, "Failed to rename file",);
517
            } else {
518
0
                debug!(?from_file, ?to_file, "Renamed file",);
519
            }
520
        }
521
43
        Ok(())
522
43
    }
523
524
86
    async fn add_files_to_cache<Fe: FileEntry>(
525
86
        evicting_map: &EvictingMap<StoreKeyBorrow, Arc<Fe>, SystemTime>,
526
86
        anchor_time: &SystemTime,
527
86
        shared_context: &Arc<SharedContext>,
528
86
        block_size: u64,
529
86
        folder: &str,
530
86
    ) -> Result<(), Error> {
531
86
        let file_infos = read_files(Some(folder), shared_context).await
?0
;
532
86
        let file_type = match folder {
533
86
            STR_FOLDER => 
FileType::String43
,
534
43
            DIGEST_FOLDER => FileType::Digest,
535
0
            _ => panic!("Invalid folder type"),
536
        };
537
538
86
        let path_root = format!("{}/{folder}", shared_context.content_path);
539
540
86
        for (
file_name1
,
atime1
,
data_size1
, _) in file_infos.into_iter().filter(|x| x.3) {
541
1
            let result = process_entry(
542
1
                evicting_map,
543
1
                &file_name,
544
1
                file_type,
545
1
                atime,
546
1
                data_size,
547
1
                block_size,
548
1
                anchor_time,
549
1
                shared_context,
550
1
            )
551
1
            .await;
552
1
            if let Err(
err0
) = result {
  Branch (552:20): [True: 0, False: 0]
  Branch (552:20): [True: 0, False: 0]
  Branch (552:20): [True: 0, False: 0]
  Branch (552:20): [True: 0, False: 0]
  Branch (552:20): [True: 0, False: 0]
  Branch (552:20): [True: 0, False: 0]
  Branch (552:20): [True: 0, False: 1]
  Branch (552:20): [True: 0, False: 0]
  Branch (552:20): [Folded - Ignored]
  Branch (552:20): [True: 0, False: 0]
553
0
                warn!(?file_name, ?err, "Failed to add file to eviction cache",);
554
                // Ignore result.
555
0
                drop(fs::remove_file(format!("{path_root}/{file_name}")).await);
556
1
            }
557
        }
558
86
        Ok(())
559
86
    }
560
561
43
    move_old_cache(shared_context, rename_fn).await
?0
;
562
563
43
    add_files_to_cache(
564
43
        evicting_map,
565
43
        anchor_time,
566
43
        shared_context,
567
43
        block_size,
568
43
        DIGEST_FOLDER,
569
43
    )
570
43
    .await
?0
;
571
572
43
    add_files_to_cache(
573
43
        evicting_map,
574
43
        anchor_time,
575
43
        shared_context,
576
43
        block_size,
577
43
        STR_FOLDER,
578
43
    )
579
43
    .await
?0
;
580
43
    Ok(())
581
43
}
582
583
43
async fn prune_temp_path(temp_path: &str) -> Result<(), Error> {
584
86
    async fn prune_temp_inner(temp_path: &str, subpath: &str) -> Result<(), Error> {
585
86
        let (_permit, dir_handle) = fs::read_dir(format!("{temp_path}/{subpath}"))
586
86
            .await
587
86
            .err_tip(
588
                || "Failed opening temp directory to prune partial downloads in filesystem store",
589
0
            )?
590
86
            .into_inner();
591
592
86
        let mut read_dir_stream = ReadDirStream::new(dir_handle);
593
86
        while let Some(
dir_entry0
) = read_dir_stream.next().await {
  Branch (593:19): [True: 0, False: 0]
  Branch (593:19): [True: 0, False: 32]
  Branch (593:19): [True: 0, False: 4]
  Branch (593:19): [Folded - Ignored]
  Branch (593:19): [True: 0, False: 50]
594
0
            let path = dir_entry?.path();
595
0
            if let Err(err) = fs::remove_file(&path).await {
  Branch (595:20): [True: 0, False: 0]
  Branch (595:20): [True: 0, False: 0]
  Branch (595:20): [True: 0, False: 0]
  Branch (595:20): [Folded - Ignored]
  Branch (595:20): [True: 0, False: 0]
596
0
                warn!(?path, ?err, "Failed to delete file",);
597
0
            }
598
        }
599
86
        Ok(())
600
86
    }
601
602
43
    prune_temp_inner(temp_path, STR_FOLDER).await
?0
;
603
43
    prune_temp_inner(temp_path, DIGEST_FOLDER).await
?0
;
604
43
    Ok(())
605
43
}
606
607
#[derive(Debug, MetricsComponent)]
608
pub struct FilesystemStore<Fe: FileEntry = FileEntryImpl> {
609
    #[metric]
610
    shared_context: Arc<SharedContext>,
611
    #[metric(group = "evicting_map")]
612
    evicting_map: Arc<EvictingMap<StoreKeyBorrow, Arc<Fe>, SystemTime>>,
613
    #[metric(help = "Block size of the configured filesystem")]
614
    block_size: u64,
615
    #[metric(help = "Size of the configured read buffer size")]
616
    read_buffer_size: usize,
617
    weak_self: Weak<Self>,
618
    rename_fn: fn(&OsStr, &OsStr) -> Result<(), std::io::Error>,
619
}
620
621
impl<Fe: FileEntry> FilesystemStore<Fe> {
622
36
    pub async fn new(spec: &FilesystemSpec) -> Result<Arc<Self>, Error> {
623
113
        
Self::new_with_timeout_and_rename_fn36
(
spec36
, |from, to| std::fs::rename(from, to)).
await36
624
36
    }
625
626
43
    pub async fn new_with_timeout_and_rename_fn(
627
43
        spec: &FilesystemSpec,
628
43
        rename_fn: fn(&OsStr, &OsStr) -> Result<(), std::io::Error>,
629
43
    ) -> Result<Arc<Self>, Error> {
630
86
        async fn create_subdirs(path: &str) -> Result<(), Error> {
631
86
            fs::create_dir_all(format!("{path}/{STR_FOLDER}"))
632
86
                .await
633
86
                .err_tip(|| format!(
"Failed to create directory {path}/{STR_FOLDER}"0
))
?0
;
634
86
            fs::create_dir_all(format!("{path}/{DIGEST_FOLDER}"))
635
86
                .await
636
86
                .err_tip(|| format!(
"Failed to create directory {path}/{DIGEST_FOLDER}"0
))
637
86
        }
638
639
43
        let now = SystemTime::now();
640
641
43
        let empty_policy = nativelink_config::stores::EvictionPolicy::default();
642
43
        let eviction_policy = spec.eviction_policy.as_ref().unwrap_or(&empty_policy);
643
43
        let evicting_map = Arc::new(EvictingMap::new(eviction_policy, now));
644
645
        // Create temp and content directories and the s and d subdirectories.
646
647
43
        create_subdirs(&spec.temp_path).await
?0
;
648
43
        create_subdirs(&spec.content_path).await
?0
;
649
650
43
        let shared_context = Arc::new(SharedContext {
651
43
            active_drop_spawns: AtomicU64::new(0),
652
43
            temp_path: spec.temp_path.clone(),
653
43
            content_path: spec.content_path.clone(),
654
43
        });
655
656
43
        let block_size = if spec.block_size == 0 {
  Branch (656:29): [True: 0, False: 0]
  Branch (656:29): [True: 0, False: 1]
  Branch (656:29): [True: 0, False: 1]
  Branch (656:29): [True: 1, False: 0]
  Branch (656:29): [True: 0, False: 1]
  Branch (656:29): [True: 1, False: 0]
  Branch (656:29): [True: 10, False: 1]
  Branch (656:29): [True: 2, False: 0]
  Branch (656:29): [Folded - Ignored]
  Branch (656:29): [True: 25, False: 0]
657
39
            DEFAULT_BLOCK_SIZE
658
        } else {
659
4
            spec.block_size
660
        };
661
43
        add_files_to_cache(
662
43
            evicting_map.as_ref(),
663
43
            &now,
664
43
            &shared_context,
665
43
            block_size,
666
43
            rename_fn,
667
43
        )
668
43
        .await
?0
;
669
43
        prune_temp_path(&shared_context.temp_path).await
?0
;
670
671
43
        let read_buffer_size = if spec.read_buffer_size == 0 {
  Branch (671:35): [True: 0, False: 0]
  Branch (671:35): [True: 1, False: 0]
  Branch (671:35): [True: 0, False: 1]
  Branch (671:35): [True: 1, False: 0]
  Branch (671:35): [True: 0, False: 1]
  Branch (671:35): [True: 1, False: 0]
  Branch (671:35): [True: 4, False: 7]
  Branch (671:35): [True: 2, False: 0]
  Branch (671:35): [Folded - Ignored]
  Branch (671:35): [True: 25, False: 0]
672
34
            DEFAULT_BUFF_SIZE
673
        } else {
674
9
            spec.read_buffer_size as usize
675
        };
676
43
        Ok(Arc::new_cyclic(|weak_self| Self {
677
43
            shared_context,
678
43
            evicting_map,
679
43
            block_size,
680
43
            read_buffer_size,
681
43
            weak_self: weak_self.clone(),
682
43
            rename_fn,
683
43
        }))
684
43
    }
685
686
26
    pub fn get_arc(&self) -> Option<Arc<Self>> {
687
26
        self.weak_self.upgrade()
688
26
    }
689
690
9
    pub async fn get_file_entry_for_digest(&self, digest: &DigestInfo) -> Result<Arc<Fe>, Error> {
691
9
        self.evicting_map
692
9
            .get::<StoreKey<'static>>(&digest.into())
693
9
            .await
694
9
            .ok_or_else(|| make_err!(
Code::NotFound0
, "{digest} not found in filesystem store. This may indicate the file was evicted due to cache pressure. Consider increasing 'max_bytes' in your filesystem store's eviction_policy configuration."))
695
9
    }
696
697
114
    async fn update_file(
698
114
        self: Pin<&Self>,
699
114
        mut entry: Fe,
700
114
        mut temp_file: fs::FileSlot,
701
114
        final_key: StoreKey<'static>,
702
114
        mut reader: DropCloserReadHalf,
703
114
    ) -> Result<(), Error> {
704
114
        let mut data_size = 0;
705
        loop {
706
193
            let mut data = reader
707
193
                .recv()
708
193
                .await
709
193
                .err_tip(|| "Failed to receive data in filesystem store")
?0
;
710
193
            let data_len = data.len();
711
193
            if data_len == 0 {
  Branch (711:16): [True: 0, False: 0]
  Branch (711:16): [True: 2, False: 2]
  Branch (711:16): [True: 2, False: 2]
  Branch (711:16): [True: 2, False: 2]
  Branch (711:16): [True: 2, False: 2]
  Branch (711:16): [True: 1, False: 1]
  Branch (711:16): [True: 13, False: 11]
  Branch (711:16): [True: 0, False: 0]
  Branch (711:16): [Folded - Ignored]
  Branch (711:16): [True: 92, False: 59]
712
114
                break; // EOF.
713
79
            }
714
79
            temp_file
715
79
                .write_all_buf(&mut data)
716
79
                .await
717
79
                .err_tip(|| "Failed to write data into filesystem store")
?0
;
718
79
            data_size += data_len as u64;
719
        }
720
721
114
        temp_file
722
114
            .as_ref()
723
114
            .sync_all()
724
114
            .await
725
114
            .err_tip(|| "Failed to sync_data in filesystem store")
?0
;
726
727
114
        drop(temp_file);
728
729
114
        *entry.data_size_mut() = data_size;
730
114
        self.emplace_file(final_key, Arc::new(entry)).await
731
113
    }
732
733
123
    async fn emplace_file(&self, key: StoreKey<'static>, entry: Arc<Fe>) -> Result<(), Error> {
734
        // This sequence of events is quite ticky to understand due to the amount of triggers that
735
        // happen, async'ness of it and the locking. So here is a breakdown of what happens:
736
        // 1. Here will hold a write lock on any file operations of this FileEntry.
737
        // 2. Then insert the entry into the evicting map. This may trigger an eviction of other
738
        //    entries.
739
        // 3. Eviction triggers `unref()`, which grabs a write lock on the evicted FileEntry
740
        //    during the rename.
741
        // 4. It should be impossible for items to be added while eviction is happening, so there
742
        //    should not be a deadlock possibility. However, it is possible for the new FileEntry
743
        //    to be evicted before the file is moved into place. Eviction of the newly inserted
744
        //    item is not possible within the `insert()` call because the write lock inside the
745
        //    eviction map. If an eviction of new item happens after `insert()` but before
746
        //    `rename()` then we get to finish our operation because the `unref()` of the new item
747
        //    will be blocked on us because we currently have the lock.
748
        // 5. Move the file into place. Since we hold a write lock still anyone that gets our new
749
        //    FileEntry (which has not yet been placed on disk) will not be able to read the file's
750
        //    contents until we release the lock.
751
123
        let evicting_map = self.evicting_map.clone();
752
123
        let rename_fn = self.rename_fn;
753
754
        // We need to guarantee that this will get to the end even if the parent future is dropped.
755
        // See: https://github.com/TraceMachina/nativelink/issues/495
756
123
        background_spawn!("filesystem_store_emplace_file", async move {
757
123
            let mut encoded_file_path = entry.get_encoded_file_path().write().await;
758
123
            let final_path = get_file_path_raw(
759
123
                &PathType::Content,
760
123
                encoded_file_path.shared_context.as_ref(),
761
123
                &key,
762
            );
763
764
123
            evicting_map
765
123
                .insert(key.borrow().into_owned().into(), entry.clone())
766
123
                .await;
767
768
123
            let from_path = encoded_file_path.get_file_path();
769
            // Internally tokio spawns fs commands onto a blocking thread anyways.
770
            // Since we are already on a blocking thread, we just need the `fs` wrapper to manage
771
            // an open-file permit (ensure we don't open too many files at once).
772
123
            let result = (rename_fn)(&from_path, &final_path).err_tip(|| 
{1
773
1
                format!(
774
1
                    "Failed to rename temp file to final path {}",
775
1
                    final_path.display()
776
                )
777
1
            });
778
779
            // In the event our move from temp file to final file fails we need to ensure we remove
780
            // the entry from our map.
781
            // Remember: At this point it is possible for another thread to have a reference to
782
            // `entry`, so we can't delete the file, only drop() should ever delete files.
783
123
            if let Err(
err1
) = result {
  Branch (783:20): [True: 0, False: 0]
  Branch (783:20): [True: 0, False: 2]
  Branch (783:20): [True: 0, False: 2]
  Branch (783:20): [True: 0, False: 2]
  Branch (783:20): [True: 0, False: 2]
  Branch (783:20): [True: 1, False: 0]
  Branch (783:20): [True: 0, False: 15]
  Branch (783:20): [True: 0, False: 0]
  Branch (783:20): [Folded - Ignored]
  Branch (783:20): [True: 0, False: 99]
784
1
                error!(?err, ?from_path, ?final_path, "Failed to rename file",);
785
                // Warning: To prevent deadlock we need to release our lock or during `remove_if()`
786
                // it will call `unref()`, which triggers a write-lock on `encoded_file_path`.
787
1
                drop(encoded_file_path);
788
                // It is possible that the item in our map is no longer the item we inserted,
789
                // So, we need to conditionally remove it only if the pointers are the same.
790
791
1
                evicting_map
792
1
                    .remove_if(&key, |map_entry| Arc::<Fe>::ptr_eq(map_entry, &entry))
793
1
                    .await;
794
1
                return Err(err);
795
122
            }
796
122
            encoded_file_path.path_type = PathType::Content;
797
122
            encoded_file_path.key = key;
798
122
            Ok(())
799
123
        })
800
123
        .await
801
122
        .err_tip(|| "Failed to create spawn in filesystem store update_file")
?0
802
122
    }
803
}
804
805
#[async_trait]
806
impl<Fe: FileEntry> StoreDriver for FilesystemStore<Fe> {
807
    async fn has_with_results(
808
        self: Pin<&Self>,
809
        keys: &[StoreKey<'_>],
810
        results: &mut [Option<u64>],
811
176
    ) -> Result<(), Error> {
812
88
        self.evicting_map
813
88
            .sizes_for_keys::<_, StoreKey<'_>, &StoreKey<'_>>(
814
88
                keys.iter(),
815
88
                results,
816
88
                false, /* peek */
817
88
            )
818
88
            .await;
819
        // We need to do a special pass to ensure our zero files exist.
820
        // If our results failed and the result was a zero file, we need to
821
        // create the file by spec.
822
88
        for (key, result) in keys.iter().zip(results.iter_mut()) {
823
88
            if result.is_some() || 
!16
is_zero_digest16
(key.borrow()) {
  Branch (823:16): [True: 0, False: 0]
  Branch (823:36): [True: 0, False: 0]
  Branch (823:16): [True: 0, False: 0]
  Branch (823:36): [True: 0, False: 0]
  Branch (823:16): [True: 0, False: 0]
  Branch (823:36): [True: 0, False: 0]
  Branch (823:16): [True: 0, False: 0]
  Branch (823:36): [True: 0, False: 0]
  Branch (823:16): [True: 0, False: 0]
  Branch (823:36): [True: 0, False: 0]
  Branch (823:16): [True: 0, False: 1]
  Branch (823:36): [True: 1, False: 0]
  Branch (823:16): [True: 1, False: 2]
  Branch (823:36): [True: 0, False: 2]
  Branch (823:16): [True: 0, False: 0]
  Branch (823:36): [True: 0, False: 0]
  Branch (823:16): [Folded - Ignored]
  Branch (823:36): [Folded - Ignored]
  Branch (823:16): [True: 71, False: 13]
  Branch (823:36): [True: 13, False: 0]
824
86
                continue;
825
2
            }
826
2
            let (mut tx, rx) = make_buf_channel_pair();
827
2
            let send_eof_result = tx.send_eof();
828
2
            self.update(key.borrow(), rx, UploadSizeInfo::ExactSize(0))
829
2
                .await
830
2
                .err_tip(|| format!(
"Failed to create zero file for key {}"0
,
key0
.
as_str0
()))
831
2
                .merge(
832
2
                    send_eof_result
833
2
                        .err_tip(|| "Failed to send zero file EOF in filesystem store has"),
834
0
                )?;
835
836
2
            *result = Some(0);
837
        }
838
88
        Ok(())
839
176
    }
840
841
    async fn update(
842
        self: Pin<&Self>,
843
        key: StoreKey<'_>,
844
        reader: DropCloserReadHalf,
845
        _upload_size: UploadSizeInfo,
846
228
    ) -> Result<(), Error> {
847
114
        let temp_key = make_temp_key(&key);
848
114
        let (entry, temp_file, temp_full_path) = Fe::make_and_open_file(
849
114
            self.block_size,
850
114
            EncodedFilePath {
851
114
                shared_context: self.shared_context.clone(),
852
114
                path_type: PathType::Temp,
853
114
                key: temp_key,
854
114
            },
855
114
        )
856
114
        .await
?0
;
857
858
114
        self.update_file(entry, temp_file, key.into_owned(), reader)
859
114
            .await
860
113
            .err_tip(|| 
{1
861
1
                format!(
862
1
                    "While processing with temp file {}",
863
1
                    temp_full_path.display()
864
                )
865
1
            })
866
227
    }
867
868
90
    fn optimized_for(&self, optimization: StoreOptimizations) -> bool {
869
90
        optimization == StoreOptimizations::FileUpdates
870
90
    }
871
872
    async fn update_with_whole_file(
873
        self: Pin<&Self>,
874
        key: StoreKey<'_>,
875
        path: OsString,
876
        file: fs::FileSlot,
877
        upload_size: UploadSizeInfo,
878
18
    ) -> Result<Option<fs::FileSlot>, Error> {
879
9
        let file_size = match upload_size {
880
9
            UploadSizeInfo::ExactSize(size) => size,
881
0
            UploadSizeInfo::MaxSize(_) => file
882
0
                .as_ref()
883
0
                .metadata()
884
0
                .await
885
0
                .err_tip(|| format!("While reading metadata for {}", path.display()))?
886
0
                .len(),
887
        };
888
9
        let entry = Fe::create(
889
9
            file_size,
890
9
            self.block_size,
891
9
            RwLock::new(EncodedFilePath {
892
9
                shared_context: self.shared_context.clone(),
893
9
                path_type: PathType::Custom(path),
894
9
                key: key.borrow().into_owned(),
895
9
            }),
896
        );
897
        // We are done with the file, if we hold a reference to the file here, it could
898
        // result in a deadlock if `emplace_file()` also needs file descriptors.
899
9
        drop(file);
900
9
        self.emplace_file(key.into_owned(), Arc::new(entry))
901
9
            .await
902
9
            .err_tip(|| "Could not move file into store in upload_file_to_store, maybe dest is on different volume?")
?0
;
903
9
        return Ok(None);
904
18
    }
905
906
    async fn get_part(
907
        self: Pin<&Self>,
908
        key: StoreKey<'_>,
909
        writer: &mut DropCloserWriteHalf,
910
        offset: u64,
911
        length: Option<u64>,
912
126
    ) -> Result<(), Error> {
913
63
        if is_zero_digest(key.borrow()) {
  Branch (913:12): [True: 0, False: 0]
  Branch (913:12): [True: 0, False: 0]
  Branch (913:12): [True: 0, False: 1]
  Branch (913:12): [True: 0, False: 0]
  Branch (913:12): [True: 0, False: 1]
  Branch (913:12): [True: 0, False: 0]
  Branch (913:12): [True: 1, False: 4]
  Branch (913:12): [True: 0, False: 0]
  Branch (913:12): [Folded - Ignored]
  Branch (913:12): [True: 15, False: 41]
914
16
            self.has(key.borrow())
915
16
                .await
916
16
                .err_tip(|| "Failed to check if zero digest exists in filesystem store")
?0
;
917
16
            writer
918
16
                .send_eof()
919
16
                .err_tip(|| "Failed to send zero EOF in filesystem store get_part")
?0
;
920
16
            return Ok(());
921
47
        }
922
47
        let entry = self.evicting_map.get(&key).await.ok_or_else(|| 
{0
923
0
            make_err!(
924
0
                Code::NotFound,
925
                "{} not found in filesystem store here",
926
0
                key.as_str()
927
            )
928
0
        })?;
929
47
        let read_limit = length.unwrap_or(u64::MAX);
930
47
        let 
mut temp_file45
= entry.read_file_part(offset, read_limit).or_else(|err| async move
{2
931
            // If the file is not found, we need to remove it from the eviction map.
932
2
            if err.code == Code::NotFound {
  Branch (932:16): [True: 0, False: 0]
  Branch (932:16): [True: 0, False: 0]
  Branch (932:16): [True: 0, False: 0]
  Branch (932:16): [True: 0, False: 0]
  Branch (932:16): [True: 0, False: 0]
  Branch (932:16): [True: 0, False: 0]
  Branch (932:16): [True: 2, False: 0]
  Branch (932:16): [True: 0, False: 0]
  Branch (932:16): [Folded - Ignored]
  Branch (932:16): [True: 0, False: 0]
933
2
                error!(
934
                    ?err,
935
                    ?key,
936
2
                    "Entry was in our map, but not found on disk. Removing from map as a precaution, but process probably need restarted."
937
                );
938
2
                self.evicting_map.remove(&key).await;
939
0
            }
940
2
            Err(err)
941
47
        
}4
).await
?2
;
942
943
        loop {
944
1.13k
            let mut buf = BytesMut::with_capacity(self.read_buffer_size);
945
1.13k
            temp_file
946
1.13k
                .read_buf(&mut buf)
947
1.13k
                .await
948
1.13k
                .err_tip(|| "Failed to read data in filesystem store")
?0
;
949
1.13k
            if buf.is_empty() {
  Branch (949:16): [True: 0, False: 0]
  Branch (949:16): [True: 0, False: 0]
  Branch (949:16): [True: 1, False: 10]
  Branch (949:16): [True: 0, False: 0]
  Branch (949:16): [True: 1, False: 10]
  Branch (949:16): [True: 0, False: 0]
  Branch (949:16): [True: 1, False: 1.02k]
  Branch (949:16): [True: 0, False: 0]
  Branch (949:16): [Folded - Ignored]
  Branch (949:16): [True: 41, False: 41]
950
44
                break; // EOF.
951
1.08k
            }
952
1.08k
            writer
953
1.08k
                .send(buf.freeze())
954
1.08k
                .await
955
1.08k
                .err_tip(|| "Failed to send chunk in filesystem store get_part")
?0
;
956
        }
957
44
        writer
958
44
            .send_eof()
959
44
            .err_tip(|| "Filed to send EOF in filesystem store get_part")
?0
;
960
961
44
        Ok(())
962
125
    }
963
964
109
    fn inner_store(&self, _digest: Option<StoreKey>) -> &dyn StoreDriver {
965
109
        self
966
109
    }
967
968
26
    fn as_any<'a>(&'a self) -> &'a (dyn core::any::Any + Sync + Send + 'static) {
969
26
        self
970
26
    }
971
972
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Sync + Send + 'static> {
973
0
        self
974
0
    }
975
976
0
    fn register_health(self: Arc<Self>, registry: &mut HealthRegistryBuilder) {
977
0
        registry.register_indicator(self);
978
0
    }
979
}
980
981
#[async_trait]
982
impl<Fe: FileEntry> HealthStatusIndicator for FilesystemStore<Fe> {
983
0
    fn get_name(&self) -> &'static str {
984
0
        "FilesystemStore"
985
0
    }
986
987
0
    async fn check_health(&self, namespace: Cow<'static, str>) -> HealthStatus {
988
0
        StoreDriver::check_health(Pin::new(self), namespace).await
989
0
    }
990
}