Coverage Report

Created: 2025-03-08 07:13

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-store/src/filesystem_store.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::borrow::Cow;
16
use std::ffi::{OsStr, OsString};
17
use std::fmt::{Debug, Formatter};
18
use std::pin::Pin;
19
use std::sync::atomic::{AtomicU64, Ordering};
20
use std::sync::{Arc, Weak};
21
use std::time::SystemTime;
22
23
use async_lock::RwLock;
24
use async_trait::async_trait;
25
use bytes::BytesMut;
26
use futures::stream::{StreamExt, TryStreamExt};
27
use futures::{Future, TryFutureExt};
28
use nativelink_config::stores::FilesystemSpec;
29
use nativelink_error::{make_err, make_input_err, Code, Error, ResultExt};
30
use nativelink_metric::MetricsComponent;
31
use nativelink_util::background_spawn;
32
use nativelink_util::buf_channel::{
33
    make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf,
34
};
35
use nativelink_util::common::{fs, DigestInfo};
36
use nativelink_util::evicting_map::{EvictingMap, LenEntry};
37
use nativelink_util::health_utils::{HealthRegistryBuilder, HealthStatus, HealthStatusIndicator};
38
use nativelink_util::store_trait::{
39
    StoreDriver, StoreKey, StoreKeyBorrow, StoreOptimizations, UploadSizeInfo,
40
};
41
use tokio::io::{AsyncReadExt, AsyncWriteExt, Take};
42
use tokio_stream::wrappers::ReadDirStream;
43
use tracing::{event, Level};
44
45
use crate::cas_utils::is_zero_digest;
46
47
// Default size to allocate memory of the buffer when reading files.
48
const DEFAULT_BUFF_SIZE: usize = 32 * 1024;
49
// Default block size of all major filesystems is 4KB
50
const DEFAULT_BLOCK_SIZE: u64 = 4 * 1024;
51
52
pub const STR_FOLDER: &str = "s";
53
pub const DIGEST_FOLDER: &str = "d";
54
55
#[derive(Clone, Copy, Debug)]
56
pub enum FileType {
57
    Digest,
58
    String,
59
}
60
61
#[derive(Debug, MetricsComponent)]
62
pub struct SharedContext {
63
    // Used in testing to know how many active drop() spawns are running.
64
    // TODO(allada) It is probably a good idea to use a spin lock during
65
    // destruction of the store to ensure that all files are actually
66
    // deleted (similar to how it is done in tests).
67
    #[metric(help = "Number of active drop spawns")]
68
    pub active_drop_spawns: AtomicU64,
69
    #[metric(help = "Path to the configured temp path")]
70
    temp_path: String,
71
    #[metric(help = "Path to the configured content path")]
72
    content_path: String,
73
}
74
75
#[derive(Eq, PartialEq, Debug)]
76
enum PathType {
77
    Content,
78
    Temp,
79
    Custom(OsString),
80
}
81
82
/// [`EncodedFilePath`] stores the path to the file
83
/// including the context, path type and key to the file.
84
/// The whole [`StoreKey`] is stored as opposed to solely
85
/// the [`DigestInfo`] so that it is more usable for things
86
/// such as BEP -see Issue #1108
87
pub struct EncodedFilePath {
88
    shared_context: Arc<SharedContext>,
89
    path_type: PathType,
90
    key: StoreKey<'static>,
91
}
92
93
impl EncodedFilePath {
94
    #[inline]
95
330
    fn get_file_path(&self) -> Cow<'_, OsStr> {
96
330
        get_file_path_raw(&self.path_type, self.shared_context.as_ref(), &self.key)
97
330
    }
98
}
99
100
#[inline]
101
447
fn get_file_path_raw<'a>(
102
447
    path_type: &'a PathType,
103
447
    shared_context: &SharedContext,
104
447
    key: &StoreKey<'a>,
105
447
) -> Cow<'a, OsStr> {
106
447
    let 
folder438
= match path_type {
107
195
        PathType::Content => &shared_context.content_path,
108
243
        PathType::Temp => &shared_context.temp_path,
109
9
        PathType::Custom(path) => return Cow::Borrowed(path),
110
    };
111
438
    Cow::Owned(to_full_path_from_key(folder, key))
112
447
}
113
114
impl Drop for EncodedFilePath {
115
118
    fn drop(&mut self) {
116
118
        // `drop()` can be called during shutdown, so we use `path_type` flag to know if the
117
118
        // file actually needs to be deleted.
118
118
        if self.path_type == PathType::Content {
  Branch (118:12): [True: 92, False: 26]
  Branch (118:12): [Folded - Ignored]
119
92
            return;
120
26
        }
121
26
122
26
        let file_path = self.get_file_path().to_os_string();
123
26
        let shared_context = self.shared_context.clone();
124
26
        shared_context
125
26
            .active_drop_spawns
126
26
            .fetch_add(1, Ordering::Relaxed);
127
26
        background_spawn!("filesystem_delete_file", async move {
128
25
            event!(Level::INFO, ?file_path, 
"File deleted"0
,);
129
25
            let 
result23
= fs::remove_file(&file_path)
130
25
                .await
131
23
                .err_tip(|| 
format!("Failed to remove file {file_path:?}")0
);
132
23
            if let Err(
err0
) = result {
  Branch (132:20): [True: 0, False: 23]
  Branch (132:20): [Folded - Ignored]
133
0
                event!(Level::ERROR, ?file_path, ?err, "Failed to delete file",);
134
23
            }
135
23
            shared_context
136
23
                .active_drop_spawns
137
23
                .fetch_sub(1, Ordering::Relaxed);
138
26
        });
139
118
    }
140
}
141
142
/// This creates the file path from the [`StoreKey`]. If
143
/// it is a string, the string, prefixed with [`STR_PREFIX`]
144
/// for backwards compatibility, is stored.
145
///
146
/// If it is a [`DigestInfo`], it is prefixed by [`DIGEST_PREFIX`]
147
/// followed by the string representation of a digest - the hash in hex,
148
/// a hyphen then the size in bytes
149
///
150
/// Previously, only the string representation of the [`DigestInfo`] was
151
/// used with no prefix
152
#[inline]
153
465
fn to_full_path_from_key(folder: &str, key: &StoreKey<'_>) -> OsString {
154
465
    match key {
155
3
        StoreKey::Str(str) => format!("{folder}/{STR_FOLDER}/{str}"),
156
462
        StoreKey::Digest(digest_info) => format!("{folder}/{DIGEST_FOLDER}/{digest_info}"),
157
    }
158
465
    .into()
159
465
}
160
161
pub trait FileEntry: LenEntry + Send + Sync + Debug + 'static {
162
    /// Responsible for creating the underlying `FileEntry`.
