Coverage Report

Created: 2026-06-12 02:59

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-store/src/filesystem_store.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::fmt::{Debug, Formatter};
16
use core::pin::Pin;
17
use core::sync::atomic::{AtomicU64, Ordering};
18
use core::time::Duration;
19
use std::borrow::Cow;
20
#[cfg(unix)]
21
use std::collections::HashMap;
22
use std::ffi::{OsStr, OsString};
23
use std::sync::{Arc, Weak};
24
use std::time::SystemTime;
25
26
#[cfg(unix)]
27
use async_lock::Mutex;
28
use async_lock::RwLock;
29
use async_trait::async_trait;
30
use bytes::{Bytes, BytesMut};
31
use futures::stream::{StreamExt, TryStreamExt};
32
use futures::{Future, TryFutureExt};
33
use nativelink_config::stores::FilesystemSpec;
34
use nativelink_error::{Code, Error, ResultExt, make_err};
35
use nativelink_metric::MetricsComponent;
36
use nativelink_util::background_spawn;
37
use nativelink_util::buf_channel::{
38
    DropCloserReadHalf, DropCloserWriteHalf, make_buf_channel_pair,
39
};
40
use nativelink_util::common::{DigestInfo, fs};
41
use nativelink_util::evicting_map::{EvictingMap, EvictionSnapshot, LenEntry};
42
use nativelink_util::fs::FileSlot;
43
use nativelink_util::health_utils::{HealthRegistryBuilder, HealthStatus, HealthStatusIndicator};
44
#[cfg(unix)]
45
use nativelink_util::spawn_blocking;
46
use nativelink_util::store_trait::{
47
    RemoveItemCallback, StoreDriver, StoreKey, StoreKeyBorrow, StoreOptimizations, UploadSizeInfo,
48
};
49
use tokio::io::{AsyncReadExt, AsyncWriteExt, Take};
50
use tokio::sync::Semaphore;
51
use tokio::time::timeout;
52
use tokio_stream::wrappers::ReadDirStream;
53
use tracing::{debug, error, info, trace, warn};
54
55
use crate::callback_utils::RemoveItemCallbackHolder;
56
use crate::cas_utils::is_zero_digest;
57
58
// Default size to allocate memory of the buffer when reading files.
59
const DEFAULT_BUFF_SIZE: usize = 32 * 1024;
60
// Default block size of all major filesystems is 4KB
61
const DEFAULT_BLOCK_SIZE: u64 = 4 * 1024;
62
63
pub const STR_FOLDER: &str = "s";
64
pub const DIGEST_FOLDER: &str = "d";
65
66
/// Suffix for the sibling directory that holds per-digest read-only
67
/// **executable** (0o555) variants of CAS blobs (see
68
/// [`FilesystemStore::get_executable_hardlink_source`]). It is a sibling of
69
/// `content_path` rather than a child so the normal content/temp scan and prune
70
/// logic never touches it. Cleared on startup; entries are regenerable.
71
#[cfg(unix)]
72
const EXECUTABLE_DIR_SUFFIX: &str = ".exec";
73
74
#[derive(Clone, Copy, Debug)]
75
pub enum FileType {
76
    Digest,
77
    String,
78
}
79
80
#[derive(Debug, MetricsComponent)]
81
pub struct SharedContext {
82
    // Used in testing to know how many active drop() spawns are running.
83
    // TODO(palfrey) It is probably a good idea to use a spin lock during
84
    // destruction of the store to ensure that all files are actually
85
    // deleted (similar to how it is done in tests).
86
    #[metric(help = "Number of active drop spawns")]
87
    pub active_drop_spawns: AtomicU64,
88
    #[metric(help = "Path to the configured temp path")]
89
    temp_path: String,
90
    #[metric(help = "Path to the configured content path")]
91
    content_path: String,
92
}
93
94
#[derive(Eq, PartialEq, Debug)]
95
enum PathType {
96
    Content,
97
    Temp,
98
    Custom(OsString),
99
}
100
101
/// [`EncodedFilePath`] stores the path to the file
102
/// including the context, path type and key to the file.
103
/// The whole [`StoreKey`] is stored as opposed to solely
104
/// the [`DigestInfo`] so that it is more usable for things
105
/// such as BEP -see Issue #1108
106
#[derive(Debug)]
107
pub struct EncodedFilePath {
108
    shared_context: Arc<SharedContext>,
109
    path_type: PathType,
110
    key: StoreKey<'static>,
111
}
112
113
impl EncodedFilePath {
114
    #[inline]
115
578
    fn get_file_path(&self) -> Cow<'_, OsStr> {
116
578
        get_file_path_raw(&self.path_type, self.shared_context.as_ref(), &self.key)
117
578
    }
118
}
119
120
#[inline]
121
731
fn get_file_path_raw<'a>(
122
731
    path_type: &'a PathType,
123
731
    shared_context: &SharedContext,
124
731
    key: &StoreKey<'a>,
125
731
) -> Cow<'a, OsStr> {
126
731
    let 
folder707
= match path_type {
127
262
        PathType::Content => &shared_context.content_path,
128
445
        PathType::Temp => &shared_context.temp_path,
129
24
        PathType::Custom(path) => return Cow::Borrowed(path),
130
    };
131
707
    Cow::Owned(to_full_path_from_key(folder, key))
132
731
}
133
134
impl Drop for EncodedFilePath {
135
161
    fn drop(&mut self) {
136
        // `drop()` can be called during shutdown, so we use `path_type` flag to know if the
137
        // file actually needs to be deleted.
138
161
        if self.path_type == PathType::Content {
139
148
            return;
140
13
        }
141
142
13
        let file_path = self.get_file_path().to_os_string();
143
13
        let shared_context = self.shared_context.clone();
144
        // .fetch_add returns previous value, so we add one to get approximate current value
145
13
        let current_active_drop_spawns = shared_context
146
13
            .active_drop_spawns
147
13
            .fetch_add(1, Ordering::Relaxed)
148
13
            + 1;
149
13
        debug!(
150
            %current_active_drop_spawns,
151
            ?file_path,
152
            "Spawned a filesystem_delete_file"
153
        );
154
13
        background_spawn!("filesystem_delete_file", async move 
{8
155
8
            let 
result5
= fs::remove_file(&file_path)
156
8
                .await
157
5
                .err_tip(|| 
format!0
("Failed to remove file {}",
file_path.display()0
));
158
5
            if let Err(
err0
) = result {
159
0
                error!(?file_path, ?err, "Failed to delete file",);
160
            } else {
161
5
                debug!(?file_path, "File deleted",);
162
            }
163
            // .fetch_sub returns previous value, so we subtract one to get approximate current value
164
5
            let current_active_drop_spawns = shared_context
165
5
                .active_drop_spawns
166
5
                .fetch_sub(1, Ordering::Relaxed)
167
5
                - 1;
168
5
            debug!(
169
                ?current_active_drop_spawns,
170
                ?file_path,
171
                "Dropped a filesystem_delete_file"
172
            );
173
5
        });
174
161
    }
175
}
176
177
/// This creates the file path from the [`StoreKey`]. If
178
/// it is a string, the string, prefixed with [`STR_PREFIX`]
179
/// for backwards compatibility, is stored.
180
///
181
/// If it is a [`DigestInfo`], it is prefixed by [`DIGEST_PREFIX`]
182
/// followed by the string representation of a digest - the hash in hex,
183
/// a hyphen then the size in bytes
184
///
185
/// Previously, only the string representation of the [`DigestInfo`] was
186
/// used with no prefix
187
#[inline]
188
715
fn to_full_path_from_key(folder: &str, key: &StoreKey<'_>) -> OsString {
189
715
    match key {
190
3
        StoreKey::Str(str) => format!("{folder}/{STR_FOLDER}/{str}"),
191
712
        StoreKey::Digest(digest_info) => format!("{folder}/{DIGEST_FOLDER}/{digest_info}"),
192
    }
193
715
    .into()
194
715
}
195
196
pub trait FileEntry: LenEntry + Send + Sync + Debug + 'static {
197
    /// Responsible for creating the underlying `FileEntry`.
198
    fn create(data_size: u64, block_size: u64, encoded_file_path: RwLock<EncodedFilePath>) -> Self;
199
200
    /// Creates a (usually) temp file, opens it and returns the path to the temp file.
201
    fn make_and_open_file(
202
        block_size: u64,
203
        encoded_file_path: EncodedFilePath,
204
    ) -> impl Future<Output = Result<(Self, FileSlot, OsString), Error>> + Send
205
    where
206
        Self: Sized;
