Coverage Report

Created: 2026-05-12 15:09

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-store/src/filesystem_store.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::fmt::{Debug, Formatter};
16
use core::pin::Pin;
17
use core::sync::atomic::{AtomicU64, Ordering};
18
use core::time::Duration;
19
use std::borrow::Cow;
20
use std::ffi::{OsStr, OsString};
21
use std::sync::{Arc, Weak};
22
use std::time::SystemTime;
23
24
use async_lock::RwLock;
25
use async_trait::async_trait;
26
use bytes::{Bytes, BytesMut};
27
use futures::stream::{StreamExt, TryStreamExt};
28
use futures::{Future, TryFutureExt};
29
use nativelink_config::stores::FilesystemSpec;
30
use nativelink_error::{Code, Error, ResultExt, make_err};
31
use nativelink_metric::MetricsComponent;
32
use nativelink_util::background_spawn;
33
use nativelink_util::buf_channel::{
34
    DropCloserReadHalf, DropCloserWriteHalf, make_buf_channel_pair,
35
};
36
use nativelink_util::common::{DigestInfo, fs};
37
use nativelink_util::evicting_map::{EvictingMap, LenEntry};
38
use nativelink_util::health_utils::{HealthRegistryBuilder, HealthStatus, HealthStatusIndicator};
39
use nativelink_util::store_trait::{
40
    RemoveItemCallback, StoreDriver, StoreKey, StoreKeyBorrow, StoreOptimizations, UploadSizeInfo,
41
};
42
use tokio::io::{AsyncReadExt, AsyncWriteExt, Take};
43
use tokio::sync::Semaphore;
44
use tokio_stream::wrappers::ReadDirStream;
45
use tracing::{debug, error, info, trace, warn};
46
47
use crate::callback_utils::RemoveItemCallbackHolder;
48
use crate::cas_utils::is_zero_digest;
49
50
// Default size to allocate memory of the buffer when reading files.
51
const DEFAULT_BUFF_SIZE: usize = 32 * 1024;
52
// Default block size of all major filesystems is 4KB
53
const DEFAULT_BLOCK_SIZE: u64 = 4 * 1024;
54
55
pub const STR_FOLDER: &str = "s";
56
pub const DIGEST_FOLDER: &str = "d";
57
58
#[derive(Clone, Copy, Debug)]
59
pub enum FileType {
60
    Digest,
61
    String,
62
}
63
64
#[derive(Debug, MetricsComponent)]
65
pub struct SharedContext {
66
    // Used in testing to know how many active drop() spawns are running.
67
    // TODO(palfrey) It is probably a good idea to use a spin lock during
68
    // destruction of the store to ensure that all files are actually
69
    // deleted (similar to how it is done in tests).
70
    #[metric(help = "Number of active drop spawns")]
71
    pub active_drop_spawns: AtomicU64,
72
    #[metric(help = "Path to the configured temp path")]
73
    temp_path: String,
74
    #[metric(help = "Path to the configured content path")]
75
    content_path: String,
76
}
77
78
#[derive(Eq, PartialEq, Debug)]
79
enum PathType {
80
    Content,
81
    Temp,
82
    Custom(OsString),
83
}
84
85
/// [`EncodedFilePath`] stores the path to the file
86
/// including the context, path type and key to the file.
87
/// The whole [`StoreKey`] is stored as opposed to solely
88
/// the [`DigestInfo`] so that it is more usable for things
89
/// such as BEP -see Issue #1108
90
#[derive(Debug)]
91
pub struct EncodedFilePath {
92
    shared_context: Arc<SharedContext>,
93
    path_type: PathType,
94
    key: StoreKey<'static>,
95
}
96
97
impl EncodedFilePath {
98
    #[inline]
99
267
    fn get_file_path(&self) -> Cow<'_, OsStr> {
100
267
        get_file_path_raw(&self.path_type, self.shared_context.as_ref(), &self.key)
101
267
    }
102
}
103
104
#[inline]
105
365
fn get_file_path_raw<'a>(
106
365
    path_type: &'a PathType,
107
365
    shared_context: &SharedContext,
108
365
    key: &StoreKey<'a>,
109
365
) -> Cow<'a, OsStr> {
110
365
    let 
folder357
= match path_type {
111
166
        PathType::Content => &shared_context.content_path,
112
191
        PathType::Temp => &shared_context.temp_path,
113
8
        PathType::Custom(path) => return Cow::Borrowed(path),
114
    };
115
357
    Cow::Owned(to_full_path_from_key(folder, key))
116
365
}
117
118
impl Drop for EncodedFilePath {
119
102
    fn drop(&mut self) {
120
        // `drop()` can be called during shutdown, so we use `path_type` flag to know if the
121
        // file actually needs to be deleted.
122
102
        if self.path_type == PathType::Content {
123
93
            return;
124
9
        }
125
126
9
        let file_path = self.get_file_path().to_os_string();
127
9
        let shared_context = self.shared_context.clone();
128
        // .fetch_add returns previous value, so we add one to get approximate current value
129
9
        let current_active_drop_spawns = shared_context
130
9
            .active_drop_spawns
131
9
            .fetch_add(1, Ordering::Relaxed)
132
9
            + 1;
133
9
        debug!(
134
            %current_active_drop_spawns,
135
            ?file_path,
136
            "Spawned a filesystem_delete_file"
137
        );
138
9
        background_spawn!("filesystem_delete_file", async move 
{8
139
8
            let 
result5
= fs::remove_file(&file_path)
140
8
                .await
141
5
                .err_tip(|| 
format!0
("Failed to remove file {}",
file_path.display()0
));
142
5
            if let Err(
err0
) = result {
143
0
                error!(?file_path, ?err, "Failed to delete file",);
144
            } else {
145
5
                debug!(?file_path, "File deleted",);
146
            }
147
            // .fetch_sub returns previous value, so we subtract one to get approximate current value
148
5
            let current_active_drop_spawns = shared_context
149
5
                .active_drop_spawns
150
5
                .fetch_sub(1, Ordering::Relaxed)
151
5
                - 1;
152
5
            debug!(
153
                ?current_active_drop_spawns,
154
                ?file_path,
155
                "Dropped a filesystem_delete_file"
156
            );
157
5
        });
158
102
    }
159
}
160
161
/// This creates the file path from the [`StoreKey`]. If
162
/// it is a string, the string, prefixed with [`STR_PREFIX`]
163
/// for backwards compatibility, is stored.
164
///
165
/// If it is a [`DigestInfo`], it is prefixed by [`DIGEST_PREFIX`]
166
/// followed by the string representation of a digest - the hash in hex,
167
/// a hyphen then the size in bytes
168
///
169
/// Previously, only the string representation of the [`DigestInfo`] was
170
/// used with no prefix
171
#[inline]
172
366
fn to_full_path_from_key(folder: &str, key: &StoreKey<'_>) -> OsString {
173
366
    match key {
174
3
        StoreKey::Str(str) => format!("{folder}/{STR_FOLDER}/{str}"),
175
363
        StoreKey::Digest(digest_info) => format!("{folder}/{DIGEST_FOLDER}/{digest_info}"),
176
    }
177
366
    .into()
178
366
}
179
180
pub trait FileEntry: LenEntry + Send + Sync + Debug + 'static {
181
    /// Responsible for creating the underlying `FileEntry`.