163
    fn create(data_size: u64, block_size: u64, encoded_file_path: RwLock<EncodedFilePath>) -> Self;
164
165
    /// Creates a (usually) temp file, opens it and returns the path to the temp file.
166
    fn make_and_open_file(
167
        block_size: u64,
168
        encoded_file_path: EncodedFilePath,
169
    ) -> impl Future<Output = Result<(Self, fs::FileSlot, OsString), Error>> + Send
170
    where
171
        Self: Sized;
172
173
    /// Returns the underlying reference to the size of the data in bytes
174
    fn data_size_mut(&mut self) -> &mut u64;
175
176
    /// Returns the actual size of the underlying file on the disk after accounting for filesystem block size.
177
    fn size_on_disk(&self) -> u64;
178
179
    /// Gets the underlying `EncodedfilePath`.
180
    fn get_encoded_file_path(&self) -> &RwLock<EncodedFilePath>;
181
182
    /// Returns a reader that will read part of the underlying file.
183
    fn read_file_part(
184
        &self,
185
        offset: u64,
186
        length: u64,
187
    ) -> impl Future<Output = Result<Take<fs::FileSlot>, Error>> + Send;
188
189
    /// This function is a safe way to extract the file name of the underlying file. To protect users from
190
    /// accidentally creating undefined behavior we encourage users to do the logic they need to do with
191
    /// the filename inside this function instead of extracting the filename and doing the logic outside.
192
    /// This is because the filename is not guaranteed to exist after this function returns, however inside
193
    /// the callback the file is always guaranteed to exist and immutable.
194
    /// DO NOT USE THIS FUNCTION TO EXTRACT THE FILENAME AND STORE IT FOR LATER USE.
195
    fn get_file_path_locked<
196
        T,
197
        Fut: Future<Output = Result<T, Error>> + Send,
198
        F: FnOnce(OsString) -> Fut + Send,
199
    >(
200
        &self,
201
        handler: F,
202
    ) -> impl Future<Output = Result<T, Error>> + Send;
203
}
204
205
pub struct FileEntryImpl {
206
    data_size: u64,
207
    block_size: u64,
208
    encoded_file_path: RwLock<EncodedFilePath>,
209
}
210
211
impl FileEntryImpl {
212
9
    pub fn get_shared_context_for_test(&mut self) -> Arc<SharedContext> {
213
9
        self.encoded_file_path.get_mut().shared_context.clone()
214
9
    }
215
}
216
217
impl FileEntry for FileEntryImpl {
218
118
    fn create(data_size: u64, block_size: u64, encoded_file_path: RwLock<EncodedFilePath>) -> Self {
219
118
        Self {
220
118
            data_size,
221
118
            block_size,
222
118
            encoded_file_path,
223
118
        }
224
118
    }
225
226
    /// This encapsulates the logic for the edge case of if the file fails to create
227
    /// the cleanup of the file is handled without creating a `FileEntry`, which would
228
    /// try to cleanup the file as well during `drop()`.
229
108
    async fn make_and_open_file(
230
108
        block_size: u64,
231
108
        encoded_file_path: EncodedFilePath,
232
108
    ) -> Result<(FileEntryImpl, fs::FileSlot, OsString), Error> {
233
108
        let temp_full_path = encoded_file_path.get_file_path().to_os_string();
234
108
        let temp_file_result = fs::create_file(temp_full_path.clone())
235
108
            .or_else(|mut err| async 
{0
236
0
                let remove_result = fs::remove_file(&temp_full_path).await.err_tip(|| {
237
0
                    format!("Failed to remove file {temp_full_path:?} in filesystem store")
238
0
                });
239
0
                if let Err(remove_err) = remove_result {
  Branch (239:24): [True: 0, False: 0]
  Branch (239:24): [True: 0, False: 0]
  Branch (239:24): [True: 0, False: 0]
  Branch (239:24): [Folded - Ignored]
  Branch (239:24): [True: 0, False: 0]
240
0
                    err = err.merge(remove_err);
241
0
                }
242
0
                event!(
243
0
                    Level::WARN,
244
                    ?err,
245
                    ?block_size,
246
                    ?temp_full_path,
247
0
                    "Failed to create file",
248
                );
249
0
                Err(err)
250
0
                    .err_tip(|| format!("Failed to create {temp_full_path:?} in filesystem store"))
251
108
            
}0
)
252
108
            .await
?0
;
253
254
108
        Ok((
255
108
            <FileEntryImpl as FileEntry>::create(
256
108
                0, /* Unknown yet, we will fill it in later */
257
108
                block_size,
258
108
                RwLock::new(encoded_file_path),
259
108
            ),
260
108
            temp_file_result,
261
108
            temp_full_path,
262
108
        ))
263
108
    }
264
265
108
    fn data_size_mut(&mut self) -> &mut u64 {
266
108
        &mut self.data_size
267
108
    }
268
269
245
    fn size_on_disk(&self) -> u64 {
270
245
        self.data_size.div_ceil(self.block_size) * self.block_size
271
245
    }
272
273
169
    fn get_encoded_file_path(&self) -> &RwLock<EncodedFilePath> {
274
169
        &self.encoded_file_path
275
169
    }
276
277
46
    fn read_file_part(
278
46
        &self,
279
46
        offset: u64,
280
46
        length: u64,
281
46
    ) -> impl Future<Output = Result<Take<fs::FileSlot>, Error>> + Send {
282
46
        self.get_file_path_locked(move |full_content_path| async move {
283
46
            let 
file44
= fs::open_file(&full_content_path, offset, length)
284
46
                .await
285
46
                .err_tip(|| {
286
2
                    format!("Failed to open file in filesystem store {full_content_path:?}")
287
46
                })
?2
;
288
44
            Ok(file)
289
92
        })
290
46
    }
291
292
52
    async fn get_file_path_locked<
293
52
        T,
294
52
        Fut: Future<Output = Result<T, Error>> + Send,
295
52
        F: FnOnce(OsString) -> Fut + Send,
296
52
    >(
297
52
        &self,
298
52
        handler: F,
299
52
    ) -> Result<T, Error> {
300
52
        let encoded_file_path = self.get_encoded_file_path().read().await;
301
52
        handler(encoded_file_path.get_file_path().to_os_string()).await
302
52
    }
303
}
304
305
impl Debug for FileEntryImpl {
306
0
    fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), std::fmt::Error> {
307
0
        f.debug_struct("FileEntryImpl")
308
0
            .field("data_size", &self.data_size)
309
0
            .field("encoded_file_path", &"<behind mutex>")
310
0
            .finish()
311
0
    }
