Coverage Report

Created: 2025-10-15 15:07

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/build/source/nativelink-store/src/ref_store.rs
Line
Count
Source
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    See LICENSE file for details
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use core::cell::UnsafeCell;
16
use core::pin::Pin;
17
use std::sync::{Arc, Mutex, Weak};
18
19
use async_trait::async_trait;
20
use nativelink_config::stores::RefSpec;
21
use nativelink_error::{Code, Error, ResultExt, make_err, make_input_err};
22
use nativelink_metric::MetricsComponent;
23
use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf};
24
use nativelink_util::health_utils::{HealthStatusIndicator, default_health_status_indicator};
25
use nativelink_util::store_trait::{
26
    RemoveItemCallback, Store, StoreDriver, StoreKey, StoreLike, UploadSizeInfo,
27
};
28
use tracing::error;
29
30
use crate::store_manager::StoreManager;
31
32
#[repr(C, align(8))]
33
#[derive(Debug)]
34
struct AlignedStoreCell(UnsafeCell<Option<Store>>);
35
36
#[derive(Debug)]
37
struct StoreReference {
38
    cell: AlignedStoreCell,
39
    mux: Mutex<()>,
40
}
41
42
unsafe impl Sync for StoreReference {}
43
44
#[derive(Debug, MetricsComponent)]
45
pub struct RefStore {
46
    #[metric(help = "The store we are referencing")]
47
    name: String,
48
    store_manager: Weak<StoreManager>,
49
    inner: StoreReference,
50
    remove_callbacks: Mutex<Vec<Arc<Box<dyn RemoveItemCallback>>>>,
51
}
52
53
impl RefStore {
54
5
    pub fn new(spec: &RefSpec, store_manager: Weak<StoreManager>) -> Arc<Self> {
55
5
        Arc::new(Self {
56
5
            name: spec.name.clone(),
57
5
            store_manager,
58
5
            inner: StoreReference {
59
5
                mux: Mutex::new(()),
60
5
                cell: AlignedStoreCell(UnsafeCell::new(None)),
61
5
            },
62
5
            remove_callbacks: Mutex::new(vec![]),
63
5
        })
64
5
    }
65
66
    // This will get the store or populate it if needed. It is designed to be quite fast on the
67
    // common path, but slow on the uncommon path. It does use some unsafe functions because we
68
    // wanted it to be fast. It is technically possible on some platforms for this function to
69
    // create a data race here is the reason I do not believe it is an issue:
70
    // 1. It would only happen on the very first call of the function (after first call we are safe)
71
    // 2. It should only happen on platforms that are < 64 bit address space
72
    // 3. It is likely that the internals of how Option work protect us anyway.
73
    #[inline]
74
5
    fn get_store(&self) -> Result<&Store, Error> {
75
5
        let ref_store = self.inner.cell.0.get();
76
        unsafe {
77
5
            if let Some(
ref store0
) = *ref_store {
  Branch (77:20): [True: 0, False: 5]
  Branch (77:20): [Folded - Ignored]
78
0
                return Ok(store);
79
5
            }
80
        }
81
        // This should protect us against multiple writers writing the same location at the same
82
        // time.
83
5
        let _lock = self.inner.mux.lock().map_err(|e| 
{0
84
0
            make_err!(
85
0
                Code::Internal,
86
                "Failed to lock mutex in ref_store : {:?}",
87
                e
88
            )
89
0
        })?;
90
5
        let store_manager = self
91
5
            .store_manager
92
5
            .upgrade()
93
5
            .err_tip(|| "Store manager is gone")
?0
;
94
5
        if let Some(store) = store_manager.get_store(&self.name) {
  Branch (94:16): [True: 5, False: 0]
  Branch (94:16): [Folded - Ignored]
95
5
            for 
callback0
in self.remove_callbacks.lock().unwrap().iter() {
96
0
                store.register_remove_callback(callback)?;
97
            }
98
            unsafe {
99
5
                *ref_store = Some(store);
100
5
                return Ok((*ref_store).as_ref().unwrap());
101
            }
102
0
        }
103
0
        Err(make_input_err!(
104
0
            "Failed to find store '{}' in StoreManager in RefStore",
105
0
            self.name
106
0
        ))
107
5
    }
108
}
109
110
#[async_trait]
111
impl StoreDriver for RefStore {
112
    async fn has_with_results(
113
        self: Pin<&Self>,
114
        keys: &[StoreKey<'_>],
115
        results: &mut [Option<u64>],
116
1
    ) -> Result<(), Error> {
117
        self.get_store()?.has_with_results(keys, results).await
118
1
    }
119
120
    async fn update(
121
        self: Pin<&Self>,
122
        key: StoreKey<'_>,
123
        reader: DropCloserReadHalf,
124
        size_info: UploadSizeInfo,
125
1
    ) -> Result<(), Error> {
126
        self.get_store()?.update(key, reader, size_info).await
127
1
    }
128
129
    async fn get_part(
130
        self: Pin<&Self>,
131
        key: StoreKey<'_>,
132
        writer: &mut DropCloserWriteHalf,
133
        offset: u64,
134
        length: Option<u64>,
135
1
    ) -> Result<(), Error> {
136
        self.get_store()?
137
            .get_part(key, writer, offset, length)
138
            .await
139
1
    }
140
141
2
    fn inner_store(&self, key: Option<StoreKey>) -> &'_ dyn StoreDriver {
142
2
        match self.get_store() {
143
2
            Ok(store) => store.inner_store(key),
144
0
            Err(err) => {
145
0
                error!(?key, ?err, "Failed to get store for key",);
146
0
                self
147
            }
148
        }
149
2
    }
150
151
0
    fn as_any<'a>(&'a self) -> &'a (dyn core::any::Any + Sync + Send + 'static) {
152
0
        self
153
0
    }
154
155
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Sync + Send + 'static> {
156
0
        self
157
0
    }
158
159
0
    fn register_remove_callback(
160
0
        self: Arc<Self>,
161
0
        callback: &Arc<Box<dyn RemoveItemCallback>>,
162
0
    ) -> Result<(), Error> {
163
0
        self.remove_callbacks.lock()?.push(callback.clone());
164
0
        let ref_store = self.inner.cell.0.get();
165
        unsafe {
166
0
            if let Some(ref store) = *ref_store {
  Branch (166:20): [True: 0, False: 0]
  Branch (166:20): [Folded - Ignored]
167
0
                store.register_remove_callback(callback)?;
168
0
            }
169
        };
170
0
        Ok(())
171
0
    }
172
}
173
174
default_health_status_indicator!(RefStore);