Coverage Report

Created: 2025-12-17 22:46

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-worker/src/directory_cache.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::future::Future;
16
use core::pin::Pin;
17
use std::collections::HashMap;
18
use std::path::{Path, PathBuf};
19
use std::sync::Arc;
20
use std::time::SystemTime;
21
22
use nativelink_error::{Code, Error, ResultExt, make_err};
23
use nativelink_proto::build::bazel::remote::execution::v2::{
24
    Directory as ProtoDirectory, DirectoryNode, FileNode, SymlinkNode,
25
};
26
use nativelink_store::ac_utils::get_and_decode_digest;
27
use nativelink_util::common::DigestInfo;
28
use nativelink_util::fs_util::{hardlink_directory_tree, set_readonly_recursive};
29
use nativelink_util::store_trait::{Store, StoreKey, StoreLike};
30
use tokio::fs;
31
use tokio::sync::{Mutex, RwLock};
32
use tracing::{debug, trace, warn};
33
34
/// Configuration for the directory cache
35
#[derive(Debug, Clone)]
36
pub struct DirectoryCacheConfig {
37
    /// Maximum number of cached directories
38
    pub max_entries: usize,
39
    /// Maximum total size in bytes (0 = unlimited)
40
    pub max_size_bytes: u64,
41
    /// Base directory for cache storage
42
    pub cache_root: PathBuf,
43
}
44
45
impl Default for DirectoryCacheConfig {
46
0
    fn default() -> Self {
47
0
        Self {
48
0
            max_entries: 1000,
49
0
            max_size_bytes: 10 * 1024 * 1024 * 1024, // 10 GB
50
0
            cache_root: std::env::temp_dir().join("nativelink_directory_cache"),
51
0
        }
52
0
    }
53
}
54
55
/// Metadata for a cached directory
56
#[derive(Debug, Clone)]
57
struct CachedDirectoryMetadata {
58
    /// Path to the cached directory
59
    path: PathBuf,
60
    /// Size in bytes
61
    size: u64,
62
    /// Last access time for LRU eviction
63
    last_access: SystemTime,
64
    /// Reference count (number of active users)
65
    ref_count: usize,
66
}
67
68
/// High-performance directory cache that uses hardlinks to avoid repeated
69
/// directory reconstruction from the CAS.
70
///
71
/// When actions need input directories, instead of fetching and reconstructing
72
/// files from the CAS each time, we:
73
/// 1. Check if we've already constructed this exact directory (by digest)
74
/// 2. If yes, hardlink the entire tree to the action's workspace
75
/// 3. If no, construct it once and cache for future use
76
///
77
/// This dramatically reduces I/O and improves action startup time.
78
#[derive(Debug)]
79
pub struct DirectoryCache {
80
    /// Configuration
81
    config: DirectoryCacheConfig,
82
    /// Cache mapping digest -> metadata
83
    cache: Arc<RwLock<HashMap<DigestInfo, CachedDirectoryMetadata>>>,
84
    /// Lock for cache construction to prevent stampedes
85
    construction_locks: Arc<Mutex<HashMap<DigestInfo, Arc<Mutex<()>>>>>,
86
    /// CAS store for fetching directories
87
    cas_store: Store,
88
}
89
90
impl DirectoryCache {
91
    /// Creates a new `DirectoryCache`
92
1
    pub async fn new(config: DirectoryCacheConfig, cas_store: Store) -> Result<Self, Error> {
93
        // Ensure cache root exists
94
1
        fs::create_dir_all(&config.cache_root).await.err_tip(|| 
{0
95
0
            format!(
96
0
                "Failed to create cache root: {}",
97
0
                config.cache_root.display()
98
            )
99
0
        })?;
100
101
1
        Ok(Self {
102
1
            config,
103
1
            cache: Arc::new(RwLock::new(HashMap::new())),
104
1
            construction_locks: Arc::new(Mutex::new(HashMap::new())),
105
1
            cas_store,
106
1
        })
107
1
    }
108
109
    /// Gets or creates a directory in the cache, then hardlinks it to the destination
110
    ///
111
    /// # Arguments
112
    /// * `digest` - Digest of the root Directory proto
113
    /// * `dest_path` - Where to hardlink/create the directory
114
    ///
115
    /// # Returns
116
    /// * `Ok(true)` - Cache hit (directory was hardlinked)
117
    /// * `Ok(false)` - Cache miss (directory was constructed)
118
    /// * `Err` - Error during construction or hardlinking
119
2
    pub async fn get_or_create(&self, digest: DigestInfo, dest_path: &Path) -> Result<bool, Error> {
120
        // Fast path: check if already in cache
121
        {
122
2
            let mut cache = self.cache.write().await;
123
2
            if let Some(
metadata1
) = cache.get_mut(&digest) {
  Branch (123:20): [Folded - Ignored]
  Branch (123:20): [True: 1, False: 1]
  Branch (123:20): [True: 0, False: 0]
124
                // Update access time and ref count
125
1
                metadata.last_access = SystemTime::now();
126
1
                metadata.ref_count += 1;
127
128
1
                debug!(
129
                    ?digest,
130
                    path = ?metadata.path,
131
0
                    "Directory cache HIT"
132
                );
133
134
                // Try to hardlink from cache
135
1
                match hardlink_directory_tree(&metadata.path, dest_path).await {
136
                    Ok(()) => {
137
1
                        metadata.ref_count -= 1;
138
1
                        return Ok(true);
139
                    }
140
0
                    Err(e) => {
141
0
                        warn!(
142
                            ?digest,
143
                            error = ?e,
144
0
                            "Failed to hardlink from cache, will reconstruct"
145
                        );
146
0
                        metadata.ref_count -= 1;
147
                        // Fall through to reconstruction
148
                    }
149
                }
150
1
            }
151
        }
152
153
1
        debug!(?digest, 
"Directory cache MISS"0
);
154
155
        // Get or create construction lock to prevent stampede
156
1
        let construction_lock = {
157
1
            let mut locks = self.construction_locks.lock().await;
158
1
            locks
159
1
                .entry(digest)
160
1
                .or_insert_with(|| Arc::new(Mutex::new(())))
161
1
                .clone()
162
        };
163
164
        // Only one task constructs at a time for this digest
165
1
        let _guard = construction_lock.lock().await;
166
167
        // Check again in case another task just constructed it
168
        {
169
1
            let cache = self.cache.read().await;
170
1
            if let Some(
metadata0
) = cache.get(&digest) {
  Branch (170:20): [Folded - Ignored]
  Branch (170:20): [True: 0, False: 1]
  Branch (170:20): [True: 0, False: 0]
171
0
                return match hardlink_directory_tree(&metadata.path, dest_path).await {
172
0
                    Ok(()) => Ok(true),
173
0
                    Err(e) => {
174
0
                        warn!(
175
                            ?digest,
176
                            error = ?e,
177
0
                            "Failed to hardlink after construction"
178
                        );
179
                        // Construct directly at dest_path
180
0
                        self.construct_directory(digest, dest_path).await?;
181
0
                        Ok(false)
182
                    }
183
                };
184
1
            }
185
        }
186
187
        // Construct the directory in cache
188
1
        let cache_path = self.get_cache_path(&digest);
189
1
        self.construct_directory(digest, &cache_path).await
?0
;
190
191
        // Make it read-only to prevent modifications
192
1
        set_readonly_recursive(&cache_path)
193
1
            .await
194
1
            .err_tip(|| "Failed to set cache directory to readonly")
?0
;
195
196
        // Calculate size
197
1
        let size = nativelink_util::fs_util::calculate_directory_size(&cache_path)
198
1
            .await
199
1
            .err_tip(|| "Failed to calculate directory size")
?0
;
200
201
        // Add to cache
202
        {
203
1
            let mut cache = self.cache.write().await;
204
205
            // Evict if necessary
206
1
            self.evict_if_needed(size, &mut cache).await
?0
;
207
208
1
            cache.insert(
209
1
                digest,
210
1
                CachedDirectoryMetadata {
211
1
                    path: cache_path.clone(),
212
1
                    size,
213
1
                    last_access: SystemTime::now(),
214
1
                    ref_count: 0,
215
1
                },
216
1
            );
217
        }
218
219
        // Hardlink to destination
220
1
        hardlink_directory_tree(&cache_path, dest_path)
221
1
            .await
222
1
            .err_tip(|| "Failed to hardlink newly cached directory")
?0
;
223
224
1
        Ok(false)
225
2
    }
226
227
    /// Constructs a directory from the CAS at the given path
228
1
    fn construct_directory<'a>(
229
1
        &'a self,
230
1
        digest: DigestInfo,
231
1
        dest_path: &'a Path,
232
1
    ) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'a>> {
