Coverage Report

Created: 2025-12-17 22:46

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-store/src/filesystem_store.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::fmt::{Debug, Formatter};
16
use core::pin::Pin;
17
use core::sync::atomic::{AtomicU64, Ordering};
18
use std::borrow::Cow;
19
use std::ffi::{OsStr, OsString};
20
use std::sync::{Arc, Weak};
21
use std::time::SystemTime;
22
23
use async_lock::RwLock;
24
use async_trait::async_trait;
25
use bytes::{Bytes, BytesMut};
26
use futures::stream::{StreamExt, TryStreamExt};
27
use futures::{Future, TryFutureExt};
28
use nativelink_config::stores::FilesystemSpec;
29
use nativelink_error::{Code, Error, ResultExt, make_err, make_input_err};
30
use nativelink_metric::MetricsComponent;
31
use nativelink_util::background_spawn;
32
use nativelink_util::buf_channel::{
33
    DropCloserReadHalf, DropCloserWriteHalf, make_buf_channel_pair,
34
};
35
use nativelink_util::common::{DigestInfo, fs};
36
use nativelink_util::evicting_map::{EvictingMap, LenEntry};
37
use nativelink_util::health_utils::{HealthRegistryBuilder, HealthStatus, HealthStatusIndicator};
38
use nativelink_util::store_trait::{
39
    RemoveItemCallback, StoreDriver, StoreKey, StoreKeyBorrow, StoreOptimizations, UploadSizeInfo,
40
};
41
use tokio::io::{AsyncReadExt, AsyncWriteExt, Take};
42
use tokio_stream::wrappers::ReadDirStream;
43
use tracing::{debug, error, info, warn};
44
45
use crate::callback_utils::RemoveItemCallbackHolder;
46
use crate::cas_utils::is_zero_digest;
47
48
// Default size to allocate memory of the buffer when reading files.
49
const DEFAULT_BUFF_SIZE: usize = 32 * 1024;
50
// Default block size of all major filesystems is 4KB
51
const DEFAULT_BLOCK_SIZE: u64 = 4 * 1024;
52
53
pub const STR_FOLDER: &str = "s";
54
pub const DIGEST_FOLDER: &str = "d";
55
56
#[derive(Clone, Copy, Debug)]
57
pub enum FileType {
58
    Digest,
59
    String,
60
}
61
62
#[derive(Debug, MetricsComponent)]
63
pub struct SharedContext {
64
    // Used in testing to know how many active drop() spawns are running.
65
    // TODO(palfrey) It is probably a good idea to use a spin lock during
66
    // destruction of the store to ensure that all files are actually
67
    // deleted (similar to how it is done in tests).
68
    #[metric(help = "Number of active drop spawns")]
69
    pub active_drop_spawns: AtomicU64,
70
    #[metric(help = "Path to the configured temp path")]
71
    temp_path: String,
72
    #[metric(help = "Path to the configured content path")]
73
    content_path: String,
74
}
75
76
#[derive(Eq, PartialEq, Debug)]
77
enum PathType {
78
    Content,
79
    Temp,
80
    Custom(OsString),
81
}
82
83
/// [`EncodedFilePath`] stores the path to the file
84
/// including the context, path type and key to the file.
85
/// The whole [`StoreKey`] is stored as opposed to solely
86
/// the [`DigestInfo`] so that it is more usable for things
87
/// such as BEP -see Issue #1108
88
#[derive(Debug)]
89
pub struct EncodedFilePath {
90
    shared_context: Arc<SharedContext>,
91
    path_type: PathType,
92
    key: StoreKey<'static>,
93
}
94
95
impl EncodedFilePath {
96
    #[inline]
97
250
    fn get_file_path(&self) -> Cow<'_, OsStr> {
98
250
        get_file_path_raw(&self.path_type, self.shared_context.as_ref(), &self.key)
99
250
    }
100
}
101
102
#[inline]
103
341
fn get_file_path_raw<'a>(
104
341
    path_type: &'a PathType,
105
341
    shared_context: &SharedContext,
106
341
    key: &StoreKey<'a>,
107
341
) -> Cow<'a, OsStr> {
108
341
    let 
folder334
= match path_type {
109
155
        PathType::Content => &shared_context.content_path,
110
179
        PathType::Temp => &shared_context.temp_path,
111
7
        PathType::Custom(path) => return Cow::Borrowed(path),
112
    };
113
334
    Cow::Owned(to_full_path_from_key(folder, key))
114
341
}
115
116
impl Drop for EncodedFilePath {
117
94
    fn drop(&mut self) {
118
        // `drop()` can be called during shutdown, so we use `path_type` flag to know if the
119
        // file actually needs to be deleted.
120
94
        if self.path_type == PathType::Content {
  Branch (120:12): [True: 85, False: 9]
  Branch (120:12): [Folded - Ignored]
121
85
            return;
122
9
        }
123
124
9
        let file_path = self.get_file_path().to_os_string();
125
9
        let shared_context = self.shared_context.clone();
126
        // .fetch_add returns previous value, so we add one to get approximate current value
127
9
        let current_active_drop_spawns = shared_context
128
9
            .active_drop_spawns
129
9
            .fetch_add(1, Ordering::Relaxed)
130
9
            + 1;
131
9
        debug!(
132
            %current_active_drop_spawns,
133
            ?file_path,
134
9
            "Spawned a filesystem_delete_file"
135
        );
136
9
        background_spawn!("filesystem_delete_file", async move 
{8
137
8
            let 
result5
= fs::remove_file(&file_path)
138
8
                .await
139
5
                .err_tip(|| format!(
"Failed to remove file {}"0
,
file_path.display()0
));
140
5
            if let Err(
err0
) = result {
  Branch (140:20): [True: 0, False: 5]
  Branch (140:20): [Folded - Ignored]
141
0
                error!(?file_path, ?err, "Failed to delete file",);
142
            } else {
143
5
                debug!(?file_path, "File deleted",);
144
            }
145
            // .fetch_sub returns previous value, so we subtract one to get approximate current value
146
5
            let current_active_drop_spawns = shared_context
147
5
                .active_drop_spawns
148
5
                .fetch_sub(1, Ordering::Relaxed)
149
5
                - 1;
150
5
            debug!(
151
                ?current_active_drop_spawns,
152
                ?file_path,
153
5
                "Dropped a filesystem_delete_file"
154
            );
155
5
        });
156
94
    }
157
}
158
159
/// This creates the file path from the [`StoreKey`]. If
160
/// it is a string, the string, prefixed with [`STR_PREFIX`]
161
/// for backwards compatibility, is stored.
162
///
163
/// If it is a [`DigestInfo`], it is prefixed by [`DIGEST_PREFIX`]
164
/// followed by the string representation of a digest - the hash in hex,
165
/// a hyphen then the size in bytes
166
///
167
/// Previously, only the string representation of the [`DigestInfo`] was
168
/// used with no prefix
169
#[inline]
170
343
fn to_full_path_from_key(folder: &str, key: &StoreKey<'_>) -> OsString {
171
343
    match key {
172
3
        StoreKey::Str(str) => format!("{folder}/{STR_FOLDER}/{str}"),
173
340
        StoreKey::Digest(digest_info) => format!("{folder}/{DIGEST_FOLDER}/{digest_info}"),
174
    }
175
343
    .into()
176
343
}
177
178
pub trait FileEntry: LenEntry + Send + Sync + Debug + 'static {
179
    /// Responsible for creating the underlying `FileEntry`.
