Coverage Report

Created: 2025-10-28 18:43

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-store/src/filesystem_store.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::fmt::{Debug, Formatter};
16
use core::pin::Pin;
17
use core::sync::atomic::{AtomicU64, Ordering};
18
use std::borrow::Cow;
19
use std::ffi::{OsStr, OsString};
20
use std::sync::{Arc, Weak};
21
use std::time::SystemTime;
22
23
use async_lock::RwLock;
24
use async_trait::async_trait;
25
use bytes::BytesMut;
26
use futures::stream::{StreamExt, TryStreamExt};
27
use futures::{Future, TryFutureExt};
28
use nativelink_config::stores::FilesystemSpec;
29
use nativelink_error::{Code, Error, ResultExt, make_err, make_input_err};
30
use nativelink_metric::MetricsComponent;
31
use nativelink_util::background_spawn;
32
use nativelink_util::buf_channel::{
33
    DropCloserReadHalf, DropCloserWriteHalf, make_buf_channel_pair,
34
};
35
use nativelink_util::common::{DigestInfo, fs};
36
use nativelink_util::evicting_map::{EvictingMap, LenEntry};
37
use nativelink_util::health_utils::{HealthRegistryBuilder, HealthStatus, HealthStatusIndicator};
38
use nativelink_util::store_trait::{
39
    RemoveItemCallback, StoreDriver, StoreKey, StoreKeyBorrow, StoreOptimizations, UploadSizeInfo,
40
};
41
use tokio::io::{AsyncReadExt, AsyncWriteExt, Take};
42
use tokio_stream::wrappers::ReadDirStream;
43
use tracing::{debug, error, warn};
44
45
use crate::callback_utils::RemoveItemCallbackHolder;
46
use crate::cas_utils::is_zero_digest;
47
48
// Default size to allocate memory of the buffer when reading files.
49
const DEFAULT_BUFF_SIZE: usize = 32 * 1024;
50
// Default block size of all major filesystems is 4KB
51
const DEFAULT_BLOCK_SIZE: u64 = 4 * 1024;
52
53
pub const STR_FOLDER: &str = "s";
54
pub const DIGEST_FOLDER: &str = "d";
55
56
#[derive(Clone, Copy, Debug)]
57
pub enum FileType {
58
    Digest,
59
    String,
60
}
61
62
#[derive(Debug, MetricsComponent)]
63
pub struct SharedContext {
64
    // Used in testing to know how many active drop() spawns are running.
65
    // TODO(palfrey) It is probably a good idea to use a spin lock during
66
    // destruction of the store to ensure that all files are actually
67
    // deleted (similar to how it is done in tests).
68
    #[metric(help = "Number of active drop spawns")]
69
    pub active_drop_spawns: AtomicU64,
70
    #[metric(help = "Path to the configured temp path")]
71
    temp_path: String,
72
    #[metric(help = "Path to the configured content path")]
73
    content_path: String,
74
}
75
76
#[derive(Eq, PartialEq, Debug)]
77
enum PathType {
78
    Content,
79
    Temp,
80
    Custom(OsString),
81
}
82
83
/// [`EncodedFilePath`] stores the path to the file
84
/// including the context, path type and key to the file.
85
/// The whole [`StoreKey`] is stored as opposed to solely
86
/// the [`DigestInfo`] so that it is more usable for things
87
/// such as BEP -see Issue #1108
88
#[derive(Debug)]
89
pub struct EncodedFilePath {
90
    shared_context: Arc<SharedContext>,
91
    path_type: PathType,
92
    key: StoreKey<'static>,
93
}
94
95
impl EncodedFilePath {
96
    #[inline]
97
348
    fn get_file_path(&self) -> Cow<'_, OsStr> {
98
348
        get_file_path_raw(&self.path_type, self.shared_context.as_ref(), &self.key)
99
348
    }
100
}
101
102
#[inline]
103
473
fn get_file_path_raw<'a>(
104
473
    path_type: &'a PathType,
105
473
    shared_context: &SharedContext,
106
473
    key: &StoreKey<'a>,
107
473
) -> Cow<'a, OsStr> {
108
473
    let 
folder466
= match path_type {
109
205
        PathType::Content => &shared_context.content_path,
110
261
        PathType::Temp => &shared_context.temp_path,
111
7
        PathType::Custom(path) => return Cow::Borrowed(path),
112
    };
113
466
    Cow::Owned(to_full_path_from_key(folder, key))
114
473
}
115
116
impl Drop for EncodedFilePath {
117
126
    fn drop(&mut self) {
118
        // `drop()` can be called during shutdown, so we use `path_type` flag to know if the
119
        // file actually needs to be deleted.
120
126
        if self.path_type == PathType::Content {
  Branch (120:12): [True: 102, False: 24]
  Branch (120:12): [Folded - Ignored]
121
102
            return;
122
24
        }
123
124
24
        let file_path = self.get_file_path().to_os_string();
125
24
        let shared_context = self.shared_context.clone();
126
        // .fetch_add returns previous value, so we add one to get approximate current value
127
24
        let current_active_drop_spawns = shared_context
128
24
            .active_drop_spawns
129
24
            .fetch_add(1, Ordering::Relaxed)
130
24
            + 1;
131
24
        debug!(
132
            ?current_active_drop_spawns,
133
24
            "Spawned a filesystem_delete_file"
134
        );
135
24
        background_spawn!("filesystem_delete_file", async move 
{23
136
23
            let 
result21
= fs::remove_file(&file_path)
137
23
                .await
138
21
                .err_tip(|| format!(
"Failed to remove file {}"0
,
file_path.display()0
));
139
21
            if let Err(
err0
) = result {
  Branch (139:20): [True: 0, False: 21]
  Branch (139:20): [Folded - Ignored]
140
0
                error!(?file_path, ?err, "Failed to delete file",);
141
            } else {
142
21
                debug!(?file_path, "File deleted",);
143
            }
144
            // .fetch_sub returns previous value, so we subtract one to get approximate current value
145
21
            let current_active_drop_spawns = shared_context
146
21
                .active_drop_spawns
147
21
                .fetch_sub(1, Ordering::Relaxed)
148
21
                - 1;
149
21
            debug!(
150
                ?current_active_drop_spawns,
151
21
                "Dropped a filesystem_delete_file"
152
            );
153
21
        });
154
126
    }
155
}
156
157
/// This creates the file path from the [`StoreKey`]. If
158
/// it is a string, the string, prefixed with [`STR_PREFIX`]
159
/// for backwards compatibility, is stored.
160
///
161
/// If it is a [`DigestInfo`], it is prefixed by [`DIGEST_PREFIX`]
162
/// followed by the string representation of a digest - the hash in hex,
163
/// a hyphen then the size in bytes
164
///
165
/// Previously, only the string representation of the [`DigestInfo`] was
166
/// used with no prefix
167
#[inline]
168
491
fn to_full_path_from_key(folder: &str, key: &StoreKey<'_>) -> OsString {
169
491
    match key {
170
3
        StoreKey::Str(str) => format!("{folder}/{STR_FOLDER}/{str}"),
171
488
        StoreKey::Digest(digest_info) => format!("{folder}/{DIGEST_FOLDER}/{digest_info}"),
172
    }
173
491
    .into()
174
491
}
175
176
pub trait FileEntry: LenEntry + Send + Sync + Debug + 'static {
177
    /// Responsible for creating the underlying `FileEntry`.