233
1
        Box::pin(async move {
234
1
            debug!(?digest, ?dest_path, 
"Constructing directory"0
);
235
236
            // Fetch the Directory proto
237
1
            let directory: ProtoDirectory = get_and_decode_digest(&self.cas_store, digest.into())
238
1
                .await
239
1
                .err_tip(|| format!(
"Failed to fetch directory digest: {digest:?}"0
))
?0
;
240
241
            // Create the destination directory
242
1
            fs::create_dir_all(dest_path)
243
1
                .await
244
1
                .err_tip(|| format!(
"Failed to create directory: {}"0
,
dest_path0
.
display0
()))
?0
;
245
246
            // Process files
247
2
            for 
file1
in &directory.files {
248
1
                self.create_file(dest_path, file).await
?0
;
249
            }
250
251
            // Process subdirectories recursively
252
1
            for 
dir_node0
in &directory.directories {
253
0
                self.create_subdirectory(dest_path, dir_node).await?;
254
            }
255
256
            // Process symlinks
257
1
            for 
symlink0
in &directory.symlinks {
258
0
                self.create_symlink(dest_path, symlink).await?;
259
            }
260
261
1
            Ok(())
262
1
        })
263
1
    }
264
265
    /// Creates a file from a `FileNode`
266
1
    async fn create_file(&self, parent: &Path, file_node: &FileNode) -> Result<(), Error> {
267
1
        let file_path = parent.join(&file_node.name);
268
1
        let digest = DigestInfo::try_from(
269
1
            file_node
270
1
                .digest
271
1
                .clone()
272
1
                .ok_or_else(|| make_err!(
Code::InvalidArgument0
, "File node missing digest"))
?0
,
273
        )
274
1
        .err_tip(|| "Invalid file digest")
?0
;
275
276
1
        trace!(?file_path, ?digest, 
"Creating file"0
);
277
278
        // Fetch file content from CAS
279
1
        let data = self
280
1
            .cas_store
281
1
            .get_part_unchunked(StoreKey::Digest(digest), 0, None)
282
1
            .await
283
1
            .err_tip(|| format!(
"Failed to fetch file: {}"0
,
file_path.display()0
))
?0
;
284
285
        // Write to disk
286
1
        fs::write(&file_path, data.as_ref())
287
1
            .await
288
1
            .err_tip(|| format!(
"Failed to write file: {}"0
,
file_path.display()0
))
?0
;
289
290
        // Set permissions
291
        #[cfg(unix)]
292
1
        if file_node.is_executable {
  Branch (292:12): [True: 0, False: 0]
  Branch (292:12): [True: 0, False: 1]
293
            use std::os::unix::fs::PermissionsExt;
294
0
            let mut perms = fs::metadata(&file_path)
295
0
                .await
296
0
                .err_tip(|| "Failed to get file metadata")?
297
0
                .permissions();
298
0
            perms.set_mode(0o755);
299
0
            fs::set_permissions(&file_path, perms)
300
0
                .await
301
0
                .err_tip(|| "Failed to set file permissions")?;
302
1
        }
303
304
1
        Ok(())
305
1
    }