180
    fn create(data_size: u64, block_size: u64, encoded_file_path: RwLock<EncodedFilePath>) -> Self;
181
182
    /// Creates a (usually) temp file, opens it and returns the path to the temp file.
183
    fn make_and_open_file(
184
        block_size: u64,
185
        encoded_file_path: EncodedFilePath,
186
    ) -> impl Future<Output = Result<(Self, fs::FileSlot, OsString), Error>> + Send
187
    where
188
        Self: Sized;
189
190
    /// Returns the underlying reference to the size of the data in bytes
191
    fn data_size_mut(&mut self) -> &mut u64;
192
193
    /// Returns the actual size of the underlying file on the disk after accounting for filesystem block size.
194
    fn size_on_disk(&self) -> u64;
195
196
    /// Gets the underlying `EncodedfilePath`.
197
    fn get_encoded_file_path(&self) -> &RwLock<EncodedFilePath>;
198
199
    /// Returns a reader that will read part of the underlying file.
200
    fn read_file_part(
201
        &self,
202
        offset: u64,
203
        length: u64,
204
    ) -> impl Future<Output = Result<Take<fs::FileSlot>, Error>> + Send;
205
206
    /// This function is a safe way to extract the file name of the underlying file. To protect users from
207
    /// accidentally creating undefined behavior we encourage users to do the logic they need to do with
208
    /// the filename inside this function instead of extracting the filename and doing the logic outside.
209
    /// This is because the filename is not guaranteed to exist after this function returns, however inside
210
    /// the callback the file is always guaranteed to exist and immutable.
211
    /// DO NOT USE THIS FUNCTION TO EXTRACT THE FILENAME AND STORE IT FOR LATER USE.
212
    fn get_file_path_locked<
213
        T,
214
        Fut: Future<Output = Result<T, Error>> + Send,
215
        F: FnOnce(OsString) -> Fut + Send,
216
    >(
217
        &self,
218
        handler: F,
219
    ) -> impl Future<Output = Result<T, Error>> + Send;
220
}
221
222
pub struct FileEntryImpl {
223
    data_size: u64,
224
    block_size: u64,
225
    // We lock around this as it gets rewritten when we move between temp and content types
226
    encoded_file_path: RwLock<EncodedFilePath>,
227
}
228
229
impl FileEntryImpl {
230
11
    pub fn get_shared_context_for_test(&mut self) -> Arc<SharedContext> {
231
11
        self.encoded_file_path.get_mut().shared_context.clone()
232
11
    }
233
}
234
235
impl FileEntry for FileEntryImpl {
236
94
    fn create(data_size: u64, block_size: u64, encoded_file_path: RwLock<EncodedFilePath>) -> Self {
237
94
        Self {
238
94
            data_size,
239
94
            block_size,
240
94
            encoded_file_path,
241
94
        }
242
94
    }
243
244
    /// This encapsulates the logic for the edge case of if the file fails to create
245
    /// the cleanup of the file is handled without creating a `FileEntry`, which would
246
    /// try to cleanup the file as well during `drop()`.
247
85
    async fn make_and_open_file(
248
85
        block_size: u64,
249
85
        encoded_file_path: EncodedFilePath,
250
85
    ) -> Result<(Self, fs::FileSlot, OsString), Error> {
251
85
        let temp_full_path = encoded_file_path.get_file_path().to_os_string();
252
85
        let temp_file_result = fs::create_file(temp_full_path.clone())
253
85
            .or_else(|mut err| async 
{0
254
0
                let remove_result = fs::remove_file(&temp_full_path).await.err_tip(|| {
255
0
                    format!(
256
0
                        "Failed to remove file {} in filesystem store",
257
0
                        temp_full_path.display()
258
                    )
259
0
                });
260
0
                if let Err(remove_err) = remove_result {
  Branch (260:24): [True: 0, False: 0]
  Branch (260:24): [True: 0, False: 0]
  Branch (260:24): [True: 0, False: 0]
  Branch (260:24): [True: 0, False: 0]
  Branch (260:24): [Folded - Ignored]
  Branch (260:24): [True: 0, False: 0]
261
0
                    err = err.merge(remove_err);
262
0
                }
263
0
                warn!(?err, ?block_size, ?temp_full_path, "Failed to create file",);
264
0
                Err(err).err_tip(|| {
265
0
                    format!(
266
0
                        "Failed to create {} in filesystem store",
267
0
                        temp_full_path.display()
268
                    )
269
0
                })
270
0
            })
271
85
            .await
?0
;
272
273
85
        Ok((
274
85
            <Self as FileEntry>::create(
275
85
                0, /* Unknown yet, we will fill it in later */
276
85
                block_size,
277
85
                RwLock::new(encoded_file_path),
278
85
            ),
279
85
            temp_file_result,
280
85
            temp_full_path,
281
85
        ))
282
85
    }
283
284
85
    fn data_size_mut(&mut self) -> &mut u64 {
285
85
        &mut self.data_size
286
85
    }
287
288
159
    fn size_on_disk(&self) -> u64 {
289
159
        self.data_size.div_ceil(self.block_size) * self.block_size
290
159
    }
291
292
148
    fn get_encoded_file_path(&self) -> &RwLock<EncodedFilePath> {
293
148
        &self.encoded_file_path
294
148
    }
295
296
50
    fn read_file_part(
297
50
        &self,
298
50
        offset: u64,
299
50
        length: u64,
300
50
    ) -> impl Future<Output = Result<Take<fs::FileSlot>, Error>> + Send {
301
50
        self.get_file_path_locked(move |full_content_path| async move {
302
50
            let 
file48
= fs::open_file(&full_content_path, offset, length)
303
50
                .await
304
50
                .err_tip(|| 
{2
305
2
                    format!(
306
2
                        "Failed to open file in filesystem store {}",
307
2
                        full_content_path.display()
308
                    )
309
2
                })?;
310
48
            Ok(file)
311
100
        })
312
50
    }
313
314
56
    async fn get_file_path_locked<
315
56
        T,
316
56
        Fut: Future<Output = Result<T, Error>> + Send,
317
56
        F: FnOnce(OsString) -> Fut + Send,
318
56
    >(
319
56
        &self,
320
56
        handler: F,
321
56
    ) -> Result<T, Error> {
322
56
        let encoded_file_path = self.get_encoded_file_path().read().await;
323
56
        handler(encoded_file_path.get_file_path().to_os_string()).await
324
56
    }
325
}
326
327
impl Debug for FileEntryImpl {
328
0
    fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), core::fmt::Error> {
329
0
        f.debug_struct("FileEntryImpl")
330
0
            .field("data_size", &self.data_size)
331
0
            .field("encoded_file_path", &"<behind mutex>")
332
0
            .finish()
333
0
    }