312
}
313
314
135
fn make_temp_digest(mut digest: DigestInfo) -> DigestInfo {
315
    static DELETE_FILE_COUNTER: AtomicU64 = AtomicU64::new(0);
316
135
    let mut hash = *digest.packed_hash();
317
135
    hash[24..].clone_from_slice(
318
135
        &DELETE_FILE_COUNTER
319
135
            .fetch_add(1, Ordering::Relaxed)
320
135
            .to_le_bytes(),
321
135
    );
322
135
    digest.set_packed_hash(*hash);
323
135
    digest
324
135
}
325
326
135
fn make_temp_key(key: &StoreKey) -> StoreKey<'static> {
327
135
    StoreKey::Digest(make_temp_digest(key.borrow().into_digest()))
328
135
}
329
330
impl LenEntry for FileEntryImpl {
331
    #[inline]
332
243
    fn len(&self) -> u64 {
333
243
        self.size_on_disk()
334
243
    }
335
336
0
    fn is_empty(&self) -> bool {
337
0
        self.data_size == 0
338
0
    }
339
340
    // unref() only triggers when an item is removed from the eviction_map. It is possible
341
    // that another place in code has a reference to `FileEntryImpl` and may later read the
342
    // file. To support this edge case, we first move the file to a temp file and point
343
    // target file location to the new temp file. `unref()` should only ever be called once.
344
    #[inline]
345
28
    async fn unref(&self) {
346
        {
347
28
            let mut encoded_file_path = self.encoded_file_path.write().await;
348
28
            if encoded_file_path.path_type == PathType::Temp {
  Branch (348:16): [True: 0, False: 0]
  Branch (348:16): [True: 1, False: 9]
  Branch (348:16): [True: 0, False: 0]
  Branch (348:16): [True: 0, False: 0]
  Branch (348:16): [Folded - Ignored]
  Branch (348:16): [True: 0, False: 18]
349
                // We are already a temp file that is now marked for deletion on drop.
350
                // This is very rare, but most likely the rename into the content path failed.
351
1
                return;
352
27
            }
353
27
            let from_path = encoded_file_path.get_file_path();
354
27
            let new_key = make_temp_key(&encoded_file_path.key);
355
27
356
27
            let to_path =
357
27
                to_full_path_from_key(&encoded_file_path.shared_context.temp_path, &new_key);
358
359
27
            if let Err(
err2
) = fs::rename(&from_path, &to_path).await {
  Branch (359:20): [True: 0, False: 0]
  Branch (359:20): [True: 2, False: 7]
  Branch (359:20): [True: 0, False: 0]
  Branch (359:20): [True: 0, False: 0]
  Branch (359:20): [Folded - Ignored]
  Branch (359:20): [True: 0, False: 18]
360
2
                event!(
361
2
                    Level::WARN,
362
2
                    key = ?encoded_file_path.key,
363
2
                    ?from_path,
364
2
                    ?to_path,
365
2
                    ?err,
366
2
                    "Failed to rename file",
367
                );
368
            } else {
369
25
                event!(
370
25
                    Level::INFO,
371
0
                    key = ?encoded_file_path.key,
372
0
                    ?from_path,
373
0
                    ?to_path,
374
0
                    "Renamed file",
375
                );
376
25
                encoded_file_path.path_type = PathType::Temp;
377
25
                encoded_file_path.key = new_key;
378
            }
379
        }
380
28
    }
381
}
382
383
#[inline]
384
2
fn digest_from_filename(file_name: &str) -> Result<DigestInfo, Error> {
385
2
    let (hash, size) = file_name.split_once('-').err_tip(|| 
""0
)
?0
;
386
2
    let size = size.parse::<i64>()
?0
;
387
2
    DigestInfo::try_new(hash, size)
388
2
}
389
390
2
pub fn key_from_file(file_name: &str, file_type: FileType) -> Result<StoreKey<'_>, Error> {
391
2
    match file_type {
392
0
        FileType::String => Ok(StoreKey::new_str(file_name)),
393
2
        FileType::Digest => digest_from_filename(file_name).map(StoreKey::Digest),
394
    }