306
307
    /// Creates a subdirectory from a `DirectoryNode`
308
0
    async fn create_subdirectory(
309
0
        &self,
310
0
        parent: &Path,
311
0
        dir_node: &DirectoryNode,
312
0
    ) -> Result<(), Error> {
313
0
        let dir_path = parent.join(&dir_node.name);
314
0
        let digest =
315
0
            DigestInfo::try_from(dir_node.digest.clone().ok_or_else(|| {
316
0
                make_err!(Code::InvalidArgument, "Directory node missing digest")
317
0
            })?)
318
0
            .err_tip(|| "Invalid directory digest")?;
319
320
0
        trace!(?dir_path, ?digest, "Creating subdirectory");
321
322
        // Recursively construct subdirectory
323
0
        self.construct_directory(digest, &dir_path).await
324
0
    }
325
326
    /// Creates a symlink from a `SymlinkNode`
327
0
    async fn create_symlink(&self, parent: &Path, symlink: &SymlinkNode) -> Result<(), Error> {
328
0
        let link_path = parent.join(&symlink.name);
329
0
        let target = Path::new(&symlink.target);
330
331
0
        trace!(?link_path, ?target, "Creating symlink");
332
333
        #[cfg(unix)]
334
0
        fs::symlink(&target, &link_path)
335
0
            .await
336
0
            .err_tip(|| format!("Failed to create symlink: {}", link_path.display()))?;
337
338
        #[cfg(windows)]
339
        {
340
            // On Windows, we need to know if target is a directory
341
            // For now, assume files (can be improved later)
342
            fs::symlink_file(&target, &link_path)
343
                .await
344
                .err_tip(|| format!("Failed to create symlink: {}", link_path.display()))?;
345
        }
346
347
0
        Ok(())
348
0
    }