334
}
335
336
94
fn make_temp_digest(mut digest: DigestInfo) -> DigestInfo {
337
    static DELETE_FILE_COUNTER: AtomicU64 = AtomicU64::new(0);
338
94
    let mut hash = *digest.packed_hash();
339
94
    hash[24..].clone_from_slice(
340
94
        &DELETE_FILE_COUNTER
341
94
            .fetch_add(1, Ordering::Relaxed)
342
94
            .to_le_bytes(),
343
    );
344
94
    digest.set_packed_hash(*hash);
345
94
    digest
346
94
}
347
348
94
fn make_temp_key(key: &StoreKey) -> StoreKey<'static> {
349
94
    StoreKey::Digest(make_temp_digest(key.borrow().into_digest()))
350
94
}
351
352
impl LenEntry for FileEntryImpl {
353
    #[inline]
354
157
    fn len(&self) -> u64 {
355
157
        self.size_on_disk()
356
157
    }
357
358
1
    fn is_empty(&self) -> bool {
359
1
        self.data_size == 0
360
1
    }
361
362
    // unref() only triggers when an item is removed from the eviction_map. It is possible
363
    // that another place in code has a reference to `FileEntryImpl` and may later read the
364
    // file. To support this edge case, we first move the file to a temp file and point
365
    // target file location to the new temp file. `unref()` should only ever be called once.
366
    #[inline]
367
11
    async fn unref(&self) {
368
11
        let mut encoded_file_path = self.encoded_file_path.write().await;
369
11
        if encoded_file_path.path_type == PathType::Temp {
  Branch (369:12): [True: 0, False: 0]
  Branch (369:12): [True: 2, False: 9]
  Branch (369:12): [True: 0, False: 0]
  Branch (369:12): [True: 0, False: 0]
  Branch (369:12): [True: 0, False: 0]
  Branch (369:12): [Folded - Ignored]
  Branch (369:12): [True: 0, False: 0]
370
            // We are already a temp file that is now marked for deletion on drop.
371
            // This is very rare, but most likely the rename into the content path failed.
372
2
            warn!(
373
2
                key = ?encoded_file_path.key,
374
2
                "File is already a temp file",
375
            );
376
2
            return;
377
9
        }
378
9
        let from_path = encoded_file_path.get_file_path();
379
9
        let new_key = make_temp_key(&encoded_file_path.key);
380
381
9
        let to_path = to_full_path_from_key(&encoded_file_path.shared_context.temp_path, &new_key);
382
383
9
        if let Err(
err2
) = fs::rename(&from_path, &to_path).await {
  Branch (383:16): [True: 0, False: 0]
  Branch (383:16): [True: 2, False: 7]
  Branch (383:16): [True: 0, False: 0]
  Branch (383:16): [True: 0, False: 0]
  Branch (383:16): [True: 0, False: 0]
  Branch (383:16): [Folded - Ignored]
  Branch (383:16): [True: 0, False: 0]
384
2
            warn!(
385
2
                key = ?encoded_file_path.key,
386
                ?from_path,
387
                ?to_path,
388
                ?err,
389
2
                "Failed to rename file",
390
            );
391
        } else {
392
7
            debug!(
393
7
                key = ?encoded_file_path.key,
394
                ?from_path,
395
                ?to_path,
396
7
                "Renamed file (unref)",
397
            );
398
7
            encoded_file_path.path_type = PathType::Temp;
399
7
            encoded_file_path.key = new_key;
400
        }
401
11
    }
402
}
403
404
#[inline]
405
2
fn digest_from_filename(file_name: &str) -> Result<DigestInfo, Error> {
406
2
    let (hash, size) = file_name.split_once('-').err_tip(|| "")
?0
;
407
2
    let size = size.parse::<i64>()
?0
;
408
2
    DigestInfo::try_new(hash, size)
409
2
}
410
411
2
pub fn key_from_file(file_name: &str, file_type: FileType) -> Result<StoreKey<'_>, Error> {
412
2
    match file_type {
413
0
        FileType::String => Ok(StoreKey::new_str(file_name)),
414
2
        FileType::Digest => digest_from_filename(file_name).map(StoreKey::Digest),
415
    }
416
2
}
417
418
/// The number of files to read the metadata for at the same time when running
419
/// `add_files_to_cache`.
420
const SIMULTANEOUS_METADATA_READS: usize = 200;
421
422
type FsEvictingMap<'a, Fe> =
423
    EvictingMap<StoreKeyBorrow, StoreKey<'a>, Arc<Fe>, SystemTime, RemoveItemCallbackHolder>;
