/build/source/nativelink-store/src/fast_slow_store.rs
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright 2024 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use std::borrow::BorrowMut; |
16 | | use std::cmp::{max, min}; |
17 | | use std::ops::Range; |
18 | | use std::pin::Pin; |
19 | | use std::sync::atomic::{AtomicU64, Ordering}; |
20 | | use std::sync::{Arc, Weak}; |
21 | | |
22 | | use async_trait::async_trait; |
23 | | use futures::{join, FutureExt}; |
24 | | use nativelink_error::{make_err, Code, Error, ResultExt}; |
25 | | use nativelink_metric::MetricsComponent; |
26 | | use nativelink_util::buf_channel::{ |
27 | | make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf, |
28 | | }; |
29 | | use nativelink_util::fs; |
30 | | use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator}; |
31 | | use nativelink_util::store_trait::{ |
32 | | slow_update_store_with_file, Store, StoreDriver, StoreKey, StoreLike, StoreOptimizations, |
33 | | UploadSizeInfo, |
34 | | }; |
35 | | |
36 | | // TODO(blaise.bruer) This store needs to be evaluated for more efficient memory usage, |
37 | | // there are many copies happening internally. |
38 | | |
39 | | // TODO(blaise.bruer) We should consider copying the data in the background to allow the |
40 | | // client to hang up while the data is buffered. An alternative is to possibly make a |
41 | | // "BufferedStore" that could be placed on the "slow" store that would hang up early |
42 | | // if data is in the buffer. |
43 | 0 | #[derive(MetricsComponent)] |
44 | | pub struct FastSlowStore { |
45 | | #[metric(group = "fast_store")] |
46 | | fast_store: Store, |
47 | | #[metric(group = "slow_store")] |
48 | | slow_store: Store, |
49 | | weak_self: Weak<Self>, |
50 | | #[metric] |
51 | | metrics: FastSlowStoreMetrics, |
52 | | } |
53 | | |
54 | | impl FastSlowStore { |
55 | 25 | pub fn new( |
56 | 25 | _config: &nativelink_config::stores::FastSlowStore, |
57 | 25 | fast_store: Store, |
58 | 25 | slow_store: Store, |
59 | 25 | ) -> Arc<Self> { |
60 | 25 | Arc::new_cyclic(|weak_self| Self { |
61 | 25 | fast_store, |
62 | 25 | slow_store, |
63 | 25 | weak_self: weak_self.clone(), |
64 | 25 | metrics: FastSlowStoreMetrics::default(), |
65 | 25 | }) |
66 | 25 | } |
67 | | |
68 | 0 | pub fn fast_store(&self) -> &Store { |
69 | 0 | &self.fast_store |
70 | 0 | } |
71 | | |
72 | 0 | pub fn slow_store(&self) -> &Store { |
73 | 0 | &self.slow_store |
74 | 0 | } |
75 | | |
76 | 2 | pub fn get_arc(&self) -> Option<Arc<Self>> { |
77 | 2 | self.weak_self.upgrade() |
78 | 2 | } |
79 | | |
80 | | /// Ensure our fast store is populated. This should be kept as a low |
81 | | /// cost function. Since the data itself is shared and not copied it should be fairly |
82 | | /// low cost to just discard the data, but does cost a few mutex locks while |
83 | | /// streaming. |
84 | 4 | pub async fn populate_fast_store(&self, key: StoreKey<'_>) -> Result<(), Error> { |
85 | 4 | let maybe_size_info = self |
86 | 4 | .fast_store |
87 | 4 | .has(key.borrow()) |
88 | 0 | .await |
89 | 4 | .err_tip(|| "While querying in populate_fast_store"0 )?0 ; |
90 | 4 | if maybe_size_info.is_some() { Branch (90:12): [Folded - Ignored]
Branch (90:12): [True: 0, False: 4]
Branch (90:12): [Folded - Ignored]
|
91 | 0 | return Ok(()); |
92 | 4 | } |
93 | 4 | // TODO(blaise.bruer) This is extremely inefficient, since we are just trying |
94 | 4 | // to send the stream to /dev/null. Maybe we could instead make a version of |
95 | 4 | // the stream that can send to the drain more efficiently? |
96 | 4 | let (tx, mut rx) = make_buf_channel_pair(); |
97 | 4 | let drain_fut = async move { |
98 | 28 | while !rx.recv()8 .await?0 .is_empty()8 {}4 Branch (98:19): [Folded - Ignored]
Branch (98:19): [True: 4, False: 4]
Branch (98:19): [Folded - Ignored]
|
99 | 4 | Ok(()) |
100 | 4 | }; |
101 | 4 | let (drain_res, get_res) = join!(drain_fut, StoreDriver::get(Pin::new(self), key, tx)); |
102 | 4 | get_res.err_tip(|| "Failed to populate()"0 ).merge(drain_res) |
103 | 4 | } |
104 | | |
105 | | /// Returns the range of bytes that should be sent given a slice bounds |
106 | | /// offset so the output range maps the received_range.start to 0. |
107 | | // TODO(allada) This should be put into utils, as this logic is used |
108 | | // elsewhere in the code. |
109 | 21 | pub fn calculate_range( |
110 | 21 | received_range: &Range<u64>, |
111 | 21 | send_range: &Range<u64>, |
112 | 21 | ) -> Result<Option<Range<usize>>, Error> { |
113 | 21 | // Protect against subtraction overflow. |
114 | 21 | if received_range.start >= received_range.end { Branch (114:12): [True: 0, False: 21]
Branch (114:12): [Folded - Ignored]
|
115 | 0 | return Ok(None); |
116 | 21 | } |
117 | 21 | |
118 | 21 | let start = max(received_range.start, send_range.start); |
119 | 21 | let end = min(received_range.end, send_range.end); |
120 | 21 | if received_range.contains(&start) && received_range.contains(&(end - 1))19 { Branch (120:12): [True: 19, False: 2]
Branch (120:47): [True: 17, False: 2]
Branch (120:12): [Folded - Ignored]
Branch (120:47): [Folded - Ignored]
|
121 | | // Offset both to the start of the received_range. |
122 | 17 | let calculated_range_start = usize::try_from(start - received_range.start) |
123 | 17 | .err_tip(|| "Could not convert (start - received_range.start) to usize"0 )?0 ; |
124 | 17 | let calculated_range_end = usize::try_from(end - received_range.start) |
125 | 17 | .err_tip(|| "Could not convert (end - received_range.start) to usize"0 )?0 ; |
126 | 17 | Ok(Some(calculated_range_start..calculated_range_end)) |
127 | | } else { |
128 | 4 | Ok(None) |
129 | | } |
130 | 21 | } |
131 | | } |
132 | | |
133 | | #[async_trait] |
134 | | impl StoreDriver for FastSlowStore { |
135 | | async fn has_with_results( |
136 | | self: Pin<&Self>, |
137 | | key: &[StoreKey<'_>], |
138 | | results: &mut [Option<u64>], |
139 | 5 | ) -> Result<(), Error> { |
140 | | // If our slow store is a noop store, it'll always return a 404, |
141 | | // so only check the fast store in such case. |
142 | 5 | let slow_store = self.slow_store.inner_store::<StoreKey<'_>>(None); |
143 | 5 | if slow_store.optimized_for(StoreOptimizations::NoopDownloads) { Branch (143:12): [True: 2, False: 3]
Branch (143:12): [Folded - Ignored]
|
144 | 2 | return self.fast_store.has_with_results(key, results).await0 ; |
145 | 3 | } |
146 | 3 | // Only check the slow store because if it's not there, then something |
147 | 3 | // down stream might be unable to get it. This should not affect |
148 | 3 | // workers as they only use get() and a CAS can use an |
149 | 3 | // ExistenceCacheStore to avoid the bottleneck. |
150 | 3 | self.slow_store.has_with_results(key, results).await0 |
151 | 10 | } |
152 | | |
153 | | async fn update( |
154 | | self: Pin<&Self>, |
155 | | key: StoreKey<'_>, |
156 | | mut reader: DropCloserReadHalf, |
157 | | size_info: UploadSizeInfo, |
158 | 40 | ) -> Result<(), Error> { |
159 | | // If either one of our stores is a noop store, bypass the multiplexing |
160 | | // and just use the store that is not a noop store. |
161 | 40 | let slow_store = self.slow_store.inner_store(Some(key.borrow())); |
162 | 40 | if slow_store.optimized_for(StoreOptimizations::NoopUpdates) { Branch (162:12): [True: 0, False: 40]
Branch (162:12): [Folded - Ignored]
|
163 | 0 | return self.fast_store.update(key, reader, size_info).await; |
164 | 40 | } |
165 | 40 | let fast_store = self.fast_store.inner_store(Some(key.borrow())); |
166 | 40 | if fast_store.optimized_for(StoreOptimizations::NoopUpdates) { Branch (166:12): [True: 0, False: 40]
Branch (166:12): [Folded - Ignored]
|
167 | 0 | return self.slow_store.update(key, reader, size_info).await; |
168 | 40 | } |
169 | 40 | |
170 | 40 | let (mut fast_tx, fast_rx) = make_buf_channel_pair(); |
171 | 40 | let (mut slow_tx, slow_rx) = make_buf_channel_pair(); |
172 | 40 | |
173 | 40 | let data_stream_fut = async move { |
174 | | loop { |
175 | 67 | let buffer = reader |
176 | 67 | .recv() |
177 | 0 | .await |
178 | 67 | .err_tip(|| "Failed to read buffer in fastslow store"0 )?0 ; |
179 | 67 | if buffer.is_empty() { Branch (179:20): [True: 40, False: 27]
Branch (179:20): [Folded - Ignored]
|
180 | | // EOF received. |
181 | 40 | fast_tx.send_eof().err_tip(|| { |
182 | 0 | "Failed to write eof to fast store in fast_slow store update" |
183 | 40 | })?0 ; |
184 | 40 | slow_tx |
185 | 40 | .send_eof() |
186 | 40 | .err_tip(|| "Failed to write eof to writer in fast_slow store update"0 )?0 ; |
187 | 40 | return Result::<(), Error>::Ok(()); |
188 | 27 | } |
189 | | |
190 | 27 | let (fast_result, slow_result) = |
191 | 27 | join!(fast_tx.send(buffer.clone()), slow_tx.send(buffer)); |
192 | 27 | fast_result |
193 | 27 | .map_err(|e| { |
194 | 0 | make_err!( |
195 | 0 | Code::Internal, |
196 | 0 | "Failed to send message to fast_store in fast_slow_store {:?}", |
197 | 0 | e |
198 | 0 | ) |
199 | 27 | }) |
200 | 27 | .merge(slow_result.map_err(|e| { |
201 | 0 | make_err!( |
202 | 0 | Code::Internal, |
203 | 0 | "Failed to send message to slow_store in fast_slow store {:?}", |
204 | 0 | e |
205 | 0 | ) |
206 | 27 | }))?0 ; |
207 | | } |
208 | 40 | }; |
209 | | |
210 | 40 | let fast_store_fut = self.fast_store.update(key.borrow(), fast_rx, size_info); |
211 | 40 | let slow_store_fut = self.slow_store.update(key.borrow(), slow_rx, size_info); |
212 | | |
213 | 40 | let (data_stream_res, fast_res, slow_res) = |
214 | 40 | join!(data_stream_fut, fast_store_fut, slow_store_fut); |
215 | 40 | data_stream_res.merge(fast_res).merge(slow_res)?0 ; |
216 | 40 | Ok(()) |
217 | 80 | } |
218 | | |
219 | | /// FastSlowStore has optimiations for dealing with files. |
220 | 0 | fn optimized_for(&self, optimization: StoreOptimizations) -> bool { |
221 | 0 | optimization == StoreOptimizations::FileUpdates |
222 | 0 | } |
223 | | |
224 | | /// Optimized variation to consume the file if one of the stores is a |
225 | | /// filesystem store. This makes the operation a move instead of a copy |
226 | | /// dramatically increasing performance for large files. |
227 | | async fn update_with_whole_file( |
228 | | self: Pin<&Self>, |
229 | | key: StoreKey<'_>, |
230 | | mut file: fs::ResumeableFileSlot, |
231 | | upload_size: UploadSizeInfo, |
232 | 3 | ) -> Result<Option<fs::ResumeableFileSlot>, Error> { |
233 | 3 | if self Branch (233:12): [True: 3, False: 0]
Branch (233:12): [Folded - Ignored]
|
234 | 3 | .fast_store |
235 | 3 | .optimized_for(StoreOptimizations::FileUpdates) |
236 | | { |
237 | 3 | if !self Branch (237:16): [True: 3, False: 0]
Branch (237:16): [Folded - Ignored]
|
238 | 3 | .slow_store |
239 | 3 | .optimized_for(StoreOptimizations::NoopUpdates) |
240 | | { |
241 | 3 | slow_update_store_with_file( |
242 | 3 | self.slow_store.as_store_driver_pin(), |
243 | 3 | key.borrow(), |
244 | 3 | &mut file, |
245 | 3 | upload_size, |
246 | 3 | ) |
247 | 18 | .await |
248 | 3 | .err_tip(|| "In FastSlowStore::update_with_whole_file slow_store"0 )?0 ; |
249 | 0 | } |
250 | 3 | return self |
251 | 3 | .fast_store |
252 | 3 | .update_with_whole_file(key, file, upload_size) |
253 | 3 | .await; |
254 | 0 | } |
255 | 0 |
|
256 | 0 | if self Branch (256:12): [True: 0, False: 0]
Branch (256:12): [Folded - Ignored]
|
257 | 0 | .slow_store |
258 | 0 | .optimized_for(StoreOptimizations::FileUpdates) |
259 | | { |
260 | 0 | if !self Branch (260:16): [True: 0, False: 0]
Branch (260:16): [Folded - Ignored]
|
261 | 0 | .fast_store |
262 | 0 | .optimized_for(StoreOptimizations::NoopUpdates) |
263 | | { |
264 | 0 | slow_update_store_with_file( |
265 | 0 | self.fast_store.as_store_driver_pin(), |
266 | 0 | key.borrow(), |
267 | 0 | &mut file, |
268 | 0 | upload_size, |
269 | 0 | ) |
270 | 0 | .await |
271 | 0 | .err_tip(|| "In FastSlowStore::update_with_whole_file fast_store")?; |
272 | 0 | } |
273 | 0 | return self |
274 | 0 | .slow_store |
275 | 0 | .update_with_whole_file(key, file, upload_size) |
276 | 0 | .await; |
277 | 0 | } |
278 | 0 |
|
279 | 0 | slow_update_store_with_file(self, key, &mut file, upload_size) |
280 | 0 | .await |
281 | 0 | .err_tip(|| "In FastSlowStore::update_with_whole_file")?; |
282 | 0 | Ok(Some(file)) |
283 | 6 | } |
284 | | |
285 | | async fn get_part( |
286 | | self: Pin<&Self>, |
287 | | key: StoreKey<'_>, |
288 | | writer: &mut DropCloserWriteHalf, |
289 | | offset: u64, |
290 | | length: Option<u64>, |
291 | 41 | ) -> Result<(), Error> { |
292 | | // TODO(blaise.bruer) Investigate if we should maybe ignore errors here instead of |
293 | | // forwarding the up. |
294 | 41 | if self.fast_store.has(key.borrow()).await40 ?0 .is_some() { Branch (294:12): [True: 29, False: 12]
Branch (294:12): [Folded - Ignored]
|
295 | 29 | self.metrics |
296 | 29 | .fast_store_hit_count |
297 | 29 | .fetch_add(1, Ordering::Acquire); |
298 | 29 | self.fast_store |
299 | 29 | .get_part(key, writer.borrow_mut(), offset, length) |
300 | 130 | .await?0 ; |
301 | 29 | self.metrics |
302 | 29 | .fast_store_downloaded_bytes |
303 | 29 | .fetch_add(writer.get_bytes_written(), Ordering::Acquire); |
304 | 29 | return Ok(()); |
305 | 12 | } |
306 | | |
307 | 12 | let sz = self |
308 | 12 | .slow_store |
309 | 12 | .has(key.borrow()) |
310 | 0 | .await |
311 | 12 | .err_tip(|| "Failed to run has() on slow store"0 )?0 |
312 | 12 | .ok_or_else(|| { |
313 | 0 | make_err!( |
314 | 0 | Code::NotFound, |
315 | 0 | "Object {} not found in either fast or slow store", |
316 | 0 | key.as_str() |
317 | 0 | ) |
318 | 12 | })?0 ; |
319 | 12 | self.metrics |
320 | 12 | .slow_store_hit_count |
321 | 12 | .fetch_add(1, Ordering::Acquire); |
322 | 12 | |
323 | 12 | let send_range = offset..length.map_or(u64::MAX, |length| length + offset7 ); |
324 | 12 | let mut bytes_received: u64 = 0; |
325 | 12 | |
326 | 12 | let (mut fast_tx, fast_rx) = make_buf_channel_pair(); |
327 | 12 | let (slow_tx, mut slow_rx) = make_buf_channel_pair(); |
328 | 12 | let data_stream_fut = async move { |
329 | 12 | let mut writer_pin = Pin::new(writer); |
330 | | loop { |
331 | 23 | let output_buf = slow_rx |
332 | 23 | .recv() |
333 | 12 | .await |
334 | 23 | .err_tip(|| "Failed to read data data buffer from slow store"0 )?0 ; |
335 | 23 | if output_buf.is_empty() { Branch (335:20): [True: 12, False: 11]
Branch (335:20): [Folded - Ignored]
|
336 | | // Write out our EOF. |
337 | | // We are dropped as soon as we send_eof to writer_pin, so |
338 | | // we wait until we've finished all of our joins to do that. |
339 | 12 | let fast_res = fast_tx.send_eof(); |
340 | 12 | return Ok::<_, Error>((fast_res, writer_pin)); |
341 | 11 | } |
342 | 11 | let output_buf_len = u64::try_from(output_buf.len()) |
343 | 11 | .err_tip(|| "Could not output_buf.len() to u64"0 )?0 ; |
344 | 11 | self.metrics |
345 | 11 | .slow_store_downloaded_bytes |
346 | 11 | .fetch_add(output_buf_len, Ordering::Acquire); |
347 | | |
348 | 11 | let writer_fut = if let Some(range) = Self::calculate_range( Branch (348:41): [True: 11, False: 0]
Branch (348:41): [Folded - Ignored]
|
349 | 11 | &(bytes_received..bytes_received + output_buf_len), |
350 | 11 | &send_range, |
351 | 11 | )?0 { |
352 | 11 | writer_pin.send(output_buf.slice(range)).right_future() |
353 | | } else { |
354 | 0 | futures::future::ready(Ok(())).left_future() |
355 | | }; |
356 | 11 | bytes_received += output_buf_len; |
357 | | |
358 | 11 | let (fast_tx_res, writer_res) = join!(fast_tx.send(output_buf), writer_fut); |
359 | 11 | fast_tx_res.err_tip(|| "Failed to write to fast store in fast_slow store"0 )?0 ; |
360 | 11 | writer_res.err_tip(|| "Failed to write result to writer in fast_slow store"0 )?0 ; |
361 | | } |
362 | 12 | }; |
363 | | |
364 | 12 | let slow_store_fut = self.slow_store.get(key.borrow(), slow_tx); |
365 | 12 | let fast_store_fut = |
366 | 12 | self.fast_store |
367 | 12 | .update(key.borrow(), fast_rx, UploadSizeInfo::ExactSize(sz)); |
368 | | |
369 | 12 | let (data_stream_res, slow_res, fast_res) = |
370 | 12 | join!(data_stream_fut, slow_store_fut, fast_store_fut); |
371 | 12 | match data_stream_res { |
372 | 12 | Ok((fast_eof_res, mut writer_pin)) => |
373 | 12 | // Sending the EOF will drop us almost immediately in bytestream_server |
374 | 12 | // so we perform it as the very last action in this method. |
375 | 12 | { |
376 | 12 | fast_eof_res |
377 | 12 | .merge(fast_res) |
378 | 12 | .merge(slow_res) |
379 | 12 | .merge(writer_pin.send_eof()) |
380 | | } |
381 | 0 | Err(err) => fast_res.merge(slow_res).merge(Err(err)), |
382 | | } |
383 | 82 | } |
384 | | |
385 | 2 | fn inner_store(&self, _key: Option<StoreKey>) -> &dyn StoreDriver { |
386 | 2 | self |
387 | 2 | } |
388 | | |
389 | 2 | fn as_any<'a>(&'a self) -> &'a (dyn std::any::Any + Sync + Send + 'static) { |
390 | 2 | self |
391 | 2 | } |
392 | | |
393 | 0 | fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Sync + Send + 'static> { |
394 | 0 | self |
395 | 0 | } |
396 | | } |
397 | | |
398 | 0 | #[derive(Default, MetricsComponent)] |
399 | | struct FastSlowStoreMetrics { |
400 | | #[metric(help = "Hit count for the fast store")] |
401 | | fast_store_hit_count: AtomicU64, |
402 | | #[metric(help = "Downloaded bytes from the fast store")] |
403 | | fast_store_downloaded_bytes: AtomicU64, |
404 | | #[metric(help = "Hit count for the slow store")] |
405 | | slow_store_hit_count: AtomicU64, |
406 | | #[metric(help = "Downloaded bytes from the slow store")] |
407 | | slow_store_downloaded_bytes: AtomicU64, |
408 | | } |
409 | | |
410 | | default_health_status_indicator!(FastSlowStore); |