Coverage Report

Created: 2024-10-22 12:33

/build/source/nativelink-store/src/filesystem_store.rs
Line
Count
Source (jump to first uncovered line)
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::borrow::Cow;
16
use std::ffi::{OsStr, OsString};
17
use std::fmt::{Debug, Formatter};
18
use std::pin::Pin;
19
use std::sync::atomic::{AtomicU64, Ordering};
20
use std::sync::{Arc, Weak};
21
use std::time::{Duration, SystemTime};
22
23
use async_lock::RwLock;
24
use async_trait::async_trait;
25
use bytes::BytesMut;
26
use filetime::{set_file_atime, FileTime};
27
use futures::stream::{StreamExt, TryStreamExt};
28
use futures::{Future, TryFutureExt};
29
use nativelink_error::{make_err, make_input_err, Code, Error, ResultExt};
30
use nativelink_metric::MetricsComponent;
31
use nativelink_util::buf_channel::{
32
    make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf,
33
};
34
use nativelink_util::common::{fs, DigestInfo};
35
use nativelink_util::evicting_map::{EvictingMap, LenEntry};
36
use nativelink_util::health_utils::{HealthRegistryBuilder, HealthStatus, HealthStatusIndicator};
37
use nativelink_util::store_trait::{StoreDriver, StoreKey, StoreOptimizations, UploadSizeInfo};
38
use nativelink_util::{background_spawn, spawn_blocking};
39
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom};
40
use tokio::time::{sleep, timeout, Sleep};
41
use tokio_stream::wrappers::ReadDirStream;
42
use tracing::{event, Level};
43
44
use crate::cas_utils::is_zero_digest;
45
46
// Default size to allocate memory of the buffer when reading files.
47
const DEFAULT_BUFF_SIZE: usize = 32 * 1024;
48
// Default block size of all major filesystems is 4KB
49
const DEFAULT_BLOCK_SIZE: u64 = 4 * 1024;
50
51
0
#[derive(Debug, MetricsComponent)]
52
pub struct SharedContext {
53
    // Used in testing to know how many active drop() spawns are running.
54
    // TODO(allada) It is probably a good idea to use a spin lock during
55
    // destruction of the store to ensure that all files are actually
56
    // deleted (similar to how it is done in tests).
57
    #[metric(help = "Number of active drop spawns")]
58
    pub active_drop_spawns: AtomicU64,
59
    #[metric(help = "Path to the configured temp path")]
60
    temp_path: String,
61
    #[metric(help = "Path to the configured content path")]
62
    content_path: String,
63
}
64
65
#[derive(Eq, PartialEq, Debug)]
66
enum PathType {
67
    Content,
68
    Temp,
69
    Custom(OsString),
70
}
71
72
// Note: We don't store the full path of the file because it would cause
73
// a lot of needless memeory bloat. There's a high chance we'll end up with a
74
// lot of small files, so to prevent storing duplicate data, we store an Arc
75
// to the path of the directory where the file is stored and the packed digest.
76
// Resulting in usize + sizeof(DigestInfo).
77
type FileNameDigest = DigestInfo;
78
pub struct EncodedFilePath {
79
    shared_context: Arc<SharedContext>,
80
    path_type: PathType,
81
    digest: FileNameDigest,
82
}
83
84
impl EncodedFilePath {
85
    #[inline]
86
297
    fn get_file_path(&self) -> Cow<'_, OsStr> {
87
297
        get_file_path_raw(&self.path_type, self.shared_context.as_ref(), &self.digest)
88
297
    }
89
}
90
91
#[inline]
92
375
fn get_file_path_raw<'a>(
93
375
    path_type: &'a PathType,
94
375
    shared_context: &SharedContext,
95
375
    digest: &DigestInfo,
96
375
) -> Cow<'a, OsStr> {
97
375
    let 
folder370
= match path_type {
98
206
        PathType::Content => &shared_context.content_path,
99
164
        PathType::Temp => &shared_context.temp_path,
100
5
        PathType::Custom(path) => return Cow::Borrowed(path),
101
    };
102
370
    Cow::Owned(to_full_path_from_digest(folder, digest))
103
375
}
104
105
impl Drop for EncodedFilePath {
106
81
    fn drop(&mut self) {
107
81
        // `drop()` can be called during shutdown, so we use `path_type` flag to know if the
108
81
        // file actually needs to be deleted.
109
81
        if self.path_type == PathType::Content {
  Branch (109:12): [True: 64, False: 17]
  Branch (109:12): [Folded - Ignored]
110
64
            return;
111
17
        }
112
17
113
17
        let file_path = self.get_file_path().to_os_string();
114
17
        let shared_context = self.shared_context.clone();
115
17
        shared_context
116
17
            .active_drop_spawns
117
17
            .fetch_add(1, Ordering::Relaxed);
118
17
        background_spawn!("filesystem_delete_file", async move {
119
16
            event!(Level::INFO, ?file_path, 
"File deleted"0
,);
120
16
            let 
result14
= fs::remove_file(&file_path)
121
12
                .await
122
14
                .err_tip(|| 
format!("Failed to remove file {file_path:?}")1
);
123
14
            if let Err(
err1
) = result {
  Branch (123:20): [True: 1, False: 13]
  Branch (123:20): [Folded - Ignored]
124
1
                event!(Level::ERROR, ?file_path, ?err, "Failed to delete file",);
125
13
            }
126
14
            shared_context
127
14
                .active_drop_spawns
128
14
                .fetch_sub(1, Ordering::Relaxed);
129
17
        });
130
81
    }
131
}
132
133
#[inline]
134
387
fn to_full_path_from_digest(folder: &str, digest: &DigestInfo) -> OsString {
135
387
    format!("{folder}/{digest}").into()
136
387
}
137
138
pub trait FileEntry: LenEntry + Send + Sync + Debug + 'static {
139
    /// Responsible for creating the underlying FileEntry.
140
    fn create(data_size: u64, block_size: u64, encoded_file_path: RwLock<EncodedFilePath>) -> Self;