349
350
    /// Evicts entries if cache is too full
351
1
    async fn evict_if_needed(
352
1
        &self,
353
1
        incoming_size: u64,
354
1
        cache: &mut HashMap<DigestInfo, CachedDirectoryMetadata>,
355
1
    ) -> Result<(), Error> {
356
        // Check entry count
357
1
        while cache.len() >= self.config.max_entries {
  Branch (357:15): [Folded - Ignored]
  Branch (357:15): [True: 0, False: 1]
  Branch (357:15): [True: 0, False: 0]
358
0
            self.evict_lru(cache).await?;
359
        }
360
361
        // Check total size
362
1
        if self.config.max_size_bytes > 0 {
  Branch (362:12): [Folded - Ignored]
  Branch (362:12): [True: 1, False: 0]
  Branch (362:12): [True: 0, False: 0]
363
1
            let current_size: u64 = cache.values().map(|m| m.size).sum();
364
1
            let mut size_after = current_size + incoming_size;
365
366
1
            while size_after > self.config.max_size_bytes {
  Branch (366:19): [Folded - Ignored]
  Branch (366:19): [True: 0, False: 1]
  Branch (366:19): [True: 0, False: 0]
367
0
                let evicted_size = self.evict_lru(cache).await?;
368
0
                size_after -= evicted_size;
369
            }
370
0
        }
371
372
1
        Ok(())
373
1
    }
374
375
    /// Evicts the least recently used entry
376
0
    async fn evict_lru(
377
0
        &self,
378
0
        cache: &mut HashMap<DigestInfo, CachedDirectoryMetadata>,
379
0
    ) -> Result<u64, Error> {
380
        // Find LRU entry that isn't currently in use
381
0
        let to_evict = cache
382
0
            .iter()
383
0
            .filter(|(_, m)| m.ref_count == 0)
384
0
            .min_by_key(|(_, m)| m.last_access)
385
0
            .map(|(digest, _)| *digest);
386
387
0
        if let Some(digest) = to_evict {
  Branch (387:16): [Folded - Ignored]
  Branch (387:16): [True: 0, False: 0]
  Branch (387:16): [True: 0, False: 0]
388
0
            if let Some(metadata) = cache.remove(&digest) {
  Branch (388:20): [Folded - Ignored]
  Branch (388:20): [True: 0, False: 0]
  Branch (388:20): [True: 0, False: 0]
389
0
                debug!(?digest, size = metadata.size, "Evicting cached directory");
390
391
                // Remove from disk
392
0
                if let Err(e) = fs::remove_dir_all(&metadata.path).await {
  Branch (392:24): [Folded - Ignored]
  Branch (392:24): [True: 0, False: 0]
  Branch (392:24): [True: 0, False: 0]
393
0
                    warn!(
394
                        ?digest,
395
                        path = ?metadata.path,
396
                        error = ?e,
397
0
                        "Failed to remove evicted directory from disk"
398
                    );
399
0
                }
400
401
0
                return Ok(metadata.size);
402
0
            }
403
0
        }
404
405
0
        Ok(0)
406
0
    }
407
408
    /// Gets the cache path for a digest
409
1
    fn get_cache_path(&self, digest: &DigestInfo) -> PathBuf {
410
1
        self.config.cache_root.join(format!("{digest}"))
411
1
    }
412
413
    /// Returns cache statistics