207
208
    /// Returns the underlying size of the data in bytes
209
    fn data_size(&self) -> u64;
210
211
    /// Returns the underlying reference to the size of the data in bytes
212
    fn data_size_mut(&mut self) -> &mut u64;
213
214
    /// Returns the actual size of the underlying file on the disk after accounting for filesystem block size.
215
    fn size_on_disk(&self) -> u64;
216
217
    /// Gets the underlying `EncodedfilePath`.
218
    fn get_encoded_file_path(&self) -> &RwLock<EncodedFilePath>;
219
220
    /// Returns a reader that will read part of the underlying file.
221
    fn read_file_part(
222
        &self,
223
        offset: u64,
224
        length: u64,
225
    ) -> impl Future<Output = Result<Take<FileSlot>, Error>> + Send;
226
227
    /// This function is a safe way to extract the file name of the underlying file. To protect users from
228
    /// accidentally creating undefined behavior we encourage users to do the logic they need to do with
229
    /// the filename inside this function instead of extracting the filename and doing the logic outside.
230
    /// This is because the filename is not guaranteed to exist after this function returns, however inside
231
    /// the callback the file is always guaranteed to exist and immutable.
232
    /// DO NOT USE THIS FUNCTION TO EXTRACT THE FILENAME AND STORE IT FOR LATER USE.
233
    fn get_file_path_locked<
234
        T,
235
        Fut: Future<Output = Result<T, Error>> + Send,
236
        F: FnOnce(OsString) -> Fut + Send,
237
    >(
238
        &self,
239
        handler: F,
240
    ) -> impl Future<Output = Result<T, Error>> + Send;
241
}
242
243
pub struct FileEntryImpl {
244
    data_size: u64,
245
    block_size: u64,
246
    // We lock around this as it gets rewritten when we move between temp and content types
247
    encoded_file_path: RwLock<EncodedFilePath>,
248
}
249
250
impl FileEntryImpl {
251
11
    pub fn get_shared_context_for_test(&mut self) -> Arc<SharedContext> {
252
11
        self.encoded_file_path.get_mut().shared_context.clone()
253
11
    }
254
}
255
256
impl FileEntry for FileEntryImpl {
257
161
    fn create(data_size: u64, block_size: u64, encoded_file_path: RwLock<EncodedFilePath>) -> Self {
258
161
        Self {
259
161
            data_size,
260
161
            block_size,
261
161
            encoded_file_path,
262
161
        }
263
161
    }
264
265
    /// This encapsulates the logic for the edge case of if the file fails to create
266
    /// the cleanup of the file is handled without creating a `FileEntry`, which would
267
    /// try to cleanup the file as well during `drop()`.
268
147
    async fn make_and_open_file(
269
147
        block_size: u64,
270
147
        encoded_file_path: EncodedFilePath,
271
147
    ) -> Result<(Self, FileSlot, OsString), Error> {
272
147
        let temp_full_path = encoded_file_path.get_file_path().to_os_string();
273
147
        let temp_file_result = fs::create_file(temp_full_path.clone())
274
147
            .or_else(|mut err| async 
{0
275
0
                let remove_result = fs::remove_file(&temp_full_path).await.err_tip(|| {
276
0
                    format!(
277
                        "Failed to remove file {} in filesystem store",
278
0
                        temp_full_path.display()
279
                    )
280
0
                });
281
0
                if let Err(remove_err) = remove_result {
282
0
                    err = err.merge(remove_err);
283
0
                }
284
0
                warn!(?err, ?block_size, ?temp_full_path, "Failed to create file",);
285
0
                Err(err).err_tip(|| {
286
0
                    format!(
287
                        "Failed to create {} in filesystem store",
288
0
                        temp_full_path.display()
289
                    )
290
0
                })
291
0
            })
292
147
            .await
?0
;
293
294
147
        Ok((
295
147
            <Self as FileEntry>::create(
296
147
                0, /* Unknown yet, we will fill it in later */
297
147
                block_size,
298
147
                RwLock::new(encoded_file_path),
299
147
            ),
300
147
            temp_file_result,
301
147
            temp_full_path,
302
147
        ))
303
147
    }
304
305
25
    fn data_size(&self) -> u64 {
306
25
        self.data_size
307
25
    }
308
309
145
    fn data_size_mut(&mut self) -> &mut u64 {
310
145
        &mut self.data_size
311
145
    }
312
313
245
    fn size_on_disk(&self) -> u64 {
314
245
        self.data_size.div_ceil(self.block_size) * self.block_size
315
245
    }
316
317
415
    fn get_encoded_file_path(&self) -> &RwLock<EncodedFilePath> {
318
415
        &self.encoded_file_path
319
415
    }
320
321
68
    fn read_file_part(
322
68
        &self,
323
68
        offset: u64,
324
68
        length: u64,
325
68
    ) -> impl Future<Output = Result<Take<FileSlot>, Error>> + Send {
326
68
        self.get_file_path_locked(move |full_content_path| async move {
327
68
            let 
file66
= fs::open_file(&full_content_path, offset, length)
328
68
                .await
329
68
                .err_tip(|| 
{2
330
2
                    format!(
331
                        "Failed to open file in filesystem store {}",
332
2
                        full_content_path.display()
333
                    )
334
2
                })?;
335
66
            Ok(file)
336
136
        })
337
68
    }
338
339
94
    async fn get_file_path_locked<
340
94
        T,
341
94
        Fut: Future<Output = Result<T, Error>> + Send,
342
94
        F: FnOnce(OsString) -> Fut + Send,
343
94
    >(
344
94
        &self,
345
94
        handler: F,
346
94
    ) -> Result<T, Error> {
347
94
        let encoded_file_path = self.get_encoded_file_path().read().await;
348
94
        handler(encoded_file_path.get_file_path().to_os_string()).await
349
94
    }
350
}
351
352
impl Debug for FileEntryImpl {
353
0
    fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), core::fmt::Error> {
354
0
        f.debug_struct("FileEntryImpl")
355
0
            .field("data_size", &self.data_size)
356
0
            .field("encoded_file_path", &"<behind mutex>")
357
0
            .finish()
358
0
    }
359
}
360
361
155
fn make_temp_digest(mut digest: DigestInfo) -> DigestInfo {
362
    static DELETE_FILE_COUNTER: AtomicU64 = AtomicU64::new(0);
363
155
    let mut hash = *digest.packed_hash();
364
155
    hash[24..].clone_from_slice(
365
155
        &DELETE_FILE_COUNTER
366
155
            .fetch_add(1, Ordering::Relaxed)
367
155
            .to_le_bytes(),
368
    );
369
155
    digest.set_packed_hash(*hash);
370
155
    digest
371
155
}
372
373
155
pub fn make_temp_key(key: &StoreKey) -> StoreKey<'static> {
374
155
    StoreKey::Digest(make_temp_digest(key.borrow().into_digest()))
