Coverage Report

Created: 2026-06-25 16:16

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-store/src/filesystem_store.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::fmt::{Debug, Formatter};
16
use core::pin::Pin;
17
use core::sync::atomic::{AtomicU64, Ordering};
18
use core::time::Duration;
19
use std::borrow::Cow;
20
#[cfg(unix)]
21
use std::collections::HashMap;
22
use std::ffi::{OsStr, OsString};
23
use std::sync::{Arc, Weak};
24
use std::time::SystemTime;
25
26
#[cfg(unix)]
27
use async_lock::Mutex;
28
use async_lock::RwLock;
29
use async_trait::async_trait;
30
use bytes::{Bytes, BytesMut};
31
use futures::stream::{StreamExt, TryStreamExt};
32
use futures::{Future, TryFutureExt};
33
use nativelink_config::stores::FilesystemSpec;
34
use nativelink_error::{Code, Error, ResultExt, make_err};
35
use nativelink_metric::MetricsComponent;
36
use nativelink_util::background_spawn;
37
use nativelink_util::buf_channel::{
38
    DropCloserReadHalf, DropCloserWriteHalf, make_buf_channel_pair,
39
};
40
use nativelink_util::common::{DigestInfo, fs};
41
use nativelink_util::evicting_map::{EvictingMap, EvictionSnapshot, LenEntry};
42
use nativelink_util::fs::FileSlot;
43
use nativelink_util::health_utils::{HealthRegistryBuilder, HealthStatus, HealthStatusIndicator};
44
#[cfg(unix)]
45
use nativelink_util::spawn_blocking;
46
use nativelink_util::store_trait::{
47
    RemoveItemCallback, StoreDriver, StoreKey, StoreKeyBorrow, StoreOptimizations, UploadSizeInfo,
48
};
49
use tokio::io::{AsyncReadExt, AsyncWriteExt, Take};
50
use tokio::sync::Semaphore;
51
use tokio::time::timeout;
52
use tokio_stream::wrappers::ReadDirStream;
53
use tracing::{debug, error, info, trace, warn};
54
55
use crate::callback_utils::RemoveItemCallbackHolder;
56
use crate::cas_utils::is_zero_digest;
57
58
// Default size to allocate memory of the buffer when reading files.
59
const DEFAULT_BUFF_SIZE: usize = 32 * 1024;
60
// Default block size of all major filesystems is 4KB
61
const DEFAULT_BLOCK_SIZE: u64 = 4 * 1024;
62
63
pub const STR_FOLDER: &str = "s";
64
pub const DIGEST_FOLDER: &str = "d";
65
66
/// Suffix for the sibling directory that holds per-digest read-only
67
/// **executable** (0o555) variants of CAS blobs (see
68
/// [`FilesystemStore::get_executable_hardlink_source`]). It is a sibling of
69
/// `content_path` rather than a child so the normal content/temp scan and prune
70
/// logic never touches it. Cleared on startup; entries are regenerable.
71
#[cfg(unix)]
72
const EXECUTABLE_DIR_SUFFIX: &str = ".exec";
73
74
#[derive(Clone, Copy, Debug)]
75
pub enum FileType {
76
    Digest,
77
    String,
78
}
79
80
#[derive(Debug, MetricsComponent)]
81
pub struct SharedContext {
82
    // Used in testing to know how many active drop() spawns are running.
83
    // TODO(palfrey) It is probably a good idea to use a spin lock during
84
    // destruction of the store to ensure that all files are actually
85
    // deleted (similar to how it is done in tests).
86
    #[metric(help = "Number of active drop spawns")]
87
    pub active_drop_spawns: AtomicU64,
88
    #[metric(help = "Path to the configured temp path")]
89
    temp_path: String,
90
    #[metric(help = "Path to the configured content path")]
91
    content_path: String,
92
}
93
94
#[derive(Eq, PartialEq, Debug)]
95
enum PathType {
96
    Content,
97
    Temp,
98
    Custom(OsString),
99
}
100
101
/// [`EncodedFilePath`] stores the path to the file
102
/// including the context, path type and key to the file.
103
/// The whole [`StoreKey`] is stored as opposed to solely
104
/// the [`DigestInfo`] so that it is more usable for things
105
/// such as BEP -see Issue #1108
106
#[derive(Debug)]
107
pub struct EncodedFilePath {
108
    shared_context: Arc<SharedContext>,
109
    path_type: PathType,
110
    key: StoreKey<'static>,
111
}
112
113
impl EncodedFilePath {
114
    #[inline]
115
595
    fn get_file_path(&self) -> Cow<'_, OsStr> {
116
595
        get_file_path_raw(&self.path_type, self.shared_context.as_ref(), &self.key)
117
595
    }
118
}
119
120
#[inline]
121
751
fn get_file_path_raw<'a>(
122
751
    path_type: &'a PathType,
123
751
    shared_context: &SharedContext,
124
751
    key: &StoreKey<'a>,
125
751
) -> Cow<'a, OsStr> {
126
751
    let 
folder727
= match path_type {
127
269
        PathType::Content => &shared_context.content_path,
128
458
        PathType::Temp => &shared_context.temp_path,
129
24
        PathType::Custom(path) => return Cow::Borrowed(path),
130
    };
131
727
    Cow::Owned(to_full_path_from_key(folder, key))
132
751
}
133
134
impl Drop for EncodedFilePath {
135
164
    fn drop(&mut self) {
136
        // `drop()` can be called during shutdown, so we use `path_type` flag to know if the
137
        // file actually needs to be deleted.
138
164
        if self.path_type == PathType::Content {
139
147
            return;
140
17
        }
141
142
17
        let file_path = self.get_file_path().to_os_string();
143
17
        let shared_context = self.shared_context.clone();
144
        // .fetch_add returns previous value, so we add one to get approximate current value
145
17
        let current_active_drop_spawns = shared_context
146
17
            .active_drop_spawns
147
17
            .fetch_add(1, Ordering::Relaxed)
148
17
            + 1;
149
17
        debug!(
150
            %current_active_drop_spawns,
151
            ?file_path,
152
            "Spawned a filesystem_delete_file"
153
        );
154
17
        background_spawn!("filesystem_delete_file", async move 
{11
155
11
            match fs::remove_file(&file_path).await {
156
5
                Ok(()) => debug!(?file_path, "File deleted"),
157
                // The file already being gone is the desired end state of a
158
                // delete, not a failure — e.g. an entry marked Temp after an
159
                // already-gone unref points at a path that was never created.
160
1
                Err(err) if err.code == Code::NotFound => {
161
1
                    debug!(?file_path, "File already gone, nothing to delete");
162
                }
163
0
                Err(err) => error!(?file_path, ?err, "Failed to delete file"),
164
            }
165
            // .fetch_sub returns previous value, so we subtract one to get approximate current value
166
6
            let current_active_drop_spawns = shared_context
167
6
                .active_drop_spawns
168
6
                .fetch_sub(1, Ordering::Relaxed)
169
6
                - 1;
170
6
            debug!(
171
                ?current_active_drop_spawns,
172
                ?file_path,
173
                "Dropped a filesystem_delete_file"
174
            );
175
6
        });
176
164
    }
177
}
178
179
/// This creates the file path from the [`StoreKey`]. If
180
/// it is a string, the string, prefixed with [`STR_PREFIX`]
181
/// for backwards compatibility, is stored.
182
///
183
/// If it is a [`DigestInfo`], it is prefixed by [`DIGEST_PREFIX`]
184
/// followed by the string representation of a digest - the hash in hex,
185
/// a hyphen then the size in bytes
186
///
187
/// Previously, only the string representation of the [`DigestInfo`] was
188
/// used with no prefix
189
#[inline]
190
738
fn to_full_path_from_key(folder: &str, key: &StoreKey<'_>) -> OsString {
191
738
    match key {
192
3
        StoreKey::Str(str) => format!("{folder}/{STR_FOLDER}/{str}"),
193
735
        StoreKey::Digest(digest_info) => format!("{folder}/{DIGEST_FOLDER}/{digest_info}"),
194
    }
195
738
    .into()
196
738
}
197
198
pub trait FileEntry: LenEntry + Send + Sync + Debug + 'static {
199
    /// Responsible for creating the underlying `FileEntry`.
200
    fn create(data_size: u64, block_size: u64, encoded_file_path: RwLock<EncodedFilePath>) -> Self;
201
202
    /// Creates a (usually) temp file, opens it and returns the path to the temp file.
203
    fn make_and_open_file(
204
        block_size: u64,
205
        encoded_file_path: EncodedFilePath,
206
    ) -> impl Future<Output = Result<(Self, FileSlot, OsString), Error>> + Send