141
142
    /// Creates a (usually) temp file, opens it and returns the path to the temp file.
143
    fn make_and_open_file(
144
        block_size: u64,
145
        encoded_file_path: EncodedFilePath,
146
    ) -> impl Future<Output = Result<(Self, fs::ResumeableFileSlot, OsString), Error>> + Send
147
    where
148
        Self: Sized;
149
150
    /// Returns the underlying reference to the size of the data in bytes
151
    fn data_size_mut(&mut self) -> &mut u64;
152
153
    /// Returns the actual size of the underlying file on the disk after accounting for filesystem block size.
154
    fn size_on_disk(&self) -> u64;
155
156
    /// Gets the underlying EncodedfilePath.
157
    fn get_encoded_file_path(&self) -> &RwLock<EncodedFilePath>;
158
159
    /// Returns a reader that will read part of the underlying file.
160
    fn read_file_part(
161
        &self,
162
        offset: u64,
163
        length: u64,
164
    ) -> impl Future<Output = Result<fs::ResumeableFileSlot, Error>> + Send;
165
166
    /// This function is a safe way to extract the file name of the underlying file. To protect users from
167
    /// accidentally creating undefined behavior we encourage users to do the logic they need to do with
168
    /// the filename inside this function instead of extracting the filename and doing the logic outside.
169
    /// This is because the filename is not guaranteed to exist after this function returns, however inside
170
    /// the callback the file is always guaranteed to exist and immutable.
171
    /// DO NOT USE THIS FUNCTION TO EXTRACT THE FILENAME AND STORE IT FOR LATER USE.
172
    fn get_file_path_locked<
173
        T,
174
        Fut: Future<Output = Result<T, Error>> + Send,
175
        F: FnOnce(OsString) -> Fut + Send,
176
    >(
177
        &self,
178
        handler: F,
179
    ) -> impl Future<Output = Result<T, Error>> + Send;
180
}
181
182
pub struct FileEntryImpl {
183
    data_size: u64,
184
    block_size: u64,
185
    encoded_file_path: RwLock<EncodedFilePath>,
186
}
187
188
impl FileEntryImpl {
189
9
    pub fn get_shared_context_for_test(&mut self) -> Arc<SharedContext> {
190
9
        self.encoded_file_path.get_mut().shared_context.clone()
191
9
    }
192
}
193
194
impl FileEntry for FileEntryImpl {
195
81
    fn create(data_size: u64, block_size: u64, encoded_file_path: RwLock<EncodedFilePath>) -> Self {
196
81
        Self {
197
81
            data_size,
198
81
            block_size,
199
81
            encoded_file_path,
200
81
        }
201
81
    }
202
203
    /// This encapsolates the logic for the edge case of if the file fails to create
204
    /// the cleanup of the file is handled without creating a FileEntry, which would
205
    /// try to cleanup the file as well during drop().
206
73
    async fn make_and_open_file(
207
73
        block_size: u64,
208
73
        encoded_file_path: EncodedFilePath,
209
73
    ) -> Result<(FileEntryImpl, fs::ResumeableFileSlot, OsString), Error> {
210
73
        let temp_full_path = encoded_file_path.get_file_path().to_os_string();
211
73
        let temp_file_result = fs::create_file(temp_full_path.clone())
212
73
            .or_else(|mut err| async 
{0
213
0
                let remove_result = fs::remove_file(&temp_full_path).await.err_tip(|| {
214
0
                    format!("Failed to remove file {temp_full_path:?} in filesystem store")
215
0
                });
216
0
                if let Err(remove_err) = remove_result {
  Branch (216:24): [True: 0, False: 0]
  Branch (216:24): [True: 0, False: 0]
  Branch (216:24): [True: 0, False: 0]
  Branch (216:24): [Folded - Ignored]
  Branch (216:24): [True: 0, False: 0]
217
0
                    err = err.merge(remove_err);
218
0
                }
219
0
                event!(
220
0
                    Level::WARN,
221
                    ?err,
222
                    ?block_size,
223
                    ?temp_full_path,
224
0
                    "Failed to create file",
225
                );
226
0
                Err(err)
227
0
                    .err_tip(|| format!("Failed to create {temp_full_path:?} in filesystem store"))
228
73
            
}0
)
229
97
            .await
?0
;
230
231
73
        Ok((
232
73
            <FileEntryImpl as FileEntry>::create(
233
73
                0, /* Unknown yet, we will fill it in later */
234
73
                block_size,
235
73
                RwLock::new(encoded_file_path),
236
73
            ),
237
73
            temp_file_result,
238
73
            temp_full_path,
239
73
        ))
240
73
    }
241
242
73
    fn data_size_mut(&mut self) -> &mut u64 {
243
73
        &mut self.data_size
244
73
    }
245
246
154
    fn size_on_disk(&self) -> u64 {
247
154
        self.data_size.div_ceil(self.block_size) * self.block_size
248
154
    }
249
250
190
    fn get_encoded_file_path(&self) -> &RwLock<EncodedFilePath> {
251
190
        &self.encoded_file_path
252
190
    }
253
254
28
    async fn read_file_part(
255
28
        &self,
256
28
        offset: u64,
257
28
        length: u64,
258
28
    ) -> Result<fs::ResumeableFileSlot, Error> {
259
28
        let (mut file, full_content_path_for_debug_only) = self
260
28
            .get_file_path_locked(|full_content_path| async move {
261
28
                let file = fs::open_file(full_content_path.clone(), length)
262
29
                    .await
263
28
                    .err_tip(|| {
264
0
                        format!("Failed to open file in filesystem store {full_content_path:?}")
265
28
                    })
?0
;
266
28
                Ok((file, full_content_path))
267
56
            }
)28
268
29
            .await
?0
;
269
270
28
        file.as_reader()
271
0
            .await
272
28
            .err_tip(|| 
"Could not seek file in read_file_part()"0
)
?0
273
28
            .get_mut()
274
28
            .seek(SeekFrom::Start(offset))
275
29
            .await
276
28
            .err_tip(|| 
format!("Failed to seek file: {full_content_path_for_debug_only:?}")0
)
?0
;
277
28
        Ok(file)
278
28
    }
279
280
112
    async fn get_file_path_locked<
281
112
        T,
282
112
        Fut: Future<Output = Result<T, Error>> + Send,
283
112
        F: FnOnce(OsString) -> Fut + Send,
284
112
    >(
285
112
        &self,
286
112
        handler: F,
287
112
    ) -> Result<T, Error> {
288
112
        let encoded_file_path = self.get_encoded_file_path().read().
await1
;
289
124
        
handler(encoded_file_path.get_file_path().to_os_string())112
.await
290
112
    }
291
}
292
293
impl Debug for FileEntryImpl {
294
0
    fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), std::fmt::Error> {
295
0
        f.debug_struct("FileEntryImpl")
296
0
            .field("data_size", &self.data_size)
297
0
            .field("encoded_file_path", &"<behind mutex>")
298
0
            .finish()
299
0
    }