375
155
}
376
377
impl LenEntry for FileEntryImpl {
378
    #[inline]
379
243
    fn len(&self) -> u64 {
380
243
        self.size_on_disk()
381
243
    }
382
383
0
    fn is_empty(&self) -> bool {
384
0
        self.data_size == 0
385
0
    }
386
387
    // unref() only triggers when an item is removed from the eviction_map. It is possible
388
    // that another place in code has a reference to `FileEntryImpl` and may later read the
389
    // file. To support this edge case, we first move the file to a temp file and point
390
    // target file location to the new temp file. `unref()` should only ever be called once.
391
    #[inline]
392
10
    async fn unref(&self) {
393
10
        let mut encoded_file_path = self.encoded_file_path.write().await;
394
10
        if encoded_file_path.path_type == PathType::Temp {
395
            // We are already a temp file that is now marked for deletion on drop.
396
            // This is very rare, but most likely the rename into the content path failed.
397
2
            warn!(
398
2
                key = ?encoded_file_path.key,
399
                "File is already a temp file",
400
            );
401
2
            return;
402
8
        }
403
8
        let from_path = encoded_file_path.get_file_path();
404
8
        let new_key = make_temp_key(&encoded_file_path.key);
405
406
8
        let to_path = to_full_path_from_key(&encoded_file_path.shared_context.temp_path, &new_key);
407
408
8
        if let Err(
err2
) = fs::rename(&from_path, &to_path).await {
409
            // ENOENT here means the file we expected at `from_path`
410
            // was already gone — typically because another thread's
411
            // eviction beat us to the unref, or because the entry
412
            // ended up in our map without its file ever landing on
413
            // disk (the "phantom-map" case the runtime recovers from
414
            // via `FastSlowStore::get_part`'s slow-store fallback).
415
            // It is benign at this site — there is no file to move
416
            // — and historically dominates the log volume of this
417
            // store under heavy write+evict concurrency, fast enough
418
            // to drown the runtime under sustained pressure. Demote
419
            // to `debug` and drop the per-emission path fields so it
420
            // stops costing serialization in the hot path.
421
            //
422
            // Other rename failures (EACCES, EXDEV, EBUSY, …) are
423
            // genuinely unexpected and stay at `warn` with full
424
            // context.
425
2
            if err.code == Code::NotFound {
426
2
                debug!(
427
2
                    key = ?encoded_file_path.key,
428
                    "Failed to rename file (already gone, treating as benign)",
429
                );
430
            } else {
431
0
                warn!(
432
0
                    key = ?encoded_file_path.key,
433
                    ?from_path,
434
                    ?to_path,
435
                    ?err,
436
                    "Failed to rename file",
437
                );
438
            }
439
        } else {
440
6
            debug!(
441
6
                key = ?encoded_file_path.key,
442
                ?from_path,
443
                ?to_path,
444
                "Renamed file (unref)",
445
            );
446
6
            encoded_file_path.path_type = PathType::Temp;
447
6
            encoded_file_path.key = new_key;
448
        }
449
10
    }
450
}
451
452
#[inline]
453
2
fn digest_from_filename(file_name: &str) -> Result<DigestInfo, Error> {
454
2
    let (hash, size) = file_name.split_once('-').err_tip(|| "")
?0
;
455
2
    let size = size.parse::<i64>()
?0
;
456
2
    DigestInfo::try_new(hash, size)
457
2
}
458
459
3
pub fn key_from_file(file_name: &str, file_type: FileType) -> Result<StoreKey<'_>, Error> {
460
3
    match file_type {
461
1
        FileType::String => Ok(StoreKey::new_str(file_name)),
462
2
        FileType::Digest => digest_from_filename(file_name).map(StoreKey::Digest),
463
    }
464
3
}
465
466
/// The number of files to read the metadata for at the same time when running
467
/// `add_files_to_cache`.
468
const SIMULTANEOUS_METADATA_READS: usize = 200;
469
470
type FsEvictingMap<'a, Fe> =
471
    EvictingMap<StoreKeyBorrow, StoreKey<'a>, Arc<Fe>, SystemTime, RemoveItemCallbackHolder>;
472
473
83
async fn add_files_to_cache<Fe: FileEntry>(
474
83
    evicting_map: &FsEvictingMap<'_, Fe>,
475
83
    anchor_time: &SystemTime,
476
83
    shared_context: &Arc<SharedContext>,
477
83
    block_size: u64,
478
83
    rename_fn: fn(&OsStr, &OsStr) -> Result<(), std::io::Error>,
479
83
) -> Result<(), Error> {
480
    #[expect(clippy::too_many_arguments)]
481
2
    async fn process_entry<Fe: FileEntry>(
482
2
        evicting_map: &FsEvictingMap<'_, Fe>,
483
2
        file_name: &str,
484
2
        file_type: FileType,
485
2
        atime: SystemTime,
486
2
        data_size: u64,
487
2
        block_size: u64,
488
2
        anchor_time: &SystemTime,
489
2
        shared_context: &Arc<SharedContext>,
490
2
    ) -> Result<(), Error> {
491
2
        let key = key_from_file(file_name, file_type)
?0
;
492
493
2
        let file_entry = Fe::create(
494
2
            data_size,
495
2
            block_size,
496
2
            RwLock::new(EncodedFilePath {
497
2
                shared_context: shared_context.clone(),
498
2
                path_type: PathType::Content,
499
2
                key: key.borrow().into_owned(),
500
2
            }),
501
        );
502
2
        let time_since_anchor = if let Ok(
d1
) = anchor_time.duration_since(atime) {
503
1
            d
504
        } else {
505
1
            warn!(
506
                %file_name,
507
1
                atime = %humantime::format_rfc3339(atime),
508
1
                anchor_time = %humantime::format_rfc3339(*anchor_time),
509
                "File access time newer than FilesystemStore start time",
510
            );
511
1
            Duration::ZERO
512
        };
513
2
        evicting_map
514
2
            .insert_with_time(
515
2
                key.into_owned().into(),
516
2
                Arc::new(file_entry),
517
2
                i32::try_from(time_since_anchor.as_secs()).unwrap_or(i32::MAX),
518
2
            )
519
2
            .await;
520
2
        Ok(())
521
2
    }
522
523
249
    async fn read_files(
524
249
        folder: Option<&str>,
525
249
        shared_context: &SharedContext,
526
249
    ) -> Result<Vec<(String, SystemTime, u64, bool)>, Error> {
527
        // Note: In Dec 2024 this is for backwards compatibility with the old
528
        // way files were stored on disk. Previously all files were in a single
529
        // folder regardless of the StoreKey type. This allows old versions of
530
        // nativelink file layout to be upgraded at startup time.
531
        // This logic can be removed once more time has passed.
532
249
        let read_dir = folder.map_or_else(
533
83
            || format!("{}/", shared_context.content_path),
534
166
            |folder| format!("{}/{folder}/", shared_context.content_path),
535
        );
536
537
249
        let (_permit, dir_handle) = fs::read_dir(read_dir)
538
249
            .await
539
249
            .err_tip(|| "Failed opening content directory for iterating in filesystem store")
?0
540
249
            .into_inner();
541
542
249
        let read_dir_stream = ReadDirStream::new(dir_handle);
543
249
        read_dir_stream
544
249
            .map(|dir_entry| async move 
{168
545
168
                let dir_entry = dir_entry.unwrap();
546
168
                let file_name = dir_entry.file_name().into_string().unwrap();
547
168
                let metadata = dir_entry
548
168
                    .metadata()
549
168
                    .await
550
168
                    .err_tip(|| "Failed to get metadata in filesystem store")
?0
;
551
                // We need to filter out folders - we do not want to try to cache the s and d folders.
552
168
                let is_file =
553
168
                    metadata.is_file() || !(
file_name == STR_FOLDER166
||
file_name == DIGEST_FOLDER83
);
554
                // Using access time is not perfect, but better than random. We do not update the
555
                // atime when a file is actually "touched", we rely on whatever the filesystem does
556
                // when we read the file (usually update on read).
557
168
                let atime = metadata
558
168
                    .accessed()
559
168
                    .or_else(|_| 
metadata0
.
modified0
())
560
168
                    .unwrap_or(SystemTime::UNIX_EPOCH);
561
168
                Result::<(String, SystemTime, u64, bool), Error>::Ok((
562
168
                    file_name,
563
168
                    atime,
564
168
                    metadata.len(),
565
168
                    is_file,
566
168
                ))
567
336
            })
568
249
            .buffer_unordered(SIMULTANEOUS_METADATA_READS)
569
249
            .try_collect()
570
249
            .await
571
249
    }
572
573
    /// Note: In Dec 2024 this is for backwards compatibility with the old
574
    /// way files were stored on disk. Previously all files were in a single
575
    /// folder regardless of the [`StoreKey`] type. This moves files from the old cache
576
    /// location to the new cache location, under [`DIGEST_FOLDER`].
577
83
    async fn move_old_cache(
578
83
        shared_context: &Arc<SharedContext>,
579
83
        rename_fn: fn(&OsStr, &OsStr) -> Result<(), std::io::Error>,
580
83
    ) -> Result<(), Error> {
581
83
        let file_infos = read_files(None, shared_context).await
?0
;
582
583
83
        let from_path = &shared_context.content_path;
584
585
83
        let to_path = format!("{}/{DIGEST_FOLDER}", shared_context.content_path);
586
587
83
        for (
file_name0
, _, _, _) in file_infos.into_iter().filter(|x| x.3) {
588
0
            let from_file: OsString = format!("{from_path}/{file_name}").into();
589
0
            let to_file: OsString = format!("{to_path}/{file_name}").into();
590
591
0
            if let Err(err) = rename_fn(&from_file, &to_file) {
592
0
                warn!(?from_file, ?to_file, ?err, "Failed to rename file",);
593
            } else {
594
0
                debug!(?from_file, ?to_file, "Renamed file (old cache)",);
595
            }
596
        }
597
83
        Ok(())
598
83
    }
599
600
166
    async fn add_files_to_cache<Fe: FileEntry>(
601
166
        evicting_map: &FsEvictingMap<'_, Fe>,
602
166
        anchor_time: &SystemTime,
603
166
        shared_context: &Arc<SharedContext>,
604
166
        block_size: u64,
605
166
        folder: &str,
606
166
    ) -> Result<(), Error> {
607
166
        let file_infos = read_files(Some(folder), shared_context).await
?0
;
608
166
        let file_type = match folder {
609
166
            STR_FOLDER => 
FileType::String83
,
610
83
            DIGEST_FOLDER => FileType::Digest,
611
0
            _ => panic!("Invalid folder type"),
612
        };
613
614
166
        let path_root = format!("{}/{folder}", shared_context.content_path);
615
616
166
        for (
file_name2
,
atime2
,
data_size2
, _) in file_infos.into_iter().filter(|x| x.3) {
617
2
            let result = process_entry(
618
2
                evicting_map,
619
2
                &file_name,
620
2
                file_type,
621
2
                atime,
622
2
                data_size,
623
2
                block_size,
624
2
                anchor_time,
625
2
                shared_context,
626
2
            )
627
2
            .await;
628
2
            if let Err(
err0
) = result {
629
0
                warn!(?file_name, ?err, "Failed to add file to eviction cache",);
630
                // Ignore result.
631
0
                drop(fs::remove_file(format!("{path_root}/{file_name}")).await);
632
2
            }
633
        }
634
166
        Ok(())
635
166
    }
636
637
83
    move_old_cache(shared_context, rename_fn).await
?0
;
638
639
83
    add_files_to_cache(
640
83
        evicting_map,
641
83
        anchor_time,
642
83
        shared_context,
643
83
        block_size,
644
83
        DIGEST_FOLDER,
645
83
    )
646
83
    .await
?0
;
647
648
83
    add_files_to_cache(
649
83
        evicting_map,
650
83
        anchor_time,
651
83
        shared_context,
652
83
        block_size,
653
83
        STR_FOLDER,
654
83
    )
655
83
    .await
?0
;
656
83
    Ok(())
657
83
}
658
659
83
async fn prune_temp_path(temp_path: &str) -> Result<(), Error> {
660
166
    async fn prune_temp_inner(temp_path: &str, subpath: &str) -> Result<(), Error> {
661
166
        let (_permit, dir_handle) = fs::read_dir(format!("{temp_path}/{subpath}"))
662
166
            .await
663
166
            .err_tip(
664
                || "Failed opening temp directory to prune partial downloads in filesystem store",
665
0
            )?
666
166
            .into_inner();
667
668
166
        let mut read_dir_stream = ReadDirStream::new(dir_handle);
669
166
        while let Some(
dir_entry0
) = read_dir_stream.next().await {
670
0
            let path = dir_entry?.path();
671
0
            if let Err(err) = fs::remove_file(&path).await {
672
0
                warn!(?path, ?err, "Failed to delete file",);
673
0
            }
674
        }
675
166
        Ok(())
676
166
    }
677
678
83
    prune_temp_inner(temp_path, STR_FOLDER).await
?0
;
679
83
    prune_temp_inner(temp_path, DIGEST_FOLDER).await
?0
;
680
83
    Ok(())
681
83
}
682
683
// Sometimes we get files to emplace that are identical to the existing files
684
// Due to the evict/remove/replace cycle taking some amount of time, we actually
685
// want to drop these
686
// Return value is "is duplicate"
687
159
pub async fn check_duplicate_files<Fe>(
688
159
    evicting_map: &Arc<FsEvictingMap<'_, Fe>>,
689
159
    key: &StoreKey<'static>,
690
159
    entry: &Arc<Fe>,
691
159
) -> Result<bool, Error>
692
159
where
693
159
    Fe: FileEntry,