395
2
}
396
397
/// The number of files to read the metadata for at the same time when running
398
/// `add_files_to_cache`.
399
const SIMULTANEOUS_METADATA_READS: usize = 200;
400
401
41
async fn add_files_to_cache<Fe: FileEntry>(
402
41
    evicting_map: &EvictingMap<StoreKeyBorrow, Arc<Fe>, SystemTime>,
403
41
    anchor_time: &SystemTime,
404
41
    shared_context: &Arc<SharedContext>,
405
41
    block_size: u64,
406
41
    rename_fn: fn(&OsStr, &OsStr) -> Result<(), std::io::Error>,
407
41
) -> Result<(), Error> {
408
    #[expect(clippy::too_many_arguments)]
409
1
    async fn process_entry<Fe: FileEntry>(
410
1
        evicting_map: &EvictingMap<StoreKeyBorrow, Arc<Fe>, SystemTime>,
411
1
        file_name: &str,
412
1
        file_type: FileType,
413
1
        atime: SystemTime,
414
1
        data_size: u64,
415
1
        block_size: u64,
416
1
        anchor_time: &SystemTime,
417
1
        shared_context: &Arc<SharedContext>,
418
1
    ) -> Result<(), Error> {
419
1
        let key = key_from_file(file_name, file_type)
?0
;
420
421
1
        let file_entry = Fe::create(
422
1
            data_size,
423
1
            block_size,
424
1
            RwLock::new(EncodedFilePath {
425
1
                shared_context: shared_context.clone(),
426
1
                path_type: PathType::Content,
427
1
                key: key.borrow().into_owned(),
428
1
            }),
429
1
        );
430
1
        let time_since_anchor = anchor_time
431
1
            .duration_since(atime)
432
1
            .map_err(|_| 
make_input_err!("File access time newer than now")0
)
?0
;
433
1
        evicting_map
434
1
            .insert_with_time(
435
1
                key.into_owned().into(),
436
1
                Arc::new(file_entry),
437
1
                time_since_anchor.as_secs() as i32,
438
1
            )
439
1
            .await;
440
1
        Ok(())
441
1
    }
442
443
123
    async fn read_files(
444
123
        folder: Option<&str>,
445
123
        shared_context: &SharedContext,
446
123
    ) -> Result<Vec<(String, SystemTime, u64, bool)>, Error> {
447
        // Note: In Dec 2024 this is for backwards compatibility with the old
448
        // way files were stored on disk. Previously all files were in a single
449
        // folder regardless of the StoreKey type. This allows old versions of
450
        // nativelink file layout to be upgraded at startup time.
451
        // This logic can be removed once more time has passed.
452
123
        let read_dir = if let Some(
folder82
) = folder {
  Branch (452:31): [True: 0, False: 0]
  Branch (452:31): [True: 32, False: 16]
  Branch (452:31): [True: 4, False: 2]
  Branch (452:31): [Folded - Ignored]
  Branch (452:31): [True: 46, False: 23]
453
82
            format!("{}/{folder}/", shared_context.content_path)
454
        } else {
455
41
            format!("{}/", shared_context.content_path)
456
        };
457
123
        let (_permit, dir_handle) = fs::read_dir(read_dir)
458
123
            .await
459
123
            .err_tip(|| 
"Failed opening content directory for iterating in filesystem store"0
)
?0
460
123
            .into_inner();
461
123
462
123
        let read_dir_stream = ReadDirStream::new(dir_handle);
463
123
        read_dir_stream
464
123
            .map(|dir_entry| async move {
465
83
                let dir_entry = dir_entry.unwrap();
466
83
                let file_name = dir_entry.file_name().into_string().unwrap();
467
83
                let metadata = dir_entry
468
83
                    .metadata()
469
83
                    .await
470
83
                    .err_tip(|| 
"Failed to get metadata in filesystem store"0
)
?0
;
471
                // We need to filter out folders - we do not want to try to cache the s and d folders.
472
83
                let is_file =
473
83
                    metadata.is_file() || !(
file_name == STR_FOLDER82
||
file_name == DIGEST_FOLDER41
);
  Branch (473:21): [True: 0, False: 0]
  Branch (473:45): [True: 0, False: 0]
  Branch (473:21): [True: 1, False: 32]
  Branch (473:45): [True: 16, False: 16]
  Branch (473:21): [True: 0, False: 4]
  Branch (473:45): [True: 2, False: 2]
  Branch (473:21): [Folded - Ignored]
  Branch (473:45): [Folded - Ignored]
  Branch (473:21): [True: 0, False: 46]
  Branch (473:45): [True: 23, False: 23]
474
                // Using access time is not perfect, but better than random. We do not update the
475
                // atime when a file is actually "touched", we rely on whatever the filesystem does
476
                // when we read the file (usually update on read).
477
83
                let atime = metadata
478
83
                    .accessed()
479
83
                    .or_else(|_| 
metadata.modified()0
)
480
83
                    .unwrap_or(SystemTime::UNIX_EPOCH);
481
83
                Result::<(String, SystemTime, u64, bool), Error>::Ok((
482
83
                    file_name,
483
83
                    atime,
484
83
                    metadata.len(),
485
83
                    is_file,
486
83
                ))
487
166
            })
488
123
            .buffer_unordered(SIMULTANEOUS_METADATA_READS)
489
123
            .try_collect()
490
123
            .await
491
123
    }
492
493
    /// Note: In Dec 2024 this is for backwards compatibility with the old
494
    /// way files were stored on disk. Previously all files were in a single
495
    /// folder regardless of the [`StoreKey`] type. This moves files from the old cache
496
    /// location to the new cache location, under [`DIGEST_FOLDER`].
497
41
    async fn move_old_cache(
498
41
        shared_context: &Arc<SharedContext>,
499
41
        rename_fn: fn(&OsStr, &OsStr) -> Result<(), std::io::Error>,
500
41
    ) -> Result<(), Error> {
501
41
        let file_infos = read_files(None, shared_context).await
?0
;
502
503
41
        let from_path = shared_context.content_path.to_string();
504
41
505
41
        let to_path = format!("{}/{DIGEST_FOLDER}", shared_context.content_path);
506
507
82
        for (
file_name0
, _, _, _) in
file_infos.into_iter().filter(41
|x| x.3
)41
{
508
0
            let from_file: OsString = format!("{from_path}/{file_name}").into();
509
0
            let to_file: OsString = format!("{to_path}/{file_name}").into();
510
511
0
            if let Err(err) = rename_fn(&from_file, &to_file) {
  Branch (511:20): [True: 0, False: 0]
  Branch (511:20): [True: 0, False: 0]
  Branch (511:20): [True: 0, False: 0]
  Branch (511:20): [Folded - Ignored]
  Branch (511:20): [True: 0, False: 0]
512
0
                event!(
513
0
                    Level::WARN,
514
                    ?from_file,
515
                    ?to_file,
516
                    ?err,
517
0
                    "Failed to rename file",
518
                );
519
            } else {
520
0
                event!(Level::INFO, ?from_file, ?to_file, "Renamed file",);
521
            }
522
        }
523
41
        Ok(())
524
41
    }
525
526
82
    async fn add_files_to_cache<Fe: FileEntry>(
527
82
        evicting_map: &EvictingMap<StoreKeyBorrow, Arc<Fe>, SystemTime>,
528
82
        anchor_time: &SystemTime,
529
82
        shared_context: &Arc<SharedContext>,
530
82
        block_size: u64,
531
82
        folder: &str,
532
82
    ) -> Result<(), Error> {
533
82
        let file_infos = read_files(Some(folder), shared_context).await
?0
;
534
82
        let file_type = match folder {
535
82
            STR_FOLDER => 
FileType::String41
,
536
41
            DIGEST_FOLDER => FileType::Digest,
537
0
            _ => panic!("Invalid folder type"),
538
        };
539
540
82
        let path_root = format!("{}/{folder}", shared_context.content_path);
541
542
82
        for (
file_name, atime, data_size1
, _) in file_infos.into_iter().filter(|x|
x.31
) {
543
1
            let result = process_entry(
544
1
                evicting_map,
545
1
                &file_name,
546
1
                file_type,
547
1
                atime,
548
1
                data_size,
549
1
                block_size,
550
1
                anchor_time,
551
1
                shared_context,
552
1
            )
553
1
            .await;
554
1
            if let Err(
err0
) = result {
  Branch (554:20): [True: 0, False: 0]
  Branch (554:20): [True: 0, False: 0]
  Branch (554:20): [True: 0, False: 0]
  Branch (554:20): [True: 0, False: 0]
  Branch (554:20): [True: 0, False: 0]
  Branch (554:20): [True: 0, False: 0]
  Branch (554:20): [True: 0, False: 1]
  Branch (554:20): [True: 0, False: 0]
  Branch (554:20): [Folded - Ignored]
  Branch (554:20): [True: 0, False: 0]
555
0
                event!(
556
0
                    Level::WARN,
557
                    ?file_name,
558
                    ?err,
559
0
                    "Failed to add file to eviction cache",
560
                );
561
                // Ignore result.
562
0
                let _ = fs::remove_file(format!("{path_root}/{file_name}")).await;
563
1
            }
564
        }
565
82
        Ok(())
566
82
    }
567
568
41
    move_old_cache(shared_context, rename_fn).await
?0
;
569
570
41
    add_files_to_cache(
571
41
        evicting_map,
572
41
        anchor_time,
573
41
        shared_context,
574
41
        block_size,
575
41
        DIGEST_FOLDER,
576
41
    )
577
41
    .await
?0
;
578
579
41
    add_files_to_cache(
580
41
        evicting_map,
581
41
        anchor_time,
582
41
        shared_context,
583
41
        block_size,
584
41
        STR_FOLDER,
585
41
    )
586
41
    .await
?0
;
587
41
    Ok(())
588
41
}
589
590
41
async fn prune_temp_path(temp_path: &str) -> Result<(), Error> {
591
82
    async fn prune_temp_inner(temp_path: &str, subpath: &str) -> Result<(), Error> {
592
82
        let (_permit, dir_handle) = fs::read_dir(format!("{temp_path}/{subpath}"))
593
82
            .await
594
82
            .err_tip(|| {
595
0
                "Failed opening temp directory to prune partial downloads in filesystem store"
596
82
            })
?0
597
82
            .into_inner();
598
82
599
82
        let mut read_dir_stream = ReadDirStream::new(dir_handle);
600
82
        while let Some(
dir_entry0
) = read_dir_stream.next().await {
  Branch (600:19): [True: 0, False: 0]
  Branch (600:19): [True: 0, False: 32]
  Branch (600:19): [True: 0, False: 4]
  Branch (600:19): [Folded - Ignored]
  Branch (600:19): [True: 0, False: 46]
601
0
            let path = dir_entry?.path();
602
0
            if let Err(err) = fs::remove_file(&path).await {
  Branch (602:20): [True: 0, False: 0]
  Branch (602:20): [True: 0, False: 0]
  Branch (602:20): [True: 0, False: 0]
  Branch (602:20): [Folded - Ignored]
  Branch (602:20): [True: 0, False: 0]
603
0
                event!(Level::WARN, ?path, ?err, "Failed to delete file",);
604
0
            }
605
        }
606
82
        Ok(())
607
82
    }
608
609
41
    prune_temp_inner(temp_path, STR_FOLDER).await
?0
;
610
41
    prune_temp_inner(temp_path, DIGEST_FOLDER).await
?0
;
611
41
    Ok(())
612
41
}
613
614
#[derive(MetricsComponent)]
615
pub struct FilesystemStore<Fe: FileEntry = FileEntryImpl> {
616
    #[metric]
617
    shared_context: Arc<SharedContext>,
618
    #[metric(group = "evicting_map")]
619
    evicting_map: Arc<EvictingMap<StoreKeyBorrow, Arc<Fe>, SystemTime>>,
620
    #[metric(help = "Block size of the configured filesystem")]
621
    block_size: u64,
622
    #[metric(help = "Size of the configured read buffer size")]
623
    read_buffer_size: usize,
624
    weak_self: Weak<Self>,
625
    rename_fn: fn(&OsStr, &OsStr) -> Result<(), std::io::Error>,
626
}
627
628
impl<Fe: FileEntry> FilesystemStore<Fe> {
629
34
    pub async fn new(spec: &FilesystemSpec) -> Result<Arc<Self>, Error> {
630
107
        Self::new_with_timeout_and_rename_fn(spec, |from, to| std::fs::rename(from, to)
).await34
631
34
    }
632
633
41
    pub async fn new_with_timeout_and_rename_fn(
634
41
        spec: &FilesystemSpec,
635
41
        rename_fn: fn(&OsStr, &OsStr) -> Result<(), std::io::Error>,
636
41
    ) -> Result<Arc<Self>, Error> {
637
82
        async fn create_subdirs(path: &str) -> Result<(), Error> {
638
82
            fs::create_dir_all(format!("{path}/{STR_FOLDER}"))
639
82
                .await
640
82
                .err_tip(|| 
format!("Failed to create directory {path}/{STR_FOLDER}")0
)
?0
;
641
82
            fs::create_dir_all(format!("{path}/{DIGEST_FOLDER}"))
642
82
                .await
643
82
                .err_tip(|| 
format!("Failed to create directory {path}/{DIGEST_FOLDER}")0
)
644
82
        }
645
646
41
        let now = SystemTime::now();
647
41
648
41
        let empty_policy = nativelink_config::stores::EvictionPolicy::default();
649
41
        let eviction_policy = spec.eviction_policy.as_ref().unwrap_or(&empty_policy);
650
41
        let evicting_map = Arc::new(EvictingMap::new(eviction_policy, now));
651
41
652
41
        // Create temp and content directories and the s and d subdirectories.
653
41
654
41
        create_subdirs(&spec.temp_path).await
?0
;
655
41
        create_subdirs(&spec.content_path).await
?0
;
656
657
41
        let shared_context = Arc::new(SharedContext {
658
41
            active_drop_spawns: AtomicU64::new(0),
659
41
            temp_path: spec.temp_path.clone(),
660
41
            content_path: spec.content_path.clone(),
661
41
        });
662
663
41
        let block_size = if spec.block_size == 0 {
  Branch (663:29): [True: 0, False: 0]
  Branch (663:29): [True: 0, False: 1]
  Branch (663:29): [True: 0, False: 1]
  Branch (663:29): [True: 1, False: 0]
  Branch (663:29): [True: 0, False: 1]
  Branch (663:29): [True: 1, False: 0]
  Branch (663:29): [True: 10, False: 1]
  Branch (663:29): [True: 2, False: 0]
  Branch (663:29): [Folded - Ignored]
  Branch (663:29): [True: 23, False: 0]
664
37
            DEFAULT_BLOCK_SIZE
665
        } else {
666
4
            spec.block_size
667
        };
668
41
        add_files_to_cache(
669
41
            evicting_map.as_ref(),
670
41
            &now,
671
41
            &shared_context,
672
41
            block_size,
673
41
            rename_fn,
674
41
        )
675
41
        .await
?0
;
676
41
        prune_temp_path(&shared_context.temp_path).await
?0
;
677
678
41
        let read_buffer_size = if spec.read_buffer_size == 0 {
  Branch (678:35): [True: 0, False: 0]
  Branch (678:35): [True: 1, False: 0]
  Branch (678:35): [True: 0, False: 1]
  Branch (678:35): [True: 1, False: 0]
  Branch (678:35): [True: 0, False: 1]
  Branch (678:35): [True: 1, False: 0]
  Branch (678:35): [True: 4, False: 7]
  Branch (678:35): [True: 2, False: 0]
  Branch (678:35): [Folded - Ignored]
  Branch (678:35): [True: 23, False: 0]
679
32
            DEFAULT_BUFF_SIZE
680
        } else {
681
9
            spec.read_buffer_size as usize
682
        };
683
41
        Ok(Arc::new_cyclic(|weak_self| Self {
684
41
            shared_context,
685
41
            evicting_map,
686
41
            block_size,
687
41
            read_buffer_size,
688
41
            weak_self: weak_self.clone(),
689
41
            rename_fn,
690
41
        }))
691
41
    }
692
693
24
    pub fn get_arc(&self) -> Option<Arc<Self>> {
694
24
        self.weak_self.upgrade()
695
24
    }
696
697
9
    pub async fn get_file_entry_for_digest(&self, digest: &DigestInfo) -> Result<Arc<Fe>, Error> {
698
9
        self.evicting_map
699
9
            .get::<StoreKey<'static>>(&digest.into())
700
9
            .await
701
9
            .ok_or_else(|| 
make_err!(Code::NotFound, "{digest} not found in filesystem store")0
)
702
9
    }
703
704
108
    async fn update_file<'a>(
705
108
        self: Pin<&'a Self>,
706
108
        mut entry: Fe,
707
108
        mut temp_file: fs::FileSlot,
708
108
        final_key: StoreKey<'static>,
709
108
        mut reader: DropCloserReadHalf,
710
108
    ) -> Result<(), Error> {
711
108
        let mut data_size = 0;
712
        loop {
713
183
            let mut data = reader
714
183
                .recv()
715
183
                .await
716
183
                .err_tip(|| 
"Failed to receive data in filesystem store"0
)
?0
;
717
183
            let data_len = data.len();
718
183
            if data_len == 0 {
  Branch (718:16): [True: 0, False: 0]
  Branch (718:16): [True: 2, False: 2]
  Branch (718:16): [True: 2, False: 2]
  Branch (718:16): [True: 2, False: 2]
  Branch (718:16): [True: 2, False: 2]
  Branch (718:16): [True: 1, False: 1]
  Branch (718:16): [True: 13, False: 11]
  Branch (718:16): [True: 0, False: 0]
  Branch (718:16): [Folded - Ignored]
  Branch (718:16): [True: 86, False: 55]
719
108
                break; // EOF.
720
75
            }
721
75
            temp_file
722
75
                .write_all_buf(&mut data)
723
75
                .await
724
75
                .err_tip(|| 
"Failed to write data into filesystem store"0
)
?0
;
725
75
            data_size += data_len as u64;
726
        }
727
728
108
        temp_file
729
108
            .as_ref()
730
108
            .sync_all()
731
108
            .await
732
108
            .err_tip(|| 
"Failed to sync_data in filesystem store"0
)
?0
;
733
734
108
        drop(temp_file);
735
108
736
108
        *entry.data_size_mut() = data_size;
737
108
        self.emplace_file(final_key, Arc::new(entry)).await
738
107
    }
