/build/source/nativelink-store/src/fast_slow_store.rs
Line | Count | Source |
1 | | // Copyright 2024-2025 The NativeLink Authors. All rights reserved. |
2 | | // |
3 | | // Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // See LICENSE file for details |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | use core::borrow::BorrowMut; |
16 | | use core::cmp::{max, min}; |
17 | | use core::ops::Range; |
18 | | use core::pin::Pin; |
19 | | use core::sync::atomic::{AtomicU64, Ordering}; |
20 | | use std::collections::HashMap; |
21 | | use std::ffi::OsString; |
22 | | use std::sync::{Arc, Weak}; |
23 | | |
24 | | use async_trait::async_trait; |
25 | | use futures::{FutureExt, join}; |
26 | | use nativelink_config::stores::{FastSlowSpec, StoreDirection}; |
27 | | use nativelink_error::{Code, Error, ResultExt, make_err}; |
28 | | use nativelink_metric::MetricsComponent; |
29 | | use nativelink_util::buf_channel::{ |
30 | | DropCloserReadHalf, DropCloserWriteHalf, make_buf_channel_pair, |
31 | | }; |
32 | | use nativelink_util::fs; |
33 | | use nativelink_util::health_utils::{HealthStatusIndicator, default_health_status_indicator}; |
34 | | use nativelink_util::store_trait::{ |
35 | | RemoveItemCallback, Store, StoreDriver, StoreKey, StoreLike, StoreOptimizations, |
36 | | UploadSizeInfo, slow_update_store_with_file, |
37 | | }; |
38 | | use parking_lot::Mutex; |
39 | | use tokio::sync::OnceCell; |
40 | | use tracing::trace; |
41 | | |
42 | | // TODO(palfrey) This store needs to be evaluated for more efficient memory usage, |
43 | | // there are many copies happening internally. |
44 | | |
45 | | type Loader = Arc<OnceCell<()>>; |
46 | | |
47 | | // TODO(palfrey) We should consider copying the data in the background to allow the |
48 | | // client to hang up while the data is buffered. An alternative is to possibly make a |
49 | | // "BufferedStore" that could be placed on the "slow" store that would hang up early |
50 | | // if data is in the buffer. |
51 | | #[derive(Debug, MetricsComponent)] |
52 | | pub struct FastSlowStore { |
53 | | #[metric(group = "fast_store")] |
54 | | fast_store: Store, |
55 | | fast_direction: StoreDirection, |
56 | | #[metric(group = "slow_store")] |
57 | | slow_store: Store, |
58 | | slow_direction: StoreDirection, |
59 | | weak_self: Weak<Self>, |
60 | | #[metric] |
61 | | metrics: FastSlowStoreMetrics, |
62 | | // De-duplicate requests for the fast store, only the first streams, others |
63 | | // are blocked. This may feel like it's causing a slow down of tasks, but |
64 | | // actually it's faster because we're not downloading the file multiple |
65 | | // times are doing loads of duplicate IO. |
66 | | populating_digests: Mutex<HashMap<StoreKey<'static>, Loader>>, |
67 | | } |
68 | | |
69 | | // This guard ensures that the populating_digests is cleared even if the future |
70 | | // is dropped, it is cancel safe. |
71 | | struct LoaderGuard<'a> { |
72 | | weak_store: Weak<FastSlowStore>, |
73 | | key: StoreKey<'a>, |
74 | | loader: Option<Loader>, |
75 | | } |
76 | | |
77 | | impl LoaderGuard<'_> { |
78 | 14 | async fn get_or_try_init<E, F, Fut>(&self, f: F) -> Result<(), E> |
79 | 14 | where |
80 | 14 | F: FnOnce() -> Fut, |
81 | 14 | Fut: Future<Output = Result<(), E>>, |
82 | 14 | { |
83 | 14 | if let Some(loader) = &self.loader { Branch (83:16): [True: 10, False: 0]
Branch (83:16): [True: 4, False: 0]
Branch (83:16): [Folded - Ignored]
|
84 | 14 | loader.get_or_try_init(f).await.map(|&()| ()) |
85 | | } else { |
86 | | // This is impossible, but we do it anyway. |
87 | 0 | f().await |
88 | | } |
89 | 14 | } |
90 | | } |
91 | | |
92 | | impl Drop for LoaderGuard<'_> { |
93 | 14 | fn drop(&mut self) { |
94 | 14 | let Some(store) = self.weak_store.upgrade() else { Branch (94:13): [True: 14, False: 0]
Branch (94:13): [Folded - Ignored]
|
95 | | // The store has already gone away, nothing to remove from. |
96 | 0 | return; |
97 | | }; |
98 | 14 | let Some(loader) = self.loader.take() else { Branch (98:13): [True: 14, False: 0]
Branch (98:13): [Folded - Ignored]
|
99 | | // This should never happen, but we do it to be safe. |
100 | 0 | return; |
101 | | }; |
102 | | |
103 | 14 | let mut guard = store.populating_digests.lock(); |
104 | 14 | if let std::collections::hash_map::Entry::Occupied(occupied_entry) = Branch (104:16): [True: 14, False: 0]
Branch (104:16): [Folded - Ignored]
|
105 | 14 | guard.entry(self.key.borrow().into_owned()) |
106 | | { |
107 | 14 | if Arc::ptr_eq(occupied_entry.get(), &loader) { Branch (107:16): [True: 14, False: 0]
Branch (107:16): [Folded - Ignored]
|
108 | 14 | drop(loader); |
109 | 14 | if Arc::strong_count(occupied_entry.get()) == 1 { Branch (109:20): [True: 14, False: 0]
Branch (109:20): [Folded - Ignored]
|
110 | 14 | // This is the last loader, so remove it. |
111 | 14 | occupied_entry.remove(); |
112 | 14 | }0 |
113 | 0 | } |
114 | 0 | } |
115 | 14 | } |
116 | | } |
117 | | |
118 | | impl FastSlowStore { |
119 | 41 | pub fn new(spec: &FastSlowSpec, fast_store: Store, slow_store: Store) -> Arc<Self> { |
120 | 41 | Arc::new_cyclic(|weak_self| Self { |
121 | 41 | fast_store, |
122 | 41 | fast_direction: spec.fast_direction, |
123 | 41 | slow_store, |
124 | 41 | slow_direction: spec.slow_direction, |
125 | 41 | weak_self: weak_self.clone(), |
126 | 41 | metrics: FastSlowStoreMetrics::default(), |
127 | 41 | populating_digests: Mutex::new(HashMap::new()), |
128 | 41 | }) |
129 | 41 | } |
130 | | |
131 | 0 | pub const fn fast_store(&self) -> &Store { |
132 | 0 | &self.fast_store |
133 | 0 | } |
134 | | |
135 | 0 | pub const fn slow_store(&self) -> &Store { |
136 | 0 | &self.slow_store |
137 | 0 | } |
138 | | |
139 | 2 | pub fn get_arc(&self) -> Option<Arc<Self>> { |
140 | 2 | self.weak_self.upgrade() |
141 | 2 | } |
142 | | |
143 | 14 | fn get_loader<'a>(&self, key: StoreKey<'a>) -> LoaderGuard<'a> { |
144 | | // Get a single loader instance that's used to populate the fast store |
145 | | // for this digest. If another request comes in then it's de-duplicated. |
146 | 14 | let loader = match self |
147 | 14 | .populating_digests |
148 | 14 | .lock() |
149 | 14 | .entry(key.borrow().into_owned()) |
150 | | { |
151 | 0 | std::collections::hash_map::Entry::Occupied(occupied_entry) => { |
152 | 0 | occupied_entry.get().clone() |
153 | | } |
154 | 14 | std::collections::hash_map::Entry::Vacant(vacant_entry) => { |
155 | 14 | vacant_entry.insert(Arc::new(OnceCell::new())).clone() |
156 | | } |
157 | | }; |
158 | 14 | LoaderGuard { |
159 | 14 | weak_store: self.weak_self.clone(), |
160 | 14 | key, |
161 | 14 | loader: Some(loader), |
162 | 14 | } |
163 | 14 | } |
164 | | |
165 | 14 | async fn populate_and_maybe_stream( |
166 | 14 | self: Pin<&Self>, |
167 | 14 | key: StoreKey<'_>, |
168 | 14 | maybe_writer: Option<&mut DropCloserWriteHalf>, |
169 | 14 | offset: u64, |
170 | 14 | length: Option<u64>, |
171 | 14 | ) -> Result<(), Error> { |
172 | 14 | let reader_stream_size = if self Branch (172:37): [True: 2, False: 8]
Branch (172:37): [True: 0, False: 4]
Branch (172:37): [Folded - Ignored]
|
173 | 14 | .slow_store |
174 | 14 | .inner_store(Some(key.borrow())) |
175 | 14 | .optimized_for(StoreOptimizations::LazyExistenceOnSync) |
176 | | { |
177 | 2 | trace!( |
178 | | %key, |
179 | 2 | store_name = %self.slow_store.inner_store(Some(key.borrow())).get_name(), |
180 | 2 | "Skipping .has() check due to LazyExistenceOnSync optimization" |
181 | | ); |
182 | 2 | UploadSizeInfo::MaxSize(u64::MAX) |
183 | | } else { |
184 | 12 | UploadSizeInfo::ExactSize(self |
185 | 12 | .slow_store |
186 | 12 | .has(key.borrow()) |
187 | 12 | .await |
188 | 12 | .err_tip(|| "Failed to run has() on slow store")?0 |
189 | 12 | .ok_or_else(|| {0 |
190 | 0 | make_err!( |
191 | 0 | Code::NotFound, |
192 | | "Object {} not found in either fast or slow store. \ |
193 | | If using multiple workers, ensure all workers share the same CAS storage path.", |
194 | 0 | key.as_str() |
195 | | ) |
196 | 0 | })? |
197 | | ) |
198 | | }; |
199 | | |
200 | 14 | let send_range = offset..length.map_or(u64::MAX, |length| length7 + offset7 ); |
201 | 14 | let mut bytes_received: u64 = 0; |
202 | 14 | let mut counted_hit = false; |
203 | | |
204 | 14 | let (mut fast_tx, fast_rx) = make_buf_channel_pair(); |
205 | 14 | let (slow_tx, mut slow_rx) = make_buf_channel_pair(); |
206 | 14 | let data_stream_fut = async move { |
207 | 14 | let mut maybe_writer_pin = maybe_writer.map(Pin::new); |
208 | | loop { |
209 | 26 | let output_buf25 = slow_rx |
210 | 26 | .recv() |
211 | 26 | .await |
212 | 26 | .err_tip(|| "Failed to read data data buffer from slow store")?1 ; |
213 | 25 | if output_buf.is_empty() { Branch (213:20): [True: 9, False: 8]
Branch (213:20): [True: 4, False: 4]
Branch (213:20): [Folded - Ignored]
|
214 | | // Write out our EOF. |
215 | | // We are dropped as soon as we send_eof to writer_pin, so |
216 | | // we wait until we've finished all of our joins to do that. |
217 | 13 | let fast_res = fast_tx.send_eof(); |
218 | 13 | return Ok::<_, Error>((fast_res, maybe_writer_pin)); |
219 | 12 | } |
220 | | |
221 | 12 | if !counted_hit { Branch (221:20): [True: 8, False: 0]
Branch (221:20): [True: 4, False: 0]
Branch (221:20): [Folded - Ignored]
|
222 | 12 | self.metrics |
223 | 12 | .slow_store_hit_count |
224 | 12 | .fetch_add(1, Ordering::Acquire); |
225 | 12 | counted_hit = true; |
226 | 12 | }0 |
227 | | |
228 | 12 | let output_buf_len = u64::try_from(output_buf.len()) |
229 | 12 | .err_tip(|| "Could not output_buf.len() to u64")?0 ; |
230 | 12 | self.metrics |
231 | 12 | .slow_store_downloaded_bytes |
232 | 12 | .fetch_add(output_buf_len, Ordering::Acquire); |
233 | | |
234 | 12 | let writer_fut = Self::calculate_range( |
235 | 12 | &(bytes_received..bytes_received + output_buf_len), |
236 | 12 | &send_range, |
237 | 0 | )? |
238 | 12 | .zip(maybe_writer_pin.as_mut()) |
239 | 12 | .map_or_else( |
240 | 4 | || futures::future::ready(Ok(())).left_future(), |
241 | 8 | |(range, writer_pin)| writer_pin.send(output_buf.slice(range)).right_future(), |
242 | | ); |
243 | | |
244 | 12 | bytes_received += output_buf_len; |
245 | | |
246 | 12 | let (fast_tx_res, writer_res) = join!(fast_tx.send(output_buf), writer_fut); |
247 | 12 | fast_tx_res.err_tip(|| "Failed to write to fast store in fast_slow store")?0 ; |
248 | 12 | writer_res.err_tip(|| "Failed to write result to writer in fast_slow store")?0 ; |
249 | | } |
250 | 14 | }; |
251 | | |
252 | 14 | let slow_store_fut = self.slow_store.get(key.borrow(), slow_tx); |
253 | 14 | let fast_store_fut = self |
254 | 14 | .fast_store |
255 | 14 | .update(key.borrow(), fast_rx, reader_stream_size); |
256 | | |
257 | 14 | let (data_stream_res, slow_res, fast_res) = |
258 | 14 | join!(data_stream_fut, slow_store_fut, fast_store_fut); |
259 | 14 | match data_stream_res { |
260 | 13 | Ok((fast_eof_res, maybe_writer_pin)) => |
261 | | // Sending the EOF will drop us almost immediately in bytestream_server |
262 | | // so we perform it as the very last action in this method. |
263 | | { |
264 | 13 | fast_eof_res.merge(fast_res).merge(slow_res).merge( |
265 | 13 | if let Some(mut writer_pin9 ) = maybe_writer_pin { Branch (265:28): [True: 9, False: 0]
Branch (265:28): [True: 0, False: 4]
Branch (265:28): [Folded - Ignored]
|
266 | 9 | writer_pin.send_eof() |
267 | | } else { |
268 | 4 | Ok(()) |
269 | | }, |
270 | | ) |
271 | | } |
272 | 1 | Err(err) => match slow_res { |
273 | 1 | Err(slow_err) if slow_err.code == Code::NotFound => Err(slow_err), Branch (273:34): [True: 1, False: 0]
Branch (273:34): [True: 0, False: 0]
Branch (273:34): [Folded - Ignored]
|
274 | 0 | _ => fast_res.merge(slow_res).merge(Err(err)), |
275 | | }, |
276 | | } |
277 | 14 | } |
278 | | |
279 | | /// Ensure our fast store is populated. This should be kept as a low |
280 | | /// cost function. Since the data itself is shared and not copied it should be fairly |
281 | | /// low cost to just discard the data, but does cost a few mutex locks while |
282 | | /// streaming. |
283 | 4 | pub async fn populate_fast_store(&self, key: StoreKey<'_>) -> Result<(), Error> { |
284 | 4 | let maybe_size_info = self |
285 | 4 | .fast_store |
286 | 4 | .has(key.borrow()) |
287 | 4 | .await |
288 | 4 | .err_tip(|| "While querying in populate_fast_store")?0 ; |
289 | 4 | if maybe_size_info.is_some() { Branch (289:12): [Folded - Ignored]
Branch (289:12): [True: 0, False: 4]
Branch (289:12): [Folded - Ignored]
|
290 | 0 | return Ok(()); |
291 | 4 | } |
292 | | |
293 | | // If the fast store is noop or read only or update only then this is an error. |
294 | 4 | if self Branch (294:12): [Folded - Ignored]
Branch (294:12): [True: 0, False: 4]
Branch (294:12): [Folded - Ignored]
|
295 | 4 | .fast_store |
296 | 4 | .inner_store(Some(key.borrow())) |
297 | 4 | .optimized_for(StoreOptimizations::NoopUpdates) |
298 | 4 | || self.fast_direction == StoreDirection::ReadOnly Branch (298:16): [Folded - Ignored]
Branch (298:16): [True: 0, False: 4]
Branch (298:16): [Folded - Ignored]
|
299 | 4 | || self.fast_direction == StoreDirection::Update Branch (299:16): [Folded - Ignored]
Branch (299:16): [True: 0, False: 4]
Branch (299:16): [Folded - Ignored]
|
300 | | { |
301 | 0 | return Err(make_err!( |
302 | 0 | Code::Internal, |
303 | 0 | "Attempt to populate fast store that is read only or noop" |
304 | 0 | )); |
305 | 4 | } |
306 | | |
307 | 4 | self.get_loader(key.borrow()) |
308 | 4 | .get_or_try_init(|| { |
309 | 4 | Pin::new(self).populate_and_maybe_stream(key.borrow(), None, 0, None) |
310 | 4 | }) |
311 | 4 | .await |
312 | 4 | .err_tip(|| "Failed to populate()") |
313 | 4 | } |
314 | | |
315 | | /// Returns the range of bytes that should be sent given a slice bounds |
316 | | /// offset so the output range maps the `received_range.start` to 0. |
317 | | // TODO(palfrey) This should be put into utils, as this logic is used |
318 | | // elsewhere in the code. |
319 | 22 | pub fn calculate_range( |
320 | 22 | received_range: &Range<u64>, |
321 | 22 | send_range: &Range<u64>, |
322 | 22 | ) -> Result<Option<Range<usize>>, Error> { |
323 | | // Protect against subtraction overflow. |
324 | 22 | if received_range.start >= received_range.end { Branch (324:12): [True: 0, False: 22]
Branch (324:12): [Folded - Ignored]
|
325 | 0 | return Ok(None); |
326 | 22 | } |
327 | | |
328 | 22 | let start = max(received_range.start, send_range.start); |
329 | 22 | let end = min(received_range.end, send_range.end); |
330 | 22 | if received_range.contains(&start) && received_range20 .contains20 (&(end - 1)20 ) { Branch (330:12): [True: 20, False: 2]
Branch (330:47): [True: 18, False: 2]
Branch (330:12): [Folded - Ignored]
Branch (330:47): [Folded - Ignored]
|
331 | | // Offset both to the start of the received_range. |
332 | 18 | let calculated_range_start = usize::try_from(start - received_range.start) |
333 | 18 | .err_tip(|| "Could not convert (start - received_range.start) to usize")?0 ; |
334 | 18 | let calculated_range_end = usize::try_from(end - received_range.start) |
335 | 18 | .err_tip(|| "Could not convert (end - received_range.start) to usize")?0 ; |
336 | 18 | Ok(Some(calculated_range_start..calculated_range_end)) |
337 | | } else { |
338 | 4 | Ok(None) |
339 | | } |
340 | 22 | } |
341 | | } |
342 | | |
343 | | #[async_trait] |
344 | | impl StoreDriver for FastSlowStore { |
345 | | async fn has_with_results( |
346 | | self: Pin<&Self>, |
347 | | key: &[StoreKey<'_>], |
348 | | results: &mut [Option<u64>], |
349 | 12 | ) -> Result<(), Error> { |
350 | | // If our slow store is a noop store, it'll always return a 404, |
351 | | // so only check the fast store in such case. |
352 | | let slow_store = self.slow_store.inner_store::<StoreKey<'_>>(None); |
353 | | if slow_store.optimized_for(StoreOptimizations::NoopDownloads) { |
354 | | return self.fast_store.has_with_results(key, results).await; |
355 | | } |
356 | | // Only check the slow store because if it's not there, then something |
357 | | // down stream might be unable to get it. This should not affect |
358 | | // workers as they only use get() and a CAS can use an |
359 | | // ExistenceCacheStore to avoid the bottleneck. |
360 | | self.slow_store.has_with_results(key, results).await |
361 | 12 | } |
362 | | |
363 | | async fn update( |
364 | | self: Pin<&Self>, |
365 | | key: StoreKey<'_>, |
366 | | mut reader: DropCloserReadHalf, |
367 | | size_info: UploadSizeInfo, |
368 | 88 | ) -> Result<(), Error> { |
369 | | // If either one of our stores is a noop store, bypass the multiplexing |
370 | | // and just use the store that is not a noop store. |
371 | | let ignore_slow = self |
372 | | .slow_store |
373 | | .inner_store(Some(key.borrow())) |
374 | | .optimized_for(StoreOptimizations::NoopUpdates) |
375 | | || self.slow_direction == StoreDirection::ReadOnly |
376 | | || self.slow_direction == StoreDirection::Get; |
377 | | let ignore_fast = self |
378 | | .fast_store |
379 | | .inner_store(Some(key.borrow())) |
380 | | .optimized_for(StoreOptimizations::NoopUpdates) |
381 | | || self.fast_direction == StoreDirection::ReadOnly |
382 | | || self.fast_direction == StoreDirection::Get; |
383 | | if ignore_slow && ignore_fast { |
384 | | // We need to drain the reader to avoid the writer complaining that we dropped |
385 | | // the connection prematurely. |
386 | | reader |
387 | | .drain() |
388 | | .await |
389 | | .err_tip(|| "In FastFlowStore::update")?; |
390 | | return Ok(()); |
391 | | } |
392 | | if ignore_slow { |
393 | | return self.fast_store.update(key, reader, size_info).await; |
394 | | } |
395 | | if ignore_fast { |
396 | | return self.slow_store.update(key, reader, size_info).await; |
397 | | } |
398 | | |
399 | | let (mut fast_tx, fast_rx) = make_buf_channel_pair(); |
400 | | let (mut slow_tx, slow_rx) = make_buf_channel_pair(); |
401 | | |
402 | 84 | let data_stream_fut = async move { |
403 | | loop { |
404 | 136 | let buffer = reader |
405 | 136 | .recv() |
406 | 136 | .await |
407 | 136 | .err_tip(|| "Failed to read buffer in fastslow store")?0 ; |
408 | 136 | if buffer.is_empty() { Branch (408:20): [True: 84, False: 52]
Branch (408:20): [Folded - Ignored]
|
409 | | // EOF received. |
410 | 84 | fast_tx.send_eof().err_tip( |
411 | | || "Failed to write eof to fast store in fast_slow store update", |
412 | 0 | )?; |
413 | 84 | slow_tx |
414 | 84 | .send_eof() |
415 | 84 | .err_tip(|| "Failed to write eof to writer in fast_slow store update")?0 ; |
416 | 84 | return Result::<(), Error>::Ok(()); |
417 | 52 | } |
418 | | |
419 | 52 | let (fast_result, slow_result) = |
420 | 52 | join!(fast_tx.send(buffer.clone()), slow_tx.send(buffer)); |
421 | 52 | fast_result |
422 | 52 | .map_err(|e| {0 |
423 | 0 | make_err!( |
424 | 0 | Code::Internal, |
425 | | "Failed to send message to fast_store in fast_slow_store {:?}", |
426 | | e |
427 | | ) |
428 | 0 | }) |
429 | 52 | .merge(slow_result.map_err(|e| {0 |
430 | 0 | make_err!( |
431 | 0 | Code::Internal, |
432 | | "Failed to send message to slow_store in fast_slow store {:?}", |
433 | | e |
434 | | ) |
435 | 0 | }))?; |
436 | | } |
437 | 84 | }; |
438 | | |
439 | | let fast_store_fut = self.fast_store.update(key.borrow(), fast_rx, size_info); |
440 | | let slow_store_fut = self.slow_store.update(key.borrow(), slow_rx, size_info); |
441 | | |
442 | | let (data_stream_res, fast_res, slow_res) = |
443 | | join!(data_stream_fut, fast_store_fut, slow_store_fut); |
444 | | data_stream_res.merge(fast_res).merge(slow_res)?; |
445 | | Ok(()) |
446 | 88 | } |
447 | | |
448 | | /// `FastSlowStore` has optimizations for dealing with files. |
449 | 0 | fn optimized_for(&self, optimization: StoreOptimizations) -> bool { |
450 | 0 | optimization == StoreOptimizations::FileUpdates |
451 | 0 | } |
452 | | |
453 | | /// Optimized variation to consume the file if one of the stores is a |
454 | | /// filesystem store. This makes the operation a move instead of a copy |
455 | | /// dramatically increasing performance for large files. |
456 | | async fn update_with_whole_file( |
457 | | self: Pin<&Self>, |
458 | | key: StoreKey<'_>, |
459 | | path: OsString, |
460 | | mut file: fs::FileSlot, |
461 | | upload_size: UploadSizeInfo, |
462 | 5 | ) -> Result<Option<fs::FileSlot>, Error> { |
463 | | if self |
464 | | .fast_store |
465 | | .optimized_for(StoreOptimizations::FileUpdates) |
466 | | { |
467 | | if !self |
468 | | .slow_store |
469 | | .inner_store(Some(key.borrow())) |
470 | | .optimized_for(StoreOptimizations::NoopUpdates) |
471 | | && self.slow_direction != StoreDirection::ReadOnly |
472 | | && self.slow_direction != StoreDirection::Get |
473 | | { |
474 | | slow_update_store_with_file( |
475 | | self.slow_store.as_store_driver_pin(), |
476 | | key.borrow(), |
477 | | &mut file, |
478 | | upload_size, |
479 | | ) |
480 | | .await |
481 | | .err_tip(|| "In FastSlowStore::update_with_whole_file slow_store")?; |
482 | | } |
483 | | if self.fast_direction == StoreDirection::ReadOnly |
484 | | || self.fast_direction == StoreDirection::Get |
485 | | { |
486 | | return Ok(Some(file)); |
487 | | } |
488 | | return self |
489 | | .fast_store |
490 | | .update_with_whole_file(key, path, file, upload_size) |
491 | | .await; |
492 | | } |
493 | | |
494 | | if self |
495 | | .slow_store |
496 | | .optimized_for(StoreOptimizations::FileUpdates) |
497 | | { |
498 | | let ignore_fast = self |
499 | | .fast_store |
500 | | .inner_store(Some(key.borrow())) |
501 | | .optimized_for(StoreOptimizations::NoopUpdates) |
502 | | || self.fast_direction == StoreDirection::ReadOnly |
503 | | || self.fast_direction == StoreDirection::Get; |
504 | | if !ignore_fast { |
505 | | slow_update_store_with_file( |
506 | | self.fast_store.as_store_driver_pin(), |
507 | | key.borrow(), |
508 | | &mut file, |
509 | | upload_size, |
510 | | ) |
511 | | .await |
512 | | .err_tip(|| "In FastSlowStore::update_with_whole_file fast_store")?; |
513 | | } |
514 | | let ignore_slow = self.slow_direction == StoreDirection::ReadOnly |
515 | | || self.slow_direction == StoreDirection::Get; |
516 | | if ignore_slow { |
517 | | return Ok(Some(file)); |
518 | | } |
519 | | return self |
520 | | .slow_store |
521 | | .update_with_whole_file(key, path, file, upload_size) |
522 | | .await; |
523 | | } |
524 | | |
525 | | slow_update_store_with_file(self, key, &mut file, upload_size) |
526 | | .await |
527 | | .err_tip(|| "In FastSlowStore::update_with_whole_file")?; |
528 | | Ok(Some(file)) |
529 | 5 | } |
530 | | |
531 | | async fn get_part( |
532 | | self: Pin<&Self>, |
533 | | key: StoreKey<'_>, |
534 | | writer: &mut DropCloserWriteHalf, |
535 | | offset: u64, |
536 | | length: Option<u64>, |
537 | 70 | ) -> Result<(), Error> { |
538 | | // TODO(palfrey) Investigate if we should maybe ignore errors here instead of |
539 | | // forwarding them up. |
540 | | if self.fast_store.has(key.borrow()).await?.is_some() { |
541 | | self.metrics |
542 | | .fast_store_hit_count |
543 | | .fetch_add(1, Ordering::Acquire); |
544 | | self.fast_store |
545 | | .get_part(key, writer.borrow_mut(), offset, length) |
546 | | .await?; |
547 | | self.metrics |
548 | | .fast_store_downloaded_bytes |
549 | | .fetch_add(writer.get_bytes_written(), Ordering::Acquire); |
550 | | return Ok(()); |
551 | | } |
552 | | |
553 | | // If the fast store is noop or read only or update only then bypass it. |
554 | | if self |
555 | | .fast_store |
556 | | .inner_store(Some(key.borrow())) |
557 | | .optimized_for(StoreOptimizations::NoopUpdates) |
558 | | || self.fast_direction == StoreDirection::ReadOnly |
559 | | || self.fast_direction == StoreDirection::Update |
560 | | { |
561 | | self.metrics |
562 | | .slow_store_hit_count |
563 | | .fetch_add(1, Ordering::Acquire); |
564 | | self.slow_store |
565 | | .get_part(key, writer.borrow_mut(), offset, length) |
566 | | .await?; |
567 | | self.metrics |
568 | | .slow_store_downloaded_bytes |
569 | | .fetch_add(writer.get_bytes_written(), Ordering::Acquire); |
570 | | return Ok(()); |
571 | | } |
572 | | |
573 | | let mut writer = Some(writer); |
574 | | self.get_loader(key.borrow()) |
575 | 10 | .get_or_try_init(|| { |
576 | 10 | self.populate_and_maybe_stream(key.borrow(), writer.take(), offset, length) |
577 | 10 | }) |
578 | | .await?; |
579 | | |
580 | | // If we didn't stream then re-enter which will stream from the fast |
581 | | // store, or retry the download. We should not get in a loop here |
582 | | // because OnceCell has the good sense to retry for all callers so in |
583 | | // order to get here the fast store will have been populated. There's |
584 | | // an outside chance it was evicted, but that's slim. |
585 | | if let Some(writer) = writer.take() { |
586 | | self.get_part(key, writer, offset, length).await |
587 | | } else { |
588 | | // This was the thread that did the streaming already, lucky duck. |
589 | | Ok(()) |
590 | | } |
591 | 70 | } |
592 | | |
593 | 2 | fn inner_store(&self, _key: Option<StoreKey>) -> &dyn StoreDriver { |
594 | 2 | self |
595 | 2 | } |
596 | | |
597 | 2 | fn as_any<'a>(&'a self) -> &'a (dyn core::any::Any + Sync + Send + 'static) { |
598 | 2 | self |
599 | 2 | } |
600 | | |
601 | 0 | fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Sync + Send + 'static> { |
602 | 0 | self |
603 | 0 | } |
604 | | |
605 | 0 | fn register_remove_callback( |
606 | 0 | self: Arc<Self>, |
607 | 0 | callback: Arc<dyn RemoveItemCallback>, |
608 | 0 | ) -> Result<(), Error> { |
609 | 0 | self.fast_store.register_remove_callback(callback.clone())?; |
610 | 0 | self.slow_store.register_remove_callback(callback)?; |
611 | 0 | Ok(()) |
612 | 0 | } |
613 | | } |
614 | | |
615 | | #[derive(Debug, Default, MetricsComponent)] |
616 | | struct FastSlowStoreMetrics { |
617 | | #[metric(help = "Hit count for the fast store")] |
618 | | fast_store_hit_count: AtomicU64, |
619 | | #[metric(help = "Downloaded bytes from the fast store")] |
620 | | fast_store_downloaded_bytes: AtomicU64, |
621 | | #[metric(help = "Hit count for the slow store")] |
622 | | slow_store_hit_count: AtomicU64, |
623 | | #[metric(help = "Downloaded bytes from the slow store")] |
624 | | slow_store_downloaded_bytes: AtomicU64, |
625 | | } |
626 | | |
627 | | default_health_status_indicator!(FastSlowStore); |