182
    fn create(data_size: u64, block_size: u64, encoded_file_path: RwLock<EncodedFilePath>) -> Self;
183
184
    /// Creates a (usually) temp file, opens it and returns the path to the temp file.
185
    fn make_and_open_file(
186
        block_size: u64,
187
        encoded_file_path: EncodedFilePath,
188
    ) -> impl Future<Output = Result<(Self, fs::FileSlot, OsString), Error>> + Send
189
    where
190
        Self: Sized;
191
192
    /// Returns the underlying reference to the size of the data in bytes
193
    fn data_size_mut(&mut self) -> &mut u64;
194
195
    /// Returns the actual size of the underlying file on the disk after accounting for filesystem block size.
196
    fn size_on_disk(&self) -> u64;
197
198
    /// Gets the underlying `EncodedfilePath`.
199
    fn get_encoded_file_path(&self) -> &RwLock<EncodedFilePath>;
200
201
    /// Returns a reader that will read part of the underlying file.
202
    fn read_file_part(
203
        &self,
204
        offset: u64,
205
        length: u64,
206
    ) -> impl Future<Output = Result<Take<fs::FileSlot>, Error>> + Send;
207
208
    /// This function is a safe way to extract the file name of the underlying file. To protect users from
209
    /// accidentally creating undefined behavior we encourage users to do the logic they need to do with
210
    /// the filename inside this function instead of extracting the filename and doing the logic outside.
211
    /// This is because the filename is not guaranteed to exist after this function returns, however inside
212
    /// the callback the file is always guaranteed to exist and immutable.
213
    /// DO NOT USE THIS FUNCTION TO EXTRACT THE FILENAME AND STORE IT FOR LATER USE.
214
    fn get_file_path_locked<
215
        T,
216
        Fut: Future<Output = Result<T, Error>> + Send,
217
        F: FnOnce(OsString) -> Fut + Send,
218
    >(
219
        &self,
220
        handler: F,
221
    ) -> impl Future<Output = Result<T, Error>> + Send;
222
}
223
224
pub struct FileEntryImpl {
225
    data_size: u64,
226
    block_size: u64,
227
    // We lock around this as it gets rewritten when we move between temp and content types
228
    encoded_file_path: RwLock<EncodedFilePath>,
229
}
230
231
impl FileEntryImpl {
232
0
    pub fn get_shared_context_for_test(&mut self) -> Arc<SharedContext> {
233
0
        self.encoded_file_path.get_mut().shared_context.clone()
234
0
    }
235
}
236
237
impl FileEntry for FileEntryImpl {
238
102
    fn create(data_size: u64, block_size: u64, encoded_file_path: RwLock<EncodedFilePath>) -> Self {
239
102
        Self {
240
102
            data_size,
241
102
            block_size,
242
102
            encoded_file_path,
243
102
        }
244
102
    }
245
246
    /// This encapsulates the logic for the edge case of if the file fails to create
247
    /// the cleanup of the file is handled without creating a `FileEntry`, which would
248
    /// try to cleanup the file as well during `drop()`.
249
91
    async fn make_and_open_file(
250
91
        block_size: u64,
251
91
        encoded_file_path: EncodedFilePath,
252
91
    ) -> Result<(Self, fs::FileSlot, OsString), Error> {
253
91
        let temp_full_path = encoded_file_path.get_file_path().to_os_string();
254
91
        let temp_file_result = fs::create_file(temp_full_path.clone())
255
91
            .or_else(|mut err| async 
{0
256
0
                let remove_result = fs::remove_file(&temp_full_path).await.err_tip(|| {
257
0
                    format!(
258
                        "Failed to remove file {} in filesystem store",
259
0
                        temp_full_path.display()
260
                    )
261
0
                });
262
0
                if let Err(remove_err) = remove_result {
263
0
                    err = err.merge(remove_err);
264
0
                }
265
0
                warn!(?err, ?block_size, ?temp_full_path, "Failed to create file",);
266
0
                Err(err).err_tip(|| {
267
0
                    format!(
268
                        "Failed to create {} in filesystem store",
269
0
                        temp_full_path.display()
270
                    )
271
0
                })
272
0
            })
273
91
            .await
?0
;
274
275
91
        Ok((
276
91
            <Self as FileEntry>::create(
277
91
                0, /* Unknown yet, we will fill it in later */
278
91
                block_size,
279
91
                RwLock::new(encoded_file_path),
280
91
            ),
281
91
            temp_file_result,
282
91
            temp_full_path,
283
91
        ))
284
91
    }
285
286
91
    fn data_size_mut(&mut self) -> &mut u64 {
287
91
        &mut self.data_size
288
91
    }
289
290
171
    fn size_on_disk(&self) -> u64 {
291
171
        self.data_size.div_ceil(self.block_size) * self.block_size
292
171
    }
293
294
159
    fn get_encoded_file_path(&self) -> &RwLock<EncodedFilePath> {
295
159
        &self.encoded_file_path
296
159
    }
297
298
54
    fn read_file_part(
299
54
        &self,
300
54
        offset: u64,
301
54
        length: u64,
302
54
    ) -> impl Future<Output = Result<Take<fs::FileSlot>, Error>> + Send {
303
54
        self.get_file_path_locked(move |full_content_path| async move {
304
54
            let 
file52
= fs::open_file(&full_content_path, offset, length)
305
54
                .await
306
54
                .err_tip(|| 
{2
307
2
                    format!(
308
                        "Failed to open file in filesystem store {}",
309
2
                        full_content_path.display()
310
                    )
311
2
                })?;
312
52
            Ok(file)
313
108
        })
314
54
    }
315
316
60
    async fn get_file_path_locked<
317
60
        T,
318
60
        Fut: Future<Output = Result<T, Error>> + Send,
319
60
        F: FnOnce(OsString) -> Fut + Send,
320
60
    >(
321
60
        &self,
322
60
        handler: F,
323
60
    ) -> Result<T, Error> {
324
60
        let encoded_file_path = self.get_encoded_file_path().read().await;
325
60
        handler(encoded_file_path.get_file_path().to_os_string()).await
326
60
    }
327
}
328
329
impl Debug for FileEntryImpl {
330
0
    fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), core::fmt::Error> {
331
0
        f.debug_struct("FileEntryImpl")
332
0
            .field("data_size", &self.data_size)
333
0
            .field("encoded_file_path", &"<behind mutex>")
334
0
            .finish()
335
0
    }