414
1
    pub async fn stats(&self) -> CacheStats {
415
1
        let cache = self.cache.read().await;
416
1
        let total_size: u64 = cache.values().map(|m| m.size).sum();
417
1
        let in_use = cache.values().filter(|m| m.ref_count > 0).count();
418
419
1
        CacheStats {
420
1
            entries: cache.len(),
421
1
            total_size_bytes: total_size,
422
1
            in_use_entries: in_use,
423
1
        }
424
1
    }
425
}
426
427
/// Statistics about the directory cache
428
#[derive(Debug, Clone, Copy)]
429
pub struct CacheStats {
430
    pub entries: usize,
431
    pub total_size_bytes: u64,
432
    pub in_use_entries: usize,
433
}
434
435
#[cfg(test)]
436
mod tests {
437
    use nativelink_store::memory_store::MemoryStore;
438
    use nativelink_util::common::DigestInfo;
439
    use nativelink_util::store_trait::StoreLike;
440
    use prost::Message;
441
    use tempfile::TempDir;
442
443
    use super::*;
444
445
1
    async fn setup_test_store() -> (Store, DigestInfo) {
446
1
        let store = Store::new(MemoryStore::new(&Default::default()));
447
448
        // Create a simple directory structure
449
1
        let file_content = b"Hello, World!";
450
        // SHA256 hash of "Hello, World!"
451
1
        let file_digest = DigestInfo::try_new(
452
1
            "dffd6021bb2bd5b0af676290809ec3a53191dd81c7f70a4b28688a362182986f",
453
            13,
454
        )
455
1
        .unwrap();
456
457
        // Upload file
458
1
        store
459
1
            .as_store_driver_pin()
460
1
            .update_oneshot(file_digest.into(), file_content.to_vec().into())
461
1
            .await
462
1
            .unwrap();
463
464
        // Create Directory proto
465
1
        let directory = ProtoDirectory {
466
1
            files: vec![FileNode {
467
1
                name: "test.txt".to_string(),
468
1
                digest: Some(file_digest.into()),
469
1
                is_executable: false,
470
1
                ..Default::default()
471
1
            }],
472
1
            directories: vec![],
473
1
            symlinks: vec![],
474
1
            ..Default::default()
475
1
        };
476
477
        // Encode and upload directory
478
1
        let mut dir_data = Vec::new();
479
1
        directory.encode(&mut dir_data).unwrap();
480
        // Use a fixed hash for the directory
481
1
        let dir_digest = DigestInfo::try_new(
482
1
            "1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef",
483
1
            dir_data.len() as i64,
484
        )
485
1
        .unwrap();
486
487
1
        store
488
1
            .as_store_driver_pin()
489
1
            .update_oneshot(dir_digest.into(), dir_data.into())
490
1
            .await
491
1
            .unwrap();
492
493
1
        (store, dir_digest)
494
1
    }
495
496
    #[tokio::test]
497
1
    async fn test_directory_cache_basic() -> Result<(), Error> {
498
1
        let temp_dir = TempDir::new().unwrap();
499
1
        let cache_root = temp_dir.path().join("cache");
500
1
        let (store, dir_digest) = setup_test_store().await;
501
502
1
        let config = DirectoryCacheConfig {
503
1
            max_entries: 10,
504
1
            max_size_bytes: 1024 * 1024,
505
1
            cache_root,
506
1
        };
507
508
1
        let cache = DirectoryCache::new(config, store).await
?0
;
509
510
        // First access - cache miss
511
1
        let dest1 = temp_dir.path().join("dest1");
512
1
        let hit = cache.get_or_create(dir_digest, &dest1).await
?0
;
513
1
        assert!(!hit, 
"First access should be cache miss"0
);
514
1
        assert!(dest1.join("test.txt").exists());
515
516
        // Second access - cache hit
517
1
        let dest2 = temp_dir.path().join("dest2");
518
1
        let hit = cache.get_or_create(dir_digest, &dest2).await
?0
;
519
1
        assert!(hit, 
"Second access should be cache hit"0
);
520
1
        assert!(dest2.join("test.txt").exists());
521
522
        // Verify stats
523
1
        let stats = cache.stats().await;
524
1
        assert_eq!(stats.entries, 1);
525
526
2
        Ok(())
527
1
    }
528
}