300
}
301
302
90
fn make_temp_digest(digest: &mut DigestInfo) {
303
    static DELETE_FILE_COUNTER: AtomicU64 = AtomicU64::new(0);
304
90
    let mut hash = *digest.packed_hash();
305
90
    hash[24..].clone_from_slice(
306
90
        &DELETE_FILE_COUNTER
307
90
            .fetch_add(1, Ordering::Relaxed)
308
90
            .to_le_bytes(),
309
90
    );
310
90
    digest.set_packed_hash(*hash);
311
90
}
312
313
impl LenEntry for FileEntryImpl {
314
    #[inline]
315
152
    fn len(&self) -> u64 {
316
152
        self.size_on_disk()
317
152
    }
318
319
0
    fn is_empty(&self) -> bool {
320
0
        self.data_size == 0
321
0
    }
322
323
    #[inline]
324
74
    async fn touch(&self) -> bool {
325
74
        let result = self
326
74
            .get_file_path_locked(move |full_content_path| async move {
327
74
                let full_content_path = full_content_path.clone();
328
74
                spawn_blocking!("filesystem_touch_set_mtime", move || {
329
74
                    set_file_atime(&full_content_path, FileTime::now()).err_tip(|| {
330
1
                        format!("Failed to touch file in filesystem store {full_content_path:?}")
331
74
                    })
332
74
                })
333
87
                .await
334
74
                .map_err(|e| {
335
0
                    make_err!(
336
0
                        Code::Internal,
337
0
                        "Failed to change atime of file due to spawn failing {:?}",
338
0
                        e
339
0
                    )
340
74
                })
?0
341
148
            }
)74
342
88
            .await;
343
74
        if let Err(
err1
) = result {
  Branch (343:16): [True: 0, False: 0]
  Branch (343:16): [True: 1, False: 15]
  Branch (343:16): [True: 0, False: 0]
  Branch (343:16): [True: 0, False: 4]
  Branch (343:16): [Folded - Ignored]
  Branch (343:16): [True: 0, False: 54]
344
1
            event!(Level::ERROR, ?err, "Failed to touch file",);
345
1
            return false;
346
73
        }
347
73
        true
348
74
    }
349
350
    // unref() only triggers when an item is removed from the eviction_map. It is possible
351
    // that another place in code has a reference to `FileEntryImpl` and may later read the
352
    // file. To support this edge case, we first move the file to a temp file and point
353
    // target file location to the new temp file. `unref()` should only ever be called once.
354
    #[inline]
355
18
    async fn unref(&self) {
356
        {
357
18
            let mut encoded_file_path = self.encoded_file_path.write().
await0
;
358
18
            if encoded_file_path.path_type == PathType::Temp {
  Branch (358:16): [True: 0, False: 0]
  Branch (358:16): [True: 1, False: 11]
  Branch (358:16): [True: 0, False: 0]
  Branch (358:16): [True: 0, False: 0]
  Branch (358:16): [Folded - Ignored]
  Branch (358:16): [True: 0, False: 6]
359
                // We are already a temp file that is now marked for deletion on drop.
360
                // This is very rare, but most likely the rename into the content path failed.
361
1
                return;
362
17
            }
363
17
            let from_path = encoded_file_path.get_file_path();
364
17
            let mut new_digest = encoded_file_path.digest;
365
17
            make_temp_digest(&mut new_digest);
366
17
367
17
            let to_path =
368
17
                to_full_path_from_digest(&encoded_file_path.shared_context.temp_path, &new_digest);
369
370
18
            if let Err(
err1
) =
fs::rename(&from_path, &to_path)17
.await {
  Branch (370:20): [True: 0, False: 0]
  Branch (370:20): [True: 1, False: 10]
  Branch (370:20): [True: 0, False: 0]
  Branch (370:20): [True: 0, False: 0]
  Branch (370:20): [Folded - Ignored]
  Branch (370:20): [True: 0, False: 6]
371
1
                event!(
372
1
                    Level::WARN,
373
1
                    digest = ?encoded_file_path.digest,
374
1
                    ?from_path,
375
1
                    ?to_path,
376
1
                    ?err,
377
1
                    "Failed to rename file",
378
                );
379
            } else {
380
16
                event!(
381
16
                    Level::INFO,
382
0
                    digest = ?encoded_file_path.digest,
383
0
                    ?from_path,
384
0
                    ?to_path,
385
0
                    "Renamed file",
386
                );
387
16
                encoded_file_path.path_type = PathType::Temp;
388
16
                encoded_file_path.digest = new_digest;
389
            }
390
        }
391
18
    }
392
}
393
394
#[inline]
395
4
pub fn digest_from_filename(file_name: &str) -> Result<DigestInfo, Error> {
396
4
    let (hash, size) = file_name.split_once('-').err_tip(|| 
""0
)
?0
;
397
4
    let size = size.parse::<i64>()
?0
;
398
4
    DigestInfo::try_new(hash, size)
399
4
}
400
401
/// The number of files to read the metadata for at the same time when running
402
/// add_files_to_cache.
403
const SIMULTANEOUS_METADATA_READS: usize = 200;
404
405
39
async fn add_files_to_cache<Fe: FileEntry>(
406
39
    evicting_map: &EvictingMap<DigestInfo, Arc<Fe>, SystemTime>,
407
39
    anchor_time: &SystemTime,
408
39
    shared_context: &Arc<SharedContext>,
409
39
    block_size: u64,
410
39
) -> Result<(), Error> {
411
3
    async fn process_entry<Fe: FileEntry>(
412
3
        evicting_map: &EvictingMap<DigestInfo, Arc<Fe>, SystemTime>,
413
3
        file_name: &str,
414
3
        atime: SystemTime,
415
3
        data_size: u64,
416
3
        block_size: u64,
417
3
        anchor_time: &SystemTime,
418
3
        shared_context: &Arc<SharedContext>,
419
3
    ) -> Result<(), Error> {
420
3
        let digest = digest_from_filename(file_name)
?0
;
421
422
3
        let file_entry = Fe::create(
423
3
            data_size,
424
3
            block_size,
425
3
            RwLock::new(EncodedFilePath {
426
3
                shared_context: shared_context.clone(),
427
3
                path_type: PathType::Content,
428
3
                digest,
429
3
            }),
430
3
        );
431
3
        let time_since_anchor = anchor_time
432
3
            .duration_since(atime)
433
3
            .map_err(|_| 
make_input_err!("File access time newer than now")0
)
?0
;
434
3
        evicting_map
435
3
            .insert_with_time(
436
3
                digest,
437
3
                Arc::new(file_entry),
438
3
                time_since_anchor.as_secs() as i32,
439
3
            )
440
1
            .await;
441
3
        Ok(())
442
3
    }
443
444
39
    let mut file_infos: Vec<(String, SystemTime, u64)> = {
445
39
        let (_permit, dir_handle) = fs::read_dir(format!("{}/", shared_context.content_path))
446
39
            .await
447
39
            .err_tip(|| 
"Failed opening content directory for iterating in filesystem store"0
)
?0
448
39
            .into_inner();
449
39
450
39
        let read_dir_stream = ReadDirStream::new(dir_handle);
451
39
        read_dir_stream
452
39
            .map(|dir_entry| async move {
453
3
                let dir_entry = dir_entry.unwrap();
454
3
                let file_name = dir_entry.file_name().into_string().unwrap();
455
3
                let metadata = dir_entry
456
3
                    .metadata()
457
3
                    .await
458
3
                    .err_tip(|| 
"Failed to get metadata in filesystem store"0
)
?0
;
459
3
                let atime = match metadata.accessed() {
460
3
                    Ok(atime) => atime,
461
0
                    Err(err) => {
462
0
                        panic!(
463
0
                            "{}{}{} : {} {:?}",
464
0
                            "It appears this filesystem does not support access time. ",
465
0
                            "Please configure this program to run on a drive that supports ",
466
0
                            "atime",
467
0
                            file_name,
468
0
                            err
469
0
                        );
470
                    }
471
                };
472
3
                Result::<(String, SystemTime, u64), Error>::Ok((file_name, atime, metadata.len()))
473
39
            
}6
)
474
39
            .buffer_unordered(SIMULTANEOUS_METADATA_READS)
475
39
            .try_collect()
476
4
            .await
?0
477
    };
478
479
39
    file_infos.sort_by(|a, b| 
a.1.cmp(&b.1)1
);
480
42
    for (
file_name, atime, data_size3
) in file_infos {
481
3
        let result = process_entry(
482
3
            evicting_map,
483
3
            &file_name,
484
3
            atime,
485
3
            data_size,
486
3
            block_size,
487
3
            anchor_time,
488
3
            shared_context,
489
3
        )
490
1
        .await;
491
3
        if let Err(
err0
) = result {
  Branch (491:16): [True: 0, False: 0]
  Branch (491:16): [True: 0, False: 0]
  Branch (491:16): [True: 0, False: 0]
  Branch (491:16): [True: 0, False: 0]
  Branch (491:16): [True: 0, False: 0]
  Branch (491:16): [True: 0, False: 0]
  Branch (491:16): [True: 0, False: 3]
  Branch (491:16): [True: 0, False: 0]
  Branch (491:16): [Folded - Ignored]
  Branch (491:16): [True: 0, False: 0]
492
0
            event!(
493
0
                Level::WARN,
494
                ?file_name,
495
                ?err,
496
0
                "Failed to add file to eviction cache",
497
            );
498
            // Ignore result.
499
            let _ =
500
0
                fs::remove_file(format!("{}/{}", &shared_context.content_path, &file_name)).await;
501
3
        }
502
    }
503
39
    Ok(())
504
39
}
505
506
39
async fn prune_temp_path(temp_path: &str) -> Result<(), Error> {
507
39
    let (_permit, dir_handle) = fs::read_dir(temp_path)
508
39
        .await
509
39
        .err_tip(|| 
"Failed opening temp directory to prune partial downloads in filesystem store"0
)
?0
510
39
        .into_inner();
511
39
512
39
    let mut read_dir_stream = ReadDirStream::new(dir_handle);
513
39
    while let Some(
dir_entry0
) = read_dir_stream.next().
await0
{
  Branch (513:15): [True: 0, False: 0]
  Branch (513:15): [True: 0, False: 21]
  Branch (513:15): [True: 0, False: 2]
  Branch (513:15): [Folded - Ignored]
  Branch (513:15): [True: 0, False: 16]
514
0
        let path = dir_entry?.path();
515
0
        if let Err(err) = fs::remove_file(&path).await {
  Branch (515:16): [True: 0, False: 0]
  Branch (515:16): [True: 0, False: 0]
  Branch (515:16): [True: 0, False: 0]
  Branch (515:16): [Folded - Ignored]
  Branch (515:16): [True: 0, False: 0]
516
0
            event!(Level::WARN, ?path, ?err, "Failed to delete file",);
517
0
        }
518
    }
519
39
    Ok(())
520
39
}
521
522
0
#[derive(MetricsComponent)]
523
pub struct FilesystemStore<Fe: FileEntry = FileEntryImpl> {
524
    #[metric]
525
    shared_context: Arc<SharedContext>,
526
    #[metric(group = "evicting_map")]
527
    evicting_map: Arc<EvictingMap<DigestInfo, Arc<Fe>, SystemTime>>,
528
    #[metric(help = "Block size of the configured filesystem")]
529
    block_size: u64,
530
    #[metric(help = "Size of the configured read buffer size")]
531
    read_buffer_size: usize,
532
    weak_self: Weak<Self>,
533
    sleep_fn: fn(Duration) -> Sleep,
534
    rename_fn: fn(&OsStr, &OsStr) -> Result<(), std::io::Error>,
535
}
536
537
impl<Fe: FileEntry> FilesystemStore<Fe> {
538
32
    pub async fn new(
539
32
        config: &nativelink_config::stores::FilesystemStore,
540
32
    ) -> Result<Arc<Self>, Error> {
541
69
        Self::new_with_timeout_and_rename_fn(config, sleep, |from, to| std::fs::rename(from, to)
)32
542
133
            .await
543
32
    }
544
545
39
    pub async fn new_with_timeout_and_rename_fn(
546
39
        config: &nativelink_config::stores::FilesystemStore,
547
39
        sleep_fn: fn(Duration) -> Sleep,
548
39
        rename_fn: fn(&OsStr, &OsStr) -> Result<(), std::io::Error>,
549
39
    ) -> Result<Arc<Self>, Error> {
550
39
        let now = SystemTime::now();
551
39
552
39
        let empty_policy = nativelink_config::stores::EvictionPolicy::default();
553
39
        let eviction_policy = config.eviction_policy.as_ref().unwrap_or(&empty_policy);
554
39
        let evicting_map = Arc::new(EvictingMap::new(eviction_policy, now));
555
39
556
39
        fs::create_dir_all(&config.temp_path)
557
39
            .await
558
39
            .err_tip(|| 
format!("Failed to temp directory {:?}", &config.temp_path)0
)
?0
;
559
39
        fs::create_dir_all(&config.content_path)
560
39
            .await
561
39
            .err_tip(|| 
format!("Failed to content directory {:?}", &config.content_path)0
)
?0
;
562
563
39
        let shared_context = Arc::new(SharedContext {
564
39
            active_drop_spawns: AtomicU64::new(0),
565
39
            temp_path: config.temp_path.clone(),
566
39
            content_path: config.content_path.clone(),
567
39
        });
568
569
39
        let block_size = if config.block_size == 0 {
  Branch (569:29): [True: 0, False: 0]
  Branch (569:29): [True: 0, False: 1]
  Branch (569:29): [True: 0, False: 1]
  Branch (569:29): [True: 1, False: 0]
  Branch (569:29): [True: 0, False: 1]
  Branch (569:29): [True: 1, False: 0]
  Branch (569:29): [True: 15, False: 1]
  Branch (569:29): [True: 2, False: 0]
  Branch (569:29): [Folded - Ignored]
  Branch (569:29): [True: 16, False: 0]
570
35
            DEFAULT_BLOCK_SIZE
571
        } else {
572
4
            config.block_size
573
        };
574
44
        
add_files_to_cache(evicting_map.as_ref(), &now, &shared_context, block_size)39
.await
?0
;
575
39
        prune_temp_path(&shared_context.temp_path).await
?0
;
576
577
39
        let read_buffer_size = if config.read_buffer_size == 0 {
  Branch (577:35): [True: 0, False: 0]
  Branch (577:35): [True: 1, False: 0]
  Branch (577:35): [True: 0, False: 1]
  Branch (577:35): [True: 1, False: 0]
  Branch (577:35): [True: 0, False: 1]
  Branch (577:35): [True: 1, False: 0]
  Branch (577:35): [True: 7, False: 9]
  Branch (577:35): [True: 2, False: 0]
  Branch (577:35): [Folded - Ignored]
  Branch (577:35): [True: 16, False: 0]
578
28
            DEFAULT_BUFF_SIZE
579
        } else {
580
11
            config.read_buffer_size as usize
581
        };
582
39
        Ok(Arc::new_cyclic(|weak_self| Self {
583
39
            shared_context,
584
39
            evicting_map,
585
39
            block_size,
586
39
            read_buffer_size,
587
39
            weak_self: weak_self.clone(),
588
39
            sleep_fn,
589
39
            rename_fn,
590
39
        }))
591
39
    }
592
593
15
    pub fn get_arc(&self) -> Option<Arc<Self>> {
594
15
        self.weak_self.upgrade()
595
15
    }
596
597
13
    pub async fn get_file_entry_for_digest(&self, digest: &DigestInfo) -> Result<Arc<Fe>, Error> {
598
13
        self.evicting_map
599
13
            .get(digest)
600
14
            .await
601
13
            .ok_or_else(|| 
make_err!(Code::NotFound, "{digest} not found in filesystem store")1
)
602
13
    }
603
604
73
    async fn update_file<'a>(
605
73
        self: Pin<&'a Self>,
606
73
        mut entry: Fe,
607
73
        mut resumeable_temp_file: fs::ResumeableFileSlot,
608
73
        final_digest: DigestInfo,
609
73
        mut reader: DropCloserReadHalf,
610
73
    ) -> Result<(), Error> {
611
73
        let mut data_size = 0;
612
        loop {
613
130
            let Ok(data_result) = timeout(fs::idle_file_descriptor_timeout(), reader.recv()).
await2
  Branch (613:17): [True: 0, False: 0]
  Branch (613:17): [True: 4, False: 0]
  Branch (613:17): [True: 4, False: 0]
  Branch (613:17): [True: 4, False: 0]
  Branch (613:17): [True: 4, False: 0]
  Branch (613:17): [True: 2, False: 0]
  Branch (613:17): [True: 32, False: 0]
  Branch (613:17): [True: 0, False: 0]
  Branch (613:17): [Folded - Ignored]
  Branch (613:17): [True: 80, False: 0]
614
            else {
615
                // In the event we timeout, we want to close the writing file, to prevent
616
                // the file descriptor left open for long periods of time.
617
                // This is needed because we wrap `fs` so only a fixed number of file
618
                // descriptors may be open at any given time. If we are streaming from
619
                // File -> File, it can cause a deadlock if the Write file is not sending
620
                // data because it is waiting for a file descriotor to open before sending data.
621
0
                resumeable_temp_file.close_file().await.err_tip(|| {
622
0
                    "Could not close file due to timeout in FileSystemStore::update_file"
623
0
                })?;
624
0
                continue;
625
            };
626
130
            let mut data = data_result.err_tip(|| 
"Failed to receive data in filesystem store"0
)
?0
;
627
130
            let data_len = data.len();
628
130
            if data_len == 0 {
  Branch (628:16): [True: 0, False: 0]
  Branch (628:16): [True: 2, False: 2]
  Branch (628:16): [True: 2, False: 2]
  Branch (628:16): [True: 2, False: 2]
  Branch (628:16): [True: 2, False: 2]
  Branch (628:16): [True: 1, False: 1]
  Branch (628:16): [True: 17, False: 15]
  Branch (628:16): [True: 0, False: 0]
  Branch (628:16): [Folded - Ignored]
  Branch (628:16): [True: 47, False: 33]
629
73
                break; // EOF.
630
57
            }
631
57
            resumeable_temp_file
632
57
                .as_writer()
633
0
                .await
634
57
                .err_tip(|| 
"in filesystem_store::update_file"0
)
?0
635
57
                .write_all_buf(&mut data)
636
0
                .await
637
57
                .err_tip(|| 
"Failed to write data into filesystem store"0
)
?0
;
638
57
            data_size += data_len as u64;
639
        }
640
641
73
        resumeable_temp_file
642
73
            .as_writer()
643
0
            .await
644
73
            .err_tip(|| 
"in filesystem_store::update_file"0
)
?0
645
73
            .as_ref()
646
73
            .sync_all()
647
185
            .await
648
73
            .err_tip(|| 
"Failed to sync_data in filesystem store"0
)
?0
;
649
650
73
        drop(resumeable_temp_file);
651
73
652
73
        *entry.data_size_mut() = data_size;
653
77
        self.emplace_file(final_digest, Arc::new(entry)).await
654
72
    }