178
    fn create(data_size: u64, block_size: u64, encoded_file_path: RwLock<EncodedFilePath>) -> Self;
179
180
    /// Creates a (usually) temp file, opens it and returns the path to the temp file.
181
    fn make_and_open_file(
182
        block_size: u64,
183
        encoded_file_path: EncodedFilePath,
184
    ) -> impl Future<Output = Result<(Self, fs::FileSlot, OsString), Error>> + Send
185
    where
186
        Self: Sized;
187
188
    /// Returns the underlying reference to the size of the data in bytes
189
    fn data_size_mut(&mut self) -> &mut u64;
190
191
    /// Returns the actual size of the underlying file on the disk after accounting for filesystem block size.
192
    fn size_on_disk(&self) -> u64;
193
194
    /// Gets the underlying `EncodedfilePath`.
195
    fn get_encoded_file_path(&self) -> &RwLock<EncodedFilePath>;
196
197
    /// Returns a reader that will read part of the underlying file.
198
    fn read_file_part(
199
        &self,
200
        offset: u64,
201
        length: u64,
202
    ) -> impl Future<Output = Result<Take<fs::FileSlot>, Error>> + Send;
203
204
    /// This function is a safe way to extract the file name of the underlying file. To protect users from
205
    /// accidentally creating undefined behavior we encourage users to do the logic they need to do with
206
    /// the filename inside this function instead of extracting the filename and doing the logic outside.
207
    /// This is because the filename is not guaranteed to exist after this function returns, however inside
208
    /// the callback the file is always guaranteed to exist and immutable.
209
    /// DO NOT USE THIS FUNCTION TO EXTRACT THE FILENAME AND STORE IT FOR LATER USE.
210
    fn get_file_path_locked<
211
        T,
212
        Fut: Future<Output = Result<T, Error>> + Send,
213
        F: FnOnce(OsString) -> Fut + Send,
214
    >(
215
        &self,
216
        handler: F,
217
    ) -> impl Future<Output = Result<T, Error>> + Send;
218
}
219
220
pub struct FileEntryImpl {
221
    data_size: u64,
222
    block_size: u64,
223
    encoded_file_path: RwLock<EncodedFilePath>,
224
}
225
226
impl FileEntryImpl {
227
11
    pub fn get_shared_context_for_test(&mut self) -> Arc<SharedContext> {
228
11
        self.encoded_file_path.get_mut().shared_context.clone()
229
11
    }
230
}
231
232
impl FileEntry for FileEntryImpl {
233
126
    fn create(data_size: u64, block_size: u64, encoded_file_path: RwLock<EncodedFilePath>) -> Self {
234
126
        Self {
235
126
            data_size,
236
126
            block_size,
237
126
            encoded_file_path,
238
126
        }
239
126
    }
240
241
    /// This encapsulates the logic for the edge case of if the file fails to create
242
    /// the cleanup of the file is handled without creating a `FileEntry`, which would
243
    /// try to cleanup the file as well during `drop()`.
244
118
    async fn make_and_open_file(
245
118
        block_size: u64,
246
118
        encoded_file_path: EncodedFilePath,
247
118
    ) -> Result<(Self, fs::FileSlot, OsString), Error> {
248
118
        let temp_full_path = encoded_file_path.get_file_path().to_os_string();
249
118
        let temp_file_result = fs::create_file(temp_full_path.clone())
250
118
            .or_else(|mut err| async 
{0
251
0
                let remove_result = fs::remove_file(&temp_full_path).await.err_tip(|| {
252
0
                    format!(
253
0
                        "Failed to remove file {} in filesystem store",
254
0
                        temp_full_path.display()
255
                    )
256
0
                });
257
0
                if let Err(remove_err) = remove_result {
  Branch (257:24): [True: 0, False: 0]
  Branch (257:24): [True: 0, False: 0]
  Branch (257:24): [True: 0, False: 0]
  Branch (257:24): [True: 0, False: 0]
  Branch (257:24): [Folded - Ignored]
  Branch (257:24): [True: 0, False: 0]
258
0
                    err = err.merge(remove_err);
259
0
                }
260
0
                warn!(?err, ?block_size, ?temp_full_path, "Failed to create file",);
261
0
                Err(err).err_tip(|| {
262
0
                    format!(
263
0
                        "Failed to create {} in filesystem store",
264
0
                        temp_full_path.display()
265
                    )
266
0
                })
267
0
            })
268
118
            .await
?0
;
269
270
118
        Ok((
271
118
            <Self as FileEntry>::create(
272
118
                0, /* Unknown yet, we will fill it in later */
273
118
                block_size,
274
118
                RwLock::new(encoded_file_path),
275
118
            ),
276
118
            temp_file_result,
277
118
            temp_full_path,
278
118
        ))
279
118
    }
280
281
118
    fn data_size_mut(&mut self) -> &mut u64 {
282
118
        &mut self.data_size
283
118
    }
284
285
252
    fn size_on_disk(&self) -> u64 {
286
252
        self.data_size.div_ceil(self.block_size) * self.block_size
287
252
    }
288
289
181
    fn get_encoded_file_path(&self) -> &RwLock<EncodedFilePath> {
290
181
        &self.encoded_file_path
291
181
    }
292
293
50
    fn read_file_part(
294
50
        &self,
295
50
        offset: u64,
296
50
        length: u64,
297
50
    ) -> impl Future<Output = Result<Take<fs::FileSlot>, Error>> + Send {
298
50
        self.get_file_path_locked(move |full_content_path| async move {
299
50
            let 
file48
= fs::open_file(&full_content_path, offset, length)
300
50
                .await
301
50
                .err_tip(|| 
{2
302
2
                    format!(
303
2
                        "Failed to open file in filesystem store {}",
304
2
                        full_content_path.display()
305
                    )
306
2
                })?;
307
48
            Ok(file)
308
100
        })
309
50
    }
310
311
56
    async fn get_file_path_locked<
312
56
        T,
313
56
        Fut: Future<Output = Result<T, Error>> + Send,
314
56
        F: FnOnce(OsString) -> Fut + Send,
315
56
    >(
316
56
        &self,
317
56
        handler: F,
318
56
    ) -> Result<T, Error> {
319
56
        let encoded_file_path = self.get_encoded_file_path().read().await;
320
56
        handler(encoded_file_path.get_file_path().to_os_string()).await
321
56
    }
322
}
323
324
impl Debug for FileEntryImpl {
325
0
    fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), core::fmt::Error> {
326
0
        f.debug_struct("FileEntryImpl")
327
0
            .field("data_size", &self.data_size)
328
0
            .field("encoded_file_path", &"<behind mutex>")
329
0
            .finish()
330
0
    }