207
    where
208
        Self: Sized;
209
210
    /// Returns the underlying size of the data in bytes
211
    fn data_size(&self) -> u64;
212
213
    /// Returns the underlying reference to the size of the data in bytes
214
    fn data_size_mut(&mut self) -> &mut u64;
215
216
    /// Returns the actual size of the underlying file on the disk after accounting for filesystem block size.
217
    fn size_on_disk(&self) -> u64;
218
219
    /// Gets the underlying `EncodedfilePath`.
220
    fn get_encoded_file_path(&self) -> &RwLock<EncodedFilePath>;
221
222
    /// Returns a reader that will read part of the underlying file.
223
    fn read_file_part(
224
        &self,
225
        offset: u64,
226
        length: u64,
227
    ) -> impl Future<Output = Result<Take<FileSlot>, Error>> + Send;
228
229
    /// This function is a safe way to extract the file name of the underlying file. To protect users from
230
    /// accidentally creating undefined behavior we encourage users to do the logic they need to do with
231
    /// the filename inside this function instead of extracting the filename and doing the logic outside.
232
    /// This is because the filename is not guaranteed to exist after this function returns, however inside
233
    /// the callback the file is always guaranteed to exist and immutable.
234
    /// DO NOT USE THIS FUNCTION TO EXTRACT THE FILENAME AND STORE IT FOR LATER USE.
235
    fn get_file_path_locked<
236
        T,
237
        Fut: Future<Output = Result<T, Error>> + Send,
238
        F: FnOnce(OsString) -> Fut + Send,
239
    >(
240
        &self,
241
        handler: F,
242
    ) -> impl Future<Output = Result<T, Error>> + Send;
243
}
244
245
pub struct FileEntryImpl {
246
    data_size: u64,
247
    block_size: u64,
248
    // We lock around this as it gets rewritten when we move between temp and content types
249
    encoded_file_path: RwLock<EncodedFilePath>,
250
}
251
252
impl FileEntryImpl {
253
11
    pub fn get_shared_context_for_test(&mut self) -> Arc<SharedContext> {
254
11
        self.encoded_file_path.get_mut().shared_context.clone()
255
11
    }
256
}
257
258
impl FileEntry for FileEntryImpl {
259
164
    fn create(data_size: u64, block_size: u64, encoded_file_path: RwLock<EncodedFilePath>) -> Self {
260
164
        Self {
261
164
            data_size,
262
164
            block_size,
263
164
            encoded_file_path,
264
164
        }
265
164
    }
266
267
    /// This encapsulates the logic for the edge case of if the file fails to create
268
    /// the cleanup of the file is handled without creating a `FileEntry`, which would
269
    /// try to cleanup the file as well during `drop()`.
270
150
    async fn make_and_open_file(
271
150
        block_size: u64,
272
150
        encoded_file_path: EncodedFilePath,
273
150
    ) -> Result<(Self, FileSlot, OsString), Error> {
274
150
        let temp_full_path = encoded_file_path.get_file_path().to_os_string();
275
150
        let temp_file_result = fs::create_file(temp_full_path.clone())
276
150
            .or_else(|mut err| async 
{0
277
0
                let remove_result = fs::remove_file(&temp_full_path).await.err_tip(|| {
278
0
                    format!(
279
                        "Failed to remove file {} in filesystem store",
280
0
                        temp_full_path.display()
281
                    )
282
0
                });
283
0
                if let Err(remove_err) = remove_result {
284
0
                    err = err.merge(remove_err);
285
0
                }
286
0
                warn!(?err, ?block_size, ?temp_full_path, "Failed to create file",);
287
0
                Err(err).err_tip(|| {
288
0
                    format!(
289
                        "Failed to create {} in filesystem store",
290
0
                        temp_full_path.display()
291
                    )
292
0
                })
293
0
            })
294
150
            .await
?0
;
295
296
150
        Ok((
297
150
            <Self as FileEntry>::create(
298
150
                0, /* Unknown yet, we will fill it in later */
299
150
                block_size,
300
150
                RwLock::new(encoded_file_path),
301
150
            ),
302
150
            temp_file_result,
303
150
            temp_full_path,
304
150
        ))
305
150
    }
306
307
25
    fn data_size(&self) -> u64 {
308
25
        self.data_size
309
25
    }
310
311
148
    fn data_size_mut(&mut self) -> &mut u64 {
312
148
        &mut self.data_size
313
148
    }
314
315
250
    fn size_on_disk(&self) -> u64 {
316
250
        self.data_size.div_ceil(self.block_size) * self.block_size
317
250
    }
318
319
422
    fn get_encoded_file_path(&self) -> &RwLock<EncodedFilePath> {
320
422
        &self.encoded_file_path
321
422
    }
322
323
69
    fn read_file_part(
324
69
        &self,
325
69
        offset: u64,
326
69
        length: u64,
327
69
    ) -> impl Future<Output = Result<Take<FileSlot>, Error>> + Send {
328
69
        self.get_file_path_locked(move |full_content_path| async move {
329
69
            let 
file66
= fs::open_file(&full_content_path, offset, length)
330
69
                .await
331
69
                .err_tip(|| 
{3
332
3
                    format!(
333
                        "Failed to open file in filesystem store {}",
334
3
                        full_content_path.display()
335
                    )
336
3
                })?;
337
66
            Ok(file)
338
138
        })
339
69
    }
340
341
95
    async fn get_file_path_locked<
342
95
        T,
343
95
        Fut: Future<Output = Result<T, Error>> + Send,
344
95
        F: FnOnce(OsString) -> Fut + Send,
345
95
    >(
346
95
        &self,
347
95
        handler: F,
348
95
    ) -> Result<T, Error> {
349
95
        let encoded_file_path = self.get_encoded_file_path().read().await;
350
95
        handler(encoded_file_path.get_file_path().to_os_string()).await
351
95
    }
352
}
353
354
impl Debug for FileEntryImpl {
355
0
    fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), core::fmt::Error> {
356
0
        f.debug_struct("FileEntryImpl")
357
0
            .field("data_size", &self.data_size)
358
0
            .field("encoded_file_path", &"<behind mutex>")
359
0
            .finish()
360
0
    }
361
}
362
363
161
fn make_temp_digest(mut digest: DigestInfo) -> DigestInfo {
364
    static DELETE_FILE_COUNTER: AtomicU64 = AtomicU64::new(0);
365
161
    let mut hash = *digest.packed_hash();
366
161
    hash[24..].clone_from_slice(
367
161
        &DELETE_FILE_COUNTER
368
161
            .fetch_add(1, Ordering::Relaxed)
369
161
            .to_le_bytes(),
370
    );
371
161
    digest.set_packed_hash(*hash);
372
161
    digest
373
161
}
374
375
161
pub fn make_temp_key(key: &StoreKey) -> StoreKey<'static> {
376
161
    StoreKey::Digest(make_temp_digest(key.borrow().into_digest()))