694
159
{
695
159
    let temp_file_encoded_file_path = entry.get_encoded_file_path().write().await;
696
159
    let maybe_existing_item = evicting_map.get(&key.borrow().into_owned()).await;
697
159
    if let Some(
existing_item9
) = maybe_existing_item {
698
9
        if Arc::ptr_eq(entry, &existing_item) {
699
1
            warn!("Tried to check duplicate of an entry we already have!");
700
1
            return Ok(true);
701
8
        }
702
8
        let existing_item_encoded_file_path = existing_item.get_encoded_file_path().write().await;
703
8
        if entry.data_size() == existing_item.data_size() {
704
            const CHUNK_SIZE: usize = 16 * 1024; // 16kb chunks, kinda picked out of the air
705
7
            let file_length = entry.data_size();
706
7
            let existing_path = existing_item_encoded_file_path.get_file_path();
707
7
            let temp_path = temp_file_encoded_file_path.get_file_path();
708
7
            trace!(?existing_path, ?temp_path, "Checking duplicate files");
709
7
            let mut temp_file = fs::open_file(&temp_path, 0, file_length).await
?0
;
710
7
            let mut existing_file = fs::open_file(&existing_path, 0, file_length).await
?0
;
711
712
7
            let mut temp_buffer: [u8; CHUNK_SIZE] = [0; CHUNK_SIZE];
713
7
            let mut existing_buffer: [u8; CHUNK_SIZE] = [0; CHUNK_SIZE];
714
            // in a file_length file, there are 0 to file_length-1 entries
715
            // not file_length. It's counting all the bytes, starting from 0
716
7
            for offset in (0..file_length - 1).step_by(CHUNK_SIZE) {
717
7
                let buffer_size = if offset + (CHUNK_SIZE as u64) <= file_length {
718
0
                    CHUNK_SIZE
719
7
                } else if file_length < CHUNK_SIZE as u64 {
720
7
                    usize::try_from(file_length)
721
7
                        .expect("Always succeeds because file_length < 16384")
722
                } else {
723
0
                    usize::try_from(file_length - offset).expect("Always succeeds because offset < file_length, and offset-file_length must be < 16384")
724
                };
725
7
                if let Err(
err0
) = temp_file.read_exact(&mut temp_buffer[0..buffer_size]).await {
726
0
                    warn!(
727
                        ?err,
728
                        ?temp_path,
729
                        file_length,
730
                        offset,
731
                        buffer_size,
732
                        "Failed to read temp file, skipping duplicate check"
733
                    );
734
0
                    return Ok(false);
735
7
                }
736
7
                if let Err(
err0
) = existing_file
737
7
                    .read_exact(&mut existing_buffer[0..buffer_size])
738
7
                    .await
739
                {
740
0
                    warn!(
741
                        ?err,
742
                        ?existing_path,
743
                        file_length,
744
                        offset,
745
                        buffer_size,
746
                        "Failed to read existing, skipping duplicate check"
747
                    );
748
0
                    return Ok(false);
749
7
                }
750
7
                if temp_buffer.ne(&existing_buffer) {
751
5
                    trace!(
752
                        ?existing_path,
753
                        ?temp_path,
754
                        "Files are different, so non-duplicate"
755
                    );
756
5
                    return Ok(false);
757
2
                }
758
            }
759
2
            trace!(
760
                ?existing_path,
761
                ?temp_path,
762
                "Identical files, so don't need to edit, skipping emplace"
763
            );
764
2
            return Ok(true);
765
1
        }
766
1
        trace!(
767
1
            entry_data_size = entry.data_size(),
768
1
            existing_data_size = existing_item.data_size(),
769
1
            existing_path = ?existing_item_encoded_file_path.get_file_path(),
770
            "Different data sizes, so non-duplicate"
771
        );
772
    } else {
773
150
        trace!(
774
148
            temp_file = ?temp_file_encoded_file_path.get_file_path(),
775
            "No existing entry, so not duplicate"
776
        );
777
    }
778
151
    Ok(false)
779
159
}
780
781
#[derive(Debug, MetricsComponent)]
782
pub struct FilesystemStore<Fe: FileEntry = FileEntryImpl> {
783
    #[metric]
784
    shared_context: Arc<SharedContext>,
785
    #[metric(group = "evicting_map")]
786
    evicting_map: Arc<FsEvictingMap<'static, Fe>>,
787
    #[metric(help = "Block size of the configured filesystem")]
788
    block_size: u64,
789
    #[metric(help = "Size of the configured read buffer size")]
790
    read_buffer_size: usize,
791
    weak_self: Weak<Self>,
792
    rename_fn: fn(&OsStr, &OsStr) -> Result<(), std::io::Error>,
793
    /// Limits concurrent write operations to prevent disk I/O saturation.
794
    write_semaphore: Option<Semaphore>,
795
    /// Per-digest single-flight locks guarding creation of the executable
796
    /// variant in `{content_path}.exec`, so each variant's writable fd is
797
    /// opened exactly once. The outer lock is sync and only ever held to
798
    /// get/insert/remove the per-digest async lock — never across I/O.
799
    #[cfg(unix)]
800
    executable_locks: std::sync::Mutex<HashMap<DigestInfo, Arc<Mutex<()>>>>,
801
}
802
803
impl<Fe: FileEntry> FilesystemStore<Fe> {
804
71
    pub async fn new(spec: &FilesystemSpec) -> Result<Arc<Self>, Error> {
805
145
        
Self::new_with_timeout_and_rename_fn71
(
spec71
, |from, to| std::fs::rename(from, to)).
await71
806
71
    }
807
808
83
    pub async fn new_with_timeout_and_rename_fn(
809
83
        spec: &FilesystemSpec,
810
83
        rename_fn: fn(&OsStr, &OsStr) -> Result<(), std::io::Error>,
811
83
    ) -> Result<Arc<Self>, Error> {
812
166
        async fn create_subdirs(path: &str) -> Result<(), Error> {
813
166
            fs::create_dir_all(format!("{path}/{STR_FOLDER}"))
814
166
                .await
815
166
                .err_tip(|| 
format!0
("Failed to create directory {path}/{STR_FOLDER}"))
?0
;
816
166
            fs::create_dir_all(format!("{path}/{DIGEST_FOLDER}"))
817
166
                .await
818
166
                .err_tip(|| 
format!0
("Failed to create directory {path}/{DIGEST_FOLDER}"))
819
166
        }
820
821
83
        let now = SystemTime::now();
822
823
83
        let empty_policy = nativelink_config::stores::EvictionPolicy::default();
824
83
        let eviction_policy = spec.eviction_policy.as_ref().unwrap_or(&empty_policy);
825
83
        let evicting_map = Arc::new(EvictingMap::new(eviction_policy, now));
826
827
        // Create temp and content directories and the s and d subdirectories.
828
829
83
        create_subdirs(&spec.temp_path).await
?0
;
830
83
        create_subdirs(&spec.content_path).await
?0
;
831
832
        // Executable-variant directory: a sibling of `content_path` holding
833
        // per-digest 0o555 copies used as hardlink sources for executable
834
        // inputs (see `get_executable_hardlink_source`). Cleared on startup —
835
        // the variants are regenerable and we never want a stale one to leak
836
        // across runs. Unix-only: the executable bit (and the ETXTBSY race it
837
        // guards against) does not apply on Windows.
838
        #[cfg(unix)]
839
        {
840
83
            let executable_dir = format!("{}{EXECUTABLE_DIR_SUFFIX}", spec.content_path);
841
83
            drop(fs::remove_dir_all(&executable_dir).await);
842
83
            fs::create_dir_all(format!("{executable_dir}/{DIGEST_FOLDER}"))
843
83
                .await
844
83
                .err_tip(|| 
format!0
("Failed to create executable dir {executable_dir}"))
?0
;
845
        }
846
847
83
        let shared_context = Arc::new(SharedContext {
848
83
            active_drop_spawns: AtomicU64::new(0),
849
83
            temp_path: spec.temp_path.clone(),
850
83
            content_path: spec.content_path.clone(),
851
83
        });
852
853
83
        let block_size = if spec.block_size == 0 {
854
79
            DEFAULT_BLOCK_SIZE
855
        } else {
856
4
            spec.block_size
857
        };
858
83
        add_files_to_cache(
859
83
            evicting_map.as_ref(),
860
83
            &now,
861
83
            &shared_context,
862
83
            block_size,
863
83
            rename_fn,
864
83
        )
865
83
        .await
?0
;
866
83
        prune_temp_path(&shared_context.temp_path).await
?0
;
867
868
83
        let read_buffer_size = if spec.read_buffer_size == 0 {
869
68
            DEFAULT_BUFF_SIZE
870
        } else {
871
15
            spec.read_buffer_size as usize
872
        };
873
83
        let write_semaphore = if spec.max_concurrent_writes > 0 {
874
0
            Some(Semaphore::new(spec.max_concurrent_writes))
875
        } else {
876
83
            None
877
        };
878
83
        Ok(Arc::new_cyclic(|weak_self| Self {
879
83
            shared_context,
880
83
            evicting_map,
881
83
            block_size,
882
83
            read_buffer_size,
883
83
            weak_self: weak_self.clone(),
884
83
            rename_fn,
885
83
            write_semaphore,
886
            #[cfg(unix)]
887
83
            executable_locks: std::sync::Mutex::new(HashMap::new()),
888
83
        }))
889
83
    }
890
891
49
    pub fn get_arc(&self) -> Option<Arc<Self>> {
892
49
        self.weak_self.upgrade()
893
49
    }
894
895
    /// Path of the read-only executable (0o555) variant for `digest`.
896
    #[cfg(unix)]
897
4
    fn executable_variant_path(&self, digest: &DigestInfo) -> OsString {
898
4
        format!(
899
            "{}{EXECUTABLE_DIR_SUFFIX}/{DIGEST_FOLDER}/{digest}",
900
4
            self.shared_context.content_path
901
        )
902
4
        .into()
903
4
    }
904
905
    /// Returns the path to a private, read-only **executable** (0o555) copy of
906
    /// the blob for `digest`, creating it at most once. Callers **hardlink**
907
    /// the returned path into action input trees instead of copying the
908
    /// executable per action.
909
    ///
910
    /// Why this exists: a CAS blob is stored read-only **0o444** and shared
911
    /// across actions by hardlink, so it cannot carry the executable bit and
912
    /// must never be `chmod`'d (that mutates the shared inode — the #2347
913
    /// corruption class). Materializing an executable input therefore needs a
914
    /// separate 0o555 inode. Doing that copy *per action* opens a writable fd
915
    /// in the worker's hot path; under fork-heavy concurrency a child can
916
    /// inherit that fd and a concurrent `execve` of the executable then fails
917
    /// with `ETXTBSY` ("Text file busy", os error 26). Creating the 0o555 inode
918
    /// **once** — writer fd fsync'd and closed, then atomically renamed into
919
    /// place before the inode is ever hardlinked or executed — and hardlinking
920
    /// it thereafter keeps the per-action path hardlink-only.
921
    #[cfg(unix)]
922
4
    pub async fn get_executable_hardlink_source(
923
4
        &self,
924
4
        digest: &DigestInfo,
925
4
    ) -> Result<OsString, Error> {
926
4
        let variant_path = self.executable_variant_path(digest);
927
928
        // Fast path: the variant already exists, so the caller can hardlink it
929
        // with no writable fd anywhere in sight.
930
4
        if fs::metadata(&variant_path).await.is_ok() {
931
2
            return Ok(variant_path);
932
2
        }
933
934
        // Single-flight: exactly one task ever opens a writable fd for this
935
        // variant. Without this, a cold-cache burst of concurrent actions
936
        // would each open a writer for the same executable — the very window
937
        // ETXTBSY exploits.
938
2
        let lock = {
939
2
            let mut locks = self
940
2
                .executable_locks
941
2
                .lock()
942
2
                .expect("executable_locks poisoned");
943
2
            locks
944
2
                .entry(*digest)
945
2
                .or_insert_with(|| Arc::new(Mutex::new(())))
946
2
                .clone()
947
        };
948
2
        let _guard = lock.lock().await;
949
950
        // Re-check: another task may have constructed it while we waited.
951
2
        if fs::metadata(&variant_path).await.is_ok() {
952
0
            self.forget_executable_lock(digest);
953
0
            return Ok(variant_path);
954
2
        }
955
956
2
        let result = self.create_executable_variant(digest, &variant_path).await;
957
        // Drop the per-digest lock entry regardless of outcome so the map
958
        // cannot grow unbounded; a concurrent waiter already cloned the Arc.
959
2
        self.forget_executable_lock(digest);
960
2
        result.map(|()| variant_path)
961
4
    }
962
963
    /// Non-unix has no executable bit and no `ETXTBSY`, so just hardlink the
964
    /// CAS blob directly.
965
    #[cfg(not(unix))]
966
    pub async fn get_executable_hardlink_source(
967
        &self,
968
        digest: &DigestInfo,
969
    ) -> Result<OsString, Error> {
970
        let file_entry = self.get_file_entry_for_digest(digest).await?;
971
        file_entry
972
            .get_file_path_locked(|p| async move { Ok(p) })
973
            .await
974
    }
975
976
    #[cfg(unix)]
977
2
    fn forget_executable_lock(&self, digest: &DigestInfo) {
978
2
        self.executable_locks
979
2
            .lock()
980
2
            .expect("executable_locks poisoned")
981
2
            .remove(digest);
982
2
    }
983
984
    /// Materializes the 0o555 executable variant for `digest`. Must be called
985
    /// under the per-digest single-flight guard.
986
    #[cfg(unix)]
987
2
    async fn create_executable_variant(
988
2
        &self,
989
2
        digest: &DigestInfo,
990
2
        variant_path: &OsStr,
991
2
    ) -> Result<(), Error> {
992
        // Resolve the on-disk CAS blob (0o444) to copy from. Must be present in
993
        // this tier; callers populate the fast store first.
994
2
        let file_entry = self
995
2
            .get_file_entry_for_digest(digest)
996
2
            .await
997
2
            .err_tip(|| "Resolving CAS blob for executable variant")
?0
;
998
2
        let src_path = file_entry
999
4
            .
get_file_path_locked2
(|p| async move
{2
Ok(p)2
})
1000
2
            .await
?0
;
1001
1002
2
        let variant_owned = variant_path.to_os_string();
1003
2
        let mut temp_owned = variant_path.to_os_string();
1004
2
        temp_owned.push(".tmp");
1005
2
        let rename_fn = self.rename_fn;
1006
1007
        // All of this is blocking std::fs; run it off the async runtime. The
1008
        // writable fd opened by `copy` is fully closed before the `rename`
1009
        // publishes the inode, so no reachable hardlink of the variant ever has
1010
        // an open writer.
1011
2
        spawn_blocking!(
1012
            "filesystem_store_executable_variant",
1013
2
            move || -> Result<(), Error> {
1014
                use std::os::unix::fs::PermissionsExt;
1015
2
                std::fs::copy(&src_path, &temp_owned).map_err(|e| 
{0
1016
0
                    make_err!(Code::Internal, "executable-variant copy failed: {e:?}")
1017
0
                })?;
1018
2
                std::fs::set_permissions(&temp_owned, std::fs::Permissions::from_mode(0o555))
1019
2
                    .map_err(|e| 
{0
1020
0
                        make_err!(
1021
0
                            Code::Internal,
1022
                            "executable-variant chmod 0o555 failed: {e:?}"
1023
                        )
1024
0
                    })?;
1025
                // Reopen read-only purely to fsync the bytes durable before publish.
1026
2
                let f = std::fs::File::open(&temp_owned)
1027
2
                    .map_err(|e| 
make_err!0
(
Code::Internal0
, "executable-variant reopen: {e:?}"))
?0
;
1028
2
                f.sync_all()
1029
2
                    .map_err(|e| 
make_err!0
(
Code::Internal0
, "executable-variant fsync: {e:?}"))
?0
;
1030
2
                drop(f);
1031
2
                rename_fn(temp_owned.as_os_str(), variant_owned.as_os_str()).map_err(|e| 
{0
1032
0
                    make_err!(Code::Internal, "executable-variant rename failed: {e:?}")
1033
0
                })?;
1034
2
                Ok(())
1035
2
            }
1036
        )
1037
2
        .await
1038
2
        .err_tip(|| "executable-variant spawn_blocking join failed")
?0
1039
2
    }
1040
1041
31
    pub async fn get_file_entry_for_digest(&self, digest: &DigestInfo) -> Result<Arc<Fe>, Error> {
1042
        // Zero-digest blobs have no backing file on disk (FilesystemStore
1043
        // never persists zero-byte content). The previous implementation
1044
        // returned a synthetic FileEntry whose content_path did not exist,
1045
        // which downstream callers would then try to hard_link from,
1046
        // silently producing missing or empty output files in worker
1047
        // execution directories. Return NotFound so callers are forced to
1048
        // take the explicit zero-digest path (e.g. fs::create_file).
1049
31
        if is_zero_digest(digest) {
1050
1
            return Err(make_err!(
1051
1
                Code::NotFound,
1052
1
                "{digest} is a zero-digest; FilesystemStore does not persist zero-byte files. \
1053
1
                 Callers must materialise empty files directly rather than going through get_file_entry_for_digest."
1054
1
            ));
1055
30
        }
1056
30
        self.evicting_map
1057
30
            .get(&digest.into())
1058
30
            .await
1059
30
            .ok_or_else(|| 
make_err!0
(
Code::NotFound0
, "{digest} not found in filesystem store. This may indicate the file was evicted due to cache pressure. Consider increasing 'max_bytes' in your filesystem store's eviction_policy configuration."))
1060
31
    }
1061
1062
116
    async fn update_file(
1063
116
        self: Pin<&Self>,
1064
116
        mut entry: Fe,
1065
116
        mut temp_file: FileSlot,
1066
116
        final_key: StoreKey<'static>,
1067
116
        mut reader: DropCloserReadHalf,
1068
116
    ) -> Result<u64, Error> {
1069
116
        let mut data_size = 0;
1070
        loop {
1071
230
            let mut data = reader
1072
230
                .recv()
1073
230
                .await
1074
230
                .err_tip(|| "Failed to receive data in filesystem store")
?0
;
1075
230
            let data_len = data.len();
1076
230
            if data_len == 0 {
1077
116
                break; // EOF.
1078
114
            }
1079
114
            temp_file
1080
114
                .write_all_buf(&mut data)
1081
114
                .await
1082
114
                .err_tip(|| "Failed to write data into filesystem store")
?0
;
1083
114
            data_size += data_len as u64;
1084
        }
1085
1086
116
        let permit = if let Some(
sem0
) = &self.write_semaphore {
1087
0
            Some(sem.acquire().await.map_err(|err| {
1088
0
                Error::from_std_err(Code::Internal, &err).append("Write semaphore closed")
1089
0
            })?)
1090
        } else {
1091
116
            None
1092
        };
1093
1094
        // tokio defers write errors to the next write or flush call, so without an explicit flush
1095
        // the final write's failure is silently swallowed by `sync_all`, and a truncated file would
1096
        // be renamed into the content path.
1097
116
        temp_file
1098
116
            .flush()
1099
116
            .await
1100
116
            .err_tip(|| "Failed to flush in filesystem store")
?0
;
1101
116
        temp_file
1102
116
            .as_ref()
1103
116
            .sync_all()
1104
116
            .await
1105
116
            .err_tip(|| "Failed to sync_data in filesystem store")
?0
;
1106
1107
116
        drop(permit);
1108
1109
116
        temp_file.advise_dontneed();
1110
116
        trace!(?temp_file, "Dropping file to update_file");
1111
116
        drop(temp_file);
1112
1113
116
        *entry.data_size_mut() = data_size;
1114
116
        self.emplace_file(final_key, Arc::new(entry)).await
?1
;
1115
115
        Ok(data_size)
1116
116
    }
1117
1118
155
    async fn emplace_file(&self, key: StoreKey<'static>, entry: Arc<Fe>) -> Result<(), Error> {
1119
        // This sequence of events is quite tricky to understand due to the amount of triggers that
1120
        // happen, async'ness of it and the locking. So here is a breakdown of what happens:
1121
        // 1. Here will hold a write lock on any file operations of this FileEntry.
1122
        // 2. Then insert the entry into the evicting map. This may trigger an eviction of other
1123
        //    entries.
1124
        // 3. Eviction triggers `unref()`, which grabs a write lock on the evicted FileEntry
1125
        //    during the rename.
1126
        // 4. It should be impossible for items to be added while eviction is happening, so there
1127
        //    should not be a deadlock possibility. However, it is possible for the new FileEntry
1128
        //    to be evicted before the file is moved into place. Eviction of the newly inserted
1129
        //    item is not possible within the `insert()` call because the write lock inside the
1130
        //    eviction map. If an eviction of new item happens after `insert()` but before
1131
        //    `rename()` then we get to finish our operation because the `unref()` of the new item
1132
        //    will be blocked on us because we currently have the lock.
1133
        // 5. Move the file into place. Since we hold a write lock still anyone that gets our new
1134
        //    FileEntry (which has not yet been placed on disk) will not be able to read the file's
1135
        //    contents until we release the lock.
1136
155
        let evicting_map = self.evicting_map.clone();
1137
155
        let rename_fn = self.rename_fn;
1138
1139
        // We need to guarantee that this will get to the end even if the parent future is dropped.
1140
        // See: https://github.com/TraceMachina/nativelink/issues/495
1141
155
        background_spawn!("filesystem_store_emplace_file", async move {
1142
            // Sometimes we get files to emplace that are identical to the existing files
1143
            // Due to the evict/remove/replace cycle taking some amount of time, we actually
1144
            // want to drop these
1145
155
            if check_duplicate_files(&evicting_map, &key, &entry).await
?0
{
1146
1
                return Ok(());
1147
154
            }
1148
1149
154
            evicting_map
1150
154
                .insert(key.borrow().into_owned().into(), entry.clone())
1151
154
                .await;
1152
1153
            // The insert might have resulted in an eviction/unref so we need to check
1154
            // it still exists in there. But first, get the lock...
1155
154
            let mut encoded_file_path = entry.get_encoded_file_path().write().await;
1156
            // Then check it's still in there...
1157
154
            if evicting_map.get(&key).await.is_none() {
1158
1
                info!(%key, "Got eviction while emplacing, dropping");
1159
1
                return Ok(());
1160
153
            }
1161
1162
153
            let final_path = get_file_path_raw(
1163
153
                &PathType::Content,
1164
153
                encoded_file_path.shared_context.as_ref(),
1165
153
                &key,
1166
            );
1167
1168
153
            let from_path = encoded_file_path.get_file_path();
1169
1170
            // Lock the blob down as read-only *before* it lands at its final
1171
            // content path, so every hardlink of it (the worker directory cache
1172
            // and `download_to_directory`) inherits an immutable, read-only
1173
            // inode. This is what preserves the input-tree hermeticity contract
1174
            // (actions cannot mutate their inputs) without a per-materialization
1175
            // chmod walk, and it means callers must never `chmod` a hardlinked
1176
            // blob — anything needing a different mode (e.g. an executable's +x
1177
            // bit) must take a private copy. Content-addressed blobs are
1178
            // write-once and replaced by `rename` rather than in-place writes,
1179
            // and unlink only needs the *parent directory* to be writable, so a
1180
            // read-only file mode is safe for both overwrite-by-rename and
1181
            // eviction. Best-effort: a failure here (e.g. the temp file was
1182
            // already evicted out from under us) must not abort the emplace.
1183
            #[cfg(unix)]
1184
            {
1185
                use std::os::unix::fs::PermissionsExt;
1186
0
                if let Err(err) =
1187
153
                    fs::set_permissions(&from_path, std::fs::Permissions::from_mode(0o444)).await
1188
                {
1189
0
                    warn!(?err, ?from_path, "Failed to set CAS blob read-only");
1190
153
                }
1191
            }
1192
            // Internally tokio spawns fs commands onto a blocking thread anyways.
1193
            // Since we are already on a blocking thread, we just need the `fs` wrapper to manage
1194
            // an open-file permit (ensure we don't open too many files at once).
1195
153
            let result = (rename_fn)(&from_path, &final_path).err_tip(|| 
{1
1196
1
                format!(
1197
                    "Failed to rename temp file to final path {}",
1198
1
                    final_path.display()
1199
                )
1200
1
            });
1201
1202
            // In the event our move from temp file to final file fails we need to ensure we remove
1203
            // the entry from our map.
1204
            // Remember: At this point it is possible for another thread to have a reference to
1205
            // `entry`, so we can't delete the file, only drop() should ever delete files.
1206
153
            if let Err(
err1
) = result {
1207
1
                error!(?err, ?from_path, ?final_path, "Failed to rename file",);
1208
                // Warning: To prevent deadlock we need to release our lock or during `remove_if()`
1209
                // it will call `unref()`, which triggers a write-lock on `encoded_file_path`.
1210
1
                drop(encoded_file_path);
1211
                // It is possible that the item in our map is no longer the item we inserted,
1212
                // So, we need to conditionally remove it only if the pointers are the same.
1213
1214
1
                evicting_map
1215
1
                    .remove_if(&key, |map_entry| Arc::<Fe>::ptr_eq(map_entry, &entry))
1216
1
                    .await;
1217
1
                return Err(err);
1218
152
            }
1219
152
            encoded_file_path.path_type = PathType::Content;
1220
152
            encoded_file_path.key = key;
1221
152
            Ok(())
1222
155
        })
1223
155
        .await
1224
154
        .err_tip(|| "Failed to create spawn in filesystem store update_file")
?0
1225
154
    }
1226
1227
0
    pub fn get_eviction_snapshot(&self) -> EvictionSnapshot {
1228
0
        self.evicting_map.get_snapshot()
1229
0
    }
1230
1231
    // Only for tests, so we can run check_duplicate_files
1232
4
    pub fn get_evicting_map(&self) -> Arc<FsEvictingMap<'static, Fe>> {
1233
4
        self.evicting_map.clone()
1234
4
    }