655
656
78
    async fn emplace_file(&self, digest: DigestInfo, entry: Arc<Fe>) -> Result<(), Error> {
657
78
        // This sequence of events is quite ticky to understand due to the amount of triggers that
658
78
        // happen, async'ness of it and the locking. So here is a breakdown of what happens:
659
78
        // 1. Here will hold a write lock on any file operations of this FileEntry.
660
78
        // 2. Then insert the entry into the evicting map. This may trigger an eviction of other
661
78
        //    entries.
662
78
        // 3. Eviction triggers `unref()`, which grabs a write lock on the evicted FileEntrys
663
78
        //    during the rename.
664
78
        // 4. It should be impossible for items to be added while eviction is happening, so there
665
78
        //    should not be a deadlock possability. However, it is possible for the new FileEntry
666
78
        //    to be evicted before the file is moved into place. Eviction of the newly inserted
667
78
        //    item is not possible within the `insert()` call because the write lock inside the
668
78
        //    eviction map. If an eviction of new item happens after `insert()` but before
669
78
        //    `rename()` then we get to finish our operation because the `unref()` of the new item
670
78
        //    will be blocked on us because we currently have the lock.
671
78
        // 5. Move the file into place. Since we hold a write lock still anyone that gets our new
672
78
        //    FileEntry (which has not yet been placed on disk) will not be able to read the file's
673
78
        //    contents until we relese the lock.
674
78
        let evicting_map = self.evicting_map.clone();
675
78
        let rename_fn = self.rename_fn;
676
78
677
78
        // We need to guarantee that this will get to the end even if the parent future is dropped.
678
78
        // See: https://github.com/TraceMachina/nativelink/issues/495
679
78
        background_spawn!("filesystem_store_emplace_file", async move {
680
78
            let mut encoded_file_path = entry.get_encoded_file_path().write().
await0
;
681
78
            let final_path = get_file_path_raw(
682
78
                &PathType::Content,
683
78
                encoded_file_path.shared_context.as_ref(),
684
78
                &digest,
685
78
            );
686
78
687
78
            evicting_map.insert(digest, entry.clone()).
await19
;
688
689
78
            let from_path = encoded_file_path.get_file_path();
690
78
            // Internally tokio spawns fs commands onto a blocking thread anyways.
691
78
            // Since we are already on a blocking thread, we just need the `fs` wrapper to manage
692
78
            // an open-file permit (ensure we don't open too many files at once).
693
78
            let result = (rename_fn)(&from_path, &final_path)
694
78
                .err_tip(|| 
format!("Failed to rename temp file to final path {final_path:?}")1
);
695
696
            // In the event our move from temp file to final file fails we need to ensure we remove
697
            // the entry from our map.
698
            // Remember: At this point it is possible for another thread to have a reference to
699
            // `entry`, so we can't delete the file, only drop() should ever delete files.
700
78
            if let Err(
err1
) = result {
  Branch (700:20): [True: 0, False: 0]
  Branch (700:20): [True: 0, False: 2]
  Branch (700:20): [True: 0, False: 2]
  Branch (700:20): [True: 0, False: 2]
  Branch (700:20): [True: 0, False: 2]
  Branch (700:20): [True: 1, False: 0]
  Branch (700:20): [True: 0, False: 20]
  Branch (700:20): [True: 0, False: 0]
  Branch (700:20): [Folded - Ignored]
  Branch (700:20): [True: 0, False: 49]
701
1
                event!(
702
1
                    Level::ERROR,
703
                    ?err,
704
                    ?from_path,
705
                    ?final_path,
706
1
                    "Failed to rename file",
707
                );
708
                // Warning: To prevent deadlock we need to release our lock or during `remove_if()`
709
                // it will call `unref()`, which triggers a write-lock on `encoded_file_path`.
710
1
                drop(encoded_file_path);
711
1
                // It is possible that the item in our map is no longer the item we inserted,
712
1
                // So, we need to conditionally remove it only if the pointers are the same.
713
1
                evicting_map
714
1
                    .remove_if(&digest, |map_entry| Arc::<Fe>::ptr_eq(map_entry, &entry))
715
0
                    .await;
716
1
                return Err(err);
717
77
            }
718
77
            encoded_file_path.path_type = PathType::Content;
719
77
            encoded_file_path.digest = digest;
720
77
            Ok(())
721
78
        })
722
82
        .await
723
77
        .err_tip(|| 
"Failed to create spawn in filesystem store update_file"0
)
?0
724
77
    }