377
161
}
378
379
impl LenEntry for FileEntryImpl {
380
    #[inline]
381
248
    fn len(&self) -> u64 {
382
248
        self.size_on_disk()
383
248
    }
384
385
0
    fn is_empty(&self) -> bool {
386
0
        self.data_size == 0
387
0
    }
388
389
    // unref() only triggers when an item is removed from the eviction_map. It is possible
390
    // that another place in code has a reference to `FileEntryImpl` and may later read the
391
    // file. To support this edge case, we first move the file to a temp file and point
392
    // target file location to the new temp file. `unref()` should only ever be called once.
393
    #[inline]
394
14
    async fn unref(&self) {
395
14
        let mut encoded_file_path = self.encoded_file_path.write().await;
396
14
        if encoded_file_path.path_type == PathType::Temp {
397
            // We are already a temp file that is now marked for deletion on drop.
398
            // This is very rare, but most likely the rename into the content path failed.
399
3
            warn!(
400
3
                key = ?encoded_file_path.key,
401
                "File is already a temp file",
402
            );
403
3
            return;
404
11
        }
405
11
        let from_path = encoded_file_path.get_file_path();
406
11
        let new_key = make_temp_key(&encoded_file_path.key);
407
408
11
        let to_path = to_full_path_from_key(&encoded_file_path.shared_context.temp_path, &new_key);
409
410
11
        if let Err(
err5
) = fs::rename(&from_path, &to_path).await {
411
            // ENOENT from rename is ambiguous: the source may be gone, or
412
            // a directory component of the destination (the temp dir) may
413
            // be missing. Confirm the source is genuinely gone before
414
            // treating it as benign — otherwise a removed temp dir would
415
            // flip an intact content file to Temp and orphan it on disk.
416
5
            let source_gone = err.code == Code::NotFound
417
4
                && matches!(
418
5
                    fs::metadata(&from_path).await,
419
4
                    Err(meta_err) if meta_err.code == Code::NotFound
420
                );
421
5
            if source_gone {
422
                // The file is already gone — typically another thread's
423
                // eviction beat us, or the entry never got its file on
424
                // disk. Benign here, and it dominates log volume under
425
                // heavy write+evict concurrency, so keep it at `debug`.
426
                // Mark the entry Temp (as a successful rename would) so a
427
                // repeat unref is a no-op and drop stops claiming the
428
                // content path.
429
4
                debug!(
430
4
                    key = ?encoded_file_path.key,
431
                    "Failed to rename file (already gone, treating as benign)",
432
                );
433
4
                encoded_file_path.path_type = PathType::Temp;
434
4
                encoded_file_path.key = new_key;
435
            } else {
436
                // Either a non-ENOENT failure (EACCES, EXDEV, EBUSY, …) or
437
                // ENOENT with the source still present (missing temp dir).
438
                // The content file is intact; leave the entry as Content.
439
1
                warn!(
440
1
                    key = ?encoded_file_path.key,
441
                    ?from_path,
442
                    ?to_path,
443
                    ?err,
444
                    "Failed to rename file",
445
                );
446
            }
447
        } else {
448
6
            debug!(
449
6
                key = ?encoded_file_path.key,
450
                ?from_path,
451
                ?to_path,
452
                "Renamed file (unref)",
453
            );
454
6
            encoded_file_path.path_type = PathType::Temp;
455
6
            encoded_file_path.key = new_key;
456
        }
457
14
    }
458
}
459
460
#[inline]
461
2
fn digest_from_filename(file_name: &str) -> Result<DigestInfo, Error> {
462
2
    let (hash, size) = file_name.split_once('-').err_tip(|| "")
?0
;
463
2
    let size = size.parse::<i64>()
?0
;
464
2
    DigestInfo::try_new(hash, size)
465
2
}
466
467
3
pub fn key_from_file(file_name: &str, file_type: FileType) -> Result<StoreKey<'_>, Error> {
468
3
    match file_type {
469
1
        FileType::String => Ok(StoreKey::new_str(file_name)),
470
2
        FileType::Digest => digest_from_filename(file_name).map(StoreKey::Digest),
471
    }
472
3
}
473
474
/// The number of files to read the metadata for at the same time when running
475
/// `add_files_to_cache`.
476
const SIMULTANEOUS_METADATA_READS: usize = 200;
477
478
type FsEvictingMap<'a, Fe> =
479
    EvictingMap<StoreKeyBorrow, StoreKey<'a>, Arc<Fe>, SystemTime, RemoveItemCallbackHolder>;
480
481
86
async fn add_files_to_cache<Fe: FileEntry>(
482
86
    evicting_map: &FsEvictingMap<'_, Fe>,
483
86
    anchor_time: &SystemTime,
484
86
    shared_context: &Arc<SharedContext>,
485
86
    block_size: u64,
486
86
    rename_fn: fn(&OsStr, &OsStr) -> Result<(), std::io::Error>,
487
86
) -> Result<(), Error> {
488
    #[expect(clippy::too_many_arguments)]
489
2
    async fn process_entry<Fe: FileEntry>(
490
2
        evicting_map: &FsEvictingMap<'_, Fe>,
491
2
        file_name: &str,
492
2
        file_type: FileType,
493
2
        atime: SystemTime,
494
2
        data_size: u64,
495
2
        block_size: u64,
496
2
        anchor_time: &SystemTime,
497
2
        shared_context: &Arc<SharedContext>,
498
2
    ) -> Result<(), Error> {
499
2
        let key = key_from_file(file_name, file_type)
?0
;
500
501
2
        let file_entry = Fe::create(
502
2
            data_size,
503
2
            block_size,
504
2
            RwLock::new(EncodedFilePath {
505
2
                shared_context: shared_context.clone(),
506
2
                path_type: PathType::Content,
507
2
                key: key.borrow().into_owned(),
508
2
            }),
509
        );
510
2
        let time_since_anchor = if let Ok(
d1
) = anchor_time.duration_since(atime) {
511
1
            d
512
        } else {
513
1
            warn!(
514
                %file_name,
515
1
                atime = %humantime::format_rfc3339(atime),
516
1
                anchor_time = %humantime::format_rfc3339(*anchor_time),
517
                "File access time newer than FilesystemStore start time",
518
            );
519
1
            Duration::ZERO
520
        };
521
2
        evicting_map
522
2
            .insert_with_time(
523
2
                key.into_owned().into(),
524
2
                Arc::new(file_entry),
525
2
                i32::try_from(time_since_anchor.as_secs()).unwrap_or(i32::MAX),
526
2
            )
527
2
            .await;
528
2
        Ok(())
529
2
    }
530
531
258
    async fn read_files(
532
258
        folder: Option<&str>,
533
258
        shared_context: &SharedContext,
534
258
    ) -> Result<Vec<(String, SystemTime, u64, bool)>, Error> {
535
        // Note: In Dec 2024 this is for backwards compatibility with the old
536
        // way files were stored on disk. Previously all files were in a single
537
        // folder regardless of the StoreKey type. This allows old versions of
538
        // nativelink file layout to be upgraded at startup time.
539
        // This logic can be removed once more time has passed.
540
258
        let read_dir = folder.map_or_else(
541
86
            || format!("{}/", shared_context.content_path),
542
172
            |folder| format!("{}/{folder}/", shared_context.content_path),
543
        );
544
545
258
        let (_permit, dir_handle) = fs::read_dir(read_dir)
546
258
            .await
547
258
            .err_tip(|| "Failed opening content directory for iterating in filesystem store")
?0
548
258
            .into_inner();
549
550
258
        let read_dir_stream = ReadDirStream::new(dir_handle);
551
258
        read_dir_stream
552
258
            .map(|dir_entry| async move 
{174
553
174
                let dir_entry = dir_entry.unwrap();
554
174
                let file_name = dir_entry.file_name().into_string().unwrap();
555
174
                let metadata = dir_entry
556
174
                    .metadata()
557
174
                    .await
558
174
                    .err_tip(|| "Failed to get metadata in filesystem store")
?0
;
559
                // We need to filter out folders - we do not want to try to cache the s and d folders.
560
174
                let is_file =
561
174
                    metadata.is_file() || !(
file_name == STR_FOLDER172
||
file_name == DIGEST_FOLDER86
);
562
                // Using access time is not perfect, but better than random. We do not update the
563
                // atime when a file is actually "touched", we rely on whatever the filesystem does
564
                // when we read the file (usually update on read).
565
174
                let atime = metadata
566
174
                    .accessed()
567
174
                    .or_else(|_| 
metadata0
.
modified0
())
568
174
                    .unwrap_or(SystemTime::UNIX_EPOCH);
569
174
                Result::<(String, SystemTime, u64, bool), Error>::Ok((
570
174
                    file_name,
571
174
                    atime,
572
174
                    metadata.len(),
573
174
                    is_file,
574
174
                ))
575
348
            })
576
258
            .buffer_unordered(SIMULTANEOUS_METADATA_READS)
577
258
            .try_collect()
578
258
            .await
579
258
    }
580
581
    /// Note: In Dec 2024 this is for backwards compatibility with the old
582
    /// way files were stored on disk. Previously all files were in a single
583
    /// folder regardless of the [`StoreKey`] type. This moves files from the old cache
584
    /// location to the new cache location, under [`DIGEST_FOLDER`].
585
86
    async fn move_old_cache(
586
86
        shared_context: &Arc<SharedContext>,
587
86
        rename_fn: fn(&OsStr, &OsStr) -> Result<(), std::io::Error>,
588
86
    ) -> Result<(), Error> {
589
86
        let file_infos = read_files(None, shared_context).await
?0
;
590
591
86
        let from_path = &shared_context.content_path;
592
593
86
        let to_path = format!("{}/{DIGEST_FOLDER}", shared_context.content_path);
594
595
86
        for (
file_name0
, _, _, _) in file_infos.into_iter().filter(|x| x.3) {
596
0
            let from_file: OsString = format!("{from_path}/{file_name}").into();
597
0
            let to_file: OsString = format!("{to_path}/{file_name}").into();
598
599
0
            if let Err(err) = rename_fn(&from_file, &to_file) {
600
0
                warn!(?from_file, ?to_file, ?err, "Failed to rename file",);
601
            } else {
602
0
                debug!(?from_file, ?to_file, "Renamed file (old cache)",);
603
            }
604
        }
605
86
        Ok(())
606
86
    }
607
608
172
    async fn add_files_to_cache<Fe: FileEntry>(
609
172
        evicting_map: &FsEvictingMap<'_, Fe>,
610
172
        anchor_time: &SystemTime,
611
172
        shared_context: &Arc<SharedContext>,
612
172
        block_size: u64,
613
172
        folder: &str,
614
172
    ) -> Result<(), Error> {
615
172
        let file_infos = read_files(Some(folder), shared_context).await
?0
;
616
172
        let file_type = match folder {
617
172
            STR_FOLDER => 
FileType::String86
,
618
86
            DIGEST_FOLDER => FileType::Digest,
619
0
            _ => panic!("Invalid folder type"),
620
        };
621
622
172
        let path_root = format!("{}/{folder}", shared_context.content_path);
623
624
172
        for (
file_name2
,
atime2
,
data_size2
, _) in file_infos.into_iter().filter(|x| x.3) {
625
2
            let result = process_entry(
626
2
                evicting_map,
627
2
                &file_name,
628
2
                file_type,
629
2
                atime,
630
2
                data_size,
631
2
                block_size,
632
2
                anchor_time,
633
2
                shared_context,
634
2
            )
635
2
            .await;
636
2
            if let Err(
err0
) = result {
637
0
                warn!(?file_name, ?err, "Failed to add file to eviction cache",);
638
                // Ignore result.
639
0
                drop(fs::remove_file(format!("{path_root}/{file_name}")).await);
640
2
            }
641
        }
642
172
        Ok(())
643
172
    }
644
645
86
    move_old_cache(shared_context, rename_fn).await
?0
;
646
647
86
    add_files_to_cache(
648
86
        evicting_map,
649
86
        anchor_time,
650
86
        shared_context,
651
86
        block_size,
652
86
        DIGEST_FOLDER,
653
86
    )
654
86
    .await
?0
;
655
656
86
    add_files_to_cache(
657
86
        evicting_map,
658
86
        anchor_time,
659
86
        shared_context,
660
86
        block_size,
661
86
        STR_FOLDER,
662
86
    )
663
86
    .await
?0
;
664
86
    Ok(())
665
86
}
666
667
86
async fn prune_temp_path(temp_path: &str) -> Result<(), Error> {
668
172
    async fn prune_temp_inner(temp_path: &str, subpath: &str) -> Result<(), Error> {
669
172
        let (_permit, dir_handle) = fs::read_dir(format!("{temp_path}/{subpath}"))
670
172
            .await
671
172
            .err_tip(
672
                || "Failed opening temp directory to prune partial downloads in filesystem store",
673
0
            )?
674
172
            .into_inner();
675
676
172
        let mut read_dir_stream = ReadDirStream::new(dir_handle);
677
172
        while let Some(
dir_entry0
) = read_dir_stream.next().await {
678
0
            let path = dir_entry?.path();
679
0
            if let Err(err) = fs::remove_file(&path).await {
680
0
                warn!(?path, ?err, "Failed to delete file",);
681
0
            }
682
        }
683
172
        Ok(())
684
172
    }
685
686
86
    prune_temp_inner(temp_path, STR_FOLDER).await
?0
;
687
86
    prune_temp_inner(temp_path, DIGEST_FOLDER).await
?0
;
688
86
    Ok(())
689
86
}
690
691
// Sometimes we get files to emplace that are identical to the existing files
692
// Due to the evict/remove/replace cycle taking some amount of time, we actually
693
// want to drop these
694
// Return value is "is duplicate"
695
162
pub async fn check_duplicate_files<Fe>(
696
162
    evicting_map: &Arc<FsEvictingMap<'_, Fe>>,
697
162
    key: &StoreKey<'static>,
698
162
    entry: &Arc<Fe>,
699
162
) -> Result<bool, Error>
700
162
where
701
162
    Fe: FileEntry,