336
}
337
338
100
fn make_temp_digest(mut digest: DigestInfo) -> DigestInfo {
339
    static DELETE_FILE_COUNTER: AtomicU64 = AtomicU64::new(0);
340
100
    let mut hash = *digest.packed_hash();
341
100
    hash[24..].clone_from_slice(
342
100
        &DELETE_FILE_COUNTER
343
100
            .fetch_add(1, Ordering::Relaxed)
344
100
            .to_le_bytes(),
345
    );
346
100
    digest.set_packed_hash(*hash);
347
100
    digest
348
100
}
349
350
100
fn make_temp_key(key: &StoreKey) -> StoreKey<'static> {
351
100
    StoreKey::Digest(make_temp_digest(key.borrow().into_digest()))
352
100
}
353
354
impl LenEntry for FileEntryImpl {
355
    #[inline]
356
169
    fn len(&self) -> u64 {
357
169
        self.size_on_disk()
358
169
    }
359
360
0
    fn is_empty(&self) -> bool {
361
0
        self.data_size == 0
362
0
    }
363
364
    // unref() only triggers when an item is removed from the eviction_map. It is possible
365
    // that another place in code has a reference to `FileEntryImpl` and may later read the
366
    // file. To support this edge case, we first move the file to a temp file and point
367
    // target file location to the new temp file. `unref()` should only ever be called once.
368
    #[inline]
369
11
    async fn unref(&self) {
370
11
        let mut encoded_file_path = self.encoded_file_path.write().await;
371
11
        if encoded_file_path.path_type == PathType::Temp {
372
            // We are already a temp file that is now marked for deletion on drop.
373
            // This is very rare, but most likely the rename into the content path failed.
374
2
            warn!(
375
2
                key = ?encoded_file_path.key,
376
                "File is already a temp file",
377
            );
378
2
            return;
379
9
        }
380
9
        let from_path = encoded_file_path.get_file_path();
381
9
        let new_key = make_temp_key(&encoded_file_path.key);
382
383
9
        let to_path = to_full_path_from_key(&encoded_file_path.shared_context.temp_path, &new_key);
384
385
9
        if let Err(
err2
) = fs::rename(&from_path, &to_path).await {
386
            // ENOENT here means the file we expected at `from_path`
387
            // was already gone — typically because another thread's
388
            // eviction beat us to the unref, or because the entry
389
            // ended up in our map without its file ever landing on
390
            // disk (the "phantom-map" case the runtime recovers from
391
            // via `FastSlowStore::get_part`'s slow-store fallback).
392
            // It is benign at this site — there is no file to move
393
            // — and historically dominates the log volume of this
394
            // store under heavy write+evict concurrency, fast enough
395
            // to drown the runtime under sustained pressure. Demote
396
            // to `debug` and drop the per-emission path fields so it
397
            // stops costing serialization in the hot path.
398
            //
399
            // Other rename failures (EACCES, EXDEV, EBUSY, …) are
400
            // genuinely unexpected and stay at `warn` with full
401
            // context.
402
2
            if err.code == Code::NotFound {
403
2
                debug!(
404
2
                    key = ?encoded_file_path.key,
405
                    "Failed to rename file (already gone, treating as benign)",
406
                );
407
            } else {
408
0
                warn!(
409
0
                    key = ?encoded_file_path.key,
410
                    ?from_path,
411
                    ?to_path,
412
                    ?err,
413
                    "Failed to rename file",
414
                );
415
            }
416
        } else {
417
7
            debug!(
418
7
                key = ?encoded_file_path.key,
419
                ?from_path,
420
                ?to_path,
421
                "Renamed file (unref)",
422
            );
423
7
            encoded_file_path.path_type = PathType::Temp;
424
7
            encoded_file_path.key = new_key;
425
        }
426
11
    }
427
}
428
429
#[inline]
430
2
fn digest_from_filename(file_name: &str) -> Result<DigestInfo, Error> {
431
2
    let (hash, size) = file_name.split_once('-').err_tip(|| "")
?0
;
432
2
    let size = size.parse::<i64>()
?0
;
433
2
    DigestInfo::try_new(hash, size)
434
2
}
435
436
3
pub fn key_from_file(file_name: &str, file_type: FileType) -> Result<StoreKey<'_>, Error> {
437
3
    match file_type {
438
1
        FileType::String => Ok(StoreKey::new_str(file_name)),
439
2
        FileType::Digest => digest_from_filename(file_name).map(StoreKey::Digest),
440
    }
441
3
}
442
443
/// The number of files to read the metadata for at the same time when running
444
/// `add_files_to_cache`.
445
const SIMULTANEOUS_METADATA_READS: usize = 200;
446
447
type FsEvictingMap<'a, Fe> =
448
    EvictingMap<StoreKeyBorrow, StoreKey<'a>, Arc<Fe>, SystemTime, RemoveItemCallbackHolder>;