331
}
332
333
143
fn make_temp_digest(mut digest: DigestInfo) -> DigestInfo {
334
    static DELETE_FILE_COUNTER: AtomicU64 = AtomicU64::new(0);
335
143
    let mut hash = *digest.packed_hash();
336
143
    hash[24..].clone_from_slice(
337
143
        &DELETE_FILE_COUNTER
338
143
            .fetch_add(1, Ordering::Relaxed)
339
143
            .to_le_bytes(),
340
    );
341
143
    digest.set_packed_hash(*hash);
342
143
    digest
343
143
}
344
345
143
fn make_temp_key(key: &StoreKey) -> StoreKey<'static> {
346
143
    StoreKey::Digest(make_temp_digest(key.borrow().into_digest()))
347
143
}
348
349
impl LenEntry for FileEntryImpl {
350
    #[inline]
351
250
    fn len(&self) -> u64 {
352
250
        self.size_on_disk()
353
250
    }
354
355
0
    fn is_empty(&self) -> bool {
356
0
        self.data_size == 0
357
0
    }
358
359
    // unref() only triggers when an item is removed from the eviction_map. It is possible
360
    // that another place in code has a reference to `FileEntryImpl` and may later read the
361
    // file. To support this edge case, we first move the file to a temp file and point
362
    // target file location to the new temp file. `unref()` should only ever be called once.
363
    #[inline]
364
26
    async fn unref(&self) {
365
        {
366
26
            let mut encoded_file_path = self.encoded_file_path.write().await;
367
26
            if encoded_file_path.path_type == PathType::Temp {
  Branch (367:16): [True: 0, False: 0]
  Branch (367:16): [True: 1, False: 9]
  Branch (367:16): [True: 0, False: 0]
  Branch (367:16): [True: 0, False: 0]
  Branch (367:16): [True: 0, False: 0]
  Branch (367:16): [Folded - Ignored]
  Branch (367:16): [True: 0, False: 16]
368
                // We are already a temp file that is now marked for deletion on drop.
369
                // This is very rare, but most likely the rename into the content path failed.
370
1
                return;
371
25
            }
372
25
            let from_path = encoded_file_path.get_file_path();
373
25
            let new_key = make_temp_key(&encoded_file_path.key);
374
375
25
            let to_path =
376
25
                to_full_path_from_key(&encoded_file_path.shared_context.temp_path, &new_key);
377
378
25
            if let Err(
err2
) = fs::rename(&from_path, &to_path).await {
  Branch (378:20): [True: 0, False: 0]
  Branch (378:20): [True: 2, False: 7]
  Branch (378:20): [True: 0, False: 0]
  Branch (378:20): [True: 0, False: 0]
  Branch (378:20): [True: 0, False: 0]
  Branch (378:20): [Folded - Ignored]
  Branch (378:20): [True: 0, False: 16]
379
2
                warn!(
380
2
                    key = ?encoded_file_path.key,
381
                    ?from_path,
382
                    ?to_path,
383
                    ?err,
384
2
                    "Failed to rename file",
385
                );
386
            } else {
387
23
                debug!(
388
23
                    key = ?encoded_file_path.key,
389
                    ?from_path,
390
                    ?to_path,
391
23
                    "Renamed file",
392
                );
393
23
                encoded_file_path.path_type = PathType::Temp;
394
23
                encoded_file_path.key = new_key;
395
            }
396
        }
397
26
    }
398
}
399
400
#[inline]
401
2
fn digest_from_filename(file_name: &str) -> Result<DigestInfo, Error> {
402
2
    let (hash, size) = file_name.split_once('-').err_tip(|| "")
?0
;
403
2
    let size = size.parse::<i64>()
?0
;
404
2
    DigestInfo::try_new(hash, size)
405
2
}
406
407
2
pub fn key_from_file(file_name: &str, file_type: FileType) -> Result<StoreKey<'_>, Error> {
408
2
    match file_type {
409
0
        FileType::String => Ok(StoreKey::new_str(file_name)),
410
2
        FileType::Digest => digest_from_filename(file_name).map(StoreKey::Digest),
411
    }
412
2
}
413
414
/// The number of files to read the metadata for at the same time when running
415
/// `add_files_to_cache`.
416
const SIMULTANEOUS_METADATA_READS: usize = 200;
417
418
type FsEvictingMap<'a, Fe> =
419
    EvictingMap<StoreKeyBorrow, StoreKey<'a>, Arc<Fe>, SystemTime, RemoveItemCallbackHolder>;