424
425
52
async fn add_files_to_cache<Fe: FileEntry>(
426
52
    evicting_map: &FsEvictingMap<'_, Fe>,
427
52
    anchor_time: &SystemTime,
428
52
    shared_context: &Arc<SharedContext>,
429
52
    block_size: u64,
430
52
    rename_fn: fn(&OsStr, &OsStr) -> Result<(), std::io::Error>,
431
52
) -> Result<(), Error> {
432
    #[expect(clippy::too_many_arguments)]
433
1
    async fn process_entry<Fe: FileEntry>(
434
1
        evicting_map: &FsEvictingMap<'_, Fe>,
435
1
        file_name: &str,
436
1
        file_type: FileType,
437
1
        atime: SystemTime,
438
1
        data_size: u64,
439
1
        block_size: u64,
440
1
        anchor_time: &SystemTime,
441
1
        shared_context: &Arc<SharedContext>,
442
1
    ) -> Result<(), Error> {
443
1
        let key = key_from_file(file_name, file_type)
?0
;
444
445
1
        let file_entry = Fe::create(
446
1
            data_size,
447
1
            block_size,
448
1
            RwLock::new(EncodedFilePath {
449
1
                shared_context: shared_context.clone(),
450
1
                path_type: PathType::Content,
451
1
                key: key.borrow().into_owned(),
452
1
            }),
453
        );
454
1
        let time_since_anchor = anchor_time
455
1
            .duration_since(atime)
456
1
            .map_err(|_| make_input_err!("File access time newer than now"))
?0
;
457
1
        evicting_map
458
1
            .insert_with_time(
459
1
                key.into_owned().into(),
460
1
                Arc::new(file_entry),
461
1
                i32::try_from(time_since_anchor.as_secs()).unwrap_or(i32::MAX),
462
1
            )
463
1
            .await;
464
1
        Ok(())
465
1
    }
466
467
156
    async fn read_files(
468
156
        folder: Option<&str>,
469
156
        shared_context: &SharedContext,
470
156
    ) -> Result<Vec<(String, SystemTime, u64, bool)>, Error> {
471
        // Note: In Dec 2024 this is for backwards compatibility with the old
472
        // way files were stored on disk. Previously all files were in a single
473
        // folder regardless of the StoreKey type. This allows old versions of
474
        // nativelink file layout to be upgraded at startup time.
475
        // This logic can be removed once more time has passed.
476
156
        let read_dir = folder.map_or_else(
477
52
            || format!("{}/", shared_context.content_path),
478
104
            |folder| format!("{}/{folder}/", shared_context.content_path),
479
        );
480
481
156
        let (_permit, dir_handle) = fs::read_dir(read_dir)
482
156
            .await
483
156
            .err_tip(|| "Failed opening content directory for iterating in filesystem store")
?0
484
156
            .into_inner();
485
486
156
        let read_dir_stream = ReadDirStream::new(dir_handle);
487
156
        read_dir_stream
488
156
            .map(|dir_entry| async move 
{105
489
105
                let dir_entry = dir_entry.unwrap();
490
105
                let file_name = dir_entry.file_name().into_string().unwrap();
491
105
                let metadata = dir_entry
492
105
                    .metadata()
493
105
                    .await
494
105
                    .err_tip(|| "Failed to get metadata in filesystem store")
?0
;
495
                // We need to filter out folders - we do not want to try to cache the s and d folders.
496
105
                let is_file =
497
105
                    metadata.is_file() || !(
file_name == STR_FOLDER104
||
file_name == DIGEST_FOLDER52
);
  Branch (497:21): [True: 0, False: 0]
  Branch (497:45): [True: 0, False: 0]
  Branch (497:21): [True: 1, False: 44]
  Branch (497:45): [True: 22, False: 22]
  Branch (497:21): [True: 0, False: 4]
  Branch (497:45): [True: 2, False: 2]
  Branch (497:21): [True: 0, False: 6]
  Branch (497:45): [True: 3, False: 3]
  Branch (497:21): [Folded - Ignored]
  Branch (497:45): [Folded - Ignored]
  Branch (497:21): [True: 0, False: 50]
  Branch (497:45): [True: 25, False: 25]
498
                // Using access time is not perfect, but better than random. We do not update the
499
                // atime when a file is actually "touched", we rely on whatever the filesystem does
500
                // when we read the file (usually update on read).
501
105
                let atime = metadata
502
105
                    .accessed()
503
105
                    .or_else(|_| 
metadata0
.
modified0
())
504
105
                    .unwrap_or(SystemTime::UNIX_EPOCH);
505
105
                Result::<(String, SystemTime, u64, bool), Error>::Ok((
506
105
                    file_name,
507
105
                    atime,
508
105
                    metadata.len(),
509
105
                    is_file,
510
105
                ))
511
210
            })
512
156
            .buffer_unordered(SIMULTANEOUS_METADATA_READS)
513
156
            .try_collect()
514
156
            .await
515
156
    }
516
517
    /// Note: In Dec 2024 this is for backwards compatibility with the old
518
    /// way files were stored on disk. Previously all files were in a single
519
    /// folder regardless of the [`StoreKey`] type. This moves files from the old cache
520
    /// location to the new cache location, under [`DIGEST_FOLDER`].
521
52
    async fn move_old_cache(
522
52
        shared_context: &Arc<SharedContext>,
523
52
        rename_fn: fn(&OsStr, &OsStr) -> Result<(), std::io::Error>,
524
52
    ) -> Result<(), Error> {
525
52
        let file_infos = read_files(None, shared_context).await
?0
;
526
527
52
        let from_path = shared_context.content_path.to_string();
528
529
52
        let to_path = format!("{}/{DIGEST_FOLDER}", shared_context.content_path);
530
531
52
        for (
file_name0
, _, _, _) in file_infos.into_iter().filter(|x| x.3) {
532
0
            let from_file: OsString = format!("{from_path}/{file_name}").into();
533
0
            let to_file: OsString = format!("{to_path}/{file_name}").into();
534
535
0
            if let Err(err) = rename_fn(&from_file, &to_file) {
  Branch (535:20): [True: 0, False: 0]
  Branch (535:20): [True: 0, False: 0]
  Branch (535:20): [True: 0, False: 0]
  Branch (535:20): [True: 0, False: 0]
  Branch (535:20): [Folded - Ignored]
  Branch (535:20): [True: 0, False: 0]
536
0
                warn!(?from_file, ?to_file, ?err, "Failed to rename file",);
537
            } else {
538
0
                debug!(?from_file, ?to_file, "Renamed file (old cache)",);
539
            }
540
        }
541
52
        Ok(())
542
52
    }
543
544
104
    async fn add_files_to_cache<Fe: FileEntry>(
545
104
        evicting_map: &FsEvictingMap<'_, Fe>,
546
104
        anchor_time: &SystemTime,
547
104
        shared_context: &Arc<SharedContext>,
548
104
        block_size: u64,
549
104
        folder: &str,
550
104
    ) -> Result<(), Error> {
551
104
        let file_infos = read_files(Some(folder), shared_context).await
?0
;
552
104
        let file_type = match folder {
553
104
            STR_FOLDER => 
FileType::String52
,
554
52
            DIGEST_FOLDER => FileType::Digest,
555
0
            _ => panic!("Invalid folder type"),
556
        };
557
558
104
        let path_root = format!("{}/{folder}", shared_context.content_path);
559
560
104
        for (
file_name1
,
atime1
,
data_size1
, _) in file_infos.into_iter().filter(|x| x.3) {
561
1
            let result = process_entry(
562
1
                evicting_map,
563
1
                &file_name,
564
1
                file_type,
565
1
                atime,
566
1
                data_size,
567
1
                block_size,
568
1
                anchor_time,
569
1
                shared_context,
570
1
            )
571
1
            .await;
572
1
            if let Err(
err0
) = result {
  Branch (572:20): [True: 0, False: 0]
  Branch (572:20): [True: 0, False: 0]
  Branch (572:20): [True: 0, False: 0]
  Branch (572:20): [True: 0, False: 0]
  Branch (572:20): [True: 0, False: 0]
  Branch (572:20): [True: 0, False: 0]
  Branch (572:20): [True: 0, False: 0]
  Branch (572:20): [True: 0, False: 1]
  Branch (572:20): [True: 0, False: 0]
  Branch (572:20): [True: 0, False: 0]
  Branch (572:20): [Folded - Ignored]
  Branch (572:20): [True: 0, False: 0]
573
0
                warn!(?file_name, ?err, "Failed to add file to eviction cache",);
574
                // Ignore result.
575
0
                drop(fs::remove_file(format!("{path_root}/{file_name}")).await);
576
1
            }
577
        }
578
104
        Ok(())
579
104
    }
580
581
52
    move_old_cache(shared_context, rename_fn).await
?0
;
582
583
52
    add_files_to_cache(
584
52
        evicting_map,
585
52
        anchor_time,
586
52
        shared_context,
587
52
        block_size,
588
52
        DIGEST_FOLDER,
589
52
    )
590
52
    .await
?0
;
591
592
52
    add_files_to_cache(
593
52
        evicting_map,
594
52
        anchor_time,
595
52
        shared_context,
596
52
        block_size,
597
52
        STR_FOLDER,
598
52
    )
599
52
    .await
?0
;
600
52
    Ok(())
601
52
}
602
603
52
async fn prune_temp_path(temp_path: &str) -> Result<(), Error> {
604
104
    async fn prune_temp_inner(temp_path: &str, subpath: &str) -> Result<(), Error> {
605
104
        let (_permit, dir_handle) = fs::read_dir(format!("{temp_path}/{subpath}"))
606
104
            .await
607
104
            .err_tip(
608
                || "Failed opening temp directory to prune partial downloads in filesystem store",
609
0
            )?
610
104
            .into_inner();
611
612
104
        let mut read_dir_stream = ReadDirStream::new(dir_handle);
613
104
        while let Some(
dir_entry0
) = read_dir_stream.next().await {
  Branch (613:19): [True: 0, False: 0]
  Branch (613:19): [True: 0, False: 44]
  Branch (613:19): [True: 0, False: 4]
  Branch (613:19): [True: 0, False: 6]
  Branch (613:19): [Folded - Ignored]
  Branch (613:19): [True: 0, False: 50]
614
0
            let path = dir_entry?.path();
615
0
            if let Err(err) = fs::remove_file(&path).await {
  Branch (615:20): [True: 0, False: 0]
  Branch (615:20): [True: 0, False: 0]
  Branch (615:20): [True: 0, False: 0]
  Branch (615:20): [True: 0, False: 0]
  Branch (615:20): [Folded - Ignored]
  Branch (615:20): [True: 0, False: 0]
616
0
                warn!(?path, ?err, "Failed to delete file",);
617
0
            }
618
        }
619
104
        Ok(())
620
104
    }
621
622
52
    prune_temp_inner(temp_path, STR_FOLDER).await
?0
;
623
52
    prune_temp_inner(temp_path, DIGEST_FOLDER).await
?0
;
624
52
    Ok(())
625
52
}
626
627
#[derive(Debug, MetricsComponent)]
628
pub struct FilesystemStore<Fe: FileEntry = FileEntryImpl> {
629
    #[metric]
630
    shared_context: Arc<SharedContext>,
631
    #[metric(group = "evicting_map")]
632
    evicting_map: Arc<FsEvictingMap<'static, Fe>>,
633
    #[metric(help = "Block size of the configured filesystem")]
634
    block_size: u64,
635
    #[metric(help = "Size of the configured read buffer size")]
636
    read_buffer_size: usize,
637
    weak_self: Weak<Self>,
638
    rename_fn: fn(&OsStr, &OsStr) -> Result<(), std::io::Error>,
639
}
640
641
impl<Fe: FileEntry> FilesystemStore<Fe> {
642
40
    pub async fn new(spec: &FilesystemSpec) -> Result<Arc<Self>, Error> {
643
81
        
Self::new_with_timeout_and_rename_fn40
(
spec40
, |from, to| std::fs::rename(from, to)).
await40
644
40
    }
645
646
52
    pub async fn new_with_timeout_and_rename_fn(
647
52
        spec: &FilesystemSpec,
648
52
        rename_fn: fn(&OsStr, &OsStr) -> Result<(), std::io::Error>,
649
52
    ) -> Result<Arc<Self>, Error> {
650
104
        async fn create_subdirs(path: &str) -> Result<(), Error> {
651
104
            fs::create_dir_all(format!("{path}/{STR_FOLDER}"))
652
104
                .await
653
104
                .err_tip(|| format!(
"Failed to create directory {path}/{STR_FOLDER}"0
))
?0
;
654
104
            fs::create_dir_all(format!("{path}/{DIGEST_FOLDER}"))
655
104
                .await
656
104
                .err_tip(|| format!(
"Failed to create directory {path}/{DIGEST_FOLDER}"0
))
657
104
        }
658
659
52
        let now = SystemTime::now();
660
661
52
        let empty_policy = nativelink_config::stores::EvictionPolicy::default();
662
52
        let eviction_policy = spec.eviction_policy.as_ref().unwrap_or(&empty_policy);
663
52
        let evicting_map = Arc::new(EvictingMap::new(eviction_policy, now));
664
665
        // Create temp and content directories and the s and d subdirectories.
666
667
52
        create_subdirs(&spec.temp_path).await
?0
;
668
52
        create_subdirs(&spec.content_path).await
?0
;
669
670
52
        let shared_context = Arc::new(SharedContext {
671
52
            active_drop_spawns: AtomicU64::new(0),
672
52
            temp_path: spec.temp_path.clone(),
673
52
            content_path: spec.content_path.clone(),
674
52
        });
675
676
52
        let block_size = if spec.block_size == 0 {
  Branch (676:29): [True: 0, False: 0]
  Branch (676:29): [True: 1, False: 0]
  Branch (676:29): [True: 0, False: 1]
  Branch (676:29): [True: 0, False: 1]
  Branch (676:29): [True: 1, False: 0]
  Branch (676:29): [True: 0, False: 1]
  Branch (676:29): [True: 1, False: 0]
  Branch (676:29): [True: 15, False: 1]
  Branch (676:29): [True: 2, False: 0]
  Branch (676:29): [True: 3, False: 0]
  Branch (676:29): [Folded - Ignored]
  Branch (676:29): [True: 25, False: 0]
677
48
            DEFAULT_BLOCK_SIZE
678
        } else {
679
4
            spec.block_size
680
        };
681
52
        add_files_to_cache(
682
52
            evicting_map.as_ref(),
683
52
            &now,
684
52
            &shared_context,
685
52
            block_size,
686
52
            rename_fn,
687
52
        )
688
52
        .await
?0
;
689
52
        prune_temp_path(&shared_context.temp_path).await
?0
;
690
691
52
        let read_buffer_size = if spec.read_buffer_size == 0 {
  Branch (691:35): [True: 0, False: 0]
  Branch (691:35): [True: 0, False: 1]
  Branch (691:35): [True: 1, False: 0]
  Branch (691:35): [True: 0, False: 1]
  Branch (691:35): [True: 1, False: 0]
  Branch (691:35): [True: 0, False: 1]
  Branch (691:35): [True: 1, False: 0]
  Branch (691:35): [True: 5, False: 11]
  Branch (691:35): [True: 2, False: 0]
  Branch (691:35): [True: 3, False: 0]
  Branch (691:35): [Folded - Ignored]
  Branch (691:35): [True: 25, False: 0]
692
38
            DEFAULT_BUFF_SIZE
693
        } else {
694
14
            spec.read_buffer_size as usize
695
        };
696
52
        Ok(Arc::new_cyclic(|weak_self| Self {
697
52
            shared_context,
698
52
            evicting_map,
699
52
            block_size,
700
52
            read_buffer_size,
701
52
            weak_self: weak_self.clone(),
702
52
            rename_fn,
703
52
        }))
704
52
    }
705
706
26
    pub fn get_arc(&self) -> Option<Arc<Self>> {
707
26
        self.weak_self.upgrade()
708
26
    }
709
710
10
    pub async fn get_file_entry_for_digest(&self, digest: &DigestInfo) -> Result<Arc<Fe>, Error> {
711
10
        if is_zero_digest(digest) {
  Branch (711:12): [Folded - Ignored]
  Branch (711:12): [True: 1, False: 5]
  Branch (711:12): [True: 0, False: 4]
  Branch (711:12): [Folded - Ignored]
712
1
            return Ok(Arc::new(Fe::create(
713
1
                0,
714
1
                0,
715
1
                RwLock::new(EncodedFilePath {
716
1
                    shared_context: self.shared_context.clone(),
717
1
                    path_type: PathType::Content,
718
1
                    key: digest.into(),
719
1
                }),
720
1
            )));
721
9
        }
722
9
        self.evicting_map
723
9
            .get(&digest.into())
724
9
            .await
725
9
            .ok_or_else(|| make_err!(
Code::NotFound0
, "{digest} not found in filesystem store. This may indicate the file was evicted due to cache pressure. Consider increasing 'max_bytes' in your filesystem store's eviction_policy configuration."))
726
10
    }
727
728
63
    async fn update_file(
729
63
        self: Pin<&Self>,
730
63
        mut entry: Fe,
731
63
        mut temp_file: fs::FileSlot,
732
63
        final_key: StoreKey<'static>,
733
63
        mut reader: DropCloserReadHalf,
734
63
    ) -> Result<(), Error> {
735
63
        let mut data_size = 0;
736
        loop {
737
131
            let mut data = reader
738
131
                .recv()
739
131
                .await
740
131
                .err_tip(|| "Failed to receive data in filesystem store")
?0
;
741
131
            let data_len = data.len();
742
131
            if data_len == 0 {
  Branch (742:16): [True: 0, False: 0]
  Branch (742:16): [True: 2, False: 8]
  Branch (742:16): [True: 0, False: 0]
  Branch (742:16): [True: 0, False: 0]
  Branch (742:16): [True: 0, False: 0]
  Branch (742:16): [True: 0, False: 0]
  Branch (742:16): [True: 1, False: 1]
  Branch (742:16): [True: 0, False: 0]
  Branch (742:16): [True: 0, False: 0]
  Branch (742:16): [True: 0, False: 0]
  Branch (742:16): [Folded - Ignored]
  Branch (742:16): [True: 60, False: 59]
743
63
                break; // EOF.
744
68
            }
745
68
            temp_file
746
68
                .write_all_buf(&mut data)
747
68
                .await
748
68
                .err_tip(|| "Failed to write data into filesystem store")
?0
;
749
68
            data_size += data_len as u64;
750
        }
751
752
63
        temp_file
753
63
            .as_ref()
754
63
            .sync_all()
755
63
            .await
756
63
            .err_tip(|| "Failed to sync_data in filesystem store")
?0
;
757
758
63
        debug!(?temp_file, "Dropping file to update_file");
759
63
        drop(temp_file);
760
761
63
        *entry.data_size_mut() = data_size;
762
63
        self.emplace_file(final_key, Arc::new(entry)).await
763
63
    }
764
765
92
    async fn emplace_file(&self, key: StoreKey<'static>, entry: Arc<Fe>) -> Result<(), Error> {
766
        // This sequence of events is quite tricky to understand due to the amount of triggers that
767
        // happen, async'ness of it and the locking. So here is a breakdown of what happens:
768
        // 1. Here will hold a write lock on any file operations of this FileEntry.
769
        // 2. Then insert the entry into the evicting map. This may trigger an eviction of other
770
        //    entries.
771
        // 3. Eviction triggers `unref()`, which grabs a write lock on the evicted FileEntry
772
        //    during the rename.
773
        // 4. It should be impossible for items to be added while eviction is happening, so there
774
        //    should not be a deadlock possibility. However, it is possible for the new FileEntry
775
        //    to be evicted before the file is moved into place. Eviction of the newly inserted
776
        //    item is not possible within the `insert()` call because the write lock inside the
777
        //    eviction map. If an eviction of new item happens after `insert()` but before
778
        //    `rename()` then we get to finish our operation because the `unref()` of the new item
779
        //    will be blocked on us because we currently have the lock.
780
        // 5. Move the file into place. Since we hold a write lock still anyone that gets our new
781
        //    FileEntry (which has not yet been placed on disk) will not be able to read the file's
782
        //    contents until we release the lock.
783
92
        let evicting_map = self.evicting_map.clone();
784
92
        let rename_fn = self.rename_fn;
785
786
        // We need to guarantee that this will get to the end even if the parent future is dropped.
787
        // See: https://github.com/TraceMachina/nativelink/issues/495
788
92
        background_spawn!("filesystem_store_emplace_file", async move {
789
92
            evicting_map
790
92
                .insert(key.borrow().into_owned().into(), entry.clone())
791
92
                .await;
792
793
            // The insert might have resulted in an eviction/unref so we need to check
794
            // it still exists in there. But first, get the lock...
795
92
            let mut encoded_file_path = entry.get_encoded_file_path().write().await;
796
            // Then check it's still in there...
797
92
            if evicting_map.get(&key).await.is_none() {
  Branch (797:16): [True: 0, False: 0]
  Branch (797:16): [True: 0, False: 2]
  Branch (797:16): [True: 0, False: 2]
  Branch (797:16): [True: 0, False: 2]
  Branch (797:16): [True: 0, False: 2]
  Branch (797:16): [True: 0, False: 2]
  Branch (797:16): [True: 0, False: 1]
  Branch (797:16): [True: 1, False: 13]
  Branch (797:16): [True: 0, False: 0]
  Branch (797:16): [True: 0, False: 2]
  Branch (797:16): [Folded - Ignored]
  Branch (797:16): [True: 0, False: 65]
798
1
                info!(%key, "Got eviction while emplacing, dropping");
799
1
                return Ok(());
800
91
            }
801
802
91
            let final_path = get_file_path_raw(
803
91
                &PathType::Content,
804
91
                encoded_file_path.shared_context.as_ref(),
805
91
                &key,
806
            );
807
808
91
            let from_path = encoded_file_path.get_file_path();
809
            // Internally tokio spawns fs commands onto a blocking thread anyways.
810
            // Since we are already on a blocking thread, we just need the `fs` wrapper to manage
811
            // an open-file permit (ensure we don't open too many files at once).
812
91
            let result = (rename_fn)(&from_path, &final_path).err_tip(|| 
{1
813
1
                format!(
814
1
                    "Failed to rename temp file to final path {}",
815
1
                    final_path.display()
816
                )
817
1
            });
818
819
            // In the event our move from temp file to final file fails we need to ensure we remove
820
            // the entry from our map.
821
            // Remember: At this point it is possible for another thread to have a reference to
822
            // `entry`, so we can't delete the file, only drop() should ever delete files.
823
91
            if let Err(
err1
) = result {
  Branch (823:20): [True: 0, False: 0]
  Branch (823:20): [True: 0, False: 2]
  Branch (823:20): [True: 0, False: 2]
  Branch (823:20): [True: 0, False: 2]
  Branch (823:20): [True: 0, False: 2]
  Branch (823:20): [True: 0, False: 2]
  Branch (823:20): [True: 1, False: 0]
  Branch (823:20): [True: 0, False: 13]
  Branch (823:20): [True: 0, False: 0]
  Branch (823:20): [True: 0, False: 2]
  Branch (823:20): [Folded - Ignored]
  Branch (823:20): [True: 0, False: 65]
824
1
                error!(?err, ?from_path, ?final_path, "Failed to rename file",);
825
                // Warning: To prevent deadlock we need to release our lock or during `remove_if()`
826
                // it will call `unref()`, which triggers a write-lock on `encoded_file_path`.
827
1
                drop(encoded_file_path);
828
                // It is possible that the item in our map is no longer the item we inserted,
829
                // So, we need to conditionally remove it only if the pointers are the same.
830
831
1
                evicting_map
832
1
                    .remove_if(&key, |map_entry| Arc::<Fe>::ptr_eq(map_entry, &entry))
833
1
                    .await;
834
1
                return Err(err);
835
90
            }
836
90
            encoded_file_path.path_type = PathType::Content;
837
90
            encoded_file_path.key = key;
838
90
            Ok(())
839
92
        })
840
92
        .await
841
91
        .err_tip(|| "Failed to create spawn in filesystem store update_file")
?0
842
91
    }
843
}
844
845
#[async_trait]
846
impl<Fe: FileEntry> StoreDriver for FilesystemStore<Fe> {
847
    async fn has_with_results(
848
        self: Pin<&Self>,
849
        keys: &[StoreKey<'_>],
850
        results: &mut [Option<u64>],
851
86
    ) -> Result<(), Error> {
852
        let own_keys = keys
853
            .iter()
854
86
            .map(|sk| sk.borrow().into_owned())
855
            .collect::<Vec<_>>();
856
        self.evicting_map
857
            .sizes_for_keys(own_keys.iter(), results, false /* peek */)
858
            .await;
859
        // We need to do a special pass to ensure our zero files exist.
860
        // If our results failed and the result was a zero file, we need to
861
        // create the file by spec.
862
        for (key, result) in keys.iter().zip(results.iter_mut()) {
863
            if result.is_some() || !is_zero_digest(key.borrow()) {
864
                continue;
865
            }
866
            let (mut tx, rx) = make_buf_channel_pair();
867
            let send_eof_result = tx.send_eof();
868
            self.update(key.borrow(), rx, UploadSizeInfo::ExactSize(0))
869
                .await
870
0
                .err_tip(|| format!("Failed to create zero file for key {}", key.as_str()))
871
                .merge(
872
                    send_eof_result
873
                        .err_tip(|| "Failed to send zero file EOF in filesystem store has"),
874
                )?;
875
876
            *result = Some(0);
877
        }
878
        Ok(())
879
86
    }
880
881
    async fn update(
882
        self: Pin<&Self>,
883
        key: StoreKey<'_>,
884
        mut reader: DropCloserReadHalf,
885
        _upload_size: UploadSizeInfo,
886
128
    ) -> Result<(), Error> {
887
        if is_zero_digest(key.borrow()) {
888
            // don't need to add, because zero length files are just assumed to exist
889
            return Ok(());
890
        }
891
892
        let temp_key = make_temp_key(&key);
893
894
        // There's a possibility of deadlock here where we take all of the
895
        // file semaphores with make_and_open_file and the semaphores for
896
        // whatever is populating reader is exhasted on the threads that
897
        // have the FileSlots and not on those which can't.  To work around
898
        // this we don't take the FileSlot until there's something on the
899
        // reader available to know that the populator is active.
900
        reader.peek().await?;
901
902
        let (entry, temp_file, temp_full_path) = Fe::make_and_open_file(
903
            self.block_size,
904
            EncodedFilePath {
905
                shared_context: self.shared_context.clone(),
906
                path_type: PathType::Temp,
907
                key: temp_key,
908
            },
909
        )
910
        .await?;
911
912
        self.update_file(entry, temp_file, key.into_owned(), reader)
913
            .await
914
1
            .err_tip(|| {
915
1
                format!(
916
1
                    "While processing with temp file {}",
917
1
                    temp_full_path.display()
918
                )
919
1
            })
920
128
    }
921
922
97
    fn optimized_for(&self, optimization: StoreOptimizations) -> bool {
923
92
        matches!(
924
97
            optimization,
925
            StoreOptimizations::FileUpdates | StoreOptimizations::SubscribesToUpdateOneshot
926
        )
927
97
    }
928
929
23
    async fn update_oneshot(self: Pin<&Self>, key: StoreKey<'_>, data: Bytes) -> Result<(), Error> {
930
        if is_zero_digest(key.borrow()) {
931
            return Ok(());
932
        }
933
934
        let temp_key = make_temp_key(&key);
935
        let (mut entry, mut temp_file, temp_full_path) = Fe::make_and_open_file(
936
            self.block_size,
937
            EncodedFilePath {
938
                shared_context: self.shared_context.clone(),
939
                path_type: PathType::Temp,
940
                key: temp_key,
941
            },
942
        )
943
        .await
944
        .err_tip(|| "Failed to create temp file in filesystem store update_oneshot")?;
945
946
        // Write directly without channel overhead
947
        if !data.is_empty() {
948
            temp_file
949
                .write_all(&data)
950
                .await
951
0
                .err_tip(|| format!("Failed to write data to {}", temp_full_path.display()))?;
952
        }
953
954
        temp_file
955
            .as_ref()
956
            .sync_all()
957
            .await
958
            .err_tip(|| "Failed to sync_data in filesystem store update_oneshot")?;
959
960
        drop(temp_file);
961
962
        *entry.data_size_mut() = data.len() as u64;
963
        self.emplace_file(key.into_owned(), Arc::new(entry)).await
964
23
    }
965
966
    async fn update_with_whole_file(
967
        self: Pin<&Self>,
968
        key: StoreKey<'_>,
969
        path: OsString,
970
        file: fs::FileSlot,
971
        upload_size: UploadSizeInfo,
972
8
    ) -> Result<Option<fs::FileSlot>, Error> {
973
        let file_size = match upload_size {
974
            UploadSizeInfo::ExactSize(size) => size,
975
            UploadSizeInfo::MaxSize(_) => file
976
                .as_ref()
977
                .metadata()
978
                .await
979
0
                .err_tip(|| format!("While reading metadata for {}", path.display()))?
980
                .len(),
981
        };
982
        if file_size == 0 {
983
            // don't need to add, because zero length files are just assumed to exist
984
            return Ok(None);
985
        }
986
        let entry = Fe::create(
987
            file_size,
988
            self.block_size,
989
            RwLock::new(EncodedFilePath {
990
                shared_context: self.shared_context.clone(),
991
                path_type: PathType::Custom(path),
992
                key: key.borrow().into_owned(),
993
            }),
994
        );
995
        // We are done with the file, if we hold a reference to the file here, it could
996
        // result in a deadlock if `emplace_file()` also needs file descriptors.
997
        debug!(?file, "Dropping file to to update_with_whole_file");
998
        drop(file);
999
        self.emplace_file(key.into_owned(), Arc::new(entry))
1000
            .await
1001
            .err_tip(|| "Could not move file into store in upload_file_to_store, maybe dest is on different volume?")?;
1002
        return Ok(None);
1003
8
    }
1004
1005
    async fn get_part(
1006
        self: Pin<&Self>,
1007
        key: StoreKey<'_>,
1008
        writer: &mut DropCloserWriteHalf,
1009
        offset: u64,
1010
        length: Option<u64>,
1011
66
    ) -> Result<(), Error> {
1012
        if is_zero_digest(key.borrow()) {
1013
            self.has(key.borrow())
1014
                .await
1015
                .err_tip(|| "Failed to check if zero digest exists in filesystem store")?;
1016
            writer
1017
                .send_eof()
1018
                .err_tip(|| "Failed to send zero EOF in filesystem store get_part")?;
1019
            return Ok(());
1020
        }
1021
        let owned_key = key.into_owned();
1022
2
        let entry = self.evicting_map.get(&owned_key).await.ok_or_else(|| {
1023
2
            make_err!(
1024
2
                Code::NotFound,
1025
                "{} not found in filesystem store here",
1026
2
                owned_key.as_str()
1027
            )
1028
2
        })?;
1029
        let read_limit = length.unwrap_or(u64::MAX);
1030
2
        let mut temp_file = entry.read_file_part(offset, read_limit).or_else(|err| async move {
1031
            // If the file is not found, we need to remove it from the eviction map.
1032
2
            if err.code == Code::NotFound {
  Branch (1032:16): [True: 0, False: 0]
  Branch (1032:16): [True: 0, False: 0]
  Branch (1032:16): [True: 0, False: 0]
  Branch (1032:16): [True: 0, False: 0]
  Branch (1032:16): [True: 0, False: 0]
  Branch (1032:16): [True: 0, False: 0]
  Branch (1032:16): [True: 0, False: 0]
  Branch (1032:16): [True: 2, False: 0]
  Branch (1032:16): [True: 0, False: 0]
  Branch (1032:16): [True: 0, False: 0]
  Branch (1032:16): [Folded - Ignored]
  Branch (1032:16): [True: 0, False: 0]
1033
2
                error!(
1034
                    ?err,
1035
                    key = ?owned_key,
1036
2
                    "Entry was in our map, but not found on disk. Removing from map as a precaution, but process probably need restarted."
1037
                );
1038
2
                self.evicting_map.remove(&owned_key).await;
1039
0
            }
1040
2
            Err(err)
1041
4
        }).await?;
1042
1043
        loop {
1044
            let mut buf = BytesMut::with_capacity(self.read_buffer_size);
1045
            temp_file
1046
                .read_buf(&mut buf)
1047
                .await
1048
                .err_tip(|| "Failed to read data in filesystem store")?;
1049
            if buf.is_empty() {
1050
                break; // EOF.
1051
            }
1052
            writer
1053
                .send(buf.freeze())
1054
                .await
1055
                .err_tip(|| "Failed to send chunk in filesystem store get_part")?;
1056
        }
1057
        writer
1058
            .send_eof()
1059
            .err_tip(|| "Filed to send EOF in filesystem store get_part")?;
1060
1061
        Ok(())
1062
66
    }
1063
1064
118
    fn inner_store(&self, _digest: Option<StoreKey>) -> &dyn StoreDriver {
1065
118
        self
1066
118
    }
1067
1068
26
    fn as_any<'a>(&'a self) -> &'a (dyn core::any::Any + Sync + Send + 'static) {
1069
26
        self
1070
26
    }
1071
1072
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Sync + Send + 'static> {
1073
0
        self
1074
0
    }
1075
1076
0
    fn register_health(self: Arc<Self>, registry: &mut HealthRegistryBuilder) {
1077
0
        registry.register_indicator(self);
1078
0
    }
1079
1080
0
    fn register_remove_callback(
1081
0
        self: Arc<Self>,
1082
0
        callback: Arc<dyn RemoveItemCallback>,
1083
0
    ) -> Result<(), Error> {
1084
0
        self.evicting_map
1085
0
            .add_remove_callback(RemoveItemCallbackHolder::new(callback));
1086
0
        Ok(())
1087
0
    }
1088
}
1089
1090
#[async_trait]
1091
impl<Fe: FileEntry> HealthStatusIndicator for FilesystemStore<Fe> {
1092
0
    fn get_name(&self) -> &'static str {
1093
0
        "FilesystemStore"
1094
0
    }
1095
1096
0
    async fn check_health(&self, namespace: Cow<'static, str>) -> HealthStatus {
1097
        StoreDriver::check_health(Pin::new(self), namespace).await
1098
0
    }
1099
}