739
740
117
    async fn emplace_file(&self, key: StoreKey<'static>, entry: Arc<Fe>) -> Result<(), Error> {
741
117
        // This sequence of events is quite ticky to understand due to the amount of triggers that
742
117
        // happen, async'ness of it and the locking. So here is a breakdown of what happens:
743
117
        // 1. Here will hold a write lock on any file operations of this FileEntry.
744
117
        // 2. Then insert the entry into the evicting map. This may trigger an eviction of other
745
117
        //    entries.
746
117
        // 3. Eviction triggers `unref()`, which grabs a write lock on the evicted FileEntrys
747
117
        //    during the rename.
748
117
        // 4. It should be impossible for items to be added while eviction is happening, so there
749
117
        //    should not be a deadlock possability. However, it is possible for the new FileEntry
750
117
        //    to be evicted before the file is moved into place. Eviction of the newly inserted
751
117
        //    item is not possible within the `insert()` call because the write lock inside the
752
117
        //    eviction map. If an eviction of new item happens after `insert()` but before
753
117
        //    `rename()` then we get to finish our operation because the `unref()` of the new item
754
117
        //    will be blocked on us because we currently have the lock.
755
117
        // 5. Move the file into place. Since we hold a write lock still anyone that gets our new
756
117
        //    FileEntry (which has not yet been placed on disk) will not be able to read the file's
757
117
        //    contents until we relese the lock.
758
117
        let evicting_map = self.evicting_map.clone();
759
117
        let rename_fn = self.rename_fn;
760
117
761
117
        // We need to guarantee that this will get to the end even if the parent future is dropped.
762
117
        // See: https://github.com/TraceMachina/nativelink/issues/495
763
117
        background_spawn!("filesystem_store_emplace_file", async move {
764
117
            let mut encoded_file_path = entry.get_encoded_file_path().write().await;
765
117
            let final_path = get_file_path_raw(
766
117
                &PathType::Content,
767
117
                encoded_file_path.shared_context.as_ref(),
768
117
                &key,
769
117
            );
770
117
771
117
            evicting_map
772
117
                .insert(key.borrow().into_owned().into(), entry.clone())
773
117
                .await;
774
775
117
            let from_path = encoded_file_path.get_file_path();
776
117
            // Internally tokio spawns fs commands onto a blocking thread anyways.
777
117
            // Since we are already on a blocking thread, we just need the `fs` wrapper to manage
778
117
            // an open-file permit (ensure we don't open too many files at once).
779
117
            let result = (rename_fn)(&from_path, &final_path)
780
117
                .err_tip(|| 
format!("Failed to rename temp file to final path {final_path:?}")1
);
781
782
            // In the event our move from temp file to final file fails we need to ensure we remove
783
            // the entry from our map.
784
            // Remember: At this point it is possible for another thread to have a reference to
785
            // `entry`, so we can't delete the file, only drop() should ever delete files.
786
117
            if let Err(
err1
) = result {
  Branch (786:20): [True: 0, False: 0]
  Branch (786:20): [True: 0, False: 2]
  Branch (786:20): [True: 0, False: 2]
  Branch (786:20): [True: 0, False: 2]
  Branch (786:20): [True: 0, False: 2]
  Branch (786:20): [True: 1, False: 0]
  Branch (786:20): [True: 0, False: 15]
  Branch (786:20): [True: 0, False: 0]
  Branch (786:20): [Folded - Ignored]
  Branch (786:20): [True: 0, False: 93]
787
1
                event!(
788
1
                    Level::ERROR,
789
                    ?err,
790
                    ?from_path,
791
                    ?final_path,
792
1
                    "Failed to rename file",
793
                );
794
                // Warning: To prevent deadlock we need to release our lock or during `remove_if()`
795
                // it will call `unref()`, which triggers a write-lock on `encoded_file_path`.
796
1
                drop(encoded_file_path);
797
1
                // It is possible that the item in our map is no longer the item we inserted,
798
1
                // So, we need to conditionally remove it only if the pointers are the same.
799
1
800
1
                evicting_map
801
1
                    .remove_if(&key, |map_entry| Arc::<Fe>::ptr_eq(map_entry, &entry))
802
1
                    .await;
803
1
                return Err(err);
804
116
            }
805
116
            encoded_file_path.path_type = PathType::Content;
806
116
            encoded_file_path.key = key;
807
116
            Ok(())
808
117
        })
809
117
        .await
810
116
        .err_tip(|| 
"Failed to create spawn in filesystem store update_file"0
)
?0
811
116
    }
