Coverage Report

Created: 2025-12-15 21:14

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-store/src/ref_store.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::cell::UnsafeCell;
16
use core::pin::Pin;
17
use std::sync::{Arc, Weak};
18
19
use async_trait::async_trait;
20
use nativelink_config::stores::RefSpec;
21
use nativelink_error::{Error, ResultExt, make_input_err};
22
use nativelink_metric::MetricsComponent;
23
use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf};
24
use nativelink_util::health_utils::{HealthStatusIndicator, default_health_status_indicator};
25
use nativelink_util::store_trait::{
26
    RemoveItemCallback, Store, StoreDriver, StoreKey, StoreLike, UploadSizeInfo,
27
};
28
use parking_lot::Mutex;
29
use tracing::error;
30
31
use crate::store_manager::StoreManager;
32
33
#[repr(C, align(8))]
34
#[derive(Debug)]
35
struct AlignedStoreCell(UnsafeCell<Option<Store>>);
36
37
#[derive(Debug)]
38
struct StoreReference {
39
    cell: AlignedStoreCell,
40
    mux: Mutex<()>,
41
}
42
43
unsafe impl Sync for StoreReference {}
44
45
#[derive(Debug, MetricsComponent)]
46
pub struct RefStore {
47
    #[metric(help = "The store we are referencing")]
48
    name: String,
49
    store_manager: Weak<StoreManager>,
50
    inner: StoreReference,
51
    remove_callbacks: Mutex<Vec<Arc<dyn RemoveItemCallback>>>,
52
}
53
54
impl RefStore {
55
5
    pub fn new(spec: &RefSpec, store_manager: Weak<StoreManager>) -> Arc<Self> {
56
5
        Arc::new(Self {
57
5
            name: spec.name.clone(),
58
5
            store_manager,
59
5
            inner: StoreReference {
60
5
                mux: Mutex::new(()),
61
5
                cell: AlignedStoreCell(UnsafeCell::new(None)),
62
5
            },
63
5
            remove_callbacks: Mutex::new(vec![]),
64
5
        })
65
5
    }
66
67
    // This will get the store or populate it if needed. It is designed to be quite fast on the
68
    // common path, but slow on the uncommon path. It does use some unsafe functions because we
69
    // wanted it to be fast. It is technically possible on some platforms for this function to
70
    // create a data race here is the reason I do not believe it is an issue:
71
    // 1. It would only happen on the very first call of the function (after first call we are safe)
72
    // 2. It should only happen on platforms that are < 64 bit address space
73
    // 3. It is likely that the internals of how Option work protect us anyway.
74
    #[inline]
75
5
    fn get_store(&self) -> Result<&Store, Error> {
76
5
        let ref_store = self.inner.cell.0.get();
77
        unsafe {
78
5
            if let Some(
ref store0
) = *ref_store {
  Branch (78:20): [True: 0, False: 5]
  Branch (78:20): [Folded - Ignored]
79
0
                return Ok(store);
80
5
            }
81
        }
82
        // This should protect us against multiple writers writing the same location at the same
83
        // time.
84
5
        let _lock = self.inner.mux.lock();
85
5
        let store_manager = self
86
5
            .store_manager
87
5
            .upgrade()
88
5
            .err_tip(|| "Store manager is gone")
?0
;
89
5
        if let Some(store) = store_manager.get_store(&self.name) {
  Branch (89:16): [True: 5, False: 0]
  Branch (89:16): [Folded - Ignored]
90
5
            let remove_callbacks = self.remove_callbacks.lock().clone();
91
5
            for 
callback0
in remove_callbacks {
92
0
                store.register_remove_callback(callback)?;
93
            }
94
            unsafe {
95
5
                *ref_store = Some(store);
96
5
                return Ok((*ref_store).as_ref().unwrap());
97
            }
98
0
        }
99
0
        Err(make_input_err!(
100
0
            "Failed to find store '{}' in StoreManager in RefStore",
101
0
            self.name
102
0
        ))
103
5
    }
104
}
105
106
#[async_trait]
107
impl StoreDriver for RefStore {
108
    async fn has_with_results(
109
        self: Pin<&Self>,
110
        keys: &[StoreKey<'_>],
111
        results: &mut [Option<u64>],
112
1
    ) -> Result<(), Error> {
113
        self.get_store()?.has_with_results(keys, results).await
114
1
    }
115
116
    async fn update(
117
        self: Pin<&Self>,
118
        key: StoreKey<'_>,
119
        reader: DropCloserReadHalf,
120
        size_info: UploadSizeInfo,
121
1
    ) -> Result<(), Error> {
122
        self.get_store()?.update(key, reader, size_info).await
123
1
    }
124
125
    async fn get_part(
126
        self: Pin<&Self>,
127
        key: StoreKey<'_>,
128
        writer: &mut DropCloserWriteHalf,
129
        offset: u64,
130
        length: Option<u64>,
131
1
    ) -> Result<(), Error> {
132
        self.get_store()?
133
            .get_part(key, writer, offset, length)
134
            .await
135
1
    }
136
137
2
    fn inner_store(&self, key: Option<StoreKey>) -> &'_ dyn StoreDriver {
138
2
        match self.get_store() {
139
2
            Ok(store) => store.inner_store(key),
140
0
            Err(err) => {
141
0
                error!(?key, ?err, "Failed to get store for key",);
142
0
                self
143
            }
144
        }
145
2
    }
146
147
0
    fn as_any<'a>(&'a self) -> &'a (dyn core::any::Any + Sync + Send + 'static) {
148
0
        self
149
0
    }
150
151
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Sync + Send + 'static> {
152
0
        self
153
0
    }
154
155
0
    fn register_remove_callback(
156
0
        self: Arc<Self>,
157
0
        callback: Arc<dyn RemoveItemCallback>,
158
0
    ) -> Result<(), Error> {
159
0
        self.remove_callbacks.lock().push(callback.clone());
160
0
        let ref_store = self.inner.cell.0.get();
161
        unsafe {
162
0
            if let Some(ref store) = *ref_store {
  Branch (162:20): [True: 0, False: 0]
  Branch (162:20): [Folded - Ignored]
163
0
                store.register_remove_callback(callback)?;
164
0
            }
165
        }
166
0
        Ok(())
167
0
    }
168
}
169
170
default_health_status_indicator!(RefStore);