1235
1236
    // Separated out so tests can use this
1237
147
    pub async fn make_temp_file(
1238
147
        &self,
1239
147
        temp_key: StoreKey<'static>,
1240
147
    ) -> Result<(Fe, FileSlot, OsString), Error> {
1241
147
        Fe::make_and_open_file(
1242
147
            self.block_size,
1243
147
            EncodedFilePath {
1244
147
                shared_context: self.shared_context.clone(),
1245
147
                path_type: PathType::Temp,
1246
147
                key: temp_key,
1247
147
            },
1248
147
        )
1249
147
        .await
1250
147
    }
1251
}
1252
1253
#[async_trait]
1254
impl<Fe: FileEntry> StoreDriver for FilesystemStore<Fe> {
1255
    async fn has_with_results(
1256
        self: Pin<&Self>,
1257
        keys: &[StoreKey<'_>],
1258
        results: &mut [Option<u64>],
1259
154
    ) -> Result<(), Error> {
1260
        let own_keys = keys
1261
            .iter()
1262
154
            .map(|sk| sk.borrow().into_owned())
1263
            .collect::<Vec<_>>();
1264
        self.evicting_map
1265
            .sizes_for_keys(own_keys.iter(), results, false /* peek */)
1266
            .await;
1267
        // We need to do a special pass to ensure our zero files exist.
1268
        // If our results failed and the result was a zero file, we need to
1269
        // create the file by spec.
1270
        for (key, result) in keys.iter().zip(results.iter_mut()) {
1271
            if result.is_some() || !is_zero_digest(key.borrow()) {
1272
                continue;
1273
            }
1274
            let (mut tx, rx) = make_buf_channel_pair();
1275
            let send_eof_result = tx.send_eof();
1276
            self.update(key.borrow(), rx, UploadSizeInfo::ExactSize(0))
1277
                .await
1278
0
                .err_tip(|| format!("Failed to create zero file for key {}", key.as_str()))
1279
                .merge(
1280
                    send_eof_result
1281
                        .err_tip(|| "Failed to send zero file EOF in filesystem store has"),
1282
                )?;
1283
1284
            *result = Some(0);
1285
        }
1286
        Ok(())
1287
154
    }
1288
1289
    async fn update(
1290
        self: Pin<&Self>,
1291
        key: StoreKey<'_>,
1292
        mut reader: DropCloserReadHalf,
1293
        _upload_size: UploadSizeInfo,
1294
202
    ) -> Result<u64, Error> {
1295
        if is_zero_digest(key.borrow()) {
1296
            // don't need to add, because zero length files are just assumed to exist.
1297
            return Ok(0);
1298
        }
1299
1300
        let temp_key = make_temp_key(&key);
1301
1302
        // There's a possibility of deadlock here where we take all of the
1303
        // file semaphores with make_and_open_file and the semaphores for
1304
        // whatever is populating reader is exhasted on the threads that
1305
        // have the FileSlots and not on those which can't.  To work around
1306
        // this we don't take the FileSlot until there's something on the
1307
        // reader available to know that the populator is active.
1308
        reader.peek().await?;
1309
1310
        let (entry, temp_file, temp_full_path) = self.make_temp_file(temp_key).await?;
1311
1312
        self.update_file(entry, temp_file, key.into_owned(), reader)
1313
            .await
1314
1
            .err_tip(|| {
1315
1
                format!(
1316
                    "While processing with temp file {}",
1317
1
                    temp_full_path.display()
1318
                )
1319
1
            })
1320
202
    }
1321
1322
168
    fn optimized_for(&self, optimization: StoreOptimizations) -> bool {
1323
158
        matches!(
1324
168
            optimization,
1325
            StoreOptimizations::FileUpdates | StoreOptimizations::SubscribesToUpdateOneshot
1326
        )
1327
168
    }
1328
1329
29
    async fn update_oneshot(self: Pin<&Self>, key: StoreKey<'_>, data: Bytes) -> Result<(), Error> {
1330
        if is_zero_digest(key.borrow()) {
1331
            return Ok(());
1332
        }
1333
1334
        let temp_key = make_temp_key(&key);
1335
        let (mut entry, mut temp_file, temp_full_path) = self
1336
            .make_temp_file(temp_key)
1337
            .await
1338
            .err_tip(|| "Failed to create temp file in filesystem store update_oneshot")?;
1339
1340
        // Write directly without channel overhead
1341
        if !data.is_empty() {
1342
            temp_file
1343
                .write_all(&data)
1344
                .await
1345
0
                .err_tip(|| format!("Failed to write data to {}", temp_full_path.display()))?;
1346
        }
1347
1348
        let _permit = if let Some(sem) = &self.write_semaphore {
1349
0
            Some(sem.acquire().await.map_err(|err| {
1350
0
                Error::from_std_err(Code::Internal, &err).append("Write semaphore closed")
1351
0
            })?)
1352
        } else {
1353
            None
1354
        };
1355
1356
        // See comment in `update_file` above.
1357
        temp_file
1358
            .flush()
1359
            .await
1360
            .err_tip(|| "Failed to flush in filesystem store update_oneshot")?;
1361
        temp_file
1362
            .as_ref()
1363
            .sync_all()
1364
            .await
1365
            .err_tip(|| "Failed to sync_data in filesystem store update_oneshot")?;
1366
1367
        drop(_permit);
1368
1369
        temp_file.advise_dontneed();
1370
        drop(temp_file);
1371
1372
        *entry.data_size_mut() = data.len() as u64;
1373
        self.emplace_file(key.into_owned(), Arc::new(entry)).await
1374
29
    }
1375
1376
    async fn update_with_whole_file(
1377
        self: Pin<&Self>,
1378
        key: StoreKey<'_>,
1379
        path: OsString,
1380
        file: FileSlot,
1381
        upload_size: UploadSizeInfo,
1382
13
    ) -> Result<(u64, Option<FileSlot>), Error> {
1383
        let file_size = match upload_size {
1384
            UploadSizeInfo::ExactSize(size) => size,
1385
            UploadSizeInfo::MaxSize(_) => file
1386
                .as_ref()
1387
                .metadata()
1388
                .await
1389
0
                .err_tip(|| format!("While reading metadata for {}", path.display()))?
1390
                .len(),
1391
        };
1392
        if file_size == 0 {
1393
            // don't need to add, because zero length files are just assumed to exist
1394
            return Ok((0, None));
1395
        }
1396
        let entry = Fe::create(
1397
            file_size,
1398
            self.block_size,
1399
            RwLock::new(EncodedFilePath {
1400
                shared_context: self.shared_context.clone(),
1401
                path_type: PathType::Custom(path),
1402
                key: key.borrow().into_owned(),
1403
            }),
1404
        );
1405
        // We are done with the file, if we hold a reference to the file here, it could
1406
        // result in a deadlock if `emplace_file()` also needs file descriptors.
1407
        trace!(?file, "Dropping file to to update_with_whole_file");
1408
        file.advise_dontneed();
1409
        drop(file);
1410
        self.emplace_file(key.into_owned(), Arc::new(entry))
1411
            .await
1412
            .err_tip(|| "Could not move file into store in upload_file_to_store, maybe dest is on different volume?")?;
1413
        return Ok((file_size, None));
1414
13
    }
1415
1416
    async fn get_part(
1417
        self: Pin<&Self>,
1418
        key: StoreKey<'_>,
1419
        writer: &mut DropCloserWriteHalf,
1420
        offset: u64,
1421
        length: Option<u64>,
1422
87
    ) -> Result<(), Error> {
1423
        if is_zero_digest(key.borrow()) {
1424
            self.has(key.borrow())
1425
                .await
1426
                .err_tip(|| "Failed to check if zero digest exists in filesystem store")?;
1427
            writer
1428
                .send_eof()
1429
                .err_tip(|| "Failed to send zero EOF in filesystem store get_part")?;
1430
            return Ok(());
1431
        }
1432
        let owned_key = key.into_owned();
1433
2
        let entry = self.evicting_map.get(&owned_key).await.ok_or_else(|| {
1434
2
            make_err!(
1435
2
                Code::NotFound,
1436
                "{} not found in filesystem store here",
1437
2
                owned_key.as_str()
1438
            )
1439
2
        })?;
1440
        let read_limit = length.unwrap_or(u64::MAX);
1441
2
        let mut temp_file = entry.read_file_part(offset, read_limit).or_else(|err| async move {
1442
            // If the file is not found, we need to remove it from the eviction map.
1443
2
            if err.code == Code::NotFound {
1444
2
                error!(
1445
                    ?err,
1446
                    key = ?owned_key,
1447
                    "Entry was in our map, but not found on disk. Removing from map as a precaution, but process probably need restarted."
1448
                );
1449
2
                self.evicting_map.remove(&owned_key).await;
1450
0
            }
1451
2
            Err(err)
1452
4
        }).await?;
1453
1454
        loop {
1455
            let mut buf = BytesMut::with_capacity(self.read_buffer_size);
1456
            temp_file
1457
                .read_buf(&mut buf)
1458
                .await
1459
                .err_tip(|| "Failed to read data in filesystem store")?;
1460
            if buf.is_empty() {
1461
                break; // EOF.
1462
            }
1463
            writer
1464
                .send(buf.freeze())
1465
                .await
1466
                .err_tip(|| "Failed to send chunk in filesystem store get_part")?;
1467
        }
1468
        temp_file.get_ref().advise_dontneed();
1469
        writer
1470
            .send_eof()
1471
            .err_tip(|| "Filed to send EOF in filesystem store get_part")?;
1472
1473
        Ok(())
1474
87
    }
1475
1476
207
    fn inner_store(&self, _digest: Option<StoreKey>) -> &dyn StoreDriver {
1477
207
        self
1478
207
    }
1479
1480
49
    fn as_any<'a>(&'a self) -> &'a (dyn core::any::Any + Sync + Send + 'static) {
1481
49
        self
1482
49
    }