812
}
813
814
#[async_trait]
815
impl<Fe: FileEntry> StoreDriver for FilesystemStore<Fe> {
816
    async fn has_with_results(
817
        self: Pin<&Self>,
818
        keys: &[StoreKey<'_>],
819
        results: &mut [Option<u64>],
820
85
    ) -> Result<(), Error> {
821
85
        self.evicting_map
822
85
            .sizes_for_keys::<_, StoreKey<'_>, &StoreKey<'_>>(
823
85
                keys.iter(),
824
85
                results,
825
85
                false, /* peek */
826
85
            )
827
85
            .await;
828
        // We need to do a special pass to ensure our zero files exist.
829
        // If our results failed and the result was a zero file, we need to
830
        // create the file by spec.
831
85
        for (key, result) in keys.iter().zip(results.iter_mut()) {
832
85
            if result.is_some() || 
!is_zero_digest(key.borrow())16
{
  Branch (832:16): [True: 0, False: 0]
  Branch (832:36): [True: 0, False: 0]
  Branch (832:16): [True: 0, False: 0]
  Branch (832:36): [True: 0, False: 0]
  Branch (832:16): [True: 0, False: 0]
  Branch (832:36): [True: 0, False: 0]
  Branch (832:16): [True: 0, False: 0]
  Branch (832:36): [True: 0, False: 0]
  Branch (832:16): [True: 0, False: 0]
  Branch (832:36): [True: 0, False: 0]
  Branch (832:16): [True: 0, False: 1]
  Branch (832:36): [True: 1, False: 0]
  Branch (832:16): [True: 1, False: 2]
  Branch (832:36): [True: 0, False: 2]
  Branch (832:16): [True: 0, False: 0]
  Branch (832:36): [True: 0, False: 0]
  Branch (832:16): [Folded - Ignored]
  Branch (832:36): [Folded - Ignored]
  Branch (832:16): [True: 68, False: 13]
  Branch (832:36): [True: 13, False: 0]
833
83
                continue;
834
2
            }
835
2
            let (mut tx, rx) = make_buf_channel_pair();
836
2
            let send_eof_result = tx.send_eof();
837
2
            self.update(key.borrow(), rx, UploadSizeInfo::ExactSize(0))
838
2
                .await
839
2
                .err_tip(|| 
format!("Failed to create zero file for key {}", key.as_str())0
)
840
2
                .merge(
841
2
                    send_eof_result
842
2
                        .err_tip(|| 
"Failed to send zero file EOF in filesystem store has"0
),
843
2
                )
?0
;
844
845
2
            *result = Some(0);
846
        }
847
85
        Ok(())
848
170
    }
849
850
    async fn update(
851
        self: Pin<&Self>,
852
        key: StoreKey<'_>,
853
        reader: DropCloserReadHalf,
854
        _upload_size: UploadSizeInfo,
855
108
    ) -> Result<(), Error> {
856
108
        let temp_key = make_temp_key(&key);
857
108
        let (entry, temp_file, temp_full_path) = Fe::make_and_open_file(
858
108
            self.block_size,
859
108
            EncodedFilePath {
860
108
                shared_context: self.shared_context.clone(),
861
108
                path_type: PathType::Temp,
862
108
                key: temp_key,
863
108
            },
864
108
        )
865
108
        .await
?0
;
866
867
108
        self.update_file(entry, temp_file, key.into_owned(), reader)
868
108
            .await
869
107
            .err_tip(|| 
format!("While processing with temp file {temp_full_path:?}")1
)
870
215
    }
871
872
84
    fn optimized_for(&self, optimization: StoreOptimizations) -> bool {
873
84
        optimization == StoreOptimizations::FileUpdates
874
84
    }
875
876
    async fn update_with_whole_file(
877
        self: Pin<&Self>,
878
        key: StoreKey<'_>,
879
        path: OsString,
880
        file: fs::FileSlot,
881
        upload_size: UploadSizeInfo,
882
9
    ) -> Result<Option<fs::FileSlot>, Error> {
883
9
        let file_size = match upload_size {
884
9
            UploadSizeInfo::ExactSize(size) => size,
885
0
            UploadSizeInfo::MaxSize(_) => file
886
0
                .as_ref()
887
0
                .metadata()
888
0
                .await
889
0
                .err_tip(|| format!("While reading metadata for {path:?}"))?
890
0
                .len(),
891
        };
892
9
        let entry = Fe::create(
893
9
            file_size,
894
9
            self.block_size,
895
9
            RwLock::new(EncodedFilePath {
896
9
                shared_context: self.shared_context.clone(),
897
9
                path_type: PathType::Custom(path),
898
9
                key: key.borrow().into_owned(),
899
9
            }),
900
9
        );
901
9
        // We are done with the file, if we hold a reference to the file here, it could
902
9
        // result in a deadlock if `emplace_file()` also needs file descriptors.
903
9
        drop(file);
904
9
        self.emplace_file(key.into_owned(), Arc::new(entry))
905
9
            .await
906
9
            .err_tip(|| 
"Could not move file into store in upload_file_to_store, maybe dest is on different volume?"0
)
?0
;
907
9
        return Ok(None);
908
18
    }
909
910
    async fn get_part(
911
        self: Pin<&Self>,
912
        key: StoreKey<'_>,
913
        writer: &mut DropCloserWriteHalf,
914
        offset: u64,
915
        length: Option<u64>,
916
60
    ) -> Result<(), Error> {
917
60
        if is_zero_digest(key.borrow()) {
  Branch (917:12): [True: 0, False: 0]
  Branch (917:12): [True: 0, False: 0]
  Branch (917:12): [True: 0, False: 1]
  Branch (917:12): [True: 0, False: 0]
  Branch (917:12): [True: 0, False: 1]
  Branch (917:12): [True: 0, False: 0]
  Branch (917:12): [True: 1, False: 4]
  Branch (917:12): [True: 0, False: 0]
  Branch (917:12): [Folded - Ignored]
  Branch (917:12): [True: 15, False: 38]
918
16
            self.has(key.borrow())
919
16
                .await
920
16
                .err_tip(|| 
"Failed to check if zero digest exists in filesystem store"0
)
?0
;
921
16
            writer
922
16
                .send_eof()
923
16
                .err_tip(|| 
"Failed to send zero EOF in filesystem store get_part"0
)
?0
;
924
16
            return Ok(());
925
44
        }
926
44
        let entry = self.evicting_map.get(&key).await.ok_or_else(|| {
927
0
            make_err!(
928
0
                Code::NotFound,
929
0
                "{} not found in filesystem store here",
930
0
                key.as_str()
931
0
            )
932
44
        })
?0
;
933
44
        let read_limit = length.unwrap_or(u64::MAX);
934
44
        let 
mut temp_file42
= entry.read_file_part(offset, read_limit).or_else(|err| async move {
935
2
            // If the file is not found, we need to remove it from the eviction map.
936
2
            if err.code == Code::NotFound {
  Branch (936:16): [True: 0, False: 0]
  Branch (936:16): [True: 0, False: 0]
  Branch (936:16): [True: 0, False: 0]
  Branch (936:16): [True: 0, False: 0]
  Branch (936:16): [True: 0, False: 0]
  Branch (936:16): [True: 0, False: 0]
  Branch (936:16): [True: 2, False: 0]
  Branch (936:16): [True: 0, False: 0]
  Branch (936:16): [Folded - Ignored]
  Branch (936:16): [True: 0, False: 0]
937
2
                event!(
938
2
                    Level::ERROR,
939
                    ?err,
940
                    ?key,
941
2
                    "Entry was in our map, but not found on disk. Removing from map as a precaution, but process probably need restarted."
942
                );
943
2
                self.evicting_map.remove(&key).await;
944
0
            }
945
2
            Err(err)
946
44
        
}2
).await
?2
;
947
948
        loop {
949
1.12k
            let mut buf = BytesMut::with_capacity(self.read_buffer_size);
950
1.12k
            temp_file
951
1.12k
                .read_buf(&mut buf)
952
1.12k
                .await
953
1.12k
                .err_tip(|| 
"Failed to read data in filesystem store"0
)
?0
;
954
1.12k
            if buf.is_empty() {
  Branch (954:16): [True: 0, False: 0]
  Branch (954:16): [True: 0, False: 0]
  Branch (954:16): [True: 1, False: 10]
  Branch (954:16): [True: 0, False: 0]
  Branch (954:16): [True: 1, False: 10]
  Branch (954:16): [True: 0, False: 0]
  Branch (954:16): [True: 1, False: 1.02k]
  Branch (954:16): [True: 0, False: 0]
  Branch (954:16): [Folded - Ignored]
  Branch (954:16): [True: 38, False: 38]
955
41
                break; // EOF.
956
1.08k
            }
957
1.08k
            writer
958
1.08k
                .send(buf.freeze())
959
1.08k
                .await
960
1.08k
                .err_tip(|| 
"Failed to send chunk in filesystem store get_part"0
)
?0
;
961
        }
962
41
        writer
963
41
            .send_eof()
964
41
            .err_tip(|| 
"Filed to send EOF in filesystem store get_part"0
)
?0
;
965
966
41
        Ok(())
967
119
    }
968
969
101
    fn inner_store(&self, _digest: Option<StoreKey>) -> &dyn StoreDriver {
970
101
        self
971
101
    }
972
973
24
    fn as_any<'a>(&'a self) -> &'a (dyn std::any::Any + Sync + Send + 'static) {
974
24
        self
975
24
    }
976
977
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Sync + Send + 'static> {
978
0
        self
979
0
    }
980
981
0
    fn register_health(self: Arc<Self>, registry: &mut HealthRegistryBuilder) {
982
0
        registry.register_indicator(self);
983
0
    }
984
}
985
986
#[async_trait]
987
impl<Fe: FileEntry> HealthStatusIndicator for FilesystemStore<Fe> {
988
0
    fn get_name(&self) -> &'static str {
989
0
        "FilesystemStore"
990
0
    }
991
992
0
    async fn check_health(&self, namespace: Cow<'static, str>) -> HealthStatus {
993
0
        StoreDriver::check_health(Pin::new(self), namespace).await
994
0
    }
995
}