449
450
55
async fn add_files_to_cache<Fe: FileEntry>(
451
55
    evicting_map: &FsEvictingMap<'_, Fe>,
452
55
    anchor_time: &SystemTime,
453
55
    shared_context: &Arc<SharedContext>,
454
55
    block_size: u64,
455
55
    rename_fn: fn(&OsStr, &OsStr) -> Result<(), std::io::Error>,
456
55
) -> Result<(), Error> {
457
    #[expect(clippy::too_many_arguments)]
458
2
    async fn process_entry<Fe: FileEntry>(
459
2
        evicting_map: &FsEvictingMap<'_, Fe>,
460
2
        file_name: &str,
461
2
        file_type: FileType,
462
2
        atime: SystemTime,
463
2
        data_size: u64,
464
2
        block_size: u64,
465
2
        anchor_time: &SystemTime,
466
2
        shared_context: &Arc<SharedContext>,
467
2
    ) -> Result<(), Error> {
468
2
        let key = key_from_file(file_name, file_type)
?0
;
469
470
2
        let file_entry = Fe::create(
471
2
            data_size,
472
2
            block_size,
473
2
            RwLock::new(EncodedFilePath {
474
2
                shared_context: shared_context.clone(),
475
2
                path_type: PathType::Content,
476
2
                key: key.borrow().into_owned(),
477
2
            }),
478
        );
479
2
        let time_since_anchor = if let Ok(
d1
) = anchor_time.duration_since(atime) {
480
1
            d
481
        } else {
482
1
            warn!(
483
                %file_name,
484
1
                atime = %humantime::format_rfc3339(atime),
485
1
                anchor_time = %humantime::format_rfc3339(*anchor_time),
486
                "File access time newer than FilesystemStore start time",
487
            );
488
1
            Duration::ZERO
489
        };
490
2
        evicting_map
491
2
            .insert_with_time(
492
2
                key.into_owned().into(),
493
2
                Arc::new(file_entry),
494
2
                i32::try_from(time_since_anchor.as_secs()).unwrap_or(i32::MAX),
495
2
            )
496
2
            .await;
497
2
        Ok(())
498
2
    }
499
500
165
    async fn read_files(
501
165
        folder: Option<&str>,
502
165
        shared_context: &SharedContext,
503
165
    ) -> Result<Vec<(String, SystemTime, u64, bool)>, Error> {
504
        // Note: In Dec 2024 this is for backwards compatibility with the old
505
        // way files were stored on disk. Previously all files were in a single
506
        // folder regardless of the StoreKey type. This allows old versions of
507
        // nativelink file layout to be upgraded at startup time.
508
        // This logic can be removed once more time has passed.
509
165
        let read_dir = folder.map_or_else(
510
55
            || format!("{}/", shared_context.content_path),
511
110
            |folder| format!("{}/{folder}/", shared_context.content_path),
512
        );
513
514
165
        let (_permit, dir_handle) = fs::read_dir(read_dir)
515
165
            .await
516
165
            .err_tip(|| "Failed opening content directory for iterating in filesystem store")
?0
517
165
            .into_inner();
518
519
165
        let read_dir_stream = ReadDirStream::new(dir_handle);
520
165
        read_dir_stream
521
165
            .map(|dir_entry| async move 
{112
522
112
                let dir_entry = dir_entry.unwrap();
523
112
                let file_name = dir_entry.file_name().into_string().unwrap();
524
112
                let metadata = dir_entry
525
112
                    .metadata()
526
112
                    .await
527
112
                    .err_tip(|| "Failed to get metadata in filesystem store")
?0
;
528
                // We need to filter out folders - we do not want to try to cache the s and d folders.
529
112
                let is_file =
530
112
                    metadata.is_file() || !(
file_name == STR_FOLDER110
||
file_name == DIGEST_FOLDER55
);
531
                // Using access time is not perfect, but better than random. We do not update the
532
                // atime when a file is actually "touched", we rely on whatever the filesystem does
533
                // when we read the file (usually update on read).
534
112
                let atime = metadata
535
112
                    .accessed()
536
112
                    .or_else(|_| 
metadata0
.
modified0
())
537
112
                    .unwrap_or(SystemTime::UNIX_EPOCH);
538
112
                Result::<(String, SystemTime, u64, bool), Error>::Ok((
539
112
                    file_name,
540
112
                    atime,
541
112
                    metadata.len(),
542
112
                    is_file,
543
112
                ))
544
224
            })
545
165
            .buffer_unordered(SIMULTANEOUS_METADATA_READS)
546
165
            .try_collect()
547
165
            .await
548
165
    }
549
550
    /// Note: In Dec 2024 this is for backwards compatibility with the old
551
    /// way files were stored on disk. Previously all files were in a single
552
    /// folder regardless of the [`StoreKey`] type. This moves files from the old cache
553
    /// location to the new cache location, under [`DIGEST_FOLDER`].
554
55
    async fn move_old_cache(
555
55
        shared_context: &Arc<SharedContext>,
556
55
        rename_fn: fn(&OsStr, &OsStr) -> Result<(), std::io::Error>,
557
55
    ) -> Result<(), Error> {
558
55
        let file_infos = read_files(None, shared_context).await
?0
;
559
560
55
        let from_path = &shared_context.content_path;
561
562
55
        let to_path = format!("{}/{DIGEST_FOLDER}", shared_context.content_path);
563
564
55
        for (
file_name0
, _, _, _) in file_infos.into_iter().filter(|x| x.3) {
565
0
            let from_file: OsString = format!("{from_path}/{file_name}").into();
566
0
            let to_file: OsString = format!("{to_path}/{file_name}").into();
567
568
0
            if let Err(err) = rename_fn(&from_file, &to_file) {
569
0
                warn!(?from_file, ?to_file, ?err, "Failed to rename file",);
570
            } else {
571
0
                debug!(?from_file, ?to_file, "Renamed file (old cache)",);
572
            }
573
        }
574
55
        Ok(())
575
55
    }
576
577
110
    async fn add_files_to_cache<Fe: FileEntry>(
578
110
        evicting_map: &FsEvictingMap<'_, Fe>,
579
110
        anchor_time: &SystemTime,
580
110
        shared_context: &Arc<SharedContext>,
581
110
        block_size: u64,
582
110
        folder: &str,
583
110
    ) -> Result<(), Error> {
584
110
        let file_infos = read_files(Some(folder), shared_context).await
?0
;
585
110
        let file_type = match folder {
586
110
            STR_FOLDER => 
FileType::String55
,
587
55
            DIGEST_FOLDER => FileType::Digest,
588
0
            _ => panic!("Invalid folder type"),
589
        };
590
591
110
        let path_root = format!("{}/{folder}", shared_context.content_path);
592
593
110
        for (
file_name2
,
atime2
,
data_size2
, _) in file_infos.into_iter().filter(|x| x.3) {
594
2
            let result = process_entry(
595
2
                evicting_map,
596
2
                &file_name,
597
2
                file_type,
598
2
                atime,
599
2
                data_size,
600
2
                block_size,
601
2
                anchor_time,
602
2
                shared_context,
603
2
            )
604
2
            .await;
605
2
            if let Err(
err0
) = result {
606
0
                warn!(?file_name, ?err, "Failed to add file to eviction cache",);
607
                // Ignore result.
608
0
                drop(fs::remove_file(format!("{path_root}/{file_name}")).await);
609
2
            }
610
        }
611
110
        Ok(())
612
110
    }
613
614
55
    move_old_cache(shared_context, rename_fn).await
?0
;
615
616
55
    add_files_to_cache(
617
55
        evicting_map,
618
55
        anchor_time,
619
55
        shared_context,
620
55
        block_size,
621
55
        DIGEST_FOLDER,
622
55
    )
623
55
    .await
?0
;
624
625
55
    add_files_to_cache(
626
55
        evicting_map,
627
55
        anchor_time,
628
55
        shared_context,
629
55
        block_size,
630
55
        STR_FOLDER,
631
55
    )
632
55
    .await
?0
;
633
55
    Ok(())
634
55
}
635
636
55
async fn prune_temp_path(temp_path: &str) -> Result<(), Error> {
637
110
    async fn prune_temp_inner(temp_path: &str, subpath: &str) -> Result<(), Error> {
638
110
        let (_permit, dir_handle) = fs::read_dir(format!("{temp_path}/{subpath}"))
639
110
            .await
640
110
            .err_tip(
641
                || "Failed opening temp directory to prune partial downloads in filesystem store",
642
0
            )?
643
110
            .into_inner();
644
645
110
        let mut read_dir_stream = ReadDirStream::new(dir_handle);
646
110
        while let Some(
dir_entry0
) = read_dir_stream.next().await {
647
0
            let path = dir_entry?.path();
648
0
            if let Err(err) = fs::remove_file(&path).await {
649
0
                warn!(?path, ?err, "Failed to delete file",);
650
0
            }
651
        }
652
110
        Ok(())
653
110
    }
654
655
55
    prune_temp_inner(temp_path, STR_FOLDER).await
?0
;
656
55
    prune_temp_inner(temp_path, DIGEST_FOLDER).await
?0
;
657
55
    Ok(())
658
55
}
659
660
#[derive(Debug, MetricsComponent)]
661
pub struct FilesystemStore<Fe: FileEntry = FileEntryImpl> {
662
    #[metric]
663
    shared_context: Arc<SharedContext>,
664
    #[metric(group = "evicting_map")]
665
    evicting_map: Arc<FsEvictingMap<'static, Fe>>,
666
    #[metric(help = "Block size of the configured filesystem")]
667
    block_size: u64,
668
    #[metric(help = "Size of the configured read buffer size")]
669
    read_buffer_size: usize,
670
    weak_self: Weak<Self>,
671
    rename_fn: fn(&OsStr, &OsStr) -> Result<(), std::io::Error>,
672
    /// Limits concurrent write operations to prevent disk I/O saturation.
673
    write_semaphore: Option<Semaphore>,
674
}
675
676
impl<Fe: FileEntry> FilesystemStore<Fe> {
677
43
    pub async fn new(spec: &FilesystemSpec) -> Result<Arc<Self>, Error> {
678
88
        
Self::new_with_timeout_and_rename_fn43
(
spec43
, |from, to| std::fs::rename(from, to)).
await43
679
43
    }
680
681
55
    pub async fn new_with_timeout_and_rename_fn(
682
55
        spec: &FilesystemSpec,
683
55
        rename_fn: fn(&OsStr, &OsStr) -> Result<(), std::io::Error>,
684
55
    ) -> Result<Arc<Self>, Error> {
685
110
        async fn create_subdirs(path: &str) -> Result<(), Error> {
686
110
            fs::create_dir_all(format!("{path}/{STR_FOLDER}"))
687
110
                .await
688
110
                .err_tip(|| 
format!0
("Failed to create directory {path}/{STR_FOLDER}"))
?0
;
689
110
            fs::create_dir_all(format!("{path}/{DIGEST_FOLDER}"))
690
110
                .await
691
110
                .err_tip(|| 
format!0
("Failed to create directory {path}/{DIGEST_FOLDER}"))
692
110
        }
693
694
55
        let now = SystemTime::now();
695
696
55
        let empty_policy = nativelink_config::stores::EvictionPolicy::default();
697
55
        let eviction_policy = spec.eviction_policy.as_ref().unwrap_or(&empty_policy);
698
55
        let evicting_map = Arc::new(EvictingMap::new(eviction_policy, now));
699
700
        // Create temp and content directories and the s and d subdirectories.
701
702
55
        create_subdirs(&spec.temp_path).await
?0
;
703
55
        create_subdirs(&spec.content_path).await
?0
;
704
705
55
        let shared_context = Arc::new(SharedContext {
706
55
            active_drop_spawns: AtomicU64::new(0),
707
55
            temp_path: spec.temp_path.clone(),
708
55
            content_path: spec.content_path.clone(),
709
55
        });
710
711
55
        let block_size = if spec.block_size == 0 {
712
51
            DEFAULT_BLOCK_SIZE
713
        } else {
714
4
            spec.block_size
715
        };
716
55
        add_files_to_cache(
717
55
            evicting_map.as_ref(),
718
55
            &now,
719
55
            &shared_context,
720
55
            block_size,
721
55
            rename_fn,
722
55
        )
723
55
        .await
?0
;
724
55
        prune_temp_path(&shared_context.temp_path).await
?0
;
725
726
55
        let read_buffer_size = if spec.read_buffer_size == 0 {
727
40
            DEFAULT_BUFF_SIZE
728
        } else {
729
15
            spec.read_buffer_size as usize
730
        };
731
55
        let write_semaphore = if spec.max_concurrent_writes > 0 {
732
0
            Some(Semaphore::new(spec.max_concurrent_writes))
733
        } else {
734
55
            None
735
        };
736
55
        Ok(Arc::new_cyclic(|weak_self| Self {
737
55
            shared_context,
738
55
            evicting_map,
739
55
            block_size,
740
55
            read_buffer_size,
741
55
            weak_self: weak_self.clone(),
742
55
            rename_fn,
743
55
            write_semaphore,
744
55
        }))
745
55
    }
746
747
28
    pub fn get_arc(&self) -> Option<Arc<Self>> {
748
28
        self.weak_self.upgrade()
749
28
    }
750
751
10
    pub async fn get_file_entry_for_digest(&self, digest: &DigestInfo) -> Result<Arc<Fe>, Error> {
752
10
        if is_zero_digest(digest) {
753
1
            return Ok(Arc::new(Fe::create(
754
1
                0,
755
1
                0,
756
1
                RwLock::new(EncodedFilePath {
757
1
                    shared_context: self.shared_context.clone(),
758
1
                    path_type: PathType::Content,
759
1
                    key: digest.into(),
760
1
                }),
761
1
            )));
762
9
        }
763
9
        self.evicting_map
764
9
            .get(&digest.into())
765
9
            .await
766
9
            .ok_or_else(|| 
make_err!0
(
Code::NotFound0
, "{digest} not found in filesystem store. This may indicate the file was evicted due to cache pressure. Consider increasing 'max_bytes' in your filesystem store's eviction_policy configuration."))
767
10
    }
768
769
69
    async fn update_file(
770
69
        self: Pin<&Self>,
771
69
        mut entry: Fe,
772
69
        mut temp_file: fs::FileSlot,
773
69
        final_key: StoreKey<'static>,
774
69
        mut reader: DropCloserReadHalf,
775
69
    ) -> Result<(), Error> {
776
69
        let mut data_size = 0;
777
        loop {
778
143
            let mut data = reader
779
143
                .recv()
780
143
                .await
781
143
                .err_tip(|| "Failed to receive data in filesystem store")
?0
;
782
143
            let data_len = data.len();
783
143
            if data_len == 0 {
784
69
                break; // EOF.
785
74
            }
786
74
            temp_file
787
74
                .write_all_buf(&mut data)
788
74
                .await
789
74
                .err_tip(|| "Failed to write data into filesystem store")
?0
;
790
74
            data_size += data_len as u64;
791
        }
792
793
69
        let permit = if let Some(
sem0
) = &self.write_semaphore {
794
0
            Some(sem.acquire().await.map_err(|err| {
795
0
                Error::from_std_err(Code::Internal, &err).append("Write semaphore closed")
796
0
            })?)
797
        } else {
798
69
            None
799
        };
800
801
69
        temp_file
802
69
            .as_ref()
803
69
            .sync_all()
804
69
            .await
805
69
            .err_tip(|| "Failed to sync_data in filesystem store")
?0
;
806
807
69
        drop(permit);
808
809
69
        temp_file.advise_dontneed();
810
69
        trace!(?temp_file, "Dropping file to update_file");
811
69
        drop(temp_file);
812
813
69
        *entry.data_size_mut() = data_size;
814
69
        self.emplace_file(final_key, Arc::new(entry)).await
815
69
    }
816
817
99
    async fn emplace_file(&self, key: StoreKey<'static>, entry: Arc<Fe>) -> Result<(), Error> {
818
        // This sequence of events is quite tricky to understand due to the amount of triggers that
819
        // happen, async'ness of it and the locking. So here is a breakdown of what happens:
820
        // 1. Here will hold a write lock on any file operations of this FileEntry.
821
        // 2. Then insert the entry into the evicting map. This may trigger an eviction of other
822
        //    entries.
823
        // 3. Eviction triggers `unref()`, which grabs a write lock on the evicted FileEntry
824
        //    during the rename.
825
        // 4. It should be impossible for items to be added while eviction is happening, so there
826
        //    should not be a deadlock possibility. However, it is possible for the new FileEntry
827
        //    to be evicted before the file is moved into place. Eviction of the newly inserted
828
        //    item is not possible within the `insert()` call because the write lock inside the
829
        //    eviction map. If an eviction of new item happens after `insert()` but before
830
        //    `rename()` then we get to finish our operation because the `unref()` of the new item
831
        //    will be blocked on us because we currently have the lock.
832
        // 5. Move the file into place. Since we hold a write lock still anyone that gets our new
833
        //    FileEntry (which has not yet been placed on disk) will not be able to read the file's
834
        //    contents until we release the lock.
835
99
        let evicting_map = self.evicting_map.clone();
836
99
        let rename_fn = self.rename_fn;
837
838
        // We need to guarantee that this will get to the end even if the parent future is dropped.
839
        // See: https://github.com/TraceMachina/nativelink/issues/495
840
99
        background_spawn!("filesystem_store_emplace_file", async move {
841
99
            evicting_map
842
99
                .insert(key.borrow().into_owned().into(), entry.clone())
843
99
                .await;
844
845
            // The insert might have resulted in an eviction/unref so we need to check
846
            // it still exists in there. But first, get the lock...
847
99
            let mut encoded_file_path = entry.get_encoded_file_path().write().await;
848
            // Then check it's still in there...
849
99
            if evicting_map.get(&key).await.is_none() {
850
1
                info!(%key, "Got eviction while emplacing, dropping");
851
1
                return Ok(());
852
98
            }
853
854
98
            let final_path = get_file_path_raw(
855
98
                &PathType::Content,
856
98
                encoded_file_path.shared_context.as_ref(),
857
98
                &key,
858
            );
859
860
98
            let from_path = encoded_file_path.get_file_path();
861
            // Internally tokio spawns fs commands onto a blocking thread anyways.
862
            // Since we are already on a blocking thread, we just need the `fs` wrapper to manage
863
            // an open-file permit (ensure we don't open too many files at once).
864
98
            let result = (rename_fn)(&from_path, &final_path).err_tip(|| 
{1
865
1
                format!(
866
                    "Failed to rename temp file to final path {}",
867
1
                    final_path.display()
868
                )
869
1
            });
870
871
            // In the event our move from temp file to final file fails we need to ensure we remove
872
            // the entry from our map.
873
            // Remember: At this point it is possible for another thread to have a reference to
874
            // `entry`, so we can't delete the file, only drop() should ever delete files.
875
98
            if let Err(
err1
) = result {
876
1
                error!(?err, ?from_path, ?final_path, "Failed to rename file",);
877
                // Warning: To prevent deadlock we need to release our lock or during `remove_if()`
878
                // it will call `unref()`, which triggers a write-lock on `encoded_file_path`.
879
1
                drop(encoded_file_path);
880
                // It is possible that the item in our map is no longer the item we inserted,
881
                // So, we need to conditionally remove it only if the pointers are the same.
882
883
1
                evicting_map
884
1
                    .remove_if(&key, |map_entry| Arc::<Fe>::ptr_eq(map_entry, &entry))
885
1
                    .await;
886
1
                return Err(err);
887
97
            }
888
97
            encoded_file_path.path_type = PathType::Content;
889
97
            encoded_file_path.key = key;
890
97
            Ok(())
891
99
        })
892
99
        .await
893
98
        .err_tip(|| "Failed to create spawn in filesystem store update_file")
?0
894
98
    }
895
}
896
897
#[async_trait]
898
impl<Fe: FileEntry> StoreDriver for FilesystemStore<Fe> {
899
    async fn has_with_results(
900
        self: Pin<&Self>,
901
        keys: &[StoreKey<'_>],
902
        results: &mut [Option<u64>],
903
90
    ) -> Result<(), Error> {
904
        let own_keys = keys
905
            .iter()
906
90
            .map(|sk| sk.borrow().into_owned())
907
            .collect::<Vec<_>>();
908
        self.evicting_map
909
            .sizes_for_keys(own_keys.iter(), results, false /* peek */)
910
            .await;
911
        // We need to do a special pass to ensure our zero files exist.
912
        // If our results failed and the result was a zero file, we need to
913
        // create the file by spec.
914
        for (key, result) in keys.iter().zip(results.iter_mut()) {
915
            if result.is_some() || !is_zero_digest(key.borrow()) {
916
                continue;
917
            }
918
            let (mut tx, rx) = make_buf_channel_pair();
919
            let send_eof_result = tx.send_eof();
920
            self.update(key.borrow(), rx, UploadSizeInfo::ExactSize(0))
921
                .await
922
0
                .err_tip(|| format!("Failed to create zero file for key {}", key.as_str()))
923
                .merge(
924
                    send_eof_result
925
                        .err_tip(|| "Failed to send zero file EOF in filesystem store has"),
926
                )?;
927
928
            *result = Some(0);
929
        }
930
        Ok(())
931
90
    }
932
933
    async fn update(
934
        self: Pin<&Self>,
935
        key: StoreKey<'_>,
936
        mut reader: DropCloserReadHalf,
937
        _upload_size: UploadSizeInfo,
938
137
    ) -> Result<(), Error> {
939
        if is_zero_digest(key.borrow()) {
940
            // don't need to add, because zero length files are just assumed to exist
941
            return Ok(());
942
        }
943
944
        let temp_key = make_temp_key(&key);
945
946
        // There's a possibility of deadlock here where we take all of the
947
        // file semaphores with make_and_open_file and the semaphores for
948
        // whatever is populating reader is exhasted on the threads that
949
        // have the FileSlots and not on those which can't.  To work around
950
        // this we don't take the FileSlot until there's something on the
951
        // reader available to know that the populator is active.
952
        reader.peek().await?;
953
954
        let (entry, temp_file, temp_full_path) = Fe::make_and_open_file(
955
            self.block_size,
956
            EncodedFilePath {
957
                shared_context: self.shared_context.clone(),
958
                path_type: PathType::Temp,
959
                key: temp_key,
960
            },
961
        )
962
        .await?;
963
964
        self.update_file(entry, temp_file, key.into_owned(), reader)
965
            .await
966
1
            .err_tip(|| {
967
1
                format!(
968
                    "While processing with temp file {}",
969
1
                    temp_full_path.display()
970
                )
971
1
            })
972
137
    }
973
974
107
    fn optimized_for(&self, optimization: StoreOptimizations) -> bool {
975
101
        matches!(
976
107
            optimization,
977
            StoreOptimizations::FileUpdates | StoreOptimizations::SubscribesToUpdateOneshot
978
        )
979
107
    }
980
981
23
    async fn update_oneshot(self: Pin<&Self>, key: StoreKey<'_>, data: Bytes) -> Result<(), Error> {
982
        if is_zero_digest(key.borrow()) {
983
            return Ok(());
984
        }
985
986
        let temp_key = make_temp_key(&key);
987
        let (mut entry, mut temp_file, temp_full_path) = Fe::make_and_open_file(
988
            self.block_size,
989
            EncodedFilePath {
990
                shared_context: self.shared_context.clone(),
991
                path_type: PathType::Temp,
992
                key: temp_key,
993
            },
994
        )
995
        .await
996
        .err_tip(|| "Failed to create temp file in filesystem store update_oneshot")?;
997
998
        // Write directly without channel overhead
999
        if !data.is_empty() {
1000
            temp_file
1001
                .write_all(&data)
1002
                .await
1003
0
                .err_tip(|| format!("Failed to write data to {}", temp_full_path.display()))?;
1004
        }
1005
1006
        let _permit = if let Some(sem) = &self.write_semaphore {
1007
0
            Some(sem.acquire().await.map_err(|err| {
1008
0
                Error::from_std_err(Code::Internal, &err).append("Write semaphore closed")
1009
0
            })?)
1010
        } else {
1011
            None
1012
        };
1013
1014
        temp_file
1015
            .as_ref()
1016
            .sync_all()
1017
            .await
1018
            .err_tip(|| "Failed to sync_data in filesystem store update_oneshot")?;
1019
1020
        drop(_permit);
1021
1022
        temp_file.advise_dontneed();
1023
        drop(temp_file);
1024
1025
        *entry.data_size_mut() = data.len() as u64;
1026
        self.emplace_file(key.into_owned(), Arc::new(entry)).await
1027
23
    }
1028
1029
    async fn update_with_whole_file(
1030
        self: Pin<&Self>,
1031
        key: StoreKey<'_>,
1032
        path: OsString,
1033
        file: fs::FileSlot,
1034
        upload_size: UploadSizeInfo,
1035
9
    ) -> Result<Option<fs::FileSlot>, Error> {
1036
        let file_size = match upload_size {
1037
            UploadSizeInfo::ExactSize(size) => size,
1038
            UploadSizeInfo::MaxSize(_) => file
1039
                .as_ref()
1040
                .metadata()
1041
                .await
1042
0
                .err_tip(|| format!("While reading metadata for {}", path.display()))?
1043
                .len(),
1044
        };
1045
        if file_size == 0 {
1046
            // don't need to add, because zero length files are just assumed to exist
1047
            return Ok(None);
1048
        }
1049
        let entry = Fe::create(
1050
            file_size,
1051
            self.block_size,
1052
            RwLock::new(EncodedFilePath {
1053
                shared_context: self.shared_context.clone(),
1054
                path_type: PathType::Custom(path),
1055
                key: key.borrow().into_owned(),
1056
            }),
1057
        );
1058
        // We are done with the file, if we hold a reference to the file here, it could
1059
        // result in a deadlock if `emplace_file()` also needs file descriptors.
1060
        trace!(?file, "Dropping file to to update_with_whole_file");
1061
        file.advise_dontneed();
1062
        drop(file);
1063
        self.emplace_file(key.into_owned(), Arc::new(entry))
1064
            .await
1065
            .err_tip(|| "Could not move file into store in upload_file_to_store, maybe dest is on different volume?")?;
1066
        return Ok(None);
1067
9
    }
1068
1069
    async fn get_part(
1070
        self: Pin<&Self>,
1071
        key: StoreKey<'_>,
1072
        writer: &mut DropCloserWriteHalf,
1073
        offset: u64,
1074
        length: Option<u64>,
1075
70
    ) -> Result<(), Error> {
1076
        if is_zero_digest(key.borrow()) {
1077
            self.has(key.borrow())
1078
                .await
1079
                .err_tip(|| "Failed to check if zero digest exists in filesystem store")?;
1080
            writer
1081
                .send_eof()
1082
                .err_tip(|| "Failed to send zero EOF in filesystem store get_part")?;
1083
            return Ok(());
1084
        }
1085
        let owned_key = key.into_owned();
1086
2
        let entry = self.evicting_map.get(&owned_key).await.ok_or_else(|| {
1087
2
            make_err!(
1088
2
                Code::NotFound,
1089
                "{} not found in filesystem store here",
1090
2
                owned_key.as_str()
1091
            )
1092
2
        })?;
1093
        let read_limit = length.unwrap_or(u64::MAX);
1094
2
        let mut temp_file = entry.read_file_part(offset, read_limit).or_else(|err| async move {
1095
            // If the file is not found, we need to remove it from the eviction map.
1096
2
            if err.code == Code::NotFound {
1097
2
                error!(
1098
                    ?err,
1099
                    key = ?owned_key,
1100
                    "Entry was in our map, but not found on disk. Removing from map as a precaution, but process probably need restarted."
1101
                );
1102
2
                self.evicting_map.remove(&owned_key).await;
1103
0
            }
1104
2
            Err(err)
1105
4
        }).await?;
1106
1107
        loop {
1108
            let mut buf = BytesMut::with_capacity(self.read_buffer_size);
1109
            temp_file
1110
                .read_buf(&mut buf)
1111
                .await
1112
                .err_tip(|| "Failed to read data in filesystem store")?;
1113
            if buf.is_empty() {
1114
                break; // EOF.
1115
            }
1116
            writer
1117
                .send(buf.freeze())
1118
                .await
1119
                .err_tip(|| "Failed to send chunk in filesystem store get_part")?;
1120
        }
1121
        temp_file.get_ref().advise_dontneed();
1122
        writer
1123
            .send_eof()
1124
            .err_tip(|| "Filed to send EOF in filesystem store get_part")?;
1125
1126
        Ok(())
1127
70
    }
1128
1129
129
    fn inner_store(&self, _digest: Option<StoreKey>) -> &dyn StoreDriver {
1130
129
        self
1131
129
    }
1132
1133
28
    fn as_any<'a>(&'a self) -> &'a (dyn core::any::Any + Sync + Send + 'static) {
1134
28
        self
1135
28
    }
1136
1137
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Sync + Send + 'static> {
1138
0
        self
1139
0
    }
1140
1141
0
    fn register_health(self: Arc<Self>, registry: &mut HealthRegistryBuilder) {
1142
0
        registry.register_indicator(self);
1143
0
    }
1144
1145
0
    fn register_remove_callback(
1146
0
        self: Arc<Self>,
1147
0
        callback: Arc<dyn RemoveItemCallback>,
1148
0
    ) -> Result<(), Error> {
1149
0
        self.evicting_map
1150
0
            .add_remove_callback(RemoveItemCallbackHolder::new(callback));
1151
0
        Ok(())
1152
0
    }
1153
}
1154
1155
#[async_trait]
1156
impl<Fe: FileEntry> HealthStatusIndicator for FilesystemStore<Fe> {
1157
0
    fn get_name(&self) -> &'static str {
1158
0
        "FilesystemStore"
1159
0
    }
1160
1161
0
    async fn check_health(&self, namespace: Cow<'static, str>) -> HealthStatus {
1162
        StoreDriver::check_health(Pin::new(self), namespace).await
1163
0
    }
1164
}