420
421
47
async fn add_files_to_cache<Fe: FileEntry>(
422
47
    evicting_map: &FsEvictingMap<'_, Fe>,
423
47
    anchor_time: &SystemTime,
424
47
    shared_context: &Arc<SharedContext>,
425
47
    block_size: u64,
426
47
    rename_fn: fn(&OsStr, &OsStr) -> Result<(), std::io::Error>,
427
47
) -> Result<(), Error> {
428
    #[expect(clippy::too_many_arguments)]
429
1
    async fn process_entry<Fe: FileEntry>(
430
1
        evicting_map: &FsEvictingMap<'_, Fe>,
431
1
        file_name: &str,
432
1
        file_type: FileType,
433
1
        atime: SystemTime,
434
1
        data_size: u64,
435
1
        block_size: u64,
436
1
        anchor_time: &SystemTime,
437
1
        shared_context: &Arc<SharedContext>,
438
1
    ) -> Result<(), Error> {
439
1
        let key = key_from_file(file_name, file_type)
?0
;
440
441
1
        let file_entry = Fe::create(
442
1
            data_size,
443
1
            block_size,
444
1
            RwLock::new(EncodedFilePath {
445
1
                shared_context: shared_context.clone(),
446
1
                path_type: PathType::Content,
447
1
                key: key.borrow().into_owned(),
448
1
            }),
449
        );
450
1
        let time_since_anchor = anchor_time
451
1
            .duration_since(atime)
452
1
            .map_err(|_| make_input_err!("File access time newer than now"))
?0
;
453
1
        evicting_map
454
1
            .insert_with_time(
455
1
                key.into_owned().into(),
456
1
                Arc::new(file_entry),
457
1
                i32::try_from(time_since_anchor.as_secs()).unwrap_or(i32::MAX),
458
1
            )
459
1
            .await;
460
1
        Ok(())
461
1
    }
462
463
141
    async fn read_files(
464
141
        folder: Option<&str>,
465
141
        shared_context: &SharedContext,
466
141
    ) -> Result<Vec<(String, SystemTime, u64, bool)>, Error> {
467
        // Note: In Dec 2024 this is for backwards compatibility with the old
468
        // way files were stored on disk. Previously all files were in a single
469
        // folder regardless of the StoreKey type. This allows old versions of
470
        // nativelink file layout to be upgraded at startup time.
471
        // This logic can be removed once more time has passed.
472
141
        let read_dir = folder.map_or_else(
473
47
            || format!("{}/", shared_context.content_path),
474
94
            |folder| format!("{}/{folder}/", shared_context.content_path),
475
        );
476
477
141
        let (_permit, dir_handle) = fs::read_dir(read_dir)
478
141
            .await
479
141
            .err_tip(|| "Failed opening content directory for iterating in filesystem store")
?0
480
141
            .into_inner();
481
482
141
        let read_dir_stream = ReadDirStream::new(dir_handle);
483
141
        read_dir_stream
484
141
            .map(|dir_entry| async move 
{95
485
95
                let dir_entry = dir_entry.unwrap();
486
95
                let file_name = dir_entry.file_name().into_string().unwrap();
487
95
                let metadata = dir_entry
488
95
                    .metadata()
489
95
                    .await
490
95
                    .err_tip(|| "Failed to get metadata in filesystem store")
?0
;
491
                // We need to filter out folders - we do not want to try to cache the s and d folders.
492
95
                let is_file =
493
95
                    metadata.is_file() || !(
file_name == STR_FOLDER94
||
file_name == DIGEST_FOLDER47
);
  Branch (493:21): [True: 0, False: 0]
  Branch (493:45): [True: 0, False: 0]
  Branch (493:21): [True: 1, False: 34]
  Branch (493:45): [True: 17, False: 17]
  Branch (493:21): [True: 0, False: 4]
  Branch (493:45): [True: 2, False: 2]
  Branch (493:21): [True: 0, False: 6]
  Branch (493:45): [True: 3, False: 3]
  Branch (493:21): [Folded - Ignored]
  Branch (493:45): [Folded - Ignored]
  Branch (493:21): [True: 0, False: 50]
  Branch (493:45): [True: 25, False: 25]
494
                // Using access time is not perfect, but better than random. We do not update the
495
                // atime when a file is actually "touched", we rely on whatever the filesystem does
496
                // when we read the file (usually update on read).
497
95
                let atime = metadata
498
95
                    .accessed()
499
95
                    .or_else(|_| 
metadata0
.
modified0
())
500
95
                    .unwrap_or(SystemTime::UNIX_EPOCH);
501
95
                Result::<(String, SystemTime, u64, bool), Error>::Ok((
502
95
                    file_name,
503
95
                    atime,
504
95
                    metadata.len(),
505
95
                    is_file,
506
95
                ))
507
190
            })
508
141
            .buffer_unordered(SIMULTANEOUS_METADATA_READS)
509
141
            .try_collect()
510
141
            .await
511
141
    }
512
513
    /// Note: In Dec 2024 this is for backwards compatibility with the old
514
    /// way files were stored on disk. Previously all files were in a single
515
    /// folder regardless of the [`StoreKey`] type. This moves files from the old cache
516
    /// location to the new cache location, under [`DIGEST_FOLDER`].
517
47
    async fn move_old_cache(
518
47
        shared_context: &Arc<SharedContext>,
519
47
        rename_fn: fn(&OsStr, &OsStr) -> Result<(), std::io::Error>,
520
47
    ) -> Result<(), Error> {
521
47
        let file_infos = read_files(None, shared_context).await
?0
;
522
523
47
        let from_path = shared_context.content_path.to_string();
524
525
47
        let to_path = format!("{}/{DIGEST_FOLDER}", shared_context.content_path);
526
527
47
        for (
file_name0
, _, _, _) in file_infos.into_iter().filter(|x| x.3) {
528
0
            let from_file: OsString = format!("{from_path}/{file_name}").into();
529
0
            let to_file: OsString = format!("{to_path}/{file_name}").into();
530
531
0
            if let Err(err) = rename_fn(&from_file, &to_file) {
  Branch (531:20): [True: 0, False: 0]
  Branch (531:20): [True: 0, False: 0]
  Branch (531:20): [True: 0, False: 0]
  Branch (531:20): [True: 0, False: 0]
  Branch (531:20): [Folded - Ignored]
  Branch (531:20): [True: 0, False: 0]
532
0
                warn!(?from_file, ?to_file, ?err, "Failed to rename file",);
533
            } else {
534
0
                debug!(?from_file, ?to_file, "Renamed file",);
535
            }
536
        }
537
47
        Ok(())
538
47
    }
539
540
94
    async fn add_files_to_cache<Fe: FileEntry>(
541
94
        evicting_map: &FsEvictingMap<'_, Fe>,
542
94
        anchor_time: &SystemTime,
543
94
        shared_context: &Arc<SharedContext>,
544
94
        block_size: u64,
545
94
        folder: &str,
546
94
    ) -> Result<(), Error> {
547
94
        let file_infos = read_files(Some(folder), shared_context).await
?0
;
548
94
        let file_type = match folder {
549
94
            STR_FOLDER => 
FileType::String47
,
550
47
            DIGEST_FOLDER => FileType::Digest,
551
0
            _ => panic!("Invalid folder type"),
552
        };
553
554
94
        let path_root = format!("{}/{folder}", shared_context.content_path);
555
556
94
        for (
file_name1
,
atime1
,
data_size1
, _) in file_infos.into_iter().filter(|x| x.3) {
557
1
            let result = process_entry(
558
1
                evicting_map,
559
1
                &file_name,
560
1
                file_type,
561
1
                atime,
562
1
                data_size,
563
1
                block_size,
564
1
                anchor_time,
565
1
                shared_context,
566
1
            )
567
1
            .await;
568
1
            if let Err(
err0
) = result {
  Branch (568:20): [True: 0, False: 0]
  Branch (568:20): [True: 0, False: 0]
  Branch (568:20): [True: 0, False: 0]
  Branch (568:20): [True: 0, False: 0]
  Branch (568:20): [True: 0, False: 0]
  Branch (568:20): [True: 0, False: 0]
  Branch (568:20): [True: 0, False: 0]
  Branch (568:20): [True: 0, False: 1]
  Branch (568:20): [True: 0, False: 0]
  Branch (568:20): [True: 0, False: 0]
  Branch (568:20): [Folded - Ignored]
  Branch (568:20): [True: 0, False: 0]
569
0
                warn!(?file_name, ?err, "Failed to add file to eviction cache",);
570
                // Ignore result.
571
0
                drop(fs::remove_file(format!("{path_root}/{file_name}")).await);
572
1
            }
573
        }
574
94
        Ok(())
575
94
    }
576
577
47
    move_old_cache(shared_context, rename_fn).await
?0
;
578
579
47
    add_files_to_cache(
580
47
        evicting_map,
581
47
        anchor_time,
582
47
        shared_context,
583
47
        block_size,
584
47
        DIGEST_FOLDER,
585
47
    )
586
47
    .await
?0
;
587
588
47
    add_files_to_cache(
589
47
        evicting_map,
590
47
        anchor_time,
591
47
        shared_context,
592
47
        block_size,
593
47
        STR_FOLDER,
594
47
    )
595
47
    .await
?0
;
596
47
    Ok(())
597
47
}
598
599
47
async fn prune_temp_path(temp_path: &str) -> Result<(), Error> {
600
94
    async fn prune_temp_inner(temp_path: &str, subpath: &str) -> Result<(), Error> {
601
94
        let (_permit, dir_handle) = fs::read_dir(format!("{temp_path}/{subpath}"))
602
94
            .await
603
94
            .err_tip(
604
                || "Failed opening temp directory to prune partial downloads in filesystem store",
605
0
            )?
606
94
            .into_inner();
607
608
94
        let mut read_dir_stream = ReadDirStream::new(dir_handle);
609
94
        while let Some(
dir_entry0
) = read_dir_stream.next().await {
  Branch (609:19): [True: 0, False: 0]
  Branch (609:19): [True: 0, False: 34]
  Branch (609:19): [True: 0, False: 4]
  Branch (609:19): [True: 0, False: 6]
  Branch (609:19): [Folded - Ignored]
  Branch (609:19): [True: 0, False: 50]
610
0
            let path = dir_entry?.path();
611
0
            if let Err(err) = fs::remove_file(&path).await {
  Branch (611:20): [True: 0, False: 0]
  Branch (611:20): [True: 0, False: 0]
  Branch (611:20): [True: 0, False: 0]
  Branch (611:20): [True: 0, False: 0]
  Branch (611:20): [Folded - Ignored]
  Branch (611:20): [True: 0, False: 0]
612
0
                warn!(?path, ?err, "Failed to delete file",);
613
0
            }
614
        }
615
94
        Ok(())
616
94
    }
617
618
47
    prune_temp_inner(temp_path, STR_FOLDER).await
?0
;
619
47
    prune_temp_inner(temp_path, DIGEST_FOLDER).await
?0
;
620
47
    Ok(())
621
47
}
622
623
#[derive(Debug, MetricsComponent)]
624
pub struct FilesystemStore<Fe: FileEntry = FileEntryImpl> {
625
    #[metric]
626
    shared_context: Arc<SharedContext>,
627
    #[metric(group = "evicting_map")]
628
    evicting_map: Arc<FsEvictingMap<'static, Fe>>,
629
    #[metric(help = "Block size of the configured filesystem")]
630
    block_size: u64,
631
    #[metric(help = "Size of the configured read buffer size")]
632
    read_buffer_size: usize,
633
    weak_self: Weak<Self>,
634
    rename_fn: fn(&OsStr, &OsStr) -> Result<(), std::io::Error>,
635
}
636
637
impl<Fe: FileEntry> FilesystemStore<Fe> {
638
39
    pub async fn new(spec: &FilesystemSpec) -> Result<Arc<Self>, Error> {
639
113
        
Self::new_with_timeout_and_rename_fn39
(
spec39
, |from, to| std::fs::rename(from, to)).
await39
640
39
    }
641
642
47
    pub async fn new_with_timeout_and_rename_fn(
643
47
        spec: &FilesystemSpec,
644
47
        rename_fn: fn(&OsStr, &OsStr) -> Result<(), std::io::Error>,
645
47
    ) -> Result<Arc<Self>, Error> {
646
94
        async fn create_subdirs(path: &str) -> Result<(), Error> {
647
94
            fs::create_dir_all(format!("{path}/{STR_FOLDER}"))
648
94
                .await
649
94
                .err_tip(|| format!(
"Failed to create directory {path}/{STR_FOLDER}"0
))
?0
;
650
94
            fs::create_dir_all(format!("{path}/{DIGEST_FOLDER}"))
651
94
                .await
652
94
                .err_tip(|| format!(
"Failed to create directory {path}/{DIGEST_FOLDER}"0
))
653
94
        }
654
655
47
        let now = SystemTime::now();
656
657
47
        let empty_policy = nativelink_config::stores::EvictionPolicy::default();
658
47
        let eviction_policy = spec.eviction_policy.as_ref().unwrap_or(&empty_policy);
659
47
        let evicting_map = Arc::new(EvictingMap::new(eviction_policy, now));
660
661
        // Create temp and content directories and the s and d subdirectories.
662
663
47
        create_subdirs(&spec.temp_path).await
?0
;
664
47
        create_subdirs(&spec.content_path).await
?0
;
665
666
47
        let shared_context = Arc::new(SharedContext {
667
47
            active_drop_spawns: AtomicU64::new(0),
668
47
            temp_path: spec.temp_path.clone(),
669
47
            content_path: spec.content_path.clone(),
670
47
        });
671
672
47
        let block_size = if spec.block_size == 0 {
  Branch (672:29): [True: 0, False: 0]
  Branch (672:29): [True: 1, False: 0]
  Branch (672:29): [True: 0, False: 1]
  Branch (672:29): [True: 0, False: 1]
  Branch (672:29): [True: 1, False: 0]
  Branch (672:29): [True: 0, False: 1]
  Branch (672:29): [True: 1, False: 0]
  Branch (672:29): [True: 10, False: 1]
  Branch (672:29): [True: 2, False: 0]
  Branch (672:29): [True: 3, False: 0]
  Branch (672:29): [Folded - Ignored]
  Branch (672:29): [True: 25, False: 0]
673
43
            DEFAULT_BLOCK_SIZE
674
        } else {
675
4
            spec.block_size
676
        };
677
47
        add_files_to_cache(
678
47
            evicting_map.as_ref(),
679
47
            &now,
680
47
            &shared_context,
681
47
            block_size,
682
47
            rename_fn,
683
47
        )
684
47
        .await
?0
;
685
47
        prune_temp_path(&shared_context.temp_path).await
?0
;
686
687
47
        let read_buffer_size = if spec.read_buffer_size == 0 {
  Branch (687:35): [True: 0, False: 0]
  Branch (687:35): [True: 0, False: 1]
  Branch (687:35): [True: 1, False: 0]
  Branch (687:35): [True: 0, False: 1]
  Branch (687:35): [True: 1, False: 0]
  Branch (687:35): [True: 0, False: 1]
  Branch (687:35): [True: 1, False: 0]
  Branch (687:35): [True: 4, False: 7]
  Branch (687:35): [True: 2, False: 0]
  Branch (687:35): [True: 3, False: 0]
  Branch (687:35): [Folded - Ignored]
  Branch (687:35): [True: 25, False: 0]
688
37
            DEFAULT_BUFF_SIZE
689
        } else {
690
10
            spec.read_buffer_size as usize
691
        };
692
47
        Ok(Arc::new_cyclic(|weak_self| Self {
693
47
            shared_context,
694
47
            evicting_map,
695
47
            block_size,
696
47
            read_buffer_size,
697
47
            weak_self: weak_self.clone(),
698
47
            rename_fn,
699
47
        }))
700
47
    }
701
702
26
    pub fn get_arc(&self) -> Option<Arc<Self>> {
703
26
        self.weak_self.upgrade()
704
26
    }
705
706
9
    pub async fn get_file_entry_for_digest(&self, digest: &DigestInfo) -> Result<Arc<Fe>, Error> {
707
9
        self.evicting_map
708
9
            .get(&digest.into())
709
9
            .await
710
9
            .ok_or_else(|| make_err!(
Code::NotFound0
, "{digest} not found in filesystem store. This may indicate the file was evicted due to cache pressure. Consider increasing 'max_bytes' in your filesystem store's eviction_policy configuration."))
711
9
    }
712
713
118
    async fn update_file(
714
118
        self: Pin<&Self>,
715
118
        mut entry: Fe,
716
118
        mut temp_file: fs::FileSlot,
717
118
        final_key: StoreKey<'static>,
718
118
        mut reader: DropCloserReadHalf,
719
118
    ) -> Result<(), Error> {
720
118
        let mut data_size = 0;
721
        loop {
722
207
            let mut data = reader
723
207
                .recv()
724
207
                .await
725
207
                .err_tip(|| "Failed to receive data in filesystem store")
?0
;
726
207
            let data_len = data.len();
727
207
            if data_len == 0 {
  Branch (727:16): [True: 0, False: 0]
  Branch (727:16): [True: 2, False: 8]
  Branch (727:16): [True: 2, False: 2]
  Branch (727:16): [True: 2, False: 2]
  Branch (727:16): [True: 2, False: 2]
  Branch (727:16): [True: 2, False: 2]
  Branch (727:16): [True: 1, False: 1]
  Branch (727:16): [True: 13, False: 11]
  Branch (727:16): [True: 0, False: 0]
  Branch (727:16): [True: 2, False: 2]
  Branch (727:16): [Folded - Ignored]
  Branch (727:16): [True: 92, False: 59]
728
118
                break; // EOF.
729
89
            }
730
89
            temp_file
731
89
                .write_all_buf(&mut data)
732
89
                .await
733
89
                .err_tip(|| "Failed to write data into filesystem store")
?0
;
734
89
            data_size += data_len as u64;
735
        }
736
737
118
        temp_file
738
118
            .as_ref()
739
118
            .sync_all()
740
118
            .await
741
118
            .err_tip(|| "Failed to sync_data in filesystem store")
?0
;
742
743
118
        drop(temp_file);
744
745
118
        *entry.data_size_mut() = data_size;
746
118
        self.emplace_file(final_key, Arc::new(entry)).await
747
117
    }
748
749
125
    async fn emplace_file(&self, key: StoreKey<'static>, entry: Arc<Fe>) -> Result<(), Error> {
750
        // This sequence of events is quite tricky to understand due to the amount of triggers that
751
        // happen, async'ness of it and the locking. So here is a breakdown of what happens:
752
        // 1. Here will hold a write lock on any file operations of this FileEntry.
753
        // 2. Then insert the entry into the evicting map. This may trigger an eviction of other
754
        //    entries.
755
        // 3. Eviction triggers `unref()`, which grabs a write lock on the evicted FileEntry
756
        //    during the rename.
757
        // 4. It should be impossible for items to be added while eviction is happening, so there
758
        //    should not be a deadlock possibility. However, it is possible for the new FileEntry
759
        //    to be evicted before the file is moved into place. Eviction of the newly inserted
760
        //    item is not possible within the `insert()` call because the write lock inside the
761
        //    eviction map. If an eviction of new item happens after `insert()` but before
762
        //    `rename()` then we get to finish our operation because the `unref()` of the new item
763
        //    will be blocked on us because we currently have the lock.
764
        // 5. Move the file into place. Since we hold a write lock still anyone that gets our new
765
        //    FileEntry (which has not yet been placed on disk) will not be able to read the file's
766
        //    contents until we release the lock.
767
125
        let evicting_map = self.evicting_map.clone();
768
125
        let rename_fn = self.rename_fn;
769
770
        // We need to guarantee that this will get to the end even if the parent future is dropped.
771
        // See: https://github.com/TraceMachina/nativelink/issues/495
772
125
        background_spawn!("filesystem_store_emplace_file", async move {
773
125
            let mut encoded_file_path = entry.get_encoded_file_path().write().await;
774
125
            let final_path = get_file_path_raw(
775
125
                &PathType::Content,
776
125
                encoded_file_path.shared_context.as_ref(),
777
125
                &key,
778
            );
779
780
125
            evicting_map
781
125
                .insert(key.borrow().into_owned().into(), entry.clone())
782
125
                .await;
783
784
125
            let from_path = encoded_file_path.get_file_path();
785
            // Internally tokio spawns fs commands onto a blocking thread anyways.
786
            // Since we are already on a blocking thread, we just need the `fs` wrapper to manage
787
            // an open-file permit (ensure we don't open too many files at once).
788
125
            let result = (rename_fn)(&from_path, &final_path).err_tip(|| 
{1
789
1
                format!(
790
1
                    "Failed to rename temp file to final path {}",
791
1
                    final_path.display()
792
                )
793
1
            });
794
795
            // In the event our move from temp file to final file fails we need to ensure we remove
796
            // the entry from our map.
797
            // Remember: At this point it is possible for another thread to have a reference to
798
            // `entry`, so we can't delete the file, only drop() should ever delete files.
799
125
            if let Err(
err1
) = result {
  Branch (799:20): [True: 0, False: 0]
  Branch (799:20): [True: 0, False: 2]
  Branch (799:20): [True: 0, False: 2]
  Branch (799:20): [True: 0, False: 2]
  Branch (799:20): [True: 0, False: 2]
  Branch (799:20): [True: 0, False: 2]
  Branch (799:20): [True: 1, False: 0]
  Branch (799:20): [True: 0, False: 15]
  Branch (799:20): [True: 0, False: 0]
  Branch (799:20): [True: 0, False: 2]
  Branch (799:20): [Folded - Ignored]
  Branch (799:20): [True: 0, False: 97]
800
1
                error!(?err, ?from_path, ?final_path, "Failed to rename file",);
801
                // Warning: To prevent deadlock we need to release our lock or during `remove_if()`
802
                // it will call `unref()`, which triggers a write-lock on `encoded_file_path`.
803
1
                drop(encoded_file_path);
804
                // It is possible that the item in our map is no longer the item we inserted,
805
                // So, we need to conditionally remove it only if the pointers are the same.
806
807
1
                evicting_map
808
1
                    .remove_if(&key, |map_entry| Arc::<Fe>::ptr_eq(map_entry, &entry))
809
1
                    .await;
810
1
                return Err(err);
811
124
            }
812
124
            encoded_file_path.path_type = PathType::Content;
813
124
            encoded_file_path.key = key;
814
124
            Ok(())
815
125
        })
816
125
        .await
817
124
        .err_tip(|| "Failed to create spawn in filesystem store update_file")
?0
818
124
    }
819
}
820
821
#[async_trait]
822
impl<Fe: FileEntry> StoreDriver for FilesystemStore<Fe> {
823
    async fn has_with_results(
824
        self: Pin<&Self>,
825
        keys: &[StoreKey<'_>],
826
        results: &mut [Option<u64>],
827
84
    ) -> Result<(), Error> {
828
        let own_keys = keys
829
            .iter()
830
84
            .map(|sk| sk.borrow().into_owned())
831
            .collect::<Vec<_>>();
832
        self.evicting_map
833
            .sizes_for_keys(own_keys.iter(), results, false /* peek */)
834
            .await;
835
        // We need to do a special pass to ensure our zero files exist.
836
        // If our results failed and the result was a zero file, we need to
837
        // create the file by spec.
838
        for (key, result) in keys.iter().zip(results.iter_mut()) {
839
            if result.is_some() || !is_zero_digest(key.borrow()) {
840
                continue;
841
            }
842
            let (mut tx, rx) = make_buf_channel_pair();
843
            let send_eof_result = tx.send_eof();
844
            self.update(key.borrow(), rx, UploadSizeInfo::ExactSize(0))
845
                .await
846
0
                .err_tip(|| format!("Failed to create zero file for key {}", key.as_str()))
847
                .merge(
848
                    send_eof_result
849
                        .err_tip(|| "Failed to send zero file EOF in filesystem store has"),
850
                )?;
851
852
            *result = Some(0);
853
        }
854
        Ok(())
855
84
    }
856
857
    async fn update(
858
        self: Pin<&Self>,
859
        key: StoreKey<'_>,
860
        mut reader: DropCloserReadHalf,
861
        _upload_size: UploadSizeInfo,
862
118
    ) -> Result<(), Error> {
863
        let temp_key = make_temp_key(&key);
864
865
        // There's a possibility of deadlock here where we take all of the
866
        // file semaphores with make_and_open_file and the semaphores for
867
        // whatever is populating reader is exhasted on the threads that
868
        // have the FileSlots and not on those which can't.  To work around
869
        // this we don't take the FileSlot until there's something on the
870
        // reader available to know that the populator is active.
871
        reader.peek().await?;
872
873
        let (entry, temp_file, temp_full_path) = Fe::make_and_open_file(
874
            self.block_size,
875
            EncodedFilePath {
876
                shared_context: self.shared_context.clone(),
877
                path_type: PathType::Temp,
878
                key: temp_key,
879
            },
880
        )
881
        .await?;
882
883
        self.update_file(entry, temp_file, key.into_owned(), reader)
884
            .await
885
1
            .err_tip(|| {
886
1
                format!(
887
1
                    "While processing with temp file {}",
888
1
                    temp_full_path.display()
889
                )
890
1
            })
891
118
    }
892
893
97
    fn optimized_for(&self, optimization: StoreOptimizations) -> bool {
894
97
        optimization == StoreOptimizations::FileUpdates
895
97
    }
896
897
    async fn update_with_whole_file(
898
        self: Pin<&Self>,
899
        key: StoreKey<'_>,
900
        path: OsString,
901
        file: fs::FileSlot,
902
        upload_size: UploadSizeInfo,
903
7
    ) -> Result<Option<fs::FileSlot>, Error> {
904
        let file_size = match upload_size {
905
            UploadSizeInfo::ExactSize(size) => size,
906
            UploadSizeInfo::MaxSize(_) => file
907
                .as_ref()
908
                .metadata()
909
                .await
910
0
                .err_tip(|| format!("While reading metadata for {}", path.display()))?
911
                .len(),
912
        };
913
        let entry = Fe::create(
914
            file_size,
915
            self.block_size,
916
            RwLock::new(EncodedFilePath {
917
                shared_context: self.shared_context.clone(),
918
                path_type: PathType::Custom(path),
919
                key: key.borrow().into_owned(),
920
            }),
921
        );
922
        // We are done with the file, if we hold a reference to the file here, it could
923
        // result in a deadlock if `emplace_file()` also needs file descriptors.
924
        drop(file);
925
        self.emplace_file(key.into_owned(), Arc::new(entry))
926
            .await
927
            .err_tip(|| "Could not move file into store in upload_file_to_store, maybe dest is on different volume?")?;
928
        return Ok(None);
929
7
    }
930
931
    async fn get_part(
932
        self: Pin<&Self>,
933
        key: StoreKey<'_>,
934
        writer: &mut DropCloserWriteHalf,
935
        offset: u64,
936
        length: Option<u64>,
937
65
    ) -> Result<(), Error> {
938
        if is_zero_digest(key.borrow()) {
939
            self.has(key.borrow())
940
                .await
941
                .err_tip(|| "Failed to check if zero digest exists in filesystem store")?;
942
            writer
943
                .send_eof()
944
                .err_tip(|| "Failed to send zero EOF in filesystem store get_part")?;
945
            return Ok(());
946
        }
947
        let owned_key = key.into_owned();
948
1
        let entry = self.evicting_map.get(&owned_key).await.ok_or_else(|| {
949
1
            make_err!(
950
1
                Code::NotFound,
951
                "{} not found in filesystem store here",
952
1
                owned_key.as_str()
953
            )
954
1
        })?;
955
        let read_limit = length.unwrap_or(u64::MAX);
956
2
        let mut temp_file = entry.read_file_part(offset, read_limit).or_else(|err| async move {
957
            // If the file is not found, we need to remove it from the eviction map.
958
2
            if err.code == Code::NotFound {
  Branch (958:16): [True: 0, False: 0]
  Branch (958:16): [True: 0, False: 0]
  Branch (958:16): [True: 0, False: 0]
  Branch (958:16): [True: 0, False: 0]
  Branch (958:16): [True: 0, False: 0]
  Branch (958:16): [True: 0, False: 0]
  Branch (958:16): [True: 0, False: 0]
  Branch (958:16): [True: 2, False: 0]
  Branch (958:16): [True: 0, False: 0]
  Branch (958:16): [True: 0, False: 0]
  Branch (958:16): [Folded - Ignored]
  Branch (958:16): [True: 0, False: 0]
959
2
                error!(
960
                    ?err,
961
                    key = ?owned_key,
962
2
                    "Entry was in our map, but not found on disk. Removing from map as a precaution, but process probably need restarted."
963
                );
964
2
                self.evicting_map.remove(&owned_key).await;
965
0
            }
966
2
            Err(err)
967
4
        }).await?;
968
969
        loop {
970
            let mut buf = BytesMut::with_capacity(self.read_buffer_size);
971
            temp_file
972
                .read_buf(&mut buf)
973
                .await
974
                .err_tip(|| "Failed to read data in filesystem store")?;
975
            if buf.is_empty() {
976
                break; // EOF.
977
            }
978
            writer
979
                .send(buf.freeze())
980
                .await
981
                .err_tip(|| "Failed to send chunk in filesystem store get_part")?;
982
        }
983
        writer
984
            .send_eof()
985
            .err_tip(|| "Filed to send EOF in filesystem store get_part")?;
986
987
        Ok(())
988
65
    }
989
990
118
    fn inner_store(&self, _digest: Option<StoreKey>) -> &dyn StoreDriver {
991
118
        self
992
118
    }
993
994
26
    fn as_any<'a>(&'a self) -> &'a (dyn core::any::Any + Sync + Send + 'static) {
995
26
        self
996
26
    }
997
998
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Sync + Send + 'static> {
999
0
        self
1000
0
    }
1001
1002
0
    fn register_health(self: Arc<Self>, registry: &mut HealthRegistryBuilder) {
1003
0
        registry.register_indicator(self);
1004
0
    }
1005
1006
0
    fn register_remove_callback(
1007
0
        self: Arc<Self>,
1008
0
        callback: Arc<dyn RemoveItemCallback>,
1009
0
    ) -> Result<(), Error> {
1010
0
        self.evicting_map
1011
0
            .add_remove_callback(RemoveItemCallbackHolder::new(callback));
1012
0
        Ok(())
1013
0
    }
1014
}
1015
1016
#[async_trait]
1017
impl<Fe: FileEntry> HealthStatusIndicator for FilesystemStore<Fe> {
1018
0
    fn get_name(&self) -> &'static str {
1019
0
        "FilesystemStore"
1020
0
    }
1021
1022
0
    async fn check_health(&self, namespace: Cow<'static, str>) -> HealthStatus {
1023
        StoreDriver::check_health(Pin::new(self), namespace).await
1024
0
    }
1025
}