702
162
{
703
162
    let temp_file_encoded_file_path = entry.get_encoded_file_path().write().await;
704
162
    let maybe_existing_item = evicting_map.get(&key.borrow().into_owned()).await;
705
162
    if let Some(
existing_item9
) = maybe_existing_item {
706
9
        if Arc::ptr_eq(entry, &existing_item) {
707
1
            warn!("Tried to check duplicate of an entry we already have!");
708
1
            return Ok(true);
709
8
        }
710
8
        let existing_item_encoded_file_path = existing_item.get_encoded_file_path().write().await;
711
8
        if entry.data_size() == existing_item.data_size() {
712
            const CHUNK_SIZE: usize = 16 * 1024; // 16kb chunks, kinda picked out of the air
713
7
            let file_length = entry.data_size();
714
7
            let existing_path = existing_item_encoded_file_path.get_file_path();
715
7
            let temp_path = temp_file_encoded_file_path.get_file_path();
716
7
            trace!(?existing_path, ?temp_path, "Checking duplicate files");
717
7
            let mut temp_file = fs::open_file(&temp_path, 0, file_length).await
?0
;
718
7
            let mut existing_file = fs::open_file(&existing_path, 0, file_length).await
?0
;
719
720
7
            let mut temp_buffer: [u8; CHUNK_SIZE] = [0; CHUNK_SIZE];
721
7
            let mut existing_buffer: [u8; CHUNK_SIZE] = [0; CHUNK_SIZE];
722
            // in a file_length file, there are 0 to file_length-1 entries
723
            // not file_length. It's counting all the bytes, starting from 0
724
7
            for offset in (0..file_length - 1).step_by(CHUNK_SIZE) {
725
7
                let buffer_size = if offset + (CHUNK_SIZE as u64) <= file_length {
726
0
                    CHUNK_SIZE
727
7
                } else if file_length < CHUNK_SIZE as u64 {
728
7
                    usize::try_from(file_length)
729
7
                        .expect("Always succeeds because file_length < 16384")
730
                } else {
731
0
                    usize::try_from(file_length - offset).expect("Always succeeds because offset < file_length, and offset-file_length must be < 16384")
732
                };
733
7
                if let Err(
err0
) = temp_file.read_exact(&mut temp_buffer[0..buffer_size]).await {
734
0
                    warn!(
735
                        ?err,
736
                        ?temp_path,
737
                        file_length,
738
                        offset,
739
                        buffer_size,
740
                        "Failed to read temp file, skipping duplicate check"
741
                    );
742
0
                    return Ok(false);
743
7
                }
744
7
                if let Err(
err0
) = existing_file
745
7
                    .read_exact(&mut existing_buffer[0..buffer_size])
746
7
                    .await
747
                {
748
0
                    warn!(
749
                        ?err,
750
                        ?existing_path,
751
                        file_length,
752
                        offset,
753
                        buffer_size,
754
                        "Failed to read existing, skipping duplicate check"
755
                    );
756
0
                    return Ok(false);
757
7
                }
758
7
                if temp_buffer.ne(&existing_buffer) {
759
5
                    trace!(
760
                        ?existing_path,
761
                        ?temp_path,
762
                        "Files are different, so non-duplicate"
763
                    );
764
5
                    return Ok(false);
765
2
                }
766
            }
767
2
            trace!(
768
                ?existing_path,
769
                ?temp_path,
770
                "Identical files, so don't need to edit, skipping emplace"
771
            );
772
2
            return Ok(true);
773
1
        }
774
1
        trace!(
775
1
            entry_data_size = entry.data_size(),
776
1
            existing_data_size = existing_item.data_size(),
777
1
            existing_path = ?existing_item_encoded_file_path.get_file_path(),
778
            "Different data sizes, so non-duplicate"
779
        );
780
    } else {
781
153
        trace!(
782
151
            temp_file = ?temp_file_encoded_file_path.get_file_path(),
783
            "No existing entry, so not duplicate"
784
        );
785
    }
786
154
    Ok(false)
787
162
}
788
789
#[derive(Debug, MetricsComponent)]
790
pub struct FilesystemStore<Fe: FileEntry = FileEntryImpl> {
791
    #[metric]
792
    shared_context: Arc<SharedContext>,
793
    #[metric(group = "evicting_map")]
794
    evicting_map: Arc<FsEvictingMap<'static, Fe>>,
795
    #[metric(help = "Block size of the configured filesystem")]
796
    block_size: u64,
797
    #[metric(help = "Size of the configured read buffer size")]
798
    read_buffer_size: usize,
799
    weak_self: Weak<Self>,
800
    rename_fn: fn(&OsStr, &OsStr) -> Result<(), std::io::Error>,
801
    /// Limits concurrent write operations to prevent disk I/O saturation.
802
    write_semaphore: Option<Semaphore>,
803
    /// Per-digest single-flight locks guarding creation of the executable
804
    /// variant in `{content_path}.exec`, so each variant's writable fd is
805
    /// opened exactly once. The outer lock is sync and only ever held to
806
    /// get/insert/remove the per-digest async lock — never across I/O.
807
    #[cfg(unix)]
808
    executable_locks: std::sync::Mutex<HashMap<DigestInfo, Arc<Mutex<()>>>>,
809
}
810
811
impl<Fe: FileEntry> FilesystemStore<Fe> {
812
74
    pub async fn new(spec: &FilesystemSpec) -> Result<Arc<Self>, Error> {
813
148
        
Self::new_with_timeout_and_rename_fn74
(
spec74
, |from, to| std::fs::rename(from, to)).
await74
814
74
    }
815
816
86
    pub async fn new_with_timeout_and_rename_fn(
817
86
        spec: &FilesystemSpec,
818
86
        rename_fn: fn(&OsStr, &OsStr) -> Result<(), std::io::Error>,
819
86
    ) -> Result<Arc<Self>, Error> {
820
172
        async fn create_subdirs(path: &str) -> Result<(), Error> {
821
172
            fs::create_dir_all(format!("{path}/{STR_FOLDER}"))
822
172
                .await
823
172
                .err_tip(|| 
format!0
("Failed to create directory {path}/{STR_FOLDER}"))
?0
;
824
172
            fs::create_dir_all(format!("{path}/{DIGEST_FOLDER}"))
825
172
                .await
826
172
                .err_tip(|| 
format!0
("Failed to create directory {path}/{DIGEST_FOLDER}"))
827
172
        }
828
829
86
        let now = SystemTime::now();
830
831
86
        let empty_policy = nativelink_config::stores::EvictionPolicy::default();
832
86
        let eviction_policy = spec.eviction_policy.as_ref().unwrap_or(&empty_policy);
833
86
        let evicting_map = Arc::new(EvictingMap::new(eviction_policy, now));
834
835
        // Create temp and content directories and the s and d subdirectories.
836
837
86
        create_subdirs(&spec.temp_path).await
?0
;
838
86
        create_subdirs(&spec.content_path).await
?0
;
839
840
        // Executable-variant directory: a sibling of `content_path` holding
841
        // per-digest 0o555 copies used as hardlink sources for executable
842
        // inputs (see `get_executable_hardlink_source`). Cleared on startup —
843
        // the variants are regenerable and we never want a stale one to leak
844
        // across runs. Unix-only: the executable bit (and the ETXTBSY race it
845
        // guards against) does not apply on Windows.
846
        #[cfg(unix)]
847
        {
848
86
            let executable_dir = format!("{}{EXECUTABLE_DIR_SUFFIX}", spec.content_path);
849
86
            drop(fs::remove_dir_all(&executable_dir).await);
850
86
            fs::create_dir_all(format!("{executable_dir}/{DIGEST_FOLDER}"))
851
86
                .await
852
86
                .err_tip(|| 
format!0
("Failed to create executable dir {executable_dir}"))
?0
;
853
        }
854
855
86
        let shared_context = Arc::new(SharedContext {
856
86
            active_drop_spawns: AtomicU64::new(0),
857
86
            temp_path: spec.temp_path.clone(),
858
86
            content_path: spec.content_path.clone(),
859
86
        });
860
861
86
        let block_size = if spec.block_size == 0 {
862
82
            DEFAULT_BLOCK_SIZE
863
        } else {
864
4
            spec.block_size
865
        };
866
86
        add_files_to_cache(
867
86
            evicting_map.as_ref(),
868
86
            &now,
869
86
            &shared_context,
870
86
            block_size,
871
86
            rename_fn,
872
86
        )
873
86
        .await
?0
;
874
86
        prune_temp_path(&shared_context.temp_path).await
?0
;
875
876
86
        let read_buffer_size = if spec.read_buffer_size == 0 {
877
71
            DEFAULT_BUFF_SIZE
878
        } else {
879
15
            spec.read_buffer_size as usize
880
        };
881
86
        let write_semaphore = if spec.max_concurrent_writes > 0 {
882
0
            Some(Semaphore::new(spec.max_concurrent_writes))
883
        } else {
884
86
            None
885
        };
886
86
        Ok(Arc::new_cyclic(|weak_self| Self {
887
86
            shared_context,
888
86
            evicting_map,
889
86
            block_size,
890
86
            read_buffer_size,
891
86
            weak_self: weak_self.clone(),
892
86
            rename_fn,
893
86
            write_semaphore,
894
            #[cfg(unix)]
895
86
            executable_locks: std::sync::Mutex::new(HashMap::new()),
896
86
        }))
897
86
    }
898
899
49
    pub fn get_arc(&self) -> Option<Arc<Self>> {
900
49
        self.weak_self.upgrade()
901
49
    }
902
903
    /// Path of the read-only executable (0o555) variant for `digest`.
904
    #[cfg(unix)]
905
4
    fn executable_variant_path(&self, digest: &DigestInfo) -> OsString {
906
4
        format!(
907
            "{}{EXECUTABLE_DIR_SUFFIX}/{DIGEST_FOLDER}/{digest}",
908
4
            self.shared_context.content_path
909
        )
910
4
        .into()
911
4
    }
912
913
    /// Returns the path to a private, read-only **executable** (0o555) copy of
914
    /// the blob for `digest`, creating it at most once. Callers **hardlink**
915
    /// the returned path into action input trees instead of copying the
916
    /// executable per action.
917
    ///
918
    /// Why this exists: a CAS blob is stored read-only **0o444** and shared
919
    /// across actions by hardlink, so it cannot carry the executable bit and
920
    /// must never be `chmod`'d (that mutates the shared inode — the #2347
921
    /// corruption class). Materializing an executable input therefore needs a
922
    /// separate 0o555 inode. Doing that copy *per action* opens a writable fd
923
    /// in the worker's hot path; under fork-heavy concurrency a child can
924
    /// inherit that fd and a concurrent `execve` of the executable then fails
925
    /// with `ETXTBSY` ("Text file busy", os error 26). Creating the 0o555 inode
926
    /// **once** — writer fd fsync'd and closed, then atomically renamed into
927
    /// place before the inode is ever hardlinked or executed — and hardlinking
928
    /// it thereafter keeps the per-action path hardlink-only.
929
    #[cfg(unix)]
930
4
    pub async fn get_executable_hardlink_source(
931
4
        &self,
932
4
        digest: &DigestInfo,
933
4
    ) -> Result<OsString, Error> {
934
4
        let variant_path = self.executable_variant_path(digest);
935
936
        // Fast path: the variant already exists, so the caller can hardlink it
937
        // with no writable fd anywhere in sight.
938
4
        if fs::metadata(&variant_path).await.is_ok() {
939
2
            return Ok(variant_path);
940
2
        }
941
942
        // Single-flight: exactly one task ever opens a writable fd for this
943
        // variant. Without this, a cold-cache burst of concurrent actions
944
        // would each open a writer for the same executable — the very window
945
        // ETXTBSY exploits.
946
2
        let lock = {
947
2
            let mut locks = self
948
2
                .executable_locks
949
2
                .lock()
950
2
                .expect("executable_locks poisoned");
951
2
            locks
952
2
                .entry(*digest)
953
2
                .or_insert_with(|| Arc::new(Mutex::new(())))
954
2
                .clone()
955
        };
956
2
        let _guard = lock.lock().await;
957
958
        // Re-check: another task may have constructed it while we waited.
959
2
        if fs::metadata(&variant_path).await.is_ok() {
960
0
            self.forget_executable_lock(digest);
961
0
            return Ok(variant_path);
962
2
        }
963
964
2
        let result = self.create_executable_variant(digest, &variant_path).await;
965
        // Drop the per-digest lock entry regardless of outcome so the map
966
        // cannot grow unbounded; a concurrent waiter already cloned the Arc.
967
2
        self.forget_executable_lock(digest);
968
2
        result.map(|()| variant_path)
969
4
    }
970
971
    /// Non-unix has no executable bit and no `ETXTBSY`, so just hardlink the
972
    /// CAS blob directly.
973
    #[cfg(not(unix))]
974
    pub async fn get_executable_hardlink_source(
975
        &self,
976
        digest: &DigestInfo,
977
    ) -> Result<OsString, Error> {
978
        let file_entry = self.get_file_entry_for_digest(digest).await?;
979
        file_entry
980
            .get_file_path_locked(|p| async move { Ok(p) })
981
            .await
982
    }
983
984
    #[cfg(unix)]
985
2
    fn forget_executable_lock(&self, digest: &DigestInfo) {
986
2
        self.executable_locks
987
2
            .lock()
988
2
            .expect("executable_locks poisoned")
989
2
            .remove(digest);
990
2
    }
991
992
    /// Materializes the 0o555 executable variant for `digest`. Must be called
993
    /// under the per-digest single-flight guard.
994
    #[cfg(unix)]
995
2
    async fn create_executable_variant(
996
2
        &self,
997
2
        digest: &DigestInfo,
998
2
        variant_path: &OsStr,
999
2
    ) -> Result<(), Error> {
1000
        // Resolve the on-disk CAS blob (0o444) to copy from. Must be present in
1001
        // this tier; callers populate the fast store first.
1002
2
        let file_entry = self
1003
2
            .get_file_entry_for_digest(digest)
1004
2
            .await
1005
2
            .err_tip(|| "Resolving CAS blob for executable variant")
?0
;
1006
2
        let src_path = file_entry
1007
4
            .
get_file_path_locked2
(|p| async move
{2
Ok(p)2
})
1008
2
            .await
?0
;
1009
1010
2
        let variant_owned = variant_path.to_os_string();
1011
2
        let mut temp_owned = variant_path.to_os_string();
1012
2
        temp_owned.push(".tmp");
1013
2
        let rename_fn = self.rename_fn;
1014
1015
        // All of this is blocking std::fs; run it off the async runtime. The
1016
        // writable fd opened by `copy` is fully closed before the `rename`
1017
        // publishes the inode, so no reachable hardlink of the variant ever has
1018
        // an open writer.
1019
2
        spawn_blocking!(
1020
            "filesystem_store_executable_variant",
1021
2
            move || -> Result<(), Error> {
1022
                use std::os::unix::fs::PermissionsExt;
1023
2
                std::fs::copy(&src_path, &temp_owned).map_err(|e| 
{0
1024
0
                    make_err!(Code::Internal, "executable-variant copy failed: {e:?}")
1025
0
                })?;
1026
2
                std::fs::set_permissions(&temp_owned, std::fs::Permissions::from_mode(0o555))
1027
2
                    .map_err(|e| 
{0
1028
0
                        make_err!(
1029
0
                            Code::Internal,
1030
                            "executable-variant chmod 0o555 failed: {e:?}"
1031
                        )
1032
0
                    })?;
1033
                // Reopen read-only purely to fsync the bytes durable before publish.
1034
2
                let f = std::fs::File::open(&temp_owned)
1035
2
                    .map_err(|e| 
make_err!0
(
Code::Internal0
, "executable-variant reopen: {e:?}"))
?0
;
1036
2
                f.sync_all()
1037
2
                    .map_err(|e| 
make_err!0
(
Code::Internal0
, "executable-variant fsync: {e:?}"))
?0
;
1038
2
                drop(f);
1039
2
                rename_fn(temp_owned.as_os_str(), variant_owned.as_os_str()).map_err(|e| 
{0
1040
0
                    make_err!(Code::Internal, "executable-variant rename failed: {e:?}")
1041
0
                })?;
1042
2
                Ok(())
1043
2
            }
1044
        )
1045
2
        .await
1046
2
        .err_tip(|| "executable-variant spawn_blocking join failed")
?0
1047
2
    }
1048
1049
33
    pub async fn get_file_entry_for_digest(&self, digest: &DigestInfo) -> Result<Arc<Fe>, Error> {
1050
        // Zero-digest blobs have no backing file on disk (FilesystemStore
1051
        // never persists zero-byte content). The previous implementation
1052
        // returned a synthetic FileEntry whose content_path did not exist,
1053
        // which downstream callers would then try to hard_link from,
1054
        // silently producing missing or empty output files in worker
1055
        // execution directories. Return NotFound so callers are forced to
1056
        // take the explicit zero-digest path (e.g. fs::create_file).
1057
33
        if is_zero_digest(digest) {
1058
1
            return Err(make_err!(
1059
1
                Code::NotFound,
1060
1
                "{digest} is a zero-digest; FilesystemStore does not persist zero-byte files. \
1061
1
                 Callers must materialise empty files directly rather than going through get_file_entry_for_digest."
1062
1
            ));
1063
32
        }
1064
32
        self.evicting_map
1065
32
            .get(&digest.into())
1066
32
            .await
1067
32
            .ok_or_else(|| 
make_err!0
(
Code::NotFound0
, "{digest} not found in filesystem store. This may indicate the file was evicted due to cache pressure. Consider increasing 'max_bytes' in your filesystem store's eviction_policy configuration."))
1068
33
    }
1069
1070
116
    async fn update_file(
1071
116
        self: Pin<&Self>,
1072
116
        mut entry: Fe,
1073
116
        mut temp_file: FileSlot,
1074
116
        final_key: StoreKey<'static>,
1075
116
        mut reader: DropCloserReadHalf,
1076
116
    ) -> Result<u64, Error> {
1077
116
        let mut data_size = 0;
1078
        loop {
1079
230
            let mut data = reader
1080
230
                .recv()
1081
230
                .await
1082
230
                .err_tip(|| "Failed to receive data in filesystem store")
?0
;
1083
230
            let data_len = data.len();
1084
230
            if data_len == 0 {
1085
116
                break; // EOF.
1086
114
            }
1087
114
            temp_file
1088
114
                .write_all_buf(&mut data)
1089
114
                .await
1090
114
                .err_tip(|| "Failed to write data into filesystem store")
?0
;
1091
114
            data_size += data_len as u64;
1092
        }
1093
1094
116
        let permit = if let Some(
sem0
) = &self.write_semaphore {
1095
0
            Some(sem.acquire().await.map_err(|err| {
1096
0
                Error::from_std_err(Code::Internal, &err).append("Write semaphore closed")
1097
0
            })?)
1098
        } else {
1099
116
            None
1100
        };
1101
1102
        // tokio defers write errors to the next write or flush call, so without an explicit flush
1103
        // the final write's failure is silently swallowed by `sync_all`, and a truncated file would
1104
        // be renamed into the content path.
1105
116
        temp_file
1106
116
            .flush()
1107
116
            .await
1108
116
            .err_tip(|| "Failed to flush in filesystem store")
?0
;
1109
116
        temp_file
1110
116
            .as_ref()
1111
116
            .sync_all()
1112
116
            .await
1113
116
            .err_tip(|| "Failed to sync_data in filesystem store")
?0
;
1114
1115
116
        drop(permit);
1116
1117
116
        temp_file.advise_dontneed();
1118
116
        trace!(?temp_file, "Dropping file to update_file");
1119
116
        drop(temp_file);
1120
1121
116
        *entry.data_size_mut() = data_size;
1122
116
        self.emplace_file(final_key, Arc::new(entry)).await
?1
;
1123
115
        Ok(data_size)
1124
116
    }
1125
1126
158
    async fn emplace_file(&self, key: StoreKey<'static>, entry: Arc<Fe>) -> Result<(), Error> {
1127
        // This sequence of events is quite tricky to understand due to the amount of triggers that
1128
        // happen, async'ness of it and the locking. So here is a breakdown of what happens:
1129
        // 1. Here will hold a write lock on any file operations of this FileEntry.
1130
        // 2. Then insert the entry into the evicting map. This may trigger an eviction of other
1131
        //    entries.
1132
        // 3. Eviction triggers `unref()`, which grabs a write lock on the evicted FileEntry
1133
        //    during the rename.
1134
        // 4. It should be impossible for items to be added while eviction is happening, so there
1135
        //    should not be a deadlock possibility. However, it is possible for the new FileEntry
1136
        //    to be evicted before the file is moved into place. Eviction of the newly inserted
1137
        //    item is not possible within the `insert()` call because the write lock inside the
1138
        //    eviction map. If an eviction of new item happens after `insert()` but before
1139
        //    `rename()` then we get to finish our operation because the `unref()` of the new item
1140
        //    will be blocked on us because we currently have the lock.
1141
        // 5. Move the file into place. Since we hold a write lock still anyone that gets our new
1142
        //    FileEntry (which has not yet been placed on disk) will not be able to read the file's
1143
        //    contents until we release the lock.
1144
158
        let evicting_map = self.evicting_map.clone();
1145
158
        let rename_fn = self.rename_fn;
1146
1147
        // We need to guarantee that this will get to the end even if the parent future is dropped.
1148
        // See: https://github.com/TraceMachina/nativelink/issues/495
1149
158
        background_spawn!("filesystem_store_emplace_file", async move {
1150
            // Sometimes we get files to emplace that are identical to the existing files
1151
            // Due to the evict/remove/replace cycle taking some amount of time, we actually
1152
            // want to drop these
1153
158
            if check_duplicate_files(&evicting_map, &key, &entry).await
?0
{
1154
1
                return Ok(());
1155
157
            }
1156
1157
157
            evicting_map
1158
157
                .insert(key.borrow().into_owned().into(), entry.clone())
1159
157
                .await;
1160
1161
            // The insert might have resulted in an eviction/unref so we need to check
1162
            // it still exists in there. But first, get the lock...
1163
157
            let mut encoded_file_path = entry.get_encoded_file_path().write().await;
1164
            // Then check it's still in there...
1165
157
            if evicting_map.get(&key).await.is_none() {
1166
1
                info!(%key, "Got eviction while emplacing, dropping");
1167
1
                return Ok(());
1168
156
            }
1169
1170
156
            let final_path = get_file_path_raw(
1171
156
                &PathType::Content,
1172
156
                encoded_file_path.shared_context.as_ref(),
1173
156
                &key,
1174
            );
1175
1176
156
            let from_path = encoded_file_path.get_file_path();
1177
1178
            // Lock the blob down as read-only *before* it lands at its final
1179
            // content path, so every hardlink of it (the worker directory cache
1180
            // and `download_to_directory`) inherits an immutable, read-only
1181
            // inode. This is what preserves the input-tree hermeticity contract
1182
            // (actions cannot mutate their inputs) without a per-materialization
1183
            // chmod walk, and it means callers must never `chmod` a hardlinked
1184
            // blob — anything needing a different mode (e.g. an executable's +x
1185
            // bit) must take a private copy. Content-addressed blobs are
1186
            // write-once and replaced by `rename` rather than in-place writes,
1187
            // and unlink only needs the *parent directory* to be writable, so a
1188
            // read-only file mode is safe for both overwrite-by-rename and
1189
            // eviction. Best-effort: a failure here (e.g. the temp file was
1190
            // already evicted out from under us) must not abort the emplace.
1191
            #[cfg(unix)]
1192
            {
1193
                use std::os::unix::fs::PermissionsExt;
1194
0
                if let Err(err) =
1195
156
                    fs::set_permissions(&from_path, std::fs::Permissions::from_mode(0o444)).await
1196
                {
1197
0
                    warn!(?err, ?from_path, "Failed to set CAS blob read-only");
1198
156
                }
1199
            }
1200
            // Internally tokio spawns fs commands onto a blocking thread anyways.
1201
            // Since we are already on a blocking thread, we just need the `fs` wrapper to manage
1202
            // an open-file permit (ensure we don't open too many files at once).
1203
156
            let result = (rename_fn)(&from_path, &final_path).err_tip(|| 
{1
1204
1
                format!(
1205
                    "Failed to rename temp file to final path {}",
1206
1
                    final_path.display()
1207
                )
1208
1
            });
1209
1210
            // In the event our move from temp file to final file fails we need to ensure we remove
1211
            // the entry from our map.
1212
            // Remember: At this point it is possible for another thread to have a reference to
1213
            // `entry`, so we can't delete the file, only drop() should ever delete files.
1214
156
            if let Err(
err1
) = result {
1215
1
                error!(?err, ?from_path, ?final_path, "Failed to rename file",);
1216
                // Warning: To prevent deadlock we need to release our lock or during `remove_if()`
1217
                // it will call `unref()`, which triggers a write-lock on `encoded_file_path`.
1218
1
                drop(encoded_file_path);
1219
                // It is possible that the item in our map is no longer the item we inserted,
1220
                // So, we need to conditionally remove it only if the pointers are the same.
1221
1222
1
                evicting_map
1223
1
                    .remove_if(&key, |map_entry| Arc::<Fe>::ptr_eq(map_entry, &entry))
1224
1
                    .await;
1225
1
                return Err(err);
1226
155
            }
1227
155
            trace!(?key, "Finished emplace file");
1228
155
            encoded_file_path.path_type = PathType::Content;
1229
155
            encoded_file_path.key = key;
1230
155
            Ok(())
1231
158
        })
1232
158
        .await
1233
157
        .err_tip(|| "Failed to create spawn in filesystem store update_file")
?0
1234
157
    }
1235
1236
0
    pub fn get_eviction_snapshot(&self) -> EvictionSnapshot {
1237
0
        self.evicting_map.get_snapshot()
1238
0
    }
1239
1240
    // Only for tests, so we can run check_duplicate_files
1241
4
    pub fn get_evicting_map(&self) -> Arc<FsEvictingMap<'static, Fe>> {
1242
4
        self.evicting_map.clone()
1243
4
    }