1483
1484
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Sync + Send + 'static> {
1485
0
        self
1486
0
    }
1487
1488
0
    fn register_health(self: Arc<Self>, registry: &mut HealthRegistryBuilder) {
1489
0
        registry.register_indicator(self);
1490
0
    }
1491
1492
0
    fn register_remove_callback(
1493
0
        self: Arc<Self>,
1494
0
        callback: Arc<dyn RemoveItemCallback>,
1495
0
    ) -> Result<(), Error> {
1496
0
        self.evicting_map
1497
0
            .add_remove_callback(RemoveItemCallbackHolder::new(callback));
1498
0
        Ok(())
1499
0
    }
1500
}
1501
1502
#[async_trait]
1503
impl<Fe: FileEntry> HealthStatusIndicator for FilesystemStore<Fe> {
1504
0
    fn get_name(&self) -> &'static str {
1505
0
        "FilesystemStore"
1506
0
    }
1507
1508
    /// Lightweight probe: `stat()` the `content_path` directory. No
1509
    /// write-semaphore / eviction-map contention with production
1510
    /// traffic, and bounded so a hung NFS / EBS mount can't wedge the
1511
    /// indicator.
1512
2
    async fn check_health(&self, _namespace: Cow<'static, str>) -> HealthStatus {
1513
        const HEALTH_PROBE_TIMEOUT: Duration = Duration::from_secs(2);
1514
1515
        let content_path = &self.shared_context.content_path;
1516
        let stat = tokio::fs::metadata(&content_path);
1517
        match timeout(HEALTH_PROBE_TIMEOUT, stat).await {
1518
            Ok(Ok(meta)) if meta.is_dir() => {
1519
                HealthStatus::new_ok(self, "FilesystemStore::check_health: ok".into())
1520
            }
1521
            Ok(Ok(_)) => HealthStatus::new_failed(
1522
                self,
1523
                format!(
1524
                    "FilesystemStore::check_health: content_path {content_path} is not a directory"
1525
                )
1526
                .into(),
1527
            ),
1528
            Ok(Err(e)) => {
1529
                warn!(
1530
                    ?e,
1531
                    %content_path,
1532
                    "FilesystemStore::check_health: stat errored",
1533
                );
1534
                HealthStatus::new_failed(
1535
                    self,
1536
                    format!("FilesystemStore::check_health: stat errored: {e}").into(),
1537
                )
1538
            }
1539
            Err(_) => {
1540
                warn!(
1541
                    %content_path,
1542
                    timeout_secs = HEALTH_PROBE_TIMEOUT.as_secs(),
1543
                    "FilesystemStore::check_health: stat timed out",
1544
                );
1545
                HealthStatus::Timeout {
1546
                    struct_name: self.struct_name(),
1547
                }
1548
            }
1549
        }
1550
2
    }
1551
}