Coverage Report

Created: 2024-11-20 10:13

/build/source/nativelink-store/src/ref_store.rs
Line
Count
Source (jump to first uncovered line)
1
// Copyright 2024 The NativeLink Authors. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//    http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
use std::cell::UnsafeCell;
16
use std::pin::Pin;
17
use std::sync::{Arc, Mutex, Weak};
18
19
use async_trait::async_trait;
20
use nativelink_config::stores::RefSpec;
21
use nativelink_error::{make_err, make_input_err, Code, Error, ResultExt};
22
use nativelink_metric::MetricsComponent;
23
use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf};
24
use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator};
25
use nativelink_util::store_trait::{Store, StoreDriver, StoreKey, StoreLike, UploadSizeInfo};
26
use tracing::{event, Level};
27
28
use crate::store_manager::StoreManager;
29
30
#[repr(C, align(8))]
31
struct AlignedStoreCell(UnsafeCell<Option<Store>>);
32
33
struct StoreReference {
34
    cell: AlignedStoreCell,
35
    mux: Mutex<()>,
36
}
37
38
unsafe impl Sync for StoreReference {}
39
40
0
#[derive(MetricsComponent)]
41
pub struct RefStore {
42
    #[metric(help = "The store we are referencing")]
43
    ref_store_name: String,
44
    store_manager: Weak<StoreManager>,
45
    ref_store: StoreReference,
46
}
47
48
impl RefStore {
49
5
    pub fn new(spec: &RefSpec, store_manager: Weak<StoreManager>) -> Arc<Self> {
50
5
        Arc::new(RefStore {
51
5
            ref_store_name: spec.name.clone(),
52
5
            store_manager,
53
5
            ref_store: StoreReference {
54
5
                mux: Mutex::new(()),
55
5
                cell: AlignedStoreCell(UnsafeCell::new(None)),
56
5
            },
57
5
        })
58
5
    }
59
60
    // This will get the store or populate it if needed. It is designed to be quite fast on the
61
    // common path, but slow on the uncommon path. It does use some unsafe functions because we
62
    // wanted it to be fast. It is technically possible on some platforms for this function to
63
    // create a data race here is the reason I do not believe it is an issue:
64
    // 1. It would only happen on the very first call of the function (after first call we are safe)
65
    // 2. It should only happen on platforms that are < 64 bit address space
66
    // 3. It is likely that the internals of how Option work protect us anyway.
67
    #[inline]
68
5
    fn get_store(&self) -> Result<&Store, Error> {
69
5
        let ref_store = self.ref_store.cell.0.get();
70
        unsafe {
71
5
            if let Some(
ref store0
) = *ref_store {
  Branch (71:20): [True: 0, False: 5]
  Branch (71:20): [Folded - Ignored]
72
0
                return Ok(store);
73
5
            }
74
        }
75
        // This should protect us against multiple writers writing the same location at the same
76
        // time.
77
5
        let _lock = self.ref_store.mux.lock().map_err(|e| {
78
0
            make_err!(
79
0
                Code::Internal,
80
0
                "Failed to lock mutex in ref_store : {:?}",
81
0
                e
82
0
            )
83
5
        })
?0
;
84
5
        let store_manager = self
85
5
            .store_manager
86
5
            .upgrade()
87
5
            .err_tip(|| 
"Store manager is gone"0
)
?0
;
88
5
        if let Some(store) = store_manager.get_store(&self.ref_store_name) {
  Branch (88:16): [True: 5, False: 0]
  Branch (88:16): [Folded - Ignored]
89
            unsafe {
90
5
                *ref_store = Some(store);
91
5
                return Ok((*ref_store).as_ref().unwrap());
92
            }
93
0
        }
94
0
        Err(make_input_err!(
95
0
            "Failed to find store '{}' in StoreManager in RefStore",
96
0
            self.ref_store_name
97
0
        ))
98
5
    }
99
}
100
101
#[async_trait]
102
impl StoreDriver for RefStore {
103
    async fn has_with_results(
104
        self: Pin<&Self>,
105
        keys: &[StoreKey<'_>],
106
        results: &mut [Option<u64>],
107
1
    ) -> Result<(), Error> {
108
1
        self.get_store()
?0
.has_with_results(keys, results).
await0
109
2
    }
110
111
    async fn update(
112
        self: Pin<&Self>,
113
        key: StoreKey<'_>,
114
        reader: DropCloserReadHalf,
115
        size_info: UploadSizeInfo,
116
1
    ) -> Result<(), Error> {
117
1
        self.get_store()
?0
.update(key, reader, size_info).
await0
118
2
    }
119
120
    async fn get_part(
121
        self: Pin<&Self>,
122
        key: StoreKey<'_>,
123
        writer: &mut DropCloserWriteHalf,
124
        offset: u64,
125
        length: Option<u64>,
126
1
    ) -> Result<(), Error> {
127
1
        self.get_store()
?0
128
1
            .get_part(key, writer, offset, length)
129
0
            .await
130
2
    }
131
132
2
    fn inner_store(&self, key: Option<StoreKey>) -> &'_ dyn StoreDriver {
133
2
        match self.get_store() {
134
2
            Ok(store) => store.inner_store(key),
135
0
            Err(err) => {
136
0
                event!(Level::ERROR, ?key, ?err, "Failed to get store for key",);
137
0
                self
138
            }
139
        }
140
2
    }
141
142
0
    fn as_any<'a>(&'a self) -> &'a (dyn std::any::Any + Sync + Send + 'static) {
143
0
        self
144
0
    }
145
146
0
    fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Sync + Send + 'static> {
147
0
        self
148
0
    }
149
}
150
151
default_health_status_indicator!(RefStore);