1244
1245
    // Separated out so tests can use this
1246
150
    pub async fn make_temp_file(
1247
150
        &self,
1248
150
        temp_key: StoreKey<'static>,
1249
150
    ) -> Result<(Fe, FileSlot, OsString), Error> {
1250
150
        Fe::make_and_open_file(
1251
150
            self.block_size,
1252
150
            EncodedFilePath {
1253
150
                shared_context: self.shared_context.clone(),
1254
150
                path_type: PathType::Temp,
1255
150
                key: temp_key,
1256
150
            },
1257
150
        )
1258
150
        .await
1259
150
    }
1260
}
1261
1262
#[async_trait]
1263
impl<Fe: FileEntry> StoreDriver for FilesystemStore<Fe> {
1264
0
    async fn post_init(self: Arc<Self>) -> Result<(), Error> {
1265
        Ok(())
1266
0
    }
1267
1268
    async fn has_with_results(
1269
        self: Pin<&Self>,
1270
        keys: &[StoreKey<'_>],
1271
        results: &mut [Option<u64>],
1272
155
    ) -> Result<(), Error> {
1273
        let own_keys = keys
1274
            .iter()
1275
155
            .map(|sk| sk.borrow().into_owned())
1276
            .collect::<Vec<_>>();
1277
        self.evicting_map
1278
            .sizes_for_keys(own_keys.iter(), results, false /* peek */)
1279
            .await;
1280
        // We need to do a special pass to ensure our zero files exist.
1281
        // If our results failed and the result was a zero file, we need to
1282
        // create the file by spec.
1283
        for (key, result) in keys.iter().zip(results.iter_mut()) {
1284
            if result.is_some() || !is_zero_digest(key.borrow()) {
1285
                continue;
1286
            }
1287
            let (mut tx, rx) = make_buf_channel_pair();
1288
            let send_eof_result = tx.send_eof();
1289
            self.update(key.borrow(), rx, UploadSizeInfo::ExactSize(0))
1290
                .await
1291
0
                .err_tip(|| format!("Failed to create zero file for key {}", key.as_str()))
1292
                .merge(
1293
                    send_eof_result
1294
                        .err_tip(|| "Failed to send zero file EOF in filesystem store has"),
1295
                )?;
1296
1297
            *result = Some(0);
1298
        }
1299
        Ok(())
1300
155
    }
1301
1302
    async fn update(
1303
        self: Pin<&Self>,
1304
        key: StoreKey<'_>,
1305
        mut reader: DropCloserReadHalf,
1306
        _upload_size: UploadSizeInfo,
1307
202
    ) -> Result<u64, Error> {
1308
        if is_zero_digest(key.borrow()) {
1309
            // don't need to add, because zero length files are just assumed to exist.
1310
            return Ok(0);
1311
        }
1312
1313
        let temp_key = make_temp_key(&key);
1314
1315
        // There's a possibility of deadlock here where we take all of the
1316
        // file semaphores with make_and_open_file and the semaphores for
1317
        // whatever is populating reader is exhasted on the threads that
1318
        // have the FileSlots and not on those which can't.  To work around
1319
        // this we don't take the FileSlot until there's something on the
1320
        // reader available to know that the populator is active.
1321
        reader.peek().await?;
1322
1323
        let (entry, temp_file, temp_full_path) = self.make_temp_file(temp_key).await?;
1324
1325
        self.update_file(entry, temp_file, key.into_owned(), reader)
1326
            .await
1327
1
            .err_tip(|| {
1328
1
                format!(
1329
                    "While processing with temp file {}",
1330
1
                    temp_full_path.display()
1331
                )
1332
1
            })
1333
202
    }
1334
1335
168
    fn optimized_for(&self, optimization: StoreOptimizations) -> bool {
1336
158
        matches!(
1337
168
            optimization,
1338
            StoreOptimizations::FileUpdates | StoreOptimizations::SubscribesToUpdateOneshot
1339
        )
1340
168
    }
1341
1342
32
    async fn update_oneshot(self: Pin<&Self>, key: StoreKey<'_>, data: Bytes) -> Result<(), Error> {
1343
        if is_zero_digest(key.borrow()) {
1344
            return Ok(());
1345
        }
1346
1347
        let temp_key = make_temp_key(&key);
1348
        let (mut entry, mut temp_file, temp_full_path) = self
1349
            .make_temp_file(temp_key)
1350
            .await
1351
            .err_tip(|| "Failed to create temp file in filesystem store update_oneshot")?;
1352
1353
        // Write directly without channel overhead
1354
        if !data.is_empty() {
1355
            temp_file
1356
                .write_all(&data)
1357
                .await
1358
0
                .err_tip(|| format!("Failed to write data to {}", temp_full_path.display()))?;
1359
        }
1360
1361
        let _permit = if let Some(sem) = &self.write_semaphore {
1362
0
            Some(sem.acquire().await.map_err(|err| {
1363
0
                Error::from_std_err(Code::Internal, &err).append("Write semaphore closed")
1364
0
            })?)
1365
        } else {
1366
            None
1367
        };
1368
1369
        // See comment in `update_file` above.
1370
        temp_file
1371
            .flush()
1372
            .await
1373
            .err_tip(|| "Failed to flush in filesystem store update_oneshot")?;
1374
        temp_file
1375
            .as_ref()
1376
            .sync_all()
1377
            .await
1378
            .err_tip(|| "Failed to sync_data in filesystem store update_oneshot")?;
1379
1380
        drop(_permit);
1381
1382
        temp_file.advise_dontneed();
1383
        drop(temp_file);
1384
1385
        *entry.data_size_mut() = data.len() as u64;
1386
        self.emplace_file(key.into_owned(), Arc::new(entry)).await
1387
32
    }
1388
1389
    async fn update_with_whole_file(
1390
        self: Pin<&Self>,
1391
        key: StoreKey<'_>,
1392
        path: OsString,
1393
        file: FileSlot,
1394
        upload_size: UploadSizeInfo,
1395
13
    ) -> Result<(u64, Option<FileSlot>), Error> {
1396
        let file_size = match upload_size {
1397
            UploadSizeInfo::ExactSize(size) => size,
1398
            UploadSizeInfo::MaxSize(_) => file
1399
                .as_ref()
1400
                .metadata()
1401
                .await
1402
0
                .err_tip(|| format!("While reading metadata for {}", path.display()))?
1403
                .len(),
1404
        };
1405
        if file_size == 0 {
1406
            // don't need to add, because zero length files are just assumed to exist
1407
            return Ok((0, None));
1408
        }
1409
        let entry = Fe::create(
1410
            file_size,
1411
            self.block_size,
1412
            RwLock::new(EncodedFilePath {
1413
                shared_context: self.shared_context.clone(),
1414
                path_type: PathType::Custom(path),
1415
                key: key.borrow().into_owned(),
1416
            }),
1417
        );
1418
        // We are done with the file, if we hold a reference to the file here, it could
1419
        // result in a deadlock if `emplace_file()` also needs file descriptors.
1420
        trace!(?file, "Dropping file to to update_with_whole_file");
1421
        file.advise_dontneed();
1422
        drop(file);
1423
        self.emplace_file(key.into_owned(), Arc::new(entry))
1424
            .await
1425
            .err_tip(|| "Could not move file into store in upload_file_to_store, maybe dest is on different volume?")?;
1426
        return Ok((file_size, None));
1427
13
    }
1428
1429
    async fn get_part(
1430
        self: Pin<&Self>,
1431
        key: StoreKey<'_>,
1432
        writer: &mut DropCloserWriteHalf,
1433
        offset: u64,
1434
        length: Option<u64>,
1435
88
    ) -> Result<(), Error> {
1436
        if is_zero_digest(key.borrow()) {
1437
            self.has(key.borrow())
1438
                .await
1439
                .err_tip(|| "Failed to check if zero digest exists in filesystem store")?;
1440
            writer
1441
                .send_eof()
1442
                .err_tip(|| "Failed to send zero EOF in filesystem store get_part")?;
1443
            return Ok(());
1444
        }
1445
        let owned_key = key.into_owned();
1446
2
        let entry = self.evicting_map.get(&owned_key).await.ok_or_else(|| {
1447
2
            make_err!(
1448
2
                Code::NotFound,
1449
                "{} not found in filesystem store here",
1450
2
                owned_key.as_str()
1451
            )
1452
2
        })?;
1453
        let read_limit = length.unwrap_or(u64::MAX);
1454
3
        let mut temp_file = entry.read_file_part(offset, read_limit).or_else(|err| async move {
1455
            // If the file is not found, we need to remove it from the eviction map.
1456
3
            if err.code == Code::NotFound {
1457
                // Map said the file was present but `open()` hit ENOENT.
1458
                // Self-heals: we remove the stale entry below and a
1459
                // fast/slow caller re-populates from the slow store, so
1460
                // this is a recoverable warn, not a fatal error.
1461
3
                warn!(
1462
                    ?err,
1463
                    key = ?owned_key,
1464
                    "Filesystem store map/disk divergence: removing entry; reader will fall through to slow store",
1465
                );
1466
3
                self.evicting_map.remove(&owned_key).await;
1467
0
            }
1468
3
            Err(err)
1469
6
        }).await?;
1470
1471
        loop {
1472
            let mut buf = BytesMut::with_capacity(self.read_buffer_size);
1473
            temp_file
1474
                .read_buf(&mut buf)
1475
                .await
1476
                .err_tip(|| "Failed to read data in filesystem store")?;
1477
            if buf.is_empty() {
1478
                break; // EOF.
1479
            }
1480
            writer
1481
                .send(buf.freeze())
1482
                .await
1483
                .err_tip(|| "Failed to send chunk in filesystem store get_part")?;
1484
        }
1485
        temp_file.get_ref().advise_dontneed();
1486
        writer
1487
            .send_eof()
1488
            .err_tip(|| "Filed to send EOF in filesystem store get_part")?;
1489
1490
        Ok(())
1491
88
    }
1492
1493
207
    fn inner_store(&self, _digest: Option<StoreKey>) -> &dyn StoreDriver {
1494
207
        self
1495
207
    }
1496
1497
49
    fn as_any<'a>(&'a self) -> &'a (dyn core::any::Any + Sync + Send + 'static) {
1498
49
        self
1499
49
    }
