Coverage Report

Created: 2024-10-22 12:33

/build/source/nativelink-store/src/ref_store.rs
Line
Count
Source (jump to first uncovered line)
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::cell::UnsafeCell;
16
use std::pin::Pin;
17
use std::sync::{Arc, Mutex, Weak};
18
19
use async_trait::async_trait;
20
use nativelink_error::{make_err, make_input_err, Code, Error, ResultExt};
21
use nativelink_metric::MetricsComponent;
22
use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf};
23
use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator};
24
use nativelink_util::store_trait::{Store, StoreDriver, StoreKey, StoreLike, UploadSizeInfo};
25
use tracing::{event, Level};
26
27
use crate::store_manager::StoreManager;
28
29
#[repr(C, align(8))]
30
struct AlignedStoreCell(UnsafeCell<Option<Store>>);
31
32
struct StoreReference {
33
    cell: AlignedStoreCell,
34
    mux: Mutex<()>,
35
}
36
37
unsafe impl Sync for StoreReference {}
38
39
0
#[derive(MetricsComponent)]
40
pub struct RefStore {
41
    #[metric(help = "The store we are referencing")]
42
    ref_store_name: String,
43
    store_manager: Weak<StoreManager>,
44
    ref_store: StoreReference,
45
}
46
47
impl RefStore {
48
5
    pub fn new(
49
5
        config: &nativelink_config::stores::RefStore,
50
5
        store_manager: Weak<StoreManager>,
51
5
    ) -> Arc<Self> {
52
5
        Arc::new(RefStore {
53
5
            ref_store_name: config.name.clone(),
54
5
            store_manager,
55
5
            ref_store: StoreReference {
56
5
                mux: Mutex::new(()),
57
5
                cell: AlignedStoreCell(UnsafeCell::new(None)),
58
5
            },
59
5
        })
60
5
    }
61
62
    // This will get the store or populate it if needed. It is designed to be quite fast on the
63
    // common path, but slow on the uncommon path. It does use some unsafe functions because we
64
    // wanted it to be fast. It is technically possible on some platforms for this function to
65
    // create a data race here is the reason I do not believe it is an issue:
66
    // 1. It would only happen on the very first call of the function (after first call we are safe)
67
    // 2. It should only happen on platforms that are < 64 bit address space
68
    // 3. It is likely that the internals of how Option work protect us anyway.
69
    #[inline]
70
5
    fn get_store(&self) -> Result<&Store, Error> {
71
5
        let ref_store = self.ref_store.cell.0.get();
72
        unsafe {
73
5
            if let Some(
ref store0
) = *ref_store {
  Branch (73:20): [True: 0, False: 5]
  Branch (73:20): [Folded - Ignored]
74
0
                return Ok(store);
75
5
            }
76
        }
77
        // This should protect us against multiple writers writing the same location at the same
78
        // time.
79
5
        let _lock = self.ref_store.mux.lock().map_err(|e| {
80
0
            make_err!(
81
0
                Code::Internal,
82
0
                "Failed to lock mutex in ref_store : {:?}",
83
0
                e
84
0
            )
85
5
        })
?0
;
86
5
        let store_manager = self
87
5
            .store_manager
88
5
            .upgrade()
89
5
            .err_tip(|| 
"Store manager is gone"0
)
?0
;
90
5
        if let Some(store) = store_manager.get_store(&self.ref_store_name) {
  Branch (90:16): [True: 5, False: 0]
  Branch (90:16): [Folded - Ignored]
91
            unsafe {
92
5
                *ref_store = Some(store);
93
5
                return Ok((*ref_store).as_ref().unwrap());
94
            }
95
0
        }
96
0
        Err(make_input_err!(
97
0
            "Failed to find store '{}' in StoreManager in RefStore",
98
0
            self.ref_store_name
99
0
        ))
100
5
    }
101
}
102
103
#[async_trait]
104
impl StoreDriver for RefStore {
105
    async fn has_with_results(
106
        self: Pin<&Self>,
107
        keys: &[StoreKey<'_>],
108
        results: &mut [Option<u64>],
109
1
    ) -> Result<(), Error> {
110
1
        self.get_store()
?0
.has_with_results(keys, results).
await0
111
2
    }
112
113
    async fn update(
114
        self: Pin<&Self>,
115
        key: StoreKey<'_>,
116
        reader: DropCloserReadHalf,
117
        size_info: UploadSizeInfo,
118
1
    ) -> Result<(), Error> {
119
1
        self.get_store()
?0
.update(key, reader, size_info).
await0
120
2
    }
121
122
    async fn get_part(
123
        self: Pin<&Self>,
124
        key: StoreKey<'_>,
125
        writer: &mut DropCloserWriteHalf,
126
        offset: u64,
127
        length: Option<u64>,
128
1
    ) -> Result<(), Error> {
129
1
        self.get_store()
?0
130
1
            .get_part(key, writer, offset, length)
131
0
            .await
132
2
    }
133
134
2
    fn inner_store(&self, key: Option<StoreKey>) -> &'_ dyn StoreDriver {
135
2
        match self.get_store() {
136
2
            Ok(store) => store.inner_store(key),
137
0
            Err(err) => {
138
0
                event!(Level::ERROR, ?key, ?err, "Failed to get store for key",);
139
0
                self
140
            }
141
        }
142
2
    }
143
144
0
    fn as_any<'a>(&'a self) -> &'a (dyn std::any::Any + Sync + Send + 'static) {
145
0
        self
146
0
    }
147
148
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Sync + Send + 'static> {
149
0
        self
150
0
    }
151
}
152
153
default_health_status_indicator!(RefStore);