725
}
726
727
#[async_trait]
728
impl<Fe: FileEntry> StoreDriver for FilesystemStore<Fe> {
729
    async fn has_with_results(
730
        self: Pin<&Self>,
731
        keys: &[StoreKey<'_>],
732
        results: &mut [Option<u64>],
733
52
    ) -> Result<(), Error> {
734
        // TODO(allada) This is a bit of a hack to get around the lifetime issues with the
735
        // existence_cache. We need to convert the digests to owned values to be able to
736
        // insert them into the cache. In theory it should be able to elide this conversion
737
        // but it seems to be a bit tricky to get right.
738
52
        let keys: Vec<_> = keys.iter().map(|v| v.borrow().into_digest()).collect();
739
52
        self.evicting_map
740
52
            .sizes_for_keys(&keys, results, false /* peek */)
741
50
            .await;
742
        // We need to do a special pass to ensure our zero files exist.
743
        // If our results failed and the result was a zero file, we need to
744
        // create the file by spec.
745
52
        for (digest, result) in keys.iter().zip(results.iter_mut()) {
746
52
            if result.is_some() || 
!is_zero_digest(digest)17
{
  Branch (746:16): [True: 0, False: 0]
  Branch (746:36): [True: 0, False: 0]
  Branch (746:16): [True: 0, False: 0]
  Branch (746:36): [True: 0, False: 0]
  Branch (746:16): [True: 0, False: 0]
  Branch (746:36): [True: 0, False: 0]
  Branch (746:16): [True: 0, False: 0]
  Branch (746:36): [True: 0, False: 0]
  Branch (746:16): [True: 0, False: 0]
  Branch (746:36): [True: 0, False: 0]
  Branch (746:16): [True: 0, False: 1]
  Branch (746:36): [True: 1, False: 0]
  Branch (746:16): [True: 1, False: 3]
  Branch (746:36): [True: 1, False: 2]
  Branch (746:16): [True: 0, False: 0]
  Branch (746:36): [True: 0, False: 0]
  Branch (746:16): [Folded - Ignored]
  Branch (746:36): [Folded - Ignored]
  Branch (746:16): [True: 34, False: 13]
  Branch (746:36): [True: 13, False: 0]
747
50
                continue;
748
2
            }
749
2
            let (mut tx, rx) = make_buf_channel_pair();
750
2
            let send_eof_result = tx.send_eof();
751
2
            self.update(digest.into(), rx, UploadSizeInfo::ExactSize(0))
752
6
                .await
753
2
                .err_tip(|| 
format!("Failed to create zero file for key {digest}")0
)
754
2
                .merge(
755
2
                    send_eof_result
756
2
                        .err_tip(|| 
"Failed to send zero file EOF in filesystem store has"0
),
757
2
                )
?0
;
758
759
2
            *result = Some(0);
760
        }
761
52
        Ok(())
762
104
    }
763
764
    async fn update(
765
        self: Pin<&Self>,
766
        key: StoreKey<'_>,
767
        reader: DropCloserReadHalf,
768
        _upload_size: UploadSizeInfo,
769
73
    ) -> Result<(), Error> {
770
73
        let digest = key.into_digest();
771
73
        let mut temp_digest = digest;
772
73
        make_temp_digest(&mut temp_digest);
773
774
73
        let (entry, temp_file, temp_full_path) = Fe::make_and_open_file(
775
73
            self.block_size,
776
73
            EncodedFilePath {
777
73
                shared_context: self.shared_context.clone(),
778
73
                path_type: PathType::Temp,
779
73
                digest: temp_digest,
780
73
            },
781
73
        )
782
97
        .await
?0
;
783
784
73
        self.update_file(entry, temp_file, digest, reader)
785
264
            .await
786
72
            .err_tip(|| 
format!("While processing with temp file {temp_full_path:?}")1
)
787
145
    }
788
789
44
    fn optimized_for(&self, optimization: StoreOptimizations) -> bool {
790
44
        optimization == StoreOptimizations::FileUpdates
791
44
    }
792
793
    async fn update_with_whole_file(
794
        self: Pin<&Self>,
795
        key: StoreKey<'_>,
796
        mut file: fs::ResumeableFileSlot,
797
        upload_size: UploadSizeInfo,
798
5
    ) -> Result<Option<fs::ResumeableFileSlot>, Error> {
799
5
        let digest = key.into_digest();
800
5
        let path = file.get_path().as_os_str().to_os_string();
801
5
        let file_size = match upload_size {
802
5
            UploadSizeInfo::ExactSize(size) => size,
803
0
            UploadSizeInfo::MaxSize(_) => file
804
0
                .as_reader()
805
0
                .await
806
0
                .err_tip(|| {
807
0
                    format!("While getting metadata for {path:?} in update_with_whole_file")
808
0
                })?
809
0
                .get_ref()
810
0
                .as_ref()
811
0
                .metadata()
812
0
                .await
813
0
                .err_tip(|| format!("While reading metadata for {path:?}"))?
814
0
                .len(),
815
        };
816
5
        let entry = Fe::create(
817
5
            file_size,
818
5
            self.block_size,
819
5
            RwLock::new(EncodedFilePath {
820
5
                shared_context: self.shared_context.clone(),
821
5
                path_type: PathType::Custom(path),
822
5
                digest,
823
5
            }),
824
5
        );
825
5
        // We are done with the file, if we hold a reference to the file here, it could
826
5
        // result in a deadlock if `emplace_file()` also needs file descriptors.
827
5
        drop(file);
828
5
        self.emplace_file(digest, Arc::new(entry))
829
5
            .await
830
5
            .err_tip(|| 
"Could not move file into store in upload_file_to_store, maybe dest is on different volume?"0
)
?0
;
831
5
        return Ok(None);
832
10
    }
833
834
    async fn get_part(
835
        self: Pin<&Self>,
836
        key: StoreKey<'_>,
837
        writer: &mut DropCloserWriteHalf,
838
        offset: u64,
839
        length: Option<u64>,
840
34
    ) -> Result<(), Error> {
841
34
        let digest = key.into_digest();
842
34
        if is_zero_digest(digest) {
  Branch (842:12): [True: 0, False: 0]
  Branch (842:12): [True: 0, False: 0]
  Branch (842:12): [True: 0, False: 1]
  Branch (842:12): [True: 0, False: 0]
  Branch (842:12): [True: 0, False: 1]
  Branch (842:12): [True: 0, False: 0]
  Branch (842:12): [True: 1, False: 4]
  Branch (842:12): [True: 0, False: 0]
  Branch (842:12): [Folded - Ignored]
  Branch (842:12): [True: 7, False: 20]
843
8
            self.has(digest.into())
844
10
                .await
845
8
                .err_tip(|| 
"Failed to check if zero digest exists in filesystem store"0
)
?0
;
846
8
            writer
847
8
                .send_eof()
848
8
                .err_tip(|| 
"Failed to send zero EOF in filesystem store get_part"0
)
?0
;
849
8
            return Ok(());
850
26
        }
851
852
26
        let entry =
853
29
            
self.evicting_map.get(&digest)26
.await.
ok_or_else(26
|| {
854
0
                make_err!(Code::NotFound, "{digest} not found in filesystem store")
855
26
            })
?0
;
856
26
        let read_limit = length.unwrap_or(u64::MAX);
857
54
        let 
mut resumeable_temp_file26
=
entry.read_file_part(offset, read_limit)26
.await
?0
;
858
859
        loop {
860
1.09k
            let mut buf = BytesMut::with_capacity(self.read_buffer_size);
861
1.09k
            resumeable_temp_file
862
1.09k
                .as_reader()
863
0
                .await
864
1.09k
                .err_tip(|| 
"In FileSystemStore::get_part()"0
)
?0
865
1.09k
                .read_buf(&mut buf)
866
1.11k
                .await
867
1.09k
                .err_tip(|| 
"Failed to read data in filesystem store"0
)
?0
;
868
1.09k
            if buf.is_empty() {
  Branch (868:16): [True: 0, False: 0]
  Branch (868:16): [True: 0, False: 0]
  Branch (868:16): [True: 1, False: 10]
  Branch (868:16): [True: 0, False: 0]
  Branch (868:16): [True: 1, False: 10]
  Branch (868:16): [True: 0, False: 0]
  Branch (868:16): [True: 3, False: 1.02k]
  Branch (868:16): [True: 0, False: 0]
  Branch (868:16): [Folded - Ignored]
  Branch (868:16): [True: 20, False: 20]
869
25
                break; // EOF.
870
1.06k
            }
871
1.06k
            // In the event it takes a while to send the data to the client, we want to close the
872
1.06k
            // reading file, to prevent the file descriptor left open for long periods of time.
873
1.06k
            // Failing to do so might cause deadlocks if the receiver is unable to receive data
874
1.06k
            // because it is waiting for a file descriptor to open before receiving data.
875
1.06k
            // Using `ResumeableFileSlot` will re-open the file in the event it gets closed on the
876
1.06k
            // next iteration.
877
1.06k
            let buf_content = buf.freeze();
878
            loop {
879
1.06k
                let sleep_fn = (self.sleep_fn)(fs::idle_file_descriptor_timeout());
880
1.06k
                tokio::pin!(sleep_fn);
881
1.06k
                tokio::select! {
882
1.06k
                    _ = & mut (sleep_fn) => {
883
0
                        resumeable_temp_file
884
0
                            .close_file()
885
0
                            .await
886
0
                            .err_tip(|| "Could not close file due to timeout in FileSystemStore::get_part")?;
887
0
                        continue;
888
                    }
889
1.06k
                    res = writer.send(buf_content.clone()) => {
890
1.06k
                        match res {
891
1.06k
                            Ok(()) => break,
892
0
                            Err(err) => {
893
0
                                return Err(err).err_tip(|| "Failed to send chunk in filesystem store get_part");
894
                            }
895
                        }
896
                    }
897
                }
898
            }
899
        }
900
25
        writer
901
25
            .send_eof()
902
25
            .err_tip(|| 
"Filed to send EOF in filesystem store get_part"0
)
?0
;
903
904
25
        Ok(())
905
67
    }
906
907
55
    fn inner_store(&self, _digest: Option<StoreKey>) -> &dyn StoreDriver {
908
55
        self
909
55
    }
910
911
15
    fn as_any<'a>(&'a self) -> &'a (dyn std::any::Any + Sync + Send + 'static) {
912
15
        self
913
15
    }
914
915
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Sync + Send + 'static> {
916
0
        self
917
0
    }
918
919
0
    fn register_health(self: Arc<Self>, registry: &mut HealthRegistryBuilder) {
920
0
        registry.register_indicator(self);
921
0
    }
922
}
923
924
#[async_trait]
925
impl<Fe: FileEntry> HealthStatusIndicator for FilesystemStore<Fe> {
926
0
    fn get_name(&self) -> &'static str {
927
0
        "FilesystemStore"
928
0
    }
929
930
0
    async fn check_health(&self, namespace: Cow<'static, str>) -> HealthStatus {
931
0
        StoreDriver::check_health(Pin::new(self), namespace).await
932
0
    }
933
}