1500
1501
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Sync + Send + 'static> {
1502
0
        self
1503
0
    }
1504
1505
0
    fn register_health(self: Arc<Self>, registry: &mut HealthRegistryBuilder) {
1506
0
        registry.register_indicator(self);
1507
0
    }
1508
1509
0
    fn register_remove_callback(
1510
0
        self: Arc<Self>,
1511
0
        callback: Arc<dyn RemoveItemCallback>,
1512
0
    ) -> Result<(), Error> {
1513
0
        self.evicting_map
1514
0
            .add_remove_callback(RemoveItemCallbackHolder::new(callback));
1515
0
        Ok(())
1516
0
    }
1517
}
1518
1519
#[async_trait]
1520
impl<Fe: FileEntry> HealthStatusIndicator for FilesystemStore<Fe> {
1521
0
    fn get_name(&self) -> &'static str {
1522
0
        "FilesystemStore"
1523
0
    }
1524
1525
    /// Lightweight probe: `stat()` the `content_path` directory. No
1526
    /// write-semaphore / eviction-map contention with production
1527
    /// traffic, and bounded so a hung NFS / EBS mount can't wedge the
1528
    /// indicator.
1529
2
    async fn check_health(&self, _namespace: Cow<'static, str>) -> HealthStatus {
1530
        const HEALTH_PROBE_TIMEOUT: Duration = Duration::from_secs(2);
1531
1532
        let content_path = &self.shared_context.content_path;
1533
        let stat = tokio::fs::metadata(&content_path);
1534
        match timeout(HEALTH_PROBE_TIMEOUT, stat).await {
1535
            Ok(Ok(meta)) if meta.is_dir() => {
1536
                HealthStatus::new_ok(self, "FilesystemStore::check_health: ok".into())
1537
            }
1538
            Ok(Ok(_)) => HealthStatus::new_failed(
1539
                self,
1540
                format!(
1541
                    "FilesystemStore::check_health: content_path {content_path} is not a directory"
1542
                )
1543
                .into(),
1544
            ),
1545
            Ok(Err(e)) => {
1546
                warn!(
1547
                    ?e,
1548
                    %content_path,
1549
                    "FilesystemStore::check_health: stat errored",
1550
                );
1551
                HealthStatus::new_failed(
1552
                    self,
1553
                    format!("FilesystemStore::check_health: stat errored: {e}").into(),
1554
                )
1555
            }
1556
            Err(_) => {
1557
                warn!(
1558
                    %content_path,
1559
                    timeout_secs = HEALTH_PROBE_TIMEOUT.as_secs(),
1560
                    "FilesystemStore::check_health: stat timed out",
1561
                );
1562
                HealthStatus::Timeout {
1563
                    struct_name: self.struct_name(),
1564
                }
1565
            }
1566
        }
1567
2
    }
1568
}