/build/source/nativelink-store/src/memory_store.rs
Line | Count | Source |
1 | | // Copyright 2024 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // See LICENSE file for details |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use core::any::Any; |
16 | | use core::borrow::Borrow; |
17 | | use core::fmt::Debug; |
18 | | use core::ops::Bound; |
19 | | use core::pin::Pin; |
20 | | use std::sync::Arc; |
21 | | use std::time::SystemTime; |
22 | | |
23 | | use async_trait::async_trait; |
24 | | use bytes::{Bytes, BytesMut}; |
25 | | use nativelink_config::stores::MemorySpec; |
26 | | use nativelink_error::{Code, Error, ResultExt}; |
27 | | use nativelink_metric::MetricsComponent; |
28 | | use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; |
29 | | use nativelink_util::evicting_map::{EvictingMap, LenEntry}; |
30 | | use nativelink_util::health_utils::{ |
31 | | HealthRegistryBuilder, HealthStatusIndicator, default_health_status_indicator, |
32 | | }; |
33 | | use nativelink_util::store_trait::{ |
34 | | RemoveItemCallback, StoreDriver, StoreKey, StoreKeyBorrow, UploadSizeInfo, |
35 | | }; |
36 | | |
37 | | use crate::callback_utils::RemoveItemCallbackHolder; |
38 | | use crate::cas_utils::is_zero_digest; |
39 | | |
40 | | #[derive(Clone)] |
41 | | pub struct BytesWrapper(Bytes); |
42 | | |
43 | | impl Debug for BytesWrapper { |
44 | 0 | fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { |
45 | 0 | f.write_str("BytesWrapper { -- Binary data -- }") |
46 | 0 | } |
47 | | } |
48 | | |
49 | | impl LenEntry for BytesWrapper { |
50 | | #[inline] |
51 | 10.2k | fn len(&self) -> u64 { |
52 | 10.2k | Bytes::len(&self.0) as u64 |
53 | 10.2k | } |
54 | | |
55 | | #[inline] |
56 | 0 | fn is_empty(&self) -> bool { |
57 | 0 | Bytes::is_empty(&self.0) |
58 | 0 | } |
59 | | } |
60 | | |
61 | | #[derive(Debug, MetricsComponent)] |
62 | | pub struct MemoryStore { |
63 | | #[metric(group = "evicting_map")] |
64 | | evicting_map: EvictingMap< |
65 | | StoreKeyBorrow, |
66 | | StoreKey<'static>, |
67 | | BytesWrapper, |
68 | | SystemTime, |
69 | | RemoveItemCallbackHolder, |
70 | | >, |
71 | | } |
72 | | |
73 | | impl MemoryStore { |
74 | 324 | pub fn new(spec: &MemorySpec) -> Arc<Self> { |
75 | 324 | let empty_policy = nativelink_config::stores::EvictionPolicy::default(); |
76 | 324 | let eviction_policy = spec.eviction_policy.as_ref().unwrap_or(&empty_policy); |
77 | 324 | Arc::new(Self { |
78 | 324 | evicting_map: EvictingMap::new(eviction_policy, SystemTime::now()), |
79 | 324 | }) |
80 | 324 | } |
81 | | |
82 | | /// Returns the number of key-value pairs that are currently in the the cache. |
83 | | /// Function is not for production code paths. |
84 | 30 | pub fn len_for_test(&self) -> usize { |
85 | 30 | self.evicting_map.len_for_test() |
86 | 30 | } |
87 | | |
88 | 8 | pub async fn remove_entry(&self, key: StoreKey<'_>) -> bool { |
89 | 8 | self.evicting_map.remove(&key.into_owned()).await |
90 | 8 | } |
91 | | } |
92 | | |
93 | | #[async_trait] |
94 | | impl StoreDriver for MemoryStore { |
95 | | async fn has_with_results( |
96 | | self: Pin<&Self>, |
97 | | keys: &[StoreKey<'_>], |
98 | | results: &mut [Option<u64>], |
99 | 179 | ) -> Result<(), Error> { |
100 | | let own_keys = keys |
101 | | .iter() |
102 | 213 | .map(|sk| sk.borrow().into_owned()) |
103 | | .collect::<Vec<_>>(); |
104 | | self.evicting_map |
105 | | .sizes_for_keys(own_keys.iter(), results, false /* peek */) |
106 | | .await; |
107 | | // We need to do a special pass to ensure our zero digest exist. |
108 | | keys.iter() |
109 | | .zip(results.iter_mut()) |
110 | 213 | .for_each(|(key, result)| { |
111 | 213 | if is_zero_digest(key.borrow()) { Branch (111:20): [True: 3, False: 210]
Branch (111:20): [Folded - Ignored]
|
112 | 3 | *result = Some(0); |
113 | 210 | } |
114 | 213 | }); |
115 | | Ok(()) |
116 | 179 | } |
117 | | |
118 | | async fn list( |
119 | | self: Pin<&Self>, |
120 | | range: (Bound<StoreKey<'_>>, Bound<StoreKey<'_>>), |
121 | | handler: &mut (dyn for<'a> FnMut(&'a StoreKey) -> bool + Send + Sync + '_), |
122 | 7 | ) -> Result<u64, Error> { |
123 | | let range = ( |
124 | | range.0.map(StoreKey::into_owned), |
125 | | range.1.map(StoreKey::into_owned), |
126 | | ); |
127 | | let iterations = self |
128 | | .evicting_map |
129 | 13 | .range(range, move |key, _value| handler(key.borrow())); |
130 | | Ok(iterations) |
131 | 7 | } |
132 | | |
133 | | async fn update( |
134 | | self: Pin<&Self>, |
135 | | key: StoreKey<'_>, |
136 | | mut reader: DropCloserReadHalf, |
137 | | _size_info: UploadSizeInfo, |
138 | 5.75k | ) -> Result<(), Error> { |
139 | | // Internally Bytes might hold a reference to more data than just our data. To prevent |
140 | | // this potential case, we make a full copy of our data for long-term storage. |
141 | | let final_buffer = { |
142 | | let buffer = reader |
143 | | .consume(None) |
144 | | .await |
145 | | .err_tip(|| "Failed to collect all bytes from reader in memory_store::update")?; |
146 | | let mut new_buffer = BytesMut::with_capacity(buffer.len()); |
147 | | new_buffer.extend_from_slice(&buffer[..]); |
148 | | new_buffer.freeze() |
149 | | }; |
150 | | |
151 | | self.evicting_map |
152 | | .insert(key.into_owned().into(), BytesWrapper(final_buffer)) |
153 | | .await; |
154 | | Ok(()) |
155 | 5.75k | } |
156 | | |
157 | | async fn get_part( |
158 | | self: Pin<&Self>, |
159 | | key: StoreKey<'_>, |
160 | | writer: &mut DropCloserWriteHalf, |
161 | | offset: u64, |
162 | | length: Option<u64>, |
163 | 4.37k | ) -> Result<(), Error> { |
164 | | let offset = usize::try_from(offset).err_tip(|| "Could not convert offset to usize")?; |
165 | | let length = length |
166 | 53 | .map(|v| usize::try_from(v).err_tip(|| "Could not convert length to usize")) |
167 | | .transpose()?; |
168 | | |
169 | | let owned_key = key.into_owned(); |
170 | | if is_zero_digest(owned_key.clone()) { |
171 | | writer |
172 | | .send_eof() |
173 | | .err_tip(|| "Failed to send zero EOF in filesystem store get_part")?; |
174 | | return Ok(()); |
175 | | } |
176 | | |
177 | | let value = self |
178 | | .evicting_map |
179 | | .get(&owned_key) |
180 | | .await |
181 | 7 | .err_tip_with_code(|_| (Code::NotFound, format!("Key {owned_key:?} not found")))?; |
182 | | let default_len = usize::try_from(value.len()) |
183 | | .err_tip(|| "Could not convert value.len() to usize")? |
184 | | .saturating_sub(offset); |
185 | | let length = length.unwrap_or(default_len).min(default_len); |
186 | | if length > 0 { |
187 | | writer |
188 | | .send(value.0.slice(offset..(offset + length))) |
189 | | .await |
190 | | .err_tip(|| "Failed to write data in memory store")?; |
191 | | } |
192 | | writer |
193 | | .send_eof() |
194 | | .err_tip(|| "Failed to write EOF in memory store get_part")?; |
195 | | Ok(()) |
196 | 4.37k | } |
197 | | |
198 | 136 | fn inner_store(&self, _digest: Option<StoreKey>) -> &dyn StoreDriver { |
199 | 136 | self |
200 | 136 | } |
201 | | |
202 | 39 | fn as_any<'a>(&'a self) -> &'a (dyn Any + Sync + Send + 'static) { |
203 | 39 | self |
204 | 39 | } |
205 | | |
206 | 0 | fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Sync + Send + 'static> { |
207 | 0 | self |
208 | 0 | } |
209 | | |
210 | 0 | fn register_health(self: Arc<Self>, registry: &mut HealthRegistryBuilder) { |
211 | 0 | registry.register_indicator(self); |
212 | 0 | } |
213 | | |
214 | 5 | fn register_remove_callback( |
215 | 5 | self: Arc<Self>, |
216 | 5 | callback: Arc<dyn RemoveItemCallback>, |
217 | 5 | ) -> Result<(), Error> { |
218 | 5 | self.evicting_map |
219 | 5 | .add_remove_callback(RemoveItemCallbackHolder::new(callback)); |
220 | 5 | Ok(()) |
221 | 5 | } |
222 | | } |
223 | | |
224 | | default_health_status_indicator